diff --git a/ChannelsConfigFile/ChannelsConfigFile.py b/ChannelsConfigFile/ChannelsConfigFile.py new file mode 100644 index 0000000..129ba8f --- /dev/null +++ b/ChannelsConfigFile/ChannelsConfigFile.py @@ -0,0 +1,160 @@ +from pathlib import Path +from typing import Union, Dict, List +import logging +import json +import atexit +import discord + +import utils + +logger = logging.getLogger(__name__) + + +class ChannelsConfigFile: + """ + Wrapper for the channels configuration file, telling which channels to watch and which channels were created by the bot. + """ + + def __init__(self, config_file_path: Union[str, Path]): + self.config_file = None # File descriptor to the on-disk config file + self.config = None # In-memory configuration + + # Loading configuration from disk + logger.debug("Loading channels configuration file") + config_file_path = Path(config_file_path).absolute() + if config_file_path.exists(): + if not config_file_path.is_file(): + raise ValueError(f"Config file {config_file_path} exists but is not a regular file") + + self.config_file = config_file_path.open("r+") + + if config_file_path.stat().st_size == 0: # Config file is empty + logger.warning(f"Config file {config_file_path} is empty, using empty config") + self.config = {} + self.save_to_file() # So that the file is proper json + else: # Config file is not empty + self.reload_from_disk() + else: # Config file does not exist + self.config_file = config_file_path.open("w+") + self.config = {} + self.save_to_file() # So that the file is proper json + + # Verifying that attributes have been initialised properly + assert self.config_file is not None and self.config is not None + logger.debug("Channels configuration file successfully initialised") + + atexit.register(self.save_to_file) + + def reload_from_disk(self) -> None: + """ + Reload the configuration from the on-disk file + """ + logger.debug("Loading channels configuration file from disk") + self.config_file.seek(0) # Moving to beginning of file + try: + self.config = json.load(self.config_file) + except json.JSONDecodeError as e: + logger.critical(f"JSON Error when parsing channels config file: {e}") + raise e + logger.debug("Loaded channels configuration file from disk") + + def save_to_file(self, indent = 2) -> None: + """ + Save the configuration to the on-fisk file + :param indent: Indentation to use for pretty-printing json + """ + logger.debug("Writing channels configuration file to disk") + self.config_file.seek(0) + self.config_file.truncate() + json.dump(self.config, self.config_file, indent = indent) + self.config_file.flush() + logger.debug("Written channels configuration file to disk") + + def _get_guild_configuration(self, guild: discord.Guild) -> Dict: + str_guild_id = str(guild.id) + if str_guild_id not in self.config: + self.config[str_guild_id] = { + "_name": guild.name, + "watched_channels": [], + "created_channels": [], + } + return self.config[str_guild_id] + + def get_watched_channels_ids(self, guild: discord.Guild) -> List[int]: + """ + :return: The list of IDs of watched channels for the given guild. Beware that the returned list is not a copy, it should not be modified. + """ + guild_configuration = self._get_guild_configuration(guild) + if "watched_channels" not in guild_configuration: + guild_configuration["watched_channels"] = [] + return guild_configuration["watched_channels"] + + def add_channel_to_watched(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]) -> bool: + """ + Add a voice channel to the list of channels that the bot should watch + :return: True if the channel has been added, False if it was already present + """ + watched_channels = self.get_watched_channels_ids(guild) + if channel.id in watched_channels: + return False + else: + watched_channels.append(channel.id) + self.save_to_file() + return True + + def remove_channel_from_watched(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]) -> bool: + """ + Attempt to remove a voice channel from the list of channels that the bot should watch + :return: True if the channel was in the list and has been removed, False if it was not in the list + """ + watched_channels = self.get_watched_channels_ids(guild) + size_before = len(watched_channels) + utils.list_remove_all_occurrences(watched_channels, channel.id) # Removing all occurrences of the channel, just in case + if len(watched_channels) != size_before: # Size differs: something must have been removed + self.save_to_file() + return True + else: # Same size: list has not been modified + return False + + def clear_watched_channels(self, guild: discord.Guild) -> None: + """ + Remove all channels from the watched channels list + """ + self.get_watched_channels_ids(guild).clear() + self.save_to_file() + + def is_watched(self, guild: discord.Guild, channel: discord.VoiceChannel) -> bool: + """ + :return: Whether the given channel is watched + """ + return channel.id in self.get_watched_channels_ids(guild) + + def _get_created_channels(self, guild: discord.Guild) -> List[int]: + guild_configuration = self._get_guild_configuration(guild) + if "created_channels" not in guild_configuration: + guild_configuration["created_channels"] = [] + return guild_configuration["created_channels"] + + def add_channel_to_created(self, guild: discord.Guild, channel: discord.VoiceChannel) -> None: + """ + Add a voice channel to the list of channels that the bot has created and manages. + """ + self._get_created_channels(guild).append(channel.id) + self.save_to_file() + + def remove_channel_from_created(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]): + """ + Remove a voice channel from the list of channels that have been created by the bot + """ + created_channels = self._get_created_channels(guild) + utils.list_remove_all_occurrences(created_channels, channel.id) + # We could update the on-disk file, but we avoid it to limit the number of writes to disk. + # It is not critical if this information is lost, and it will probably be updated properly anyway the next time the file is written + # self.save_to_file() + pass + + def is_created(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]): + """ + :return: Whether the given channel is one of the channels that have been created by the bot + """ + return channel.id in self._get_created_channels(guild) diff --git a/ChannelsConfigFile/__init__.py b/ChannelsConfigFile/__init__.py new file mode 100644 index 0000000..60bde24 --- /dev/null +++ b/ChannelsConfigFile/__init__.py @@ -0,0 +1 @@ +from .ChannelsConfigFile import ChannelsConfigFile diff --git a/VocalMaisBot.py b/VocalMaisBot.py index 083033e..8bf50b3 100755 --- a/VocalMaisBot.py +++ b/VocalMaisBot.py @@ -2,29 +2,25 @@ import argparse import logging import sys -from pathlib import Path -import json -from typing import TextIO, Any, List, Dict, Union import textwrap +from pathlib import Path +from typing import Any, List, Dict, Union import discord import discord.utils +from ChannelsConfigFile import ChannelsConfigFile + logger = logging.getLogger("VocalMaisBot") class VocalMaisBot(discord.Client): - def __init__(self, watched_channels_file: TextIO): + def __init__(self, channels_config: ChannelsConfigFile): super().__init__( intents = discord.Intents(voice_states = True, guild_messages = True, guilds = True) ) - self.watched_channels_file = watched_channels_file - try: - self.watched_channels = json.load(watched_channels_file) - except json.JSONDecodeError as e: - logger.critical(f"Impossible to parse watched channels file, JSON error: {e}") - sys.exit(1) + self.channels_config = channels_config self.owner = None # Fetched by on_ready async def _check_administrator_for_command(self, message: discord.Message) -> bool: @@ -34,14 +30,6 @@ class VocalMaisBot(discord.Client): await message.channel.send(f":dragon_face: Only an administrator can control me!") return False - def write_watched_channels_file(self): - logger.debug("Writing watched channels information to file") - self.watched_channels_file.seek(0) - self.watched_channels_file.truncate() - json.dump(self.watched_channels, self.watched_channels_file, indent = 2) - self.watched_channels_file.flush() - logger.debug("Written watched channels information to file") - async def sorry_do_not_understand(self, message: discord.Message): await message.channel.send(f"Sorry I did not understand this message. Try `@{self.user.display_name} help` for help") @@ -100,24 +88,15 @@ class VocalMaisBot(discord.Client): # Retrieving the channel channel = self.get_channel(channel_id) - if channel is None: + if channel is None or channel.guild != message.guild: return await message.channel.send(f"I could not find a channel with id {channel_id} :cry:") - # Adding the channel to the list of watched channels - guild_id = str(message.guild.id) - if guild_id not in self.watched_channels: - self.watched_channels[guild_id] = { - "_name": message.guild.name, - "watched_channels": [], - } - watched_channels_for_this_guild = self.watched_channels[guild_id]["watched_channels"] - - if channel_id in watched_channels_for_this_guild: - return await message.channel.send(":thumbsup: I was already watching this channel") + # Adding the channel to the list of watched channels (if needed) + has_been_added = self.channels_config.add_channel_to_watched(message.guild, channel) + if has_been_added: + await message.channel.send(f":white_check_mark: I am now watching {channel.name}") else: - watched_channels_for_this_guild.append(channel_id) - self.loop.call_soon(self.write_watched_channels_file) - return await message.channel.send(f":white_check_mark: I am now watching {channel.name}") + await message.channel.send(":thumbsup: I was already watching this channel") async def forget_channel(self, message: discord.Message): channel_id = await self._get_channel_id_for_command(message) @@ -126,27 +105,15 @@ class VocalMaisBot(discord.Client): if not await self._check_administrator_for_command(message): return - did_something = False - guild_id = str(message.guild.id) - if guild_id in self.watched_channels: - guild_info = self.watched_channels[guild_id] - if "watched_channels" in guild_info: - watched_channels_ids = guild_info["watched_channels"] - try: - watched_channels_ids.remove(channel_id) - did_something = True - except ValueError: - pass # The channel already was not in the list - - # Getting the channel name (if it exists), for pretty printing channel = self.get_channel(channel_id) - if channel is None: # Channel does not exist + if channel is None: # The channel does not exist (maybe it has been deleted) + channel = discord.Object(channel_id) channel_name = str(channel_id) else: channel_name = f"{channel.name} ({channel_id})" - if did_something: - self.write_watched_channels_file() + has_been_removed = self.channels_config.remove_channel_from_watched(message.guild, channel) + if has_been_removed: await message.channel.send(f":white_check_mark: I am no longer watching {channel_name}") else: await message.channel.send(f":thumbsup: I already was not watching {channel_name}") @@ -167,22 +134,16 @@ class VocalMaisBot(discord.Client): return None async def list_watched_channels(self, message: discord.Message): - guild_id = str(message.guild.id) - if guild_id in self.watched_channels: - guild_info = self.watched_channels[guild_id] - if "watched_channels" in guild_info: - watched_channels_id = guild_info["watched_channels"] - else: - watched_channels_id = [] - else: - watched_channels_id = [] - - if len(watched_channels_id) > 0: + watched_channels_ids = self.channels_config.get_watched_channels_ids(message.guild) + if len(watched_channels_ids) > 0: answer_lines = [] - for channel_id in watched_channels_id: + for channel_id in watched_channels_ids: channel = self.get_channel(channel_id) if channel is not None: # Does the channel still exist? answer_lines.append(f":eyes: {channel.name} ({channel_id})") + else: + # We just ignore non-existing channels + pass await message.channel.send(embed = discord.Embed(title = "I am watching the following channels", description = "\n".join(answer_lines))) else: await message.channel.send(f":see_no_evil: I am not watching any voice channel! Do not hesitate to add some by running `@{self.user.display_name} register channel_id`") @@ -191,66 +152,49 @@ class VocalMaisBot(discord.Client): if not await self._check_administrator_for_command(message): return - guild_id = str(message.guild.id) - if guild_id in self.watched_channels: - self.watched_channels[guild_id]["watched_channels"] = [] - self.write_watched_channels_file() + self.channels_config.clear_watched_channels(message.guild) await message.channel.send(":white_check_mark: I am no longer watching any channel") async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): - if member.bot: - return if before.channel is not None: # They left a channel await self.handle_user_left_voice(before.channel) if after.channel is not None: # They joined a channel. Note that there is an if and not an elif, because they could leave a channel to join another await self.handle_user_connected_to_voice(member, after.channel) async def handle_user_connected_to_voice(self, user: discord.Member, channel: discord.VoiceChannel): - guild_id = str(channel.guild.id) - if guild_id not in self.watched_channels: - return # We do not have an entry for this guild, so we surely do not have anything to do here - if channel.id not in self.watched_channels[guild_id]["watched_channels"]: - return # It's not one of our special watched join-to-create channel + if user.bot: + return # We ignore bots connecting to channels, we do not want them asking us to create voice channels + if not self.channels_config.is_watched(channel.guild, channel): + return # It's not one of our special watched join-to-create channels - # The user connected to one of our special watched channels + # The user is not a bot and connected to one of our special watched channels + # We create a channel for them and move them into it category = channel.category user_name = user.display_name user_channel = await category.create_voice_channel(f"{user_name}'s channel", overwrites = {user: discord.PermissionOverwrite(manage_channels = True)}) await user.move_to(user_channel) logger.info(f"Created channel {user_channel.name}({user_channel.id}) for {user.display_name}({user.id})") - # Updating channels information - guild_info = self.watched_channels[guild_id] - if "created_channels" not in guild_info: - guild_info["created_channels"] = [] - guild_info["created_channels"].append(user_channel.id) - self.loop.call_soon(self.write_watched_channels_file) + # Saving the channel in the configuration + self.channels_config.add_channel_to_created(user_channel.guild, user_channel) async def handle_user_left_voice(self, channel: discord.VoiceChannel): - guild_id = str(channel.guild.id) - if guild_id not in self.watched_channels: - return # We do not have an entry for this guild, so we surely do not have anything to do here - guild_info = self.watched_channels[guild_id] - if "created_channels" not in guild_info: - return # We did not create any channel in this guild, so we do not have anything to do when a user disconnects + # We do not verify if the user is a bot, because a bot could be the last one to leave a channel (e.g. music bot) - if channel.id in guild_info["created_channels"]: # This is a temporary channel that we created - if len(channel.members) > 0: - return # There are still people inside it, nothing to do - try: - await channel.delete() - except discord.NotFound: - pass # The channel already does not exist, that's not a problem, that's what we want - logger.info(f"Deleted channel {channel.name}({channel.id}) because it was empty") - try: - guild_info["created_channels"].remove(channel.id) - except ValueError: - pass # For some reason the value was not in the list, that's not a problem, that's what we want - # We could update the json file, but we avoid it to limit the number of writes to disk. - # It is not critical if this information is lost, and it will probably be update properly anyway the next time the file is written - pass - else: - return + if not self.channels_config.is_created(channel.guild, channel): + return # It is not a channel that we manage, nothing to do + + logger.debug(f"User left temporary channel {channel.name} ({channel.id})") + # It is a channel that we manage + if len(channel.members) > 0: + return # There are still people inside it, nothing to do + try: + await channel.delete() + except discord.NotFound: + pass # The channel already does not exist, that's not a problem, that's what we want + logger.info(f"Deleted channel {channel.name}({channel.id}) because it was empty") + # updating the configuration + self.channels_config.remove_channel_from_created(channel.guild, channel) def _check_list_element(l: List, index: int, expected_value: Any) -> bool: @@ -289,14 +233,6 @@ if __name__ == '__main__': with token_file_path.open("r") as token_file: token = token_file.readline().strip() - watched_channels_file_path = Path(ARGS.watched_channels_file).absolute() - if not watched_channels_file_path.exists(): - logger.info(f"Watched channels file {watched_channels_file_path} does not exist, creating it...") - with watched_channels_file_path.open("w") as watched_channels_file: - json.dump({}, watched_channels_file) - - with watched_channels_file_path.open("r+") as watched_channels_file: - bot = VocalMaisBot(watched_channels_file) - bot.run(token) - bot.write_watched_channels_file() - watched_channels_file.flush() + channels_config_file = ChannelsConfigFile(ARGS.watched_channels_file) + bot = VocalMaisBot(channels_config_file) + bot.run(token) diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..d3b31da --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,15 @@ +from typing import List, Any + + +def list_remove_all_occurrences(l: List[Any], value: Any) -> None: + """ + Remove all occurrences of a given value from a list **in place** + + See https://stackoverflow.com/a/48253792 + """ + i = -1 + # Shift elements that should not be removed + for i, v in enumerate(filter(value.__ne__, l)): + l[i] = v + # Remove the tail + del l[i + 1: len(l)]