313 lines
14 KiB
Python
Executable File
313 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import argparse
|
|
import asyncio
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, Tuple, Coroutine
|
|
|
|
import discord
|
|
import discord.utils
|
|
from discord.ext import commands
|
|
|
|
import utils
|
|
from GameFiles import Game, GamesFile, Policy
|
|
|
|
logger = logging.getLogger("SecretBot")
|
|
|
|
|
|
class SecretBot(commands.Cog):
|
|
|
|
def __init__(self, games_file: GamesFile):
|
|
super().__init__()
|
|
self.bot = commands.Bot(
|
|
"!",
|
|
help_command = commands.MinimalHelpCommand(),
|
|
intents = discord.Intents(guild_messages = True, guilds = True, members = True, reactions = True),
|
|
allowed_mentions = discord.AllowedMentions.none(), # By default we do not allow mentions, as we will white-list them on each required message
|
|
)
|
|
self.bot.add_cog(self)
|
|
self.games_file = games_file
|
|
# Each guild can have a confirmation message to execute an action. We store:
|
|
# The confirmation message, the coroutine to execute if the user reacts with :thumbsup: to the confirmation message, a task that will delete the message after a certain time (so that we can stop it if the user confirms instead)
|
|
self.confirmation_messages: Dict[discord.guild, Tuple[discord.Message, Coroutine, asyncio.Task]] = {}
|
|
|
|
def run(self, token):
|
|
self.bot.run(token)
|
|
|
|
def get_command_usage_message(self, command: commands.Command, prepend = "Usage: ") -> str:
|
|
command_elements = [str(cmd) for cmd in command.parents] + [command.name]
|
|
return f"{prepend}`{self.bot.command_prefix}{' '.join(command_elements)} {command.signature}`"
|
|
|
|
@commands.Cog.listener()
|
|
async def on_ready(self):
|
|
logger.info("Connected and ready!")
|
|
logger.info(f"Logged in as {utils.to_string(self.bot.user)}")
|
|
|
|
oauth_url = discord.utils.oauth_url(
|
|
self.bot.user.id,
|
|
discord.Permissions(read_messages = True, send_messages = True, manage_channels = True, manage_roles = True, mention_everyone = True, manage_messages = True)
|
|
)
|
|
logger.info(f"You can invite the bot to your server using the following link: {oauth_url}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_command_error(self, ctx: commands.Context, error: commands.CommandError):
|
|
if isinstance(error, commands.CommandNotFound):
|
|
return await ctx.reply(f":x: The requested command does not exist")
|
|
elif isinstance(error, commands.MissingRequiredArgument):
|
|
return await ctx.reply(f":x: Missing required argument {error.param}. {self.get_command_usage_message(ctx.command)}")
|
|
elif isinstance(error, commands.TooManyArguments):
|
|
return await ctx.reply(f":x: too many arguments for command {ctx.command}. {self.get_command_usage_message(ctx.command)}")
|
|
elif isinstance(error, commands.BadArgument):
|
|
if hasattr(error, "argument"):
|
|
return await ctx.reply(f":x: I could not correctly understand argument {error.argument}. {self.get_command_usage_message(ctx.command)}")
|
|
else:
|
|
return await ctx.reply(f":x: I could not correctly understand one of the arguments. {self.get_command_usage_message(ctx.command)}")
|
|
elif isinstance(error, commands.ArgumentParsingError):
|
|
return await ctx.reply(f":x: Error when parsing the command: {error}")
|
|
elif isinstance(error, commands.CheckFailure):
|
|
if isinstance(error, utils.CheckFailDoNotNotify): # A check failed, but the check function asked that we do not notify the user
|
|
return
|
|
else:
|
|
return await ctx.reply(f":x: this command can not be run: {error}")
|
|
else:
|
|
logger.error(f"Error when running command '{ctx.message.content}' by {utils.to_string(ctx.author)} in {utils.to_string(ctx.guild)}", exc_info = error)
|
|
await ctx.reply(f":x: There was an error with the command :dizzy_face:")
|
|
|
|
def cog_check(self, ctx: commands.Context):
|
|
# We silently ignore messages from bots, since we do not want bots to command us
|
|
if ctx.author.bot or ctx.message.is_system():
|
|
logger.info(f"[{utils.to_string(ctx.guild)}] Ignored command message from bot or system: {ctx.message.content}")
|
|
raise utils.CheckFailDoNotNotify
|
|
return True
|
|
|
|
@staticmethod
|
|
async def check_is_administrator(ctx: commands.Context):
|
|
"""
|
|
If the user that sent the command is not an administrator of the server, notify them and raise utils.CheckFailDoNotNotify.
|
|
"""
|
|
if not utils.is_administrator(ctx.author):
|
|
await ctx.reply(f":dragon_face: You have no power here!")
|
|
raise utils.CheckFailDoNotNotify
|
|
|
|
async def check_is_administrator_or_gm(self, ctx: commands.Context):
|
|
"""
|
|
If the user that sent the command is not either an administrator of the server or a gm of the game, notify them and raise utils.CheckFailDoNotNotify.
|
|
"""
|
|
game = self.games_file[ctx.guild]
|
|
if not (utils.is_administrator(ctx.author) or (game.is_started() and game.is_gm(ctx.author))):
|
|
await ctx.reply(f":dragon_face: You have no power here!")
|
|
raise utils.CheckFailDoNotNotify
|
|
|
|
async def get_running_game_or_error_message(self, ctx: commands.Context) -> Game:
|
|
"""
|
|
Return the game running on the guild on which a command was executed or print an error message and raise an exception if there is no game running
|
|
"""
|
|
game = self.games_file[ctx.guild]
|
|
if game.is_started():
|
|
return game
|
|
else:
|
|
await ctx.reply(":x: Game is not running")
|
|
raise utils.CheckFailDoNotNotify
|
|
|
|
@staticmethod
|
|
async def check_in_admin_channel_or_error_message(ctx: commands.Context, game: Game):
|
|
"""
|
|
If the message has not been sent in the game's admin channel, send an error message and raise utils.CheckFailDoNotNotify
|
|
"""
|
|
if ctx.channel != game.get_gm_channel():
|
|
await ctx.reply(":warning: You should do this in the admin channel!")
|
|
raise utils.CheckFailDoNotNotify
|
|
|
|
async def confirm_action(self, message_text, text_channel: discord.TextChannel, action: Coroutine, reply_to: discord.Message = None, max_delay = 60):
|
|
"""
|
|
Asks the user to confirm an action by reacting to a message.
|
|
If the user does not react in max_delay seconds, the confirmation message is deleted and the action is canceled.
|
|
For now, only one such confirmation per server is allowed. If another confirmation is asked while a previous one exists, the previous one is treated as if the delay passed.
|
|
"""
|
|
if text_channel.guild in self.confirmation_messages: # If there was already a pending confirmation
|
|
# We delete it
|
|
message, action, task = self.confirmation_messages[text_channel.guild]
|
|
asyncio.create_task(message.delete())
|
|
action.close()
|
|
task.cancel()
|
|
|
|
message = await text_channel.send(message_text + "\nReact to this message with :thumbsup: to confirm or :thumbsdown: to cancel", reference = reply_to)
|
|
|
|
async def add_reaction_prompts():
|
|
await message.add_reaction("👍")
|
|
await message.add_reaction("👎")
|
|
asyncio.create_task(add_reaction_prompts())
|
|
|
|
async def delete_confirmation_message_after_max_delay():
|
|
await asyncio.sleep(max_delay)
|
|
del self.confirmation_messages[text_channel.guild]
|
|
action.close()
|
|
await message.delete()
|
|
|
|
self.confirmation_messages[text_channel.guild] = (message, action, asyncio.create_task(delete_confirmation_message_after_max_delay()))
|
|
|
|
@commands.Cog.listener()
|
|
async def on_reaction_add(self, reaction: discord.Reaction, user: discord.Member):
|
|
if user.bot: # We ignore reactions added by bots
|
|
return
|
|
if reaction.message.guild in self.confirmation_messages:
|
|
message, action, task = self.confirmation_messages[reaction.message.guild]
|
|
if reaction.message == message:
|
|
# Note that we do not check which user added the reaction.
|
|
# The confirmation message is just supposed to be a safeguard against typing mistakes
|
|
if reaction.emoji == "👍":
|
|
del self.confirmation_messages[reaction.message.guild]
|
|
task.cancel()
|
|
asyncio.create_task(message.delete())
|
|
await action
|
|
elif reaction.emoji == "👎":
|
|
del self.confirmation_messages[reaction.message.guild]
|
|
task.cancel()
|
|
action.close()
|
|
await message.delete()
|
|
|
|
@commands.command(help = "See if I'm alive")
|
|
async def ping(self, ctx: commands.Context):
|
|
await ctx.reply(":ping_pong:", mention_author = True)
|
|
|
|
@commands.command("StartGame")
|
|
async def start_game(self, ctx: commands.Context, player_role: discord.Role):
|
|
await self.check_is_administrator(ctx)
|
|
game = self.games_file[ctx.guild]
|
|
if game.is_started():
|
|
await ctx.reply(":x: a game is already running")
|
|
else:
|
|
await game.start(player_role, self.bot.user.id)
|
|
await ctx.reply(":white_check_mark: Game started!")
|
|
|
|
@commands.command("DeleteGame", help = "Delete a running game and all of its associated channels")
|
|
async def delete_game_with_confirmation(self, ctx: commands.Context):
|
|
await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await self.confirm_action("Do you really want to delete the current game?", ctx.channel, self.delete_game(ctx), ctx.message)
|
|
|
|
async def delete_game(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await game.delete()
|
|
try:
|
|
await ctx.reply(":white_check_mark: Game deleted!")
|
|
except discord.NotFound: # If the command is executed in a game channel, then the channel won't exist anymore for the reply
|
|
pass
|
|
|
|
@commands.command("StartVote")
|
|
async def start_vote(self, ctx: commands.Context, president: discord.Member, chancellor: discord.Member):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
if game.is_vote_running():
|
|
await ctx.reply(":x: A vote is already running")
|
|
return
|
|
await game.start_vote(president, chancellor)
|
|
await ctx.message.delete()
|
|
|
|
async def cast_vote(self, ctx: commands.Context, vote: bool):
|
|
game = self.games_file[ctx.guild]
|
|
if game.is_started() and game.is_vote_running():
|
|
if ctx.author.id in game.get_players_id():
|
|
if ctx.channel == game.get_player_channel(ctx.author):
|
|
await game.cast_vote(ctx.author, vote)
|
|
await ctx.reply(":pencil: Your vote has been registered. Thank you citizen.")
|
|
else:
|
|
await ctx.reply(":x: Please cast your vote in your dedicated channel.")
|
|
else:
|
|
await ctx.reply(":x: Only players can cast votes")
|
|
else:
|
|
await ctx.reply(":x: There isn't a vote running")
|
|
|
|
@commands.command("ja")
|
|
async def vote_yes(self, ctx: commands.Context):
|
|
await self.cast_vote(ctx, True)
|
|
|
|
@commands.command("nein")
|
|
async def vote_no(self, ctx: commands.Context):
|
|
await self.cast_vote(ctx, False)
|
|
|
|
@commands.command("StopTheCount")
|
|
async def stop_vote_with_confirmation(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
if not game.is_vote_running():
|
|
await ctx.reply(":x: No vote is running")
|
|
return
|
|
await self.confirm_action("Do you really want to stop the vote and reveal all votes?", ctx.channel, self.stop_vote(ctx), ctx.message)
|
|
|
|
async def stop_vote(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
if not game.is_vote_running():
|
|
await ctx.reply(":x: No vote is running")
|
|
return
|
|
await game.stop_vote()
|
|
await ctx.message.delete()
|
|
|
|
@commands.command("Legislate", help = "Start the legislative session by drawing three policies")
|
|
async def draw_policies(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await self.check_in_admin_channel_or_error_message(ctx, game)
|
|
policies = await game.draw_policies()
|
|
message_content = [
|
|
"The following policies have been drawn:",
|
|
" ".join([f"{num + 1}) {':blue_square:' if policy == Policy.LIBERAL else ':red_square:'} " for num, policy in enumerate(policies)]),
|
|
"Send them to the president and chancellor and type `!Enact <number>` to enact one of them when you are finished"
|
|
]
|
|
await ctx.reply("\n".join(message_content))
|
|
|
|
@commands.command("Enact", help = "Legislative session only: enact one of the previously drawn policies")
|
|
async def enact_drawn_policy(self, ctx: commands.Context, policy_number: int):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await self.check_in_admin_channel_or_error_message(ctx, game)
|
|
policy_number = policy_number - 1
|
|
await game.enact_drawn_policy(policy_number)
|
|
await ctx.reply(":white_check_mark: Done")
|
|
|
|
@commands.command("Peek", help = "Look at the top three cards of the deck without drawing them")
|
|
async def peek_policies(self, ctx: commands.Context):
|
|
game = await self.get_running_game_or_error_message(ctx)
|
|
await self.check_is_administrator_or_gm(ctx)
|
|
await self.check_in_admin_channel_or_error_message(ctx, game)
|
|
message_content = [
|
|
"The top three cards of the deck are the following:",
|
|
" ".join([":blue_square:" if policy == Policy.LIBERAL else ":red_square:" for policy in await game.peek_policies()])
|
|
]
|
|
await ctx.reply("\n".join(message_content))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
argparser = argparse.ArgumentParser(description = "Secret Hitler helper bot", formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
|
argparser.add_argument("-t", "--token-file", default = "token", help = "File where the discord bot token is stored")
|
|
argparser.add_argument("-g", "--games-file", default = "games.json", help = "File used to store game information so that the bot can be stopped safely")
|
|
argparser.add_argument("-v", "--verbose", action = "count", default = 0, help = "Specify once to print bot debug messages, specify twice to print discord.py debug messages as well")
|
|
|
|
ARGS = argparser.parse_args()
|
|
|
|
logging.basicConfig(
|
|
format = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
|
|
)
|
|
|
|
if ARGS.verbose == 0:
|
|
logging.root.setLevel(logging.INFO)
|
|
elif ARGS.verbose == 1:
|
|
logging.root.setLevel(logging.DEBUG)
|
|
logging.getLogger("discord").setLevel(logging.INFO)
|
|
else:
|
|
logging.root.setLevel(logging.DEBUG)
|
|
|
|
logger.info(f"Using discord.py version {discord.__version__}")
|
|
|
|
token_file_path = Path(ARGS.token_file).absolute()
|
|
if not token_file_path.is_file():
|
|
logger.error(f"Token file {token_file_path} does not exist")
|
|
sys.exit(1)
|
|
with token_file_path.open("r") as token_file:
|
|
token = token_file.readline().strip()
|
|
|
|
games_file = GamesFile(ARGS.games_file)
|
|
bot = SecretBot(games_file)
|
|
bot.run(token)
|