270 lines
11 KiB
Python
270 lines
11 KiB
Python
import atexit
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Union, Dict, List, Callable, Tuple, Any, Optional
|
|
|
|
import discord
|
|
|
|
import utils
|
|
from . import vNoneTov1_0
|
|
from . import v1_0Tov1_1
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Functions used to convert between configuration versions, in a dictionary of: old_version -> converter
|
|
# Each converter takes the old configuration and returns the converted configuration and the new version
|
|
config_version_converters: Dict[Union[None, str], Callable[[Dict], Tuple[Dict, str]]] = {
|
|
None: vNoneTov1_0.convert,
|
|
"1.0": v1_0Tov1_1.convert,
|
|
}
|
|
|
|
|
|
class ChannelsConfigFile:
|
|
"""
|
|
Wrapper for the channels configuration file, telling which channels to watch and which channels were created by the bot.
|
|
"""
|
|
version = "1.1"
|
|
|
|
@staticmethod
|
|
def empty_config() -> Dict:
|
|
"""
|
|
:return: An empty configuration, used to initialise the configuration file
|
|
"""
|
|
return {
|
|
"__version__": ChannelsConfigFile.version,
|
|
}
|
|
|
|
def __init__(self, config_file_path: Union[str, Path]):
|
|
self.config_file = None # File descriptor to the on-disk config file
|
|
self.config = None # In-memory configuration
|
|
|
|
# Loading configuration from disk
|
|
logger.debug("Loading channels configuration file")
|
|
config_file_path = Path(config_file_path).absolute()
|
|
if config_file_path.exists():
|
|
if not config_file_path.is_file():
|
|
raise ValueError(f"Config file {config_file_path} exists but is not a regular file")
|
|
|
|
self.config_file = config_file_path.open("r+")
|
|
|
|
if config_file_path.stat().st_size == 0: # Config file is empty
|
|
logger.warning(f"Config file {config_file_path} is empty, using empty config")
|
|
self.config = self.empty_config()
|
|
self.save_to_file() # So that the file is proper json
|
|
else: # Config file is not empty
|
|
has_been_converted = self.reload_from_disk()
|
|
if has_been_converted:
|
|
self.save_to_file()
|
|
else: # Config file does not exist
|
|
self.config_file = config_file_path.open("w+")
|
|
self.config = self.empty_config()
|
|
self.save_to_file() # So that the file is proper json
|
|
|
|
# Verifying that attributes have been initialised properly
|
|
assert self.config_file is not None and self.config is not None
|
|
logger.debug("Channels configuration file successfully initialised")
|
|
|
|
atexit.register(self.save_to_file)
|
|
|
|
def reload_from_disk(self) -> bool:
|
|
"""
|
|
Reload the configuration from the on-disk file
|
|
|
|
:return: Whether the configuration had to be converted to a more recent version because the on-file was an earlier version
|
|
:except json.JSONDecodeError: if the file is not valid json
|
|
:except ValueError: if the version of the configuration in the file is not known or can not be converted to a more recent one
|
|
"""
|
|
logger.debug("Loading channels configuration file from disk")
|
|
self.config_file.seek(0) # Moving to beginning of file
|
|
try:
|
|
self.config = json.load(self.config_file)
|
|
except json.JSONDecodeError as e:
|
|
logger.critical(f"JSON Error when parsing channels config file: {e}")
|
|
raise e
|
|
# Checking configuration version and converting if needed
|
|
config_version = self.config["__version__"] if "__version__" in self.config else None
|
|
has_been_converted = False
|
|
if config_version != self.version:
|
|
logger.info(f"Channels configuration file is an older version, converting (on-file: {config_version}, current: {self.version})...")
|
|
|
|
# Performing the conversion
|
|
while config_version != self.version:
|
|
if config_version in config_version_converters:
|
|
logger.debug(f"Converting from {config_version}")
|
|
self.config, config_version = config_version_converters[config_version](self.config)
|
|
logger.debug(f"Converted to {config_version}")
|
|
else:
|
|
logger.critical(f"Impossible to find converter to convert from {config_version}")
|
|
raise ValueError(f"Configuration loading: impossible to convert on-file config to current version")
|
|
has_been_converted = True
|
|
|
|
assert self.config["__version__"] == self.version
|
|
|
|
logger.debug("Loaded channels configuration file from disk")
|
|
return has_been_converted
|
|
|
|
def save_to_file(self, indent = 2) -> None:
|
|
"""
|
|
Save the configuration to the on-fisk file
|
|
:param indent: Indentation to use for pretty-printing json
|
|
"""
|
|
logger.debug("Writing channels configuration file to disk")
|
|
self.config_file.seek(0)
|
|
self.config_file.truncate()
|
|
json.dump(self.config, self.config_file, indent = indent)
|
|
self.config_file.flush()
|
|
logger.debug("Written channels configuration file to disk")
|
|
|
|
def _get_guild_configuration(self, guild: discord.Guild) -> Dict:
|
|
str_guild_id = str(guild.id)
|
|
if str_guild_id not in self.config:
|
|
self.config[str_guild_id] = {
|
|
"_name": guild.name,
|
|
"watched_channels": [],
|
|
"created_channels": [],
|
|
"user_config": {},
|
|
}
|
|
return self.config[str_guild_id]
|
|
|
|
def get_watched_channels_ids(self, guild: discord.Guild) -> List[int]:
|
|
"""
|
|
:return: The list of IDs of watched channels for the given guild. Beware that the returned list is not a copy, it should not be modified.
|
|
"""
|
|
guild_configuration = self._get_guild_configuration(guild)
|
|
if "watched_channels" not in guild_configuration:
|
|
guild_configuration["watched_channels"] = []
|
|
return guild_configuration["watched_channels"]
|
|
|
|
def add_channel_to_watched(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]) -> bool:
|
|
"""
|
|
Add a voice channel to the list of channels that the bot should watch
|
|
:return: True if the channel has been added, False if it was already present
|
|
"""
|
|
watched_channels = self.get_watched_channels_ids(guild)
|
|
if channel.id in watched_channels:
|
|
return False
|
|
else:
|
|
watched_channels.append(channel.id)
|
|
self.save_to_file()
|
|
return True
|
|
|
|
def remove_channel_from_watched(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]) -> bool:
|
|
"""
|
|
Attempt to remove a voice channel from the list of channels that the bot should watch
|
|
:return: True if the channel was in the list and has been removed, False if it was not in the list
|
|
"""
|
|
watched_channels = self.get_watched_channels_ids(guild)
|
|
size_before = len(watched_channels)
|
|
utils.list_remove_all_occurrences(watched_channels, channel.id) # Removing all occurrences of the channel, just in case
|
|
if len(watched_channels) != size_before: # Size differs: something must have been removed
|
|
self.save_to_file()
|
|
return True
|
|
else: # Same size: list has not been modified
|
|
return False
|
|
|
|
def clear_watched_channels(self, guild: discord.Guild) -> None:
|
|
"""
|
|
Remove all channels from the watched channels list
|
|
"""
|
|
self.get_watched_channels_ids(guild).clear()
|
|
self.save_to_file()
|
|
|
|
def is_watched(self, guild: discord.Guild, channel: discord.VoiceChannel) -> bool:
|
|
"""
|
|
:return: Whether the given channel is watched
|
|
"""
|
|
return channel.id in self.get_watched_channels_ids(guild)
|
|
|
|
def _get_created_channels(self, guild: discord.Guild) -> List[int]:
|
|
guild_configuration = self._get_guild_configuration(guild)
|
|
if "created_channels" not in guild_configuration:
|
|
guild_configuration["created_channels"] = []
|
|
return guild_configuration["created_channels"]
|
|
|
|
def add_channel_to_created(self, guild: discord.Guild, channel: discord.VoiceChannel) -> None:
|
|
"""
|
|
Add a voice channel to the list of channels that the bot has created and manages.
|
|
"""
|
|
self._get_created_channels(guild).append(channel.id)
|
|
self.save_to_file()
|
|
|
|
def remove_channel_from_created(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]):
|
|
"""
|
|
Remove a voice channel from the list of channels that have been created by the bot
|
|
"""
|
|
created_channels = self._get_created_channels(guild)
|
|
utils.list_remove_all_occurrences(created_channels, channel.id)
|
|
# We could update the on-disk file, but we avoid it to limit the number of writes to disk.
|
|
# It is not critical if this information is lost, and it will probably be updated properly anyway the next time the file is written
|
|
# self.save_to_file()
|
|
pass
|
|
|
|
def is_created(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]):
|
|
"""
|
|
:return: Whether the given channel is one of the channels that have been created by the bot
|
|
"""
|
|
return channel.id in self._get_created_channels(guild)
|
|
|
|
def _get_user_configs(self, guild: discord.Guild) -> Dict[str, Dict]:
|
|
"""
|
|
:return: The configuration for all users in a guild
|
|
"""
|
|
guild_configuration = self._get_guild_configuration(guild)
|
|
if "user_config" not in guild_configuration:
|
|
guild_configuration["user_config"] = {}
|
|
return guild_configuration["user_config"]
|
|
|
|
def _get_user_config(self, guild: discord.Guild, user: discord.Member) -> Dict[str, Any]:
|
|
"""
|
|
:return: The configuration for a user in a guild
|
|
"""
|
|
all_users_configs = self._get_user_configs(guild)
|
|
user_id_str = str(user.id)
|
|
if user_id_str not in all_users_configs:
|
|
all_users_configs[user_id_str] = {
|
|
"__username": f"{user.name}#{user.discriminator}",
|
|
"channel_name": None,
|
|
}
|
|
return all_users_configs[user_id_str]
|
|
|
|
def _clean_user_config_if_useless(self, guild: discord.Guild, user: discord.Member):
|
|
"""
|
|
If some of the information for a user in the user config of a guild is useless, remove it.
|
|
If the whole entry for a user in the user config of a guild is useless (i.e. it contains only its username), delete it.
|
|
"""
|
|
all_users_config = self._get_user_configs(guild)
|
|
user_id_str = str(user.id)
|
|
user_config = all_users_config[user_id_str] if user_id_str in all_users_config else {}
|
|
# If the default channel name for this user is set to None, it is the same as not being set
|
|
if "channel_name" in user_config and user_config["channel_name"] is None:
|
|
del user_config["channel_name"]
|
|
# If the only information remaining is the username (or nothing), delete the entry
|
|
if list(user_config.keys()) in ([], ["__username"]):
|
|
del all_users_config[user_id_str]
|
|
logger.debug(f"[{guild.name}({guild.id})] Deleted user {user.display_name}({user.id}) configuration because it was useless")
|
|
|
|
def set_user_channel_name(self, guild: discord.Guild, user: discord.Member, channel_name: str) -> None:
|
|
"""
|
|
Set the initial name for the channels created for a specific user
|
|
"""
|
|
user_config = self._get_user_config(guild, user)
|
|
user_config["channel_name"] = channel_name
|
|
|
|
def get_user_channel_name(self, guild: discord.Guild, user: discord.Member) -> Optional[str]:
|
|
"""
|
|
:return: The initial name for the channels created for a specific user, if the user set one
|
|
"""
|
|
user_config = self._get_user_config(guild, user)
|
|
channel_name = user_config["channel_name"] if "channel_name" in user_config else None
|
|
self._clean_user_config_if_useless(guild, user)
|
|
return channel_name
|
|
|
|
def clear_user_channel_name(self, guild: discord.Guild, user: discord.Member):
|
|
"""
|
|
Remove the user preferences about the names for their channels
|
|
"""
|
|
user_config = self._get_user_config(guild, user)
|
|
user_config["channel_name"] = None
|
|
self._clean_user_config_if_useless(guild, user)
|