import atexit import json import logging from pathlib import Path from typing import Union, Dict, List, Callable, Tuple, Any, Optional import discord import utils from . import vNoneTov1_0 from . import v1_0Tov1_1 logger = logging.getLogger(__name__) # Functions used to convert between configuration versions, in a dictionary of: old_version -> converter # Each converter takes the old configuration and returns the converted configuration and the new version config_version_converters: Dict[Union[None, str], Callable[[Dict], Tuple[Dict, str]]] = { None: vNoneTov1_0.convert, "1.0": v1_0Tov1_1.convert, } class ChannelsConfigFile: """ Wrapper for the channels configuration file, telling which channels to watch and which channels were created by the bot. """ version = "1.1" @staticmethod def empty_config() -> Dict: """ :return: An empty configuration, used to initialise the configuration file """ return { "__version__": ChannelsConfigFile.version, } def __init__(self, config_file_path: Union[str, Path]): self.config_file = None # File descriptor to the on-disk config file self.config = None # In-memory configuration # Loading configuration from disk logger.debug("Loading channels configuration file") config_file_path = Path(config_file_path).absolute() if config_file_path.exists(): if not config_file_path.is_file(): raise ValueError(f"Config file {config_file_path} exists but is not a regular file") self.config_file = config_file_path.open("r+") if config_file_path.stat().st_size == 0: # Config file is empty logger.warning(f"Config file {config_file_path} is empty, using empty config") self.config = self.empty_config() self.save_to_file() # So that the file is proper json else: # Config file is not empty has_been_converted = self.reload_from_disk() if has_been_converted: self.save_to_file() else: # Config file does not exist self.config_file = config_file_path.open("w+") self.config = self.empty_config() self.save_to_file() # So that the file is proper json # Verifying that attributes have been initialised properly assert self.config_file is not None and self.config is not None logger.debug("Channels configuration file successfully initialised") atexit.register(self.save_to_file) def reload_from_disk(self) -> bool: """ Reload the configuration from the on-disk file :return: Whether the configuration had to be converted to a more recent version because the on-file was an earlier version :except json.JSONDecodeError: if the file is not valid json :except ValueError: if the version of the configuration in the file is not known or can not be converted to a more recent one """ logger.debug("Loading channels configuration file from disk") self.config_file.seek(0) # Moving to beginning of file try: self.config = json.load(self.config_file) except json.JSONDecodeError as e: logger.critical(f"JSON Error when parsing channels config file: {e}") raise e # Checking configuration version and converting if needed config_version = self.config["__version__"] if "__version__" in self.config else None has_been_converted = False if config_version != self.version: logger.info(f"Channels configuration file is an older version, converting (on-file: {config_version}, current: {self.version})...") # Performing the conversion while config_version != self.version: if config_version in config_version_converters: logger.debug(f"Converting from {config_version}") self.config, config_version = config_version_converters[config_version](self.config) logger.debug(f"Converted to {config_version}") else: logger.critical(f"Impossible to find converter to convert from {config_version}") raise ValueError(f"Configuration loading: impossible to convert on-file config to current version") has_been_converted = True assert self.config["__version__"] == self.version logger.debug("Loaded channels configuration file from disk") return has_been_converted def save_to_file(self, indent = 2) -> None: """ Save the configuration to the on-fisk file :param indent: Indentation to use for pretty-printing json """ logger.debug("Writing channels configuration file to disk") self.config_file.seek(0) self.config_file.truncate() json.dump(self.config, self.config_file, indent = indent) self.config_file.flush() logger.debug("Written channels configuration file to disk") def _get_guild_configuration(self, guild: discord.Guild) -> Dict: str_guild_id = str(guild.id) if str_guild_id not in self.config: self.config[str_guild_id] = { "_name": guild.name, "watched_channels": [], "created_channels": [], "user_config": {}, } return self.config[str_guild_id] def get_watched_channels_ids(self, guild: discord.Guild) -> List[int]: """ :return: The list of IDs of watched channels for the given guild. Beware that the returned list is not a copy, it should not be modified. """ guild_configuration = self._get_guild_configuration(guild) if "watched_channels" not in guild_configuration: guild_configuration["watched_channels"] = [] return guild_configuration["watched_channels"] def add_channel_to_watched(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]) -> bool: """ Add a voice channel to the list of channels that the bot should watch :return: True if the channel has been added, False if it was already present """ watched_channels = self.get_watched_channels_ids(guild) if channel.id in watched_channels: return False else: watched_channels.append(channel.id) self.save_to_file() return True def remove_channel_from_watched(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]) -> bool: """ Attempt to remove a voice channel from the list of channels that the bot should watch :return: True if the channel was in the list and has been removed, False if it was not in the list """ watched_channels = self.get_watched_channels_ids(guild) size_before = len(watched_channels) utils.list_remove_all_occurrences(watched_channels, channel.id) # Removing all occurrences of the channel, just in case if len(watched_channels) != size_before: # Size differs: something must have been removed self.save_to_file() return True else: # Same size: list has not been modified return False def clear_watched_channels(self, guild: discord.Guild) -> None: """ Remove all channels from the watched channels list """ self.get_watched_channels_ids(guild).clear() self.save_to_file() def is_watched(self, guild: discord.Guild, channel: discord.VoiceChannel) -> bool: """ :return: Whether the given channel is watched """ return channel.id in self.get_watched_channels_ids(guild) def _get_created_channels(self, guild: discord.Guild) -> List[int]: guild_configuration = self._get_guild_configuration(guild) if "created_channels" not in guild_configuration: guild_configuration["created_channels"] = [] return guild_configuration["created_channels"] def add_channel_to_created(self, guild: discord.Guild, channel: discord.VoiceChannel) -> None: """ Add a voice channel to the list of channels that the bot has created and manages. """ self._get_created_channels(guild).append(channel.id) self.save_to_file() def remove_channel_from_created(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]): """ Remove a voice channel from the list of channels that have been created by the bot """ created_channels = self._get_created_channels(guild) utils.list_remove_all_occurrences(created_channels, channel.id) # We could update the on-disk file, but we avoid it to limit the number of writes to disk. # It is not critical if this information is lost, and it will probably be updated properly anyway the next time the file is written # self.save_to_file() pass def is_created(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]): """ :return: Whether the given channel is one of the channels that have been created by the bot """ return channel.id in self._get_created_channels(guild) def _get_user_configs(self, guild: discord.Guild) -> Dict[str, Dict]: """ :return: The configuration for all users in a guild """ guild_configuration = self._get_guild_configuration(guild) if "user_config" not in guild_configuration: guild_configuration["user_config"] = {} return guild_configuration["user_config"] def _get_user_config(self, guild: discord.Guild, user: discord.Member) -> Dict[str, Any]: """ :return: The configuration for a user in a guild """ all_users_configs = self._get_user_configs(guild) user_id_str = str(user.id) if user_id_str not in all_users_configs: all_users_configs[user_id_str] = { "__username": f"{user.name}#{user.discriminator}", "channel_name": None, } return all_users_configs[user_id_str] def _clean_user_config_if_useless(self, guild: discord.Guild, user: discord.Member): """ If some of the information for a user in the user config of a guild is useless, remove it. If the whole entry for a user in the user config of a guild is useless (i.e. it contains only its username), delete it. """ all_users_config = self._get_user_configs(guild) user_id_str = str(user.id) user_config = all_users_config[user_id_str] if user_id_str in all_users_config else {} # If the default channel name for this user is set to None, it is the same as not being set if "channel_name" in user_config and user_config["channel_name"] is None: del user_config["channel_name"] # If the only information remaining is the username (or nothing), delete the entry if list(user_config.keys()) in ([], ["__username"]): del all_users_config[user_id_str] logger.debug(f"[{guild.name}({guild.id})] Deleted user {user.display_name}({user.id}) configuration because it was useless") def set_user_channel_name(self, guild: discord.Guild, user: discord.Member, channel_name: str) -> None: """ Set the initial name for the channels created for a specific user """ user_config = self._get_user_config(guild, user) user_config["channel_name"] = channel_name def get_user_channel_name(self, guild: discord.Guild, user: discord.Member) -> Optional[str]: """ :return: The initial name for the channels created for a specific user, if the user set one """ user_config = self._get_user_config(guild, user) channel_name = user_config["channel_name"] if "channel_name" in user_config else None self._clean_user_config_if_useless(guild, user) return channel_name def clear_user_channel_name(self, guild: discord.Guild, user: discord.Member): """ Remove the user preferences about the names for their channels """ user_config = self._get_user_config(guild, user) user_config["channel_name"] = None self._clean_user_config_if_useless(guild, user)