314 lines
11 KiB
Python
314 lines
11 KiB
Python
import logging
|
|
import random
|
|
from typing import Dict, Callable, List, Union
|
|
from functools import wraps
|
|
from collections import defaultdict
|
|
from enum import Enum
|
|
|
|
import discord
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Policy(Enum):
|
|
LIBERAL = "L"
|
|
FASCIST = "F"
|
|
|
|
|
|
def game_started(func):
|
|
"""
|
|
Decorator for *methods* of Game that need the game to be started.
|
|
"""
|
|
|
|
@wraps(func)
|
|
def decorated(obj: 'Game', *args, **kwargs):
|
|
if not obj.is_started():
|
|
raise RuntimeError("This function only works on running games!")
|
|
return func(obj, *args, **kwargs)
|
|
|
|
return decorated
|
|
|
|
|
|
def vote_running(func):
|
|
"""
|
|
Decorator for *methods* of Game that need a vote to be running
|
|
"""
|
|
|
|
@wraps(func)
|
|
def decorated(obj: 'Game', *args, **kwargs):
|
|
if not obj.is_vote_running():
|
|
raise RuntimeError("This function only works if a vote is running!")
|
|
return func(obj, *args, **kwargs)
|
|
|
|
return decorated
|
|
|
|
|
|
class Game:
|
|
"""
|
|
Game state for one guild
|
|
"""
|
|
|
|
def __init__(self, config_dict: Dict, save_function: Callable, guild: discord.Guild):
|
|
self.config = config_dict
|
|
self.save_function = save_function
|
|
self.guild = guild
|
|
|
|
@staticmethod
|
|
def new_dict():
|
|
return {
|
|
"game_started": False,
|
|
}
|
|
|
|
def is_started(self):
|
|
return self.config["game_started"]
|
|
|
|
@game_started
|
|
def get_players_id(self) -> List[int]:
|
|
return self.config["players"]
|
|
|
|
@game_started
|
|
def get_players(self) -> List[discord.Member]:
|
|
return [self.guild.get_member(player) for player in self.get_players_id()]
|
|
|
|
@game_started
|
|
def get_player_info(self, player: Union[int, discord.Member]):
|
|
if isinstance(player, discord.Member):
|
|
player = player.id
|
|
return self.config["player_info"][str(player)]
|
|
|
|
@game_started
|
|
def get_gm_role_id(self) -> int:
|
|
return self.config["gm_role"]
|
|
|
|
@game_started
|
|
def get_gm_role(self) -> discord.Role:
|
|
return self.guild.get_role(self.get_gm_role_id())
|
|
|
|
@game_started
|
|
def is_gm(self, user: discord.Member) -> bool:
|
|
return self.get_gm_role() in user.roles
|
|
|
|
@game_started
|
|
def get_player_role_id(self) -> int:
|
|
return self.config["player_role"]
|
|
|
|
@game_started
|
|
def get_player_role(self) -> discord.Role:
|
|
return self.guild.get_role(self.get_player_role_id())
|
|
|
|
@game_started
|
|
def get_game_category_id(self) -> int:
|
|
return self.config["category"]
|
|
|
|
@game_started
|
|
def get_game_category(self) -> discord.CategoryChannel:
|
|
return self.guild.get_channel(self.get_game_category_id())
|
|
|
|
@game_started
|
|
def get_announcements_channel_id(self) -> int:
|
|
return self.config["announce_chan"]
|
|
|
|
@game_started
|
|
def get_announcements_channel(self) -> discord.TextChannel:
|
|
return self.guild.get_channel(self.get_announcements_channel_id())
|
|
|
|
@game_started
|
|
def get_votes_channel_id(self) -> int:
|
|
return self.config["votes_chan"]
|
|
|
|
@game_started
|
|
def get_votes_channel(self) -> discord.TextChannel:
|
|
return self.guild.get_channel(self.get_votes_channel_id())
|
|
|
|
@game_started
|
|
def get_gm_channel_id(self) -> int:
|
|
return self.config["admin_chan"]
|
|
|
|
@game_started
|
|
def get_gm_channel(self) -> discord.TextChannel:
|
|
return self.guild.get_channel(self.get_gm_channel_id())
|
|
|
|
@game_started
|
|
def is_vote_running(self) -> bool:
|
|
return self.config["vote"] is not None
|
|
|
|
@vote_running
|
|
def is_vote_passing(self) -> bool:
|
|
vote_count = defaultdict(int)
|
|
for player_id in self.get_players_id():
|
|
vote_count[self.config["vote"][str(player_id)]] += 1
|
|
return vote_count[True] > vote_count[False]
|
|
|
|
@game_started
|
|
def get_player_channel_id(self, player: Union[int, discord.Member]) -> int:
|
|
if isinstance(player, discord.Member):
|
|
player = player.id
|
|
return self.config["player_info"][str(player)]["channel"]
|
|
|
|
@game_started
|
|
def get_player_channel(self, player: Union[int, discord.Member]) -> discord.TextChannel:
|
|
return self.guild.get_channel(self.get_player_channel_id(player))
|
|
|
|
async def start(self, gm_role: discord.Role, player_role: discord.Role):
|
|
if self.is_started():
|
|
raise ValueError("Game already started")
|
|
logger.info(f"[{self.guild.name}] Starting game")
|
|
self.config["gm_role"] = gm_role.id
|
|
self.config["player_role"] = player_role.id
|
|
self.config["players"] = [member.id for member in player_role.members]
|
|
self.config["player_info"] = {str(player): {} for player in self.config["players"]}
|
|
self.config["vote"] = None
|
|
self.config["deck"] = [Policy.FASCIST.value] * 11 + [Policy.LIBERAL.value] * 6
|
|
random.shuffle(self.config["deck"])
|
|
self.config["discard"] = []
|
|
self.config["drawn"] = None
|
|
self.config["enacted"] = []
|
|
|
|
permissions = {
|
|
self.guild.default_role: discord.PermissionOverwrite(send_messages = False),
|
|
gm_role: discord.PermissionOverwrite(send_messages = True),
|
|
player_role: discord.PermissionOverwrite(send_messages = True),
|
|
}
|
|
game_category = await self.guild.create_category("In-game", overwrites = permissions)
|
|
self.config["category"] = game_category.id
|
|
logger.debug(f"[{self.guild.name}] Created game category")
|
|
|
|
permissions = {
|
|
self.guild.default_role: discord.PermissionOverwrite(read_messages = False),
|
|
gm_role: discord.PermissionOverwrite(read_messages = True),
|
|
}
|
|
self.config["admin_chan"] = (await game_category.create_text_channel("admin", overwrites = permissions)).id
|
|
logger.debug(f"[{self.guild.name}] Created admin channel")
|
|
|
|
permissions = {
|
|
self.guild.default_role: discord.PermissionOverwrite(send_messages = False),
|
|
gm_role: discord.PermissionOverwrite(send_messages = True),
|
|
}
|
|
self.config["announce_chan"] = (await game_category.create_text_channel("announce", overwrites = permissions)).id
|
|
self.config["votes_chan"] = (await game_category.create_text_channel("votes", overwrites = permissions)).id
|
|
logger.debug(f"[{self.guild.name}] Created announcements and votes channels")
|
|
|
|
self.config["discussion_chan"] = (await game_category.create_text_channel("discussion")).id # Permissions are inherited from the category
|
|
logger.debug(f"[{self.guild.name}] Created discussion channel")
|
|
|
|
for player in player_role.members:
|
|
channel_permissions = {
|
|
self.guild.default_role: discord.PermissionOverwrite(read_messages = False),
|
|
player: discord.PermissionOverwrite(read_messages = True),
|
|
gm_role: discord.PermissionOverwrite(read_messages = True),
|
|
}
|
|
player_channel = await game_category.create_text_channel(player.name, overwrites = channel_permissions)
|
|
self.config["player_info"][str(player.id)]["channel"] = player_channel.id
|
|
logger.debug(f"[{self.guild.name}] Created player channels")
|
|
|
|
self.config["game_started"] = True
|
|
self.save_function()
|
|
|
|
@game_started
|
|
async def delete(self):
|
|
category = self.get_game_category()
|
|
for channel in category.channels:
|
|
await channel.delete()
|
|
await category.delete()
|
|
self.config.clear()
|
|
self.config.update(self.new_dict())
|
|
self.save_function()
|
|
|
|
async def start_vote(self, president: discord.Member, chancellor: discord.Member):
|
|
if self.is_vote_running():
|
|
raise RuntimeError("A vote is already running")
|
|
logging.debug(f"[{self.guild.name}] Starting vote")
|
|
|
|
self.config["vote"] = {
|
|
"president": president.id,
|
|
"chancellor": chancellor.id,
|
|
"message": None,
|
|
"revealed": False,
|
|
}
|
|
self.config["vote"].update({str(player_id): None for player_id in self.get_players_id()})
|
|
self.save_function()
|
|
await self.update_vote_message()
|
|
|
|
@vote_running
|
|
async def update_vote_message(self):
|
|
logging.debug(f"[{self.guild.name}] Updating vote message")
|
|
president = self.config["vote"]["president"]
|
|
chancellor = self.config["vote"]["chancellor"]
|
|
message_content = [
|
|
"**Citizens are called to vote**",
|
|
"Do you want to elect the following government?",
|
|
f":crown: <@{president}> as president",
|
|
f":person_in_tuxedo: <@{chancellor}> as chancellor",
|
|
"You can vote by typing `!ja` or `!nein` in your channel"
|
|
"",
|
|
]
|
|
for player_id in self.get_players_id():
|
|
player_vote = self.config["vote"][str(player_id)]
|
|
if player_vote is None:
|
|
message_content.append(f":black_large_square: <@{player_id}> has not voted")
|
|
else:
|
|
if self.config["vote"]["revealed"]:
|
|
if player_vote:
|
|
message_content.append(f":green_square: <@{player_id}> has voted JA")
|
|
else:
|
|
message_content.append(f":red_square: <@{player_id}> has voted NEIN")
|
|
else: # Player has voted but the vote should not be revealed
|
|
message_content.append(f":white_large_square: <@{player_id}> has voted")
|
|
|
|
message_content_str = "\n".join(message_content)
|
|
if self.config["vote"]["message"] is None:
|
|
self.config["vote"]["message"] = (await self.get_votes_channel().send(message_content_str, allowed_mentions = discord.AllowedMentions.none())).id
|
|
else:
|
|
await (await self.get_votes_channel().fetch_message(self.config["vote"]["message"])).edit(content = message_content_str, allowed_mentions = discord.AllowedMentions.none())
|
|
|
|
@vote_running
|
|
async def cast_vote(self, user: discord.Member, vote: Union[bool, None]):
|
|
logging.debug(f"[{self.guild.name}] Casting vote with value {vote} for user {user.display_name}")
|
|
self.config["vote"][str(user.id)] = vote
|
|
await self.update_vote_message()
|
|
self.save_function()
|
|
|
|
@vote_running
|
|
async def stop_vote(self):
|
|
logging.debug(f"[{self.guild.name}] Stopping the vote")
|
|
passed = self.is_vote_passing()
|
|
self.config["vote"]["revealed"] = True
|
|
await self.update_vote_message()
|
|
await self.get_votes_channel().send("**The vote has ended**")
|
|
announcement_content = [
|
|
f"{self.get_player_role().mention} the vote has ended!",
|
|
f"{':green_square:' if passed else ':red_square:'} The vote has **{'' if passed else 'not '}passed**"
|
|
]
|
|
if passed:
|
|
president = self.config["vote"]["president"]
|
|
chancellor = self.config["vote"]["chancellor"]
|
|
announcement_content.append(f"Congratulations to president <@{president}> and chancellor <@{chancellor}>!")
|
|
await self.get_announcements_channel().send("\n".join(announcement_content), allowed_mentions = discord.AllowedMentions(roles = True))
|
|
self.config["vote"] = None
|
|
self.save_function()
|
|
|
|
@game_started
|
|
async def draw_policies(self) -> List[Policy]:
|
|
self.config["drawn"] = [self.config["deck"].pop() for _ in range(3)]
|
|
self.save_function()
|
|
return [Policy(p) for p in self.config["drawn"]]
|
|
|
|
async def enact_drawn_policy(self, index: int):
|
|
if self.config["drawn"] is None:
|
|
raise RuntimeError("Can only enact a policy when they have been drawn")
|
|
if not (0 <= index < len(self.config["drawn"])):
|
|
raise IndexError(f"Expected policy index between 0 and {len(self.config['drawn'])}, got {index}")
|
|
for i, policy_str in enumerate(self.config["drawn"]):
|
|
if i == index:
|
|
self.config["enacted"].append(policy_str)
|
|
else:
|
|
self.config["discard"].append(policy_str)
|
|
self.config["drawn"] = None
|
|
if len(self.config["deck"]) < 3:
|
|
self.config["deck"].extend(self.config["discard"])
|
|
self.config["discard"] = []
|
|
random.shuffle(self.config["deck"])
|
|
self.save_function()
|
|
enacted = Policy(self.config["enacted"][-1])
|
|
await self.get_announcements_channel().send(f"{self.get_player_role().mention} A **{enacted.name}** policy has been enacted!")
|