SecretBot/GameFiles/Game.py

398 lines
15 KiB
Python

import asyncio
import logging
import random
from collections import defaultdict
from enum import Enum
from functools import wraps
from typing import Dict, Callable, List, Union
import discord
logger = logging.getLogger(__name__)
class Policy(Enum):
LIBERAL = "L"
FASCIST = "F"
def square_emoji(self):
if self == self.LIBERAL:
return ":blue_square:"
else:
return ":red_square:"
def game_started(func):
"""
Decorator for *methods* of Game that need the game to be started.
"""
@wraps(func)
def decorated(obj: 'Game', *args, **kwargs):
if not obj.is_started():
raise RuntimeError("This function only works on running games!")
return func(obj, *args, **kwargs)
return decorated
def vote_running(func):
"""
Decorator for *methods* of Game that need a vote to be running
"""
@wraps(func)
def decorated(obj: 'Game', *args, **kwargs):
if not obj.is_vote_running():
raise RuntimeError("This function only works if a vote is running!")
return func(obj, *args, **kwargs)
return decorated
def save_on_success(func):
"""
Decorator for *async methods* of game that calls the save function after the method has executed without exceptions
"""
@wraps(func)
async def decorated(obj: 'Game', *args, **kwargs):
return_value = await func(obj, *args, **kwargs)
obj.save_function()
return return_value
return decorated
class Game:
"""
Game state for one guild
"""
def __init__(self, config_dict: Dict, save_function: Callable, guild: discord.Guild):
self.config = config_dict
self.save_function = save_function
self.guild = guild
@staticmethod
def new_dict():
return {
"game_started": False,
}
def is_started(self):
return self.config["game_started"]
@game_started
def get_players_id(self) -> List[int]:
return self.config["players"]
@game_started
def get_players(self) -> List[discord.Member]:
return [self.guild.get_member(player) for player in self.get_players_id()]
@game_started
def get_player_info(self, player: Union[int, discord.Member]):
if isinstance(player, discord.Member):
player = player.id
return self.config["player_info"][str(player)]
@game_started
def get_gm_role_id(self) -> int:
return self.config["gm_role"]
@game_started
def get_gm_role(self) -> discord.Role:
return self.guild.get_role(self.get_gm_role_id())
@game_started
def is_gm(self, user: discord.Member) -> bool:
return self.get_gm_role() in user.roles
@game_started
def get_player_role_id(self) -> int:
return self.config["player_role"]
@game_started
def get_player_role(self) -> discord.Role:
return self.guild.get_role(self.get_player_role_id())
@game_started
def get_observer_role_id(self) -> int:
return self.config["observer_role"]
@game_started
def get_observer_role(self) -> discord.Role:
return self.guild.get_role(self.get_observer_role_id())
@game_started
def get_game_category_id(self) -> int:
return self.config["category"]
@game_started
def get_game_category(self) -> discord.CategoryChannel:
return self.guild.get_channel(self.get_game_category_id())
@game_started
def get_announcements_channel_id(self) -> int:
return self.config["announce_chan"]
@game_started
def get_announcements_channel(self) -> discord.TextChannel:
return self.guild.get_channel(self.get_announcements_channel_id())
@game_started
def get_votes_channel_id(self) -> int:
return self.config["votes_chan"]
@game_started
def get_votes_channel(self) -> discord.TextChannel:
return self.guild.get_channel(self.get_votes_channel_id())
@game_started
def get_gm_channel_id(self) -> int:
return self.config["admin_chan"]
@game_started
def get_gm_channel(self) -> discord.TextChannel:
return self.guild.get_channel(self.get_gm_channel_id())
@game_started
def is_vote_running(self) -> bool:
return self.config["vote"] is not None
@vote_running
def is_vote_passing(self) -> bool:
vote_count = defaultdict(int)
for player_id in self.get_players_id():
vote_count[self.config["vote"][str(player_id)]] += 1
return vote_count[True] > vote_count[False]
@game_started
def get_player_channel_id(self, player: Union[int, discord.Member]) -> int:
if isinstance(player, discord.Member):
player = player.id
return self.config["player_info"][str(player)]["channel"]
@game_started
def get_player_channel(self, player: Union[int, discord.Member]) -> discord.TextChannel:
return self.guild.get_channel(self.get_player_channel_id(player))
@save_on_success
async def start(self, player_role: discord.Role, bot_user_id: int):
if self.is_started():
raise ValueError("Game already started")
logger.info(f"[{self.guild.name}] Starting game")
tasks = [] # Asyncio tasks scheduled to run in parallel (e.g. channel creation), so that we can wait them all at the end of the function
self.config["game_started"] = True
self.config["player_role"] = player_role.id
self.config["players"] = [member.id for member in player_role.members]
self.config["player_info"] = {str(player): {} for player in self.config["players"]}
self.config["vote"] = None
self.config["deck"] = [Policy.FASCIST.value] * 11 + [Policy.LIBERAL.value] * 6
random.shuffle(self.config["deck"])
self.config["discard"] = []
self.config["drawn"] = None
self.config["enacted"] = []
gm_role = await self.guild.create_role(name = "GM", hoist = True, mentionable = True, permissions = self.guild.default_role.permissions)
self.config["gm_role"] = gm_role.id
await self.guild.get_member(bot_user_id).add_roles(gm_role) # Need to be here otherwise the bot won't have the permissions needed to set up private channels
logger.debug("Created GM role")
observer_role = await self.guild.create_role(name = "Observer", mentionable = True, permissions = self.guild.default_role.permissions)
self.config["observer_role"] = observer_role.id
logger.debug("Created Observer role")
category_permissions = {
self.guild.default_role: discord.PermissionOverwrite(send_messages = False),
gm_role: discord.PermissionOverwrite(send_messages = True),
player_role: discord.PermissionOverwrite(send_messages = True),
observer_role: discord.PermissionOverwrite(send_messages = False, read_messages = True),
}
game_category = await self.guild.create_category("In-game", overwrites = category_permissions)
self.config["category"] = game_category.id
logger.debug(f"[{self.guild.name}] Created game category")
async def create_admin_chan():
perms = {
self.guild.default_role: discord.PermissionOverwrite(read_messages = False),
gm_role: discord.PermissionOverwrite(read_messages = True),
}
self.config["admin_chan"] = (await game_category.create_text_channel("admin", overwrites = perms, position = 0)).id
logger.debug(f"[{self.guild.name}] Created admin channel")
tasks.append(asyncio.create_task(create_admin_chan()))
async def create_announcements_and_vote_chans():
perms = {
self.guild.default_role: discord.PermissionOverwrite(send_messages = False),
gm_role: discord.PermissionOverwrite(send_messages = True),
}
channels = await asyncio.gather(
game_category.create_text_channel("announcements", overwrites = perms, position = 1),
game_category.create_text_channel("votes", overwrites = perms, position = 2)
)
self.config["announce_chan"] = channels[0].id
self.config["votes_chan"] = channels[1].id
logger.debug(f"[{self.guild.name}] Created announcements and votes channels")
tasks.append(asyncio.create_task(create_announcements_and_vote_chans()))
async def create_discussion_chan():
# Permissions are inherited from the category so they are synced to it
self.config["discussion_chan"] = (await game_category.create_text_channel("discussion", position = 3)).id # Permissions are inherited from the category
logger.debug(f"[{self.guild.name}] Created discussion channel")
tasks.append(asyncio.create_task(create_discussion_chan()))
async def create_observers_chan():
perms = {
self.guild.default_role: discord.PermissionOverwrite(read_messages = False),
gm_role: discord.PermissionOverwrite(read_messages = True),
observer_role: discord.PermissionOverwrite(read_messages = True),
}
self.config["observer_chan"] = (await game_category.create_text_channel("observers", overwrites = perms, position = 4)).id
logger.debug(f"[{self.guild}] Created observers channel")
tasks.append(asyncio.create_task(create_observers_chan()))
async def create_player_channel(player: discord.Member, position: int):
perms = {
self.guild.default_role: discord.PermissionOverwrite(read_messages = False),
player: discord.PermissionOverwrite(read_messages = True),
gm_role: discord.PermissionOverwrite(read_messages = True),
observer_role: discord.PermissionOverwrite(read_messages = True, send_messages = False),
}
self.config["player_info"][str(player.id)]["channel"] = (await game_category.create_text_channel(player.name, overwrites = perms, position = position)).id
logger.debug(f"[{self.guild.name}] Created channel for player {player.name}")
for i, player in enumerate(player_role.members):
tasks.append(asyncio.create_task(create_player_channel(player, 5 + i)))
await asyncio.wait(tasks)
@game_started
@save_on_success
async def delete(self):
category = self.get_game_category()
await asyncio.wait([channel.delete() for channel in category.channels])
await category.delete()
# Need to delete the roles last
await asyncio.wait([self.get_gm_role().delete(), self.get_observer_role().delete()])
self.config.clear()
self.config.update(self.new_dict())
logger.debug(f"[{self.guild.name}] Game deleted")
@save_on_success
async def start_vote(self, president: discord.Member, chancellor: discord.Member):
if self.is_vote_running():
raise RuntimeError("A vote is already running")
logging.debug(f"[{self.guild.name}] Starting vote")
self.config["vote"] = {
"president": president.id,
"chancellor": chancellor.id,
"message": None,
"revealed": False,
}
self.config["vote"].update({str(player_id): None for player_id in self.get_players_id()})
await self.update_vote_message()
@vote_running
async def update_vote_message(self):
logging.debug(f"[{self.guild.name}] Updating vote message")
president = self.config["vote"]["president"]
chancellor = self.config["vote"]["chancellor"]
message_content = [
"**Citizens are called to vote**",
"Do you want to elect the following government?",
f":crown: <@{president}> as president",
f":person_in_tuxedo: <@{chancellor}> as chancellor",
"You can vote by typing `!ja` or `!nein` in your channel"
"",
]
for player_id in self.get_players_id():
player_vote = self.config["vote"][str(player_id)]
if player_vote is None:
message_content.append(f":black_large_square: <@{player_id}> has not voted")
else:
if self.config["vote"]["revealed"]:
if player_vote:
message_content.append(f":green_square: <@{player_id}> has voted JA")
else:
message_content.append(f":red_square: <@{player_id}> has voted NEIN")
else: # Player has voted but the vote should not be revealed
message_content.append(f":white_large_square: <@{player_id}> has voted")
message_content_str = "\n".join(message_content)
if self.config["vote"]["message"] is None:
self.config["vote"]["message"] = (await self.get_votes_channel().send(message_content_str, allowed_mentions = discord.AllowedMentions.none())).id
else:
await (await self.get_votes_channel().fetch_message(self.config["vote"]["message"])).edit(content = message_content_str, allowed_mentions = discord.AllowedMentions.none())
@vote_running
@save_on_success
async def cast_vote(self, user: discord.Member, vote: Union[bool, None]):
logging.debug(f"[{self.guild.name}] Casting vote with value {vote} for user {user.display_name}")
self.config["vote"][str(user.id)] = vote
await self.update_vote_message()
@vote_running
@save_on_success
async def stop_vote(self):
logging.debug(f"[{self.guild.name}] Stopping the vote")
tasks = []
passed = self.is_vote_passing()
self.config["vote"]["revealed"] = True
await self.update_vote_message()
tasks.append(asyncio.create_task(self.get_votes_channel().send("**The vote has ended**")))
announcement_content = [
f"{self.get_player_role().mention} the vote has ended!",
f"{':green_square:' if passed else ':red_square:'} The vote has **{'' if passed else 'not '}passed**"
]
if passed:
president = self.config["vote"]["president"]
chancellor = self.config["vote"]["chancellor"]
announcement_content.append(f"Congratulations to president <@{president}> and chancellor <@{chancellor}>!")
tasks.append(asyncio.create_task(self.get_announcements_channel().send("\n".join(announcement_content), allowed_mentions = discord.AllowedMentions(roles = True))))
await asyncio.wait(tasks)
self.config["vote"] = None
@game_started
@save_on_success
async def draw_policies(self) -> List[Policy]:
self.config["drawn"] = [self.config["deck"].pop(0) for _ in range(3)]
return [Policy(p) for p in self.config["drawn"]]
@game_started
async def peek_policies(self) -> List[Policy]:
return [Policy(self.config["deck"][i]) for i in range(3)]
@game_started
@save_on_success
async def enact_drawn_policy(self, index: int):
if self.config["drawn"] is None:
raise RuntimeError("Can only enact a policy when they have been drawn")
if not (0 <= index < len(self.config["drawn"])):
raise IndexError(f"Expected policy index between 0 and {len(self.config['drawn'])}, got {index}")
for i, policy_str in enumerate(self.config["drawn"]):
if i == index:
self.config["enacted"].append(policy_str)
else:
self.config["discard"].append(policy_str)
self.config["drawn"] = None
if len(self.config["deck"]) < 3:
self.config["deck"].extend(self.config["discard"])
self.config["discard"] = []
random.shuffle(self.config["deck"])
await self.announce_latest_enacted_policy()
@game_started
async def announce_latest_enacted_policy(self):
last_enacted = Policy(self.config["enacted"][-1])
enacted_count = defaultdict(int)
for policy_str in self.config["enacted"]:
enacted_count[Policy(policy_str)] += 1
message_content = [
f"{self.get_player_role().mention} A **{last_enacted.name}** policy {last_enacted.square_emoji()} has been enacted!",
f"In total, **{enacted_count[Policy.LIBERAL]} {Policy.LIBERAL.name}** policies and **{enacted_count[Policy.FASCIST]} {Policy.FASCIST.name}** policies have been enacted",
" ".join([Policy.LIBERAL.square_emoji()] * enacted_count[Policy.LIBERAL] + [":black_small_square:"] * (5 - enacted_count[Policy.LIBERAL])),
" ".join([Policy.FASCIST.square_emoji()] * enacted_count[Policy.FASCIST] + [":black_small_square:"] * (6 - enacted_count[Policy.FASCIST])),
]
await self.get_announcements_channel().send("\n".join(message_content), allowed_mentions = discord.AllowedMentions(roles = True))