import asyncio import logging import random from collections import defaultdict from enum import Enum from functools import wraps from typing import Dict, Callable, List, Union import discord logger = logging.getLogger(__name__) class Policy(Enum): LIBERAL = "L" FASCIST = "F" def square_emoji(self): if self == self.LIBERAL: return ":blue_square:" else: return ":red_square:" def game_started(func): """ Decorator for *methods* of Game that need the game to be started. """ @wraps(func) def decorated(obj: 'Game', *args, **kwargs): if not obj.is_started(): raise RuntimeError("This function only works on running games!") return func(obj, *args, **kwargs) return decorated def vote_running(func): """ Decorator for *methods* of Game that need a vote to be running """ @wraps(func) def decorated(obj: 'Game', *args, **kwargs): if not obj.is_vote_running(): raise RuntimeError("This function only works if a vote is running!") return func(obj, *args, **kwargs) return decorated def policies_drawn(func): """ Decorator for *methods* of Game that need policies to have been drawn (i.e. the legislative phase has started) """ @wraps(func) def decorated(obj: 'Game', *args, **kwargs): if obj.config["drawn"] is None: raise RuntimeError("This function only works when policies have been drawn (i.e. the legislative phase has started)") return func(obj, *args, **kwargs) return decorated def save_on_success(func): """ Decorator for *async methods* of game that calls the save function after the method has executed without exceptions """ @wraps(func) async def decorated(obj: 'Game', *args, **kwargs): return_value = await func(obj, *args, **kwargs) obj.save_function() return return_value return decorated class Game: """ Game state for one guild """ def __init__(self, config_dict: Dict, save_function: Callable, guild: discord.Guild): self.config = config_dict self.save_function = save_function self.guild = guild @staticmethod def new_dict(): return { "game_started": False, } def is_started(self): return self.config["game_started"] @game_started def get_players_id(self) -> List[int]: return self.config["players"] @game_started def get_players(self) -> List[discord.Member]: return [self.guild.get_member(player) for player in self.get_players_id()] @game_started def get_player_info(self, player: Union[int, discord.Member]): if isinstance(player, discord.Member): player = player.id return self.config["player_info"][str(player)] @game_started def get_gm_role_id(self) -> int: return self.config["gm_role"] @game_started def get_gm_role(self) -> discord.Role: return self.guild.get_role(self.get_gm_role_id()) @game_started def is_gm(self, user: discord.Member) -> bool: return self.get_gm_role() in user.roles @game_started def get_player_role_id(self) -> int: return self.config["player_role"] @game_started def get_player_role(self) -> discord.Role: return self.guild.get_role(self.get_player_role_id()) @game_started def get_observer_role_id(self) -> int: return self.config["observer_role"] @game_started def get_observer_role(self) -> discord.Role: return self.guild.get_role(self.get_observer_role_id()) @game_started def get_game_category_id(self) -> int: return self.config["category"] @game_started def get_game_category(self) -> discord.CategoryChannel: return self.guild.get_channel(self.get_game_category_id()) @game_started def get_announcements_channel_id(self) -> int: return self.config["announce_chan"] @game_started def get_announcements_channel(self) -> discord.TextChannel: return self.guild.get_channel(self.get_announcements_channel_id()) @game_started def get_gm_channel_id(self) -> int: return self.config["admin_chan"] @game_started def get_gm_channel(self) -> discord.TextChannel: return self.guild.get_channel(self.get_gm_channel_id()) @game_started def get_observer_channel_id(self) -> int: return self.config["observer_chan"] @game_started def get_observer_channel(self) -> discord.TextChannel: return self.guild.get_channel(self.get_observer_channel_id()) @game_started def get_discussion_channel_id(self) -> int: return self.config["discussion_chan"] @game_started def get_discussion_channel(self) -> discord.TextChannel: return self.guild.get_channel(self.get_discussion_channel_id()) @game_started def is_vote_running(self) -> bool: return self.config["vote"] is not None @game_started def can_cast_votes(self) -> bool: return self.is_vote_running() and self.config["vote"]["can_cast_votes"] @vote_running def is_vote_passing(self) -> bool: vote_count = defaultdict(int) for player_id in self.get_players_id(): vote_count[self.config["vote"][str(player_id)]] += 1 return vote_count[True] > vote_count[False] @game_started def get_auto_end_vote(self) -> bool: return self.config["auto_end_vote"] @game_started def set_auto_end_vote(self, auto_end_vote: bool): self.config["auto_end_vote"] = auto_end_vote self.save_function() # We do not use save_on_success decorator since this is not a coroutine @game_started def get_chaos(self) -> int: return self.config["chaos"] @game_started def get_player_channel_id(self, player: Union[int, discord.Member]) -> int: if isinstance(player, discord.Member): player = player.id return self.config["player_info"][str(player)]["channel"] @game_started def get_player_channel(self, player: Union[int, discord.Member]) -> discord.TextChannel: return self.guild.get_channel(self.get_player_channel_id(player)) @game_started def is_legislative_phase(self) -> bool: return self.config["drawn"] is not None @game_started def get_enacted_policies(self) -> List[Policy]: return [Policy(policy_str) for policy_str in self.config["enacted"]] @game_started def get_nb_policies_to_victory(self, faction: Policy) -> int: if faction == Policy.LIBERAL: return self.config["nb_policies_to_victory"][Policy.LIBERAL.value] else: return self.config["nb_policies_to_victory"][Policy.FASCIST.value] @game_started @save_on_success async def set_nb_policies_to_victory(self, faction: Policy, value: int): self.config["nb_policies_to_victory"][faction.value] = value @save_on_success async def start(self, player_role: discord.Role, bot_user_id: int): if self.is_started(): raise ValueError("Game already started") logger.info(f"[{self.guild.name}] Starting game") tasks = [] # Asyncio tasks scheduled to run in parallel (e.g. channel creation), so that we can wait them all at the end of the function self.config["game_started"] = True self.config["player_role"] = player_role.id players = [member.id for member in player_role.members] random.shuffle(players) self.config["players"] = players self.config["player_info"] = {str(player): {} for player in self.config["players"]} self.config["vote"] = None self.config["auto_end_vote"] = True self.config["deck"] = [Policy.FASCIST.value] * 11 + [Policy.LIBERAL.value] * 6 random.shuffle(self.config["deck"]) self.config["discard"] = [] self.config["drawn"] = None self.config["enacted"] = [] self.config["chaos"] = 0 self.config["nb_policies_to_victory"] = { Policy.LIBERAL.value: 5, Policy.FASCIST.value: 6, } gm_role = await self.guild.create_role(name = "GM", hoist = True, mentionable = True, permissions = self.guild.default_role.permissions) self.config["gm_role"] = gm_role.id await self.guild.get_member(bot_user_id).add_roles(gm_role) # Need to be here otherwise the bot won't have the permissions needed to set up private channels logger.debug("Created GM role") observer_role = await self.guild.create_role(name = "Observer", mentionable = True, permissions = self.guild.default_role.permissions) self.config["observer_role"] = observer_role.id logger.debug("Created Observer role") category_permissions = { self.guild.default_role: discord.PermissionOverwrite(send_messages = False), gm_role: discord.PermissionOverwrite(send_messages = True), player_role: discord.PermissionOverwrite(send_messages = True), observer_role: discord.PermissionOverwrite(send_messages = False, read_messages = True), } game_category = await self.guild.create_category("In-game", overwrites = category_permissions) self.config["category"] = game_category.id logger.debug(f"[{self.guild.name}] Created game category") async def create_admin_chan(): perms = { self.guild.default_role: discord.PermissionOverwrite(read_messages = False), gm_role: discord.PermissionOverwrite(read_messages = True), } self.config["admin_chan"] = (await game_category.create_text_channel("admin", overwrites = perms, position = 0)).id logger.debug(f"[{self.guild.name}] Created admin channel") tasks.append(asyncio.create_task(create_admin_chan())) async def create_announcements_chan(): perms = { self.guild.default_role: discord.PermissionOverwrite(send_messages = False), gm_role: discord.PermissionOverwrite(send_messages = True), } announcements_chan = await game_category.create_text_channel("announcements", overwrites = perms, position = 1) self.config["announce_chan"] = announcements_chan.id logger.debug(f"[{self.guild.name}] Created announcements channel") async def announce_game_start(): message_content = [ f"<@&{self.get_player_role_id()}> <@&{self.get_gm_role_id()}>", "**A new game has started!**", "The turn order is the following:", ] message_content.extend([f"ยท <@{player_id}>" for player_id in self.get_players_id()]) message_content.append("Have a nice game!") message = await announcements_chan.send("\n".join(message_content), allowed_mentions = discord.AllowedMentions(roles = True, users = True)) await message.pin() logger.debug(f"[{self.guild.name}] Announced game start in announcements") asyncio.create_task(announce_game_start()) # We do not add it to the list of tasks to wait before game creation finishes because we do not care if it is executed after a while tasks.append(asyncio.create_task(create_announcements_chan())) async def create_discussion_chan(): # Permissions are inherited from the category so they are synced to it self.config["discussion_chan"] = (await game_category.create_text_channel("discussion", position = 3)).id # Permissions are inherited from the category logger.debug(f"[{self.guild.name}] Created discussion channel") tasks.append(asyncio.create_task(create_discussion_chan())) async def create_observers_chan(): perms = { self.guild.default_role: discord.PermissionOverwrite(read_messages = False), gm_role: discord.PermissionOverwrite(read_messages = True), observer_role: discord.PermissionOverwrite(read_messages = True), } self.config["observer_chan"] = (await game_category.create_text_channel("observers", overwrites = perms, position = 4)).id logger.debug(f"[{self.guild}] Created observers channel") tasks.append(asyncio.create_task(create_observers_chan())) await asyncio.wait(tasks) # Waiting for all other channels to be created before creating player channels, since they must be placed after them async def create_player_channels(): create_player_channel_tasks = [] # Discord channel positions are not relative to the category and not necessarily absolute, but a channel in same category than another with a higher # position attribute will be sorted lower. # See https://github.com/Rapptz/discord.py/issues/2392#issuecomment-707455919 observer_channel_position = self.get_observer_channel().position async def create_player_channel(player: discord.Member, channel_position: int): perms = { self.guild.default_role: discord.PermissionOverwrite(read_messages = False), player: discord.PermissionOverwrite(read_messages = True), gm_role: discord.PermissionOverwrite(read_messages = True), observer_role: discord.PermissionOverwrite(read_messages = True, send_messages = False), } player_channel = await game_category.create_text_channel(player.name, overwrites = perms, position = channel_position) self.config["player_info"][str(player.id)]["channel"] = player_channel.id logger.debug(f"[{self.guild.name}] Created channel for player {player.name}") asyncio.create_task(player_channel.send(f"Hello! This is your private channel.\nIn here you can cast your votes, interact with the <@&{self.get_gm_role_id()}>, and write freely.\nHave a nice game!", allowed_mentions = discord.AllowedMentions(roles = True))) for i, player in enumerate(self.get_players()): create_player_channel_tasks.append(asyncio.create_task(create_player_channel(player, observer_channel_position + i + 1))) await asyncio.wait(create_player_channel_tasks) await create_player_channels() @game_started @save_on_success async def open_secret_channels(self): tasks = [] for channel in [self.get_discussion_channel(), self.get_observer_channel()] + [self.get_player_channel(player_id) for player_id in self.get_players_id()]: tasks.append(asyncio.create_task(channel.edit(overwrites = {}))) await asyncio.wait(tasks) @game_started @save_on_success async def delete(self): category = self.get_game_category() await asyncio.wait([channel.delete() for channel in category.channels]) await category.delete() # Need to delete the roles last await asyncio.wait([self.get_gm_role().delete(), self.get_observer_role().delete()]) self.config.clear() self.config.update(self.new_dict()) logger.debug(f"[{self.guild.name}] Game deleted") @save_on_success async def start_vote(self, president: discord.Member, chancellor: discord.Member): if self.is_vote_running(): raise RuntimeError("A vote is already running") logger.debug(f"[{self.guild.name}] Starting vote") self.config["vote"] = { "president": president.id, "chancellor": chancellor.id, "message": None, "revealed": False, "can_cast_votes": True, } self.config["vote"].update({str(player_id): None for player_id in self.get_players_id()}) await self.update_vote_message() @vote_running async def update_vote_message(self): logger.debug(f"[{self.guild.name}] Updating vote message") president = self.config["vote"]["president"] chancellor = self.config["vote"]["chancellor"] message_content = [ "**Citizens are called to vote**", "Do you want to elect the following government?", f":crown: <@{president}> as president", f":person_in_tuxedo: <@{chancellor}> as chancellor", ] if self.can_cast_votes(): message_content.append("You can vote by typing `!ja` or `!nein` in your channel") message_content.append("") # Just to mark a separation between the message header and the votes for player_id in self.get_players_id(): player_vote = self.config["vote"][str(player_id)] if player_vote is None: message_content.append(f":black_large_square: <@{player_id}> has not voted") else: if self.config["vote"]["revealed"]: if player_vote: message_content.append(f":green_square: <@{player_id}> has voted JA") else: message_content.append(f":red_square: <@{player_id}> has voted NEIN") else: # Player has voted but the vote should not be revealed message_content.append(f":white_large_square: <@{player_id}> has voted") if self.get_auto_end_vote() and self.can_cast_votes(): message_content.append("The vote will automatically end once everybody has voted") if not self.can_cast_votes(): message_content.append("**The vote has ended**") message_content_str = "\n".join(message_content) if self.config["vote"]["message"] is None: self.config["vote"]["message"] = (await self.get_announcements_channel().send(message_content_str, allowed_mentions = discord.AllowedMentions(users = True))).id else: await (await self.get_announcements_channel().fetch_message(self.config["vote"]["message"])).edit(content = message_content_str, allowed_mentions = discord.AllowedMentions(users = True)) @vote_running @save_on_success async def cast_vote(self, user: discord.Member, vote: Union[bool, None]): logger.debug(f"[{self.guild.name}] Casting vote with value {vote} for user {user.display_name}") if not self.can_cast_votes(): raise RuntimeError("Votes can not be casted right now") self.config["vote"][str(user.id)] = vote if self.get_auto_end_vote() and all(self.config["vote"][str(player_id)] is not None for player_id in self.get_players_id()): asyncio.create_task(self.stop_vote()) await self.update_vote_message() @vote_running @save_on_success async def stop_vote(self): logger.debug(f"[{self.guild.name}] Stopping the vote") self.config["vote"]["can_cast_votes"] = False self.config["vote"]["revealed"] = True await self.update_vote_message() passed = self.is_vote_passing() announcement_content = [ f"<@&{self.get_player_role_id()}> <@&{self.get_gm_role_id()}> the vote has ended!", f"{':green_square:' if passed else ':red_square:'} The vote has **{'' if passed else 'not '}passed**" ] if passed: president = self.config["vote"]["president"] chancellor = self.config["vote"]["chancellor"] announcement_content.append(f"Congratulations to president <@{president}> and chancellor <@{chancellor}>!") if self.config["chaos"] > 0: # If there was some chaos announcement_content.append(":relaxed: The country has calmed and the chaos counter has been reset") self.config["chaos"] = 0 # Anyway, the chaos is reset by a successful vote self.config["vote"] = None await self.get_announcements_channel().send("\n".join(announcement_content), allowed_mentions = discord.AllowedMentions(roles = True, users = True)) # After announcing a non-passing vote, we increase the chaos and announce it if not passed: await self.increase_chaos() @vote_running @save_on_success async def cancel_vote(self): logger.debug(f"[{self.guild.name}] Cancelling the vote") vote_message_id = self.config["vote"]["message"] self.config["vote"] = None if vote_message_id is not None: await (await self.get_announcements_channel().fetch_message(vote_message_id)).edit(content = "~~The vote has been canceled~~") @game_started @save_on_success async def increase_chaos(self): new_chaos = self.config["chaos"] + 1 if new_chaos < 3: self.config["chaos"] = new_chaos await self.get_announcements_channel().send( ":fire: The country slowly descends into chaos " + " ".join([":fire:" for _ in range(new_chaos)] + [":black_small_square:" for _ in range(3 - new_chaos)]) ) else: # Chaos is too high! await self.get_announcements_channel().send(":fire: :fire: :fire: **The country is thrown into chaos** :fire: :fire: :fire:") await self.enact_top_policy(delay = 10) await self.get_announcements_channel().send(":relaxed: The country has calmed and the chaos counter has been reset") self.config["chaos"] = 0 # We reset the counter @game_started @save_on_success async def draw_policies(self) -> List[Policy]: if self.is_legislative_phase(): raise RuntimeError("Can not draw cards if some are already in hand. Enact one or veto instead.") self.config["drawn"] = [self.config["deck"].pop(0) for _ in range(3)] return [Policy(p) for p in self.config["drawn"]] @game_started @policies_drawn @save_on_success async def cancel_draw(self): logger.info(f"[{self.guild.name}] Cancelling draw") new_deck = self.config["drawn"] + self.config["deck"] self.config["drawn"] = None self.config["deck"] = new_deck @game_started async def peek_deck(self) -> List[Policy]: return [Policy(policy_code) for policy_code in self.config["deck"]] @game_started async def peek_top_3_policies(self) -> List[Policy]: return (await self.peek_deck())[:3] @game_started @save_on_success async def add_to_deck(self, policies: List[Policy]): self.config["deck"].extend([policy.value for policy in policies]) random.shuffle(self.config["deck"]) @game_started @policies_drawn @save_on_success async def enact_drawn_policy(self, index: int): if not (0 <= index < len(self.config["drawn"])): raise IndexError(f"Expected policy index between 0 and {len(self.config['drawn'])}, got {index}") for i, policy_str in enumerate(self.config["drawn"]): if i == index: self.config["enacted"].append(policy_str) else: self.config["discard"].append(policy_str) self.config["drawn"] = None await self.announce_latest_enacted_policy() if len(self.config["deck"]) < 3: await self.shuffle_discard_into_deck() @game_started @save_on_success async def shuffle_discard_into_deck(self): self.config["deck"].extend(self.config["discard"]) self.config["discard"] = [] random.shuffle(self.config["deck"]) await self.get_announcements_channel().send("*The policy deck has been shuffled with the discard pile*") @game_started @save_on_success async def enact_top_policy(self, delay = 10): logger.debug(f"[{self.guild.name}] Enacting top policy in {delay} seconds...") await asyncio.sleep(delay) self.config["enacted"].append(self.config["deck"].pop(0)) await self.announce_latest_enacted_policy() if len(self.config["deck"]) < 3: await self.shuffle_discard_into_deck() @game_started async def announce_latest_enacted_policy(self): last_enacted = Policy(self.config["enacted"][-1]) enacted_count = defaultdict(int) for policy in self.get_enacted_policies(): enacted_count[policy] += 1 message_content = [ f"{self.get_player_role().mention} A **{last_enacted.name}** policy {last_enacted.square_emoji()} has been enacted!", f"In total, **{enacted_count[Policy.LIBERAL]} {Policy.LIBERAL.name}** policies and **{enacted_count[Policy.FASCIST]} {Policy.FASCIST.name}** policies have been enacted", " ".join([Policy.LIBERAL.square_emoji()] * enacted_count[Policy.LIBERAL] + [":black_small_square:"] * (self.get_nb_policies_to_victory(Policy.LIBERAL) - enacted_count[Policy.LIBERAL])), " ".join([Policy.FASCIST.square_emoji()] * enacted_count[Policy.FASCIST] + [":black_small_square:"] * (self.get_nb_policies_to_victory(Policy.FASCIST) - enacted_count[Policy.FASCIST])), ] if last_enacted == Policy.FASCIST: if enacted_count[Policy.FASCIST] == self.get_nb_policies_to_victory(Policy.FASCIST) // 2: message_content.append("Be careful about who you elect as chancellor!") elif enacted_count[Policy.FASCIST] == self.get_nb_policies_to_victory(Policy.FASCIST) - 1: message_content.append(":person_gesturing_no: Veto power unlocked :person_gesturing_no:") await self.get_announcements_channel().send("\n".join(message_content), allowed_mentions = discord.AllowedMentions(roles = True)) @game_started @policies_drawn @save_on_success async def veto(self): self.config["discard"].extend(self.config["drawn"]) self.config["drawn"] = None await self.get_announcements_channel().send(f"<@&{self.get_player_role_id()}>\n:person_gesturing_no: The government used the veto power! No policies have been enacted. :person_gesturing_no:", allowed_mentions = discord.AllowedMentions(roles = True)) if len(self.config["deck"]) < 3: await self.shuffle_discard_into_deck() await self.increase_chaos() @game_started @save_on_success async def kill_player(self, player: discord.Member): if player.id not in self.get_players_id(): raise ValueError(f"Trying to kill a player ({player.name}) which is not in the game") self.config["players"].remove(player.id) await player.remove_roles(self.get_player_role()) await self.get_announcements_channel().send(f":skull: <@{player.id}> has been killed! :skull:", allowed_mentions = discord.AllowedMentions(users = True)) await self.get_player_channel(player).send("u ded lol")