453 lines
21 KiB
Python
Executable File
453 lines
21 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import argparse
|
|
import asyncio
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, Tuple, Coroutine
|
|
|
|
import discord
|
|
import discord.utils
|
|
from discord.ext import commands
|
|
|
|
import utils
|
|
from GameFiles import Game, GamesFile, Policy
|
|
|
|
logger = logging.getLogger("SecretBot")
|
|
|
|
|
|
class SecretBot(commands.Cog):
|
|
|
|
def __init__(self, games_file: GamesFile):
|
|
super().__init__()
|
|
self.bot = commands.Bot(
|
|
"!",
|
|
help_command = commands.MinimalHelpCommand(),
|
|
intents = discord.Intents(guild_messages = True, guilds = True, members = True, reactions = True),
|
|
allowed_mentions = discord.AllowedMentions.none(), # By default we do not allow mentions, as we will white-list them on each required message
|
|
)
|
|
self.bot.add_cog(self)
|
|
self.games_file = games_file
|
|
# Each guild can have a confirmation message to execute an action. We store:
|
|
# The confirmation message, the coroutine to execute if the user reacts with :thumbsup: to the confirmation message, a task that will delete the message after a certain time (so that we can stop it if the user confirms instead)
|
|
self.confirmation_messages: Dict[discord.guild, Tuple[discord.Message, Coroutine, asyncio.Task]] = {}
|
|
|
|
def run(self, token):
|
|
self.bot.run(token)
|
|
|
|
def get_command_usage_message(self, command: commands.Command, prepend = "Usage: ") -> str:
|
|
command_elements = [str(cmd) for cmd in command.parents] + [command.name]
|
|
return f"{prepend}`{self.bot.command_prefix}{' '.join(command_elements)} {command.signature}`"
|
|
|
|
@commands.Cog.listener()
|
|
async def on_ready(self):
|
|
logger.info("Connected and ready!")
|
|
logger.info(f"Logged in as {utils.to_string(self.bot.user)}")
|
|
|
|
oauth_url = discord.utils.oauth_url(
|
|
self.bot.user.id,
|
|
discord.Permissions(read_messages = True, send_messages = True, manage_channels = True, manage_roles = True, mention_everyone = True, manage_messages = True)
|
|
)
|
|
logger.info(f"You can invite the bot to your server using the following link: {oauth_url}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_command_error(self, ctx: commands.Context, error: commands.CommandError):
|
|
if isinstance(error, commands.CommandNotFound):
|
|
return await ctx.reply(f":x: The requested command does not exist")
|
|
elif isinstance(error, commands.MissingRequiredArgument):
|
|
return await ctx.reply(f":x: Missing required argument {error.param}. {self.get_command_usage_message(ctx.command)}")
|
|
elif isinstance(error, commands.TooManyArguments):
|
|
return await ctx.reply(f":x: too many arguments for command {ctx.command}. {self.get_command_usage_message(ctx.command)}")
|
|
elif isinstance(error, commands.BadArgument):
|
|
if hasattr(error, "argument"):
|
|
return await ctx.reply(f":x: I could not correctly understand argument {error.argument}. {self.get_command_usage_message(ctx.command)}")
|
|
else:
|
|
return await ctx.reply(f":x: I could not correctly understand one of the arguments. {self.get_command_usage_message(ctx.command)}")
|
|
elif isinstance(error, commands.ArgumentParsingError):
|
|
return await ctx.reply(f":x: Error when parsing the command: {error}")
|
|
elif isinstance(error, commands.CheckFailure):
|
|
if isinstance(error, utils.CheckFailDoNotNotify): # A check failed, but the check function asked that we do not notify the user
|
|
return
|
|
else:
|
|
return await ctx.reply(f":x: this command can not be run: {error}")
|
|
else:
|
|
logger.error(f"Error when running command '{ctx.message.content}' by {utils.to_string(ctx.author)} in {utils.to_string(ctx.guild)}", exc_info = error)
|
|
await ctx.reply(f":x: There was an error with the command :dizzy_face:")
|
|
|
|
def cog_check(self, ctx: commands.Context):
|
|
# We silently ignore messages from bots, since we do not want bots to command us
|
|
if ctx.author.bot or ctx.message.is_system():
|
|
logger.info(f"[{utils.to_string(ctx.guild)}] Ignored command message from bot or system: {ctx.message.content}")
|
|
raise utils.CheckFailDoNotNotify
|
|
return True
|
|
|
|
@staticmethod
|
|
async def check_is_administrator(ctx: commands.Context):
|
|
"""
|
|
If the user that sent the command is not an administrator of the server, notify them and raise utils.CheckFailDoNotNotify.
|
|
"""
|
|
if not utils.is_administrator(ctx.author):
|
|
await ctx.reply(f":dragon_face: You have no power here!")
|
|
raise utils.CheckFailDoNotNotify
|
|
|
|
async def check_is_administrator_or_gm(self, ctx: commands.Context):
|
|
"""
|
|
If the user that sent the command is not either an administrator of the server or a gm of the game, notify them and raise utils.CheckFailDoNotNotify.
|
|
"""
|
|
game = self.games_file[ctx.guild]
|
|
if not (utils.is_administrator(ctx.author) or (game.is_started() and game.is_gm(ctx.author))):
|
|
await ctx.reply(f":dragon_face: You have no power here!")
|
|
raise utils.CheckFailDoNotNotify
|
|
|
|
async def get_running_game_or_error_message(self, ctx: commands.Context) -> Game:
|
|
"""
|
|
Return the game running on the guild on which a command was executed or print an error message and raise an exception if there is no game running
|
|
"""
|
|
game = self.games_file[ctx.guild]
|
|
if game.is_started():
|
|
return game
|
|
else:
|
|
await ctx.reply(":x: Game is not running")
|
|
raise utils.CheckFailDoNotNotify
|
|
|
|
@staticmethod
|
|
async def check_in_admin_channel_or_error_message(ctx: commands.Context, game: Game):
|
|
"""
|
|
If the message has not been sent in the game's admin channel, send an error message and raise utils.CheckFailDoNotNotify
|
|
"""
|
|
if ctx.channel != game.get_gm_channel():
|
|
await ctx.reply(":warning: You should do this in the admin channel!")
|
|
raise utils.CheckFailDoNotNotify
|
|
|
|
async def confirm_action(self, message_text, text_channel: discord.TextChannel, action: Coroutine, reply_to: discord.Message = None, max_delay = 60):
|
|
"""
|
|
Asks the user to confirm an action by reacting to a message.
|
|
If the user does not react in max_delay seconds, the confirmation message is deleted and the action is canceled.
|
|
For now, only one such confirmation per server is allowed. If another confirmation is asked while a previous one exists, the previous one is treated as if the delay passed.
|
|
"""
|
|
if text_channel.guild in self.confirmation_messages: # If there was already a pending confirmation
|
|
# We delete it
|
|
message, action, task = self.confirmation_messages[text_channel.guild]
|
|
asyncio.create_task(message.delete())
|
|
action.close()
|
|
task.cancel()
|
|
|
|
message = await text_channel.send(message_text + "\nReact to this message with :thumbsup: to confirm or :thumbsdown: to cancel", reference = reply_to)
|
|
|
|
async def add_reaction_prompts():
|
|
await message.add_reaction("👍")
|
|
await message.add_reaction("👎")
|
|
asyncio.create_task(add_reaction_prompts())
|
|
|
|
async def delete_confirmation_message_after_max_delay():
|
|
await asyncio.sleep(max_delay)
|
|
del self.confirmation_messages[text_channel.guild]
|
|
action.close()
|
|
await message.delete()
|
|
|
|
self.confirmation_messages[text_channel.guild] = (message, action, asyncio.create_task(delete_confirmation_message_after_max_delay()))
|
|
|
|
@commands.Cog.listener()
|
|
async def on_reaction_add(self, reaction: discord.Reaction, user: discord.Member):
|
|
if user.bot: # We ignore reactions added by bots
|
|
return
|
|
if reaction.message.guild in self.confirmation_messages:
|
|
message, action, task = self.confirmation_messages[reaction.message.guild]
|
|
if reaction.message == message:
|
|
# Note that we do not check which user added the reaction.
|
|
# The confirmation message is just supposed to be a safeguard against typing mistakes
|
|
if reaction.emoji == "👍":
|
|
del self.confirmation_messages[reaction.message.guild]
|
|
task.cancel()
|
|
asyncio.create_task(message.delete())
|
|
await action
|
|
elif reaction.emoji == "👎":
|
|
del self.confirmation_messages[reaction.message.guild]
|
|
task.cancel()
|
|
action.close()
|
|
await message.delete()
|
|
|
|
@commands.command(help = "See if I'm alive")
|
|
async def ping(self, ctx: commands.Context):
|
|
await ctx.reply(":ping_pong:", mention_author = True)
|
|
|
|
@commands.command("StartGame")
|
|
async def start_game(self, ctx: commands.Context, player_role: discord.Role):
|
|
await self.check_is_administrator(ctx)
|
|
game = self.games_file[ctx.guild]
|
|
if game.is_started():
|
|
await ctx.reply(":x: a game is already running")
|
|
else:
|
|
await game.start(player_role, self.bot.user.id)
|
|
await ctx.reply(":white_check_mark: Game started!")
|
|
|
|
@commands.command("StopGame", help = "Open all secret channels, to be done when a game has ended")
|
|
async def stop_game_with_confirmation(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_in_admin_channel_or_error_message(ctx, game)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await self.confirm_action("Do you really want to open all secret channels, giving everyone access to all information?", ctx.channel, self.open_all_channels(ctx), ctx.message)
|
|
|
|
async def open_all_channels(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_in_admin_channel_or_error_message(ctx, game)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await game.open_secret_channels()
|
|
await ctx.reply(":white_check_mark: All secret channels have been opened")
|
|
|
|
@commands.command("DeleteGame", help = "Delete a running game and all of its associated channels")
|
|
async def delete_game_with_confirmation(self, ctx: commands.Context):
|
|
await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await self.confirm_action("Do you really want to delete the current game?", ctx.channel, self.delete_game(ctx), ctx.message)
|
|
|
|
async def delete_game(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await game.delete()
|
|
try:
|
|
await ctx.reply(":white_check_mark: Game deleted!")
|
|
except discord.NotFound: # If the command is executed in a game channel, then the channel won't exist anymore for the reply
|
|
pass
|
|
|
|
@commands.command("StartVote")
|
|
async def start_vote(self, ctx: commands.Context, president: discord.Member, chancellor: discord.Member):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
if game.is_vote_running():
|
|
await ctx.reply(":x: A vote is already running")
|
|
return
|
|
await game.start_vote(president, chancellor)
|
|
await ctx.message.delete()
|
|
|
|
async def cast_vote(self, ctx: commands.Context, vote: bool):
|
|
game = self.games_file[ctx.guild]
|
|
if game.is_started() and game.is_vote_running() and game.can_cast_votes():
|
|
if ctx.author.id in game.get_players_id():
|
|
if ctx.channel == game.get_player_channel(ctx.author):
|
|
await game.cast_vote(ctx.author, vote)
|
|
await ctx.reply(":pencil: Your vote has been registered. Thank you citizen.")
|
|
else:
|
|
await ctx.reply(":x: Please cast your vote in your dedicated channel.")
|
|
else:
|
|
await ctx.reply(":x: Only players can cast votes")
|
|
else:
|
|
await ctx.reply(":x: There isn't a vote running")
|
|
|
|
@commands.command("ja")
|
|
async def vote_yes(self, ctx: commands.Context):
|
|
await self.cast_vote(ctx, True)
|
|
|
|
@commands.command("nein")
|
|
async def vote_no(self, ctx: commands.Context):
|
|
await self.cast_vote(ctx, False)
|
|
|
|
@commands.command("StopTheCount")
|
|
async def stop_vote_with_confirmation(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
if not game.is_vote_running():
|
|
await ctx.reply(":x: No vote is running")
|
|
return
|
|
await self.confirm_action("Do you really want to stop the vote and reveal all votes?", ctx.channel, self.stop_vote(ctx), ctx.message)
|
|
|
|
async def stop_vote(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
if not game.is_vote_running():
|
|
await ctx.reply(":x: No vote is running")
|
|
return
|
|
await game.stop_vote()
|
|
await ctx.message.delete()
|
|
|
|
@commands.command("CancelVote", help = "Cancel the current vote, applying no consequences")
|
|
async def cancel_vote_with_confirmation(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
if game.is_vote_running():
|
|
await self.confirm_action("Do you really want to cancel the vote (applying no consequences)?", ctx.channel, self.cancel_vote(ctx), ctx.message)
|
|
else:
|
|
await ctx.reply(":x: No vote is running")
|
|
|
|
async def cancel_vote(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
if game.is_vote_running():
|
|
await game.cancel_vote()
|
|
await ctx.reply(":white_check_mark: The current vote has been canceled.")
|
|
else:
|
|
await ctx.reply(":x: Not vote is running")
|
|
|
|
@commands.command("AutoEndVote", help = "Set whether votes of the current game should end automatically when everybody has voted", usage = "true|false|get")
|
|
async def auto_end_vote(self, ctx: commands.Context, value):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
if value in ["true", "false"]:
|
|
game.set_auto_end_vote(value == "true")
|
|
await ctx.reply(":white_check_mark: Done.")
|
|
elif value == "get":
|
|
await ctx.reply(f"AutoEndVote currently set to {str(game.get_auto_end_vote()).lower()}.")
|
|
else:
|
|
await ctx.reply(":dizzy_face: You should give either 'true', 'false' or 'get'")
|
|
|
|
@commands.command("Legislate", help = "Start the legislative session by drawing three policies")
|
|
async def draw_policies(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await self.check_in_admin_channel_or_error_message(ctx, game)
|
|
if game.is_legislative_phase():
|
|
await ctx.reply(":x: The game is already in a legislative phase. Enact a policy or use veto power")
|
|
else:
|
|
policies = await game.draw_policies()
|
|
message_content = [
|
|
"The following policies have been drawn:",
|
|
" ".join([f"{num + 1}) {':blue_square:' if policy == Policy.LIBERAL else ':red_square:'} " for num, policy in enumerate(policies)]),
|
|
"Send them to the president and chancellor and type `!Enact <number>` to enact one of them when you are finished"
|
|
]
|
|
await ctx.reply("\n".join(message_content))
|
|
|
|
@commands.command("CancelLegislate", help = "Cancel a legislative phase and put the policies back on top of the deck")
|
|
async def cancel_legislate_with_confirmation(self, ctx: commands.Context):
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await self.confirm_action("Are you sure that you want to cancel the legislative phase?", ctx.channel, self.cancel_legislate(ctx), ctx.message)
|
|
|
|
async def cancel_legislate(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
if game.is_legislative_phase():
|
|
await game.cancel_draw()
|
|
await ctx.reply(":white_check_mark: The legislative phase has been canceled and the policy cards put on top of the deck")
|
|
else:
|
|
await ctx.reply(":x: The game is not in a legislative phase")
|
|
|
|
@commands.command("Enact", help = "Legislative session only: enact one of the previously drawn policies")
|
|
async def enact_drawn_policy(self, ctx: commands.Context, policy_number: int):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await self.check_in_admin_channel_or_error_message(ctx, game)
|
|
policy_number = policy_number - 1
|
|
await game.enact_drawn_policy(policy_number)
|
|
await ctx.reply(":white_check_mark: Done")
|
|
|
|
@commands.command("Peek", help = "Look at the top three cards of the deck without drawing them")
|
|
async def peek_policies(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await self.check_in_admin_channel_or_error_message(ctx, game)
|
|
message_content = [
|
|
"The top three cards of the deck are the following:",
|
|
" ".join([":blue_square:" if policy == Policy.LIBERAL else ":red_square:" for policy in await game.peek_top_3_policies()])
|
|
]
|
|
await ctx.reply("\n".join(message_content))
|
|
|
|
@commands.command("Kill", help = "Kill a player and remove them from the game")
|
|
async def kill_player_with_confirmation(self, ctx: commands.Context, player: discord.Member):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
if player.id in game.get_players_id():
|
|
await self.confirm_action(f"Do you really want to kill <@{player.id}>? This will completely remove them from the game", ctx.channel, self.kill_player(ctx, player), ctx.message)
|
|
else:
|
|
await ctx.reply(f":x: <@{player.id}> is not in the game", allowed_mentions = discord.AllowedMentions.none())
|
|
|
|
async def kill_player(self, ctx: commands.Context, player: discord.Member):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await game.kill_player(player)
|
|
await ctx.reply(":dagger: The order has been executed.")
|
|
|
|
@commands.command("Veto")
|
|
async def veto_policy_with_confirmation_if_veto_locked(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
if game.is_legislative_phase():
|
|
nb_fascist_policies = len([policy for policy in game.get_enacted_policies() if policy == Policy.FASCIST])
|
|
if nb_fascist_policies < 5:
|
|
await self.confirm_action(
|
|
f"Are you sure that you want to use the veto power with less than 5 enacted fascist policies? ({nb_fascist_policies} enacted)",
|
|
ctx.channel,
|
|
self.veto_policy(ctx),
|
|
ctx.message
|
|
)
|
|
else:
|
|
await self.veto_policy(ctx)
|
|
else:
|
|
await ctx.reply(":x: We are not in a legislative phase")
|
|
|
|
async def veto_policy(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await game.veto()
|
|
|
|
@commands.command("ForceShuffle", help = "Force a shuffle of the discard into the draw pile")
|
|
async def force_shuffle_with_confirmation(self, ctx: commands.Context):
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await self.confirm_action("Are you sure that you want to force a shuffle of the discard and draw pile?", ctx.channel, self.force_shuffle(ctx), ctx.message)
|
|
|
|
async def force_shuffle(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await game.shuffle_discard_into_deck()
|
|
await ctx.reply(":white_check_mark: The discard and draw pile have been shuffled")
|
|
|
|
@commands.command("ForceUpdateVoteMessage", help = "Trigger an update of the vote message")
|
|
async def force_update_vote_message(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
if game.is_vote_running():
|
|
await game.update_vote_message()
|
|
await ctx.reply(":white_check_mark: Done.")
|
|
else:
|
|
await ctx.reply(":x: There is no vote running")
|
|
|
|
@commands.command("PeekAll", help = "Show the content of the whole deck, in order")
|
|
async def peek_all_deck(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
if ctx.channel in (game.get_observer_channel(), game.get_gm_channel()):
|
|
policies = await game.peek_deck()
|
|
policies_text_elements = []
|
|
for i, policy in enumerate(policies):
|
|
if policy == Policy.LIBERAL:
|
|
policies_text_elements.append(":blue_square:")
|
|
else:
|
|
policies_text_elements.append(":red_square:")
|
|
if i % 3 == 2:
|
|
policies_text_elements.append(" ")
|
|
message = [
|
|
":eyes: The deck contains the following policies",
|
|
f"||{' '.join(policies_text_elements).strip()}||",
|
|
]
|
|
await ctx.reply("\n".join(message))
|
|
else:
|
|
await ctx.reply(":warning: You should do this in a channel that is hidden from the players!")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
argparser = argparse.ArgumentParser(description = "Secret Hitler helper bot", formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
|
argparser.add_argument("-t", "--token-file", default = "token", help = "File where the discord bot token is stored")
|
|
argparser.add_argument("-g", "--games-file", default = "games.json", help = "File used to store game information so that the bot can be stopped safely")
|
|
argparser.add_argument("-v", "--verbose", action = "count", default = 0, help = "Specify once to print bot debug messages, specify twice to print discord.py debug messages as well")
|
|
|
|
ARGS = argparser.parse_args()
|
|
|
|
logging.basicConfig(
|
|
format = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
|
|
)
|
|
|
|
if ARGS.verbose == 0:
|
|
logging.root.setLevel(logging.INFO)
|
|
elif ARGS.verbose == 1:
|
|
logging.root.setLevel(logging.DEBUG)
|
|
logging.getLogger("discord").setLevel(logging.INFO)
|
|
else:
|
|
logging.root.setLevel(logging.DEBUG)
|
|
|
|
logger.info(f"Using discord.py version {discord.__version__}")
|
|
|
|
token_file_path = Path(ARGS.token_file).absolute()
|
|
if not token_file_path.is_file():
|
|
logger.error(f"Token file {token_file_path} does not exist")
|
|
sys.exit(1)
|
|
with token_file_path.open("r") as token_file:
|
|
token = token_file.readline().strip()
|
|
|
|
games_file = GamesFile(ARGS.games_file)
|
|
bot = SecretBot(games_file)
|
|
bot.run(token)
|