Created class to handle channels configuration file management

This commit is contained in:
Elnath 2020-12-28 23:27:28 +01:00
parent cd87a5cc72
commit 8b19a73fb2
4 changed files with 225 additions and 113 deletions

View File

@ -0,0 +1,160 @@
from pathlib import Path
from typing import Union, Dict, List
import logging
import json
import atexit
import discord
import utils
logger = logging.getLogger(__name__)
class ChannelsConfigFile:
"""
Wrapper for the channels configuration file, telling which channels to watch and which channels were created by the bot.
"""
def __init__(self, config_file_path: Union[str, Path]):
self.config_file = None # File descriptor to the on-disk config file
self.config = None # In-memory configuration
# Loading configuration from disk
logger.debug("Loading channels configuration file")
config_file_path = Path(config_file_path).absolute()
if config_file_path.exists():
if not config_file_path.is_file():
raise ValueError(f"Config file {config_file_path} exists but is not a regular file")
self.config_file = config_file_path.open("r+")
if config_file_path.stat().st_size == 0: # Config file is empty
logger.warning(f"Config file {config_file_path} is empty, using empty config")
self.config = {}
self.save_to_file() # So that the file is proper json
else: # Config file is not empty
self.reload_from_disk()
else: # Config file does not exist
self.config_file = config_file_path.open("w+")
self.config = {}
self.save_to_file() # So that the file is proper json
# Verifying that attributes have been initialised properly
assert self.config_file is not None and self.config is not None
logger.debug("Channels configuration file successfully initialised")
atexit.register(self.save_to_file)
def reload_from_disk(self) -> None:
"""
Reload the configuration from the on-disk file
"""
logger.debug("Loading channels configuration file from disk")
self.config_file.seek(0) # Moving to beginning of file
try:
self.config = json.load(self.config_file)
except json.JSONDecodeError as e:
logger.critical(f"JSON Error when parsing channels config file: {e}")
raise e
logger.debug("Loaded channels configuration file from disk")
def save_to_file(self, indent = 2) -> None:
"""
Save the configuration to the on-fisk file
:param indent: Indentation to use for pretty-printing json
"""
logger.debug("Writing channels configuration file to disk")
self.config_file.seek(0)
self.config_file.truncate()
json.dump(self.config, self.config_file, indent = indent)
self.config_file.flush()
logger.debug("Written channels configuration file to disk")
def _get_guild_configuration(self, guild: discord.Guild) -> Dict:
str_guild_id = str(guild.id)
if str_guild_id not in self.config:
self.config[str_guild_id] = {
"_name": guild.name,
"watched_channels": [],
"created_channels": [],
}
return self.config[str_guild_id]
def get_watched_channels_ids(self, guild: discord.Guild) -> List[int]:
"""
:return: The list of IDs of watched channels for the given guild. Beware that the returned list is not a copy, it should not be modified.
"""
guild_configuration = self._get_guild_configuration(guild)
if "watched_channels" not in guild_configuration:
guild_configuration["watched_channels"] = []
return guild_configuration["watched_channels"]
def add_channel_to_watched(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]) -> bool:
"""
Add a voice channel to the list of channels that the bot should watch
:return: True if the channel has been added, False if it was already present
"""
watched_channels = self.get_watched_channels_ids(guild)
if channel.id in watched_channels:
return False
else:
watched_channels.append(channel.id)
self.save_to_file()
return True
def remove_channel_from_watched(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]) -> bool:
"""
Attempt to remove a voice channel from the list of channels that the bot should watch
:return: True if the channel was in the list and has been removed, False if it was not in the list
"""
watched_channels = self.get_watched_channels_ids(guild)
size_before = len(watched_channels)
utils.list_remove_all_occurrences(watched_channels, channel.id) # Removing all occurrences of the channel, just in case
if len(watched_channels) != size_before: # Size differs: something must have been removed
self.save_to_file()
return True
else: # Same size: list has not been modified
return False
def clear_watched_channels(self, guild: discord.Guild) -> None:
"""
Remove all channels from the watched channels list
"""
self.get_watched_channels_ids(guild).clear()
self.save_to_file()
def is_watched(self, guild: discord.Guild, channel: discord.VoiceChannel) -> bool:
"""
:return: Whether the given channel is watched
"""
return channel.id in self.get_watched_channels_ids(guild)
def _get_created_channels(self, guild: discord.Guild) -> List[int]:
guild_configuration = self._get_guild_configuration(guild)
if "created_channels" not in guild_configuration:
guild_configuration["created_channels"] = []
return guild_configuration["created_channels"]
def add_channel_to_created(self, guild: discord.Guild, channel: discord.VoiceChannel) -> None:
"""
Add a voice channel to the list of channels that the bot has created and manages.
"""
self._get_created_channels(guild).append(channel.id)
self.save_to_file()
def remove_channel_from_created(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]):
"""
Remove a voice channel from the list of channels that have been created by the bot
"""
created_channels = self._get_created_channels(guild)
utils.list_remove_all_occurrences(created_channels, channel.id)
# We could update the on-disk file, but we avoid it to limit the number of writes to disk.
# It is not critical if this information is lost, and it will probably be updated properly anyway the next time the file is written
# self.save_to_file()
pass
def is_created(self, guild: discord.Guild, channel: Union[discord.VoiceChannel, discord.Object]):
"""
:return: Whether the given channel is one of the channels that have been created by the bot
"""
return channel.id in self._get_created_channels(guild)

View File

@ -0,0 +1 @@
from .ChannelsConfigFile import ChannelsConfigFile

View File

@ -2,29 +2,25 @@
import argparse import argparse
import logging import logging
import sys import sys
from pathlib import Path
import json
from typing import TextIO, Any, List, Dict, Union
import textwrap import textwrap
from pathlib import Path
from typing import Any, List, Dict, Union
import discord import discord
import discord.utils import discord.utils
from ChannelsConfigFile import ChannelsConfigFile
logger = logging.getLogger("VocalMaisBot") logger = logging.getLogger("VocalMaisBot")
class VocalMaisBot(discord.Client): class VocalMaisBot(discord.Client):
def __init__(self, watched_channels_file: TextIO): def __init__(self, channels_config: ChannelsConfigFile):
super().__init__( super().__init__(
intents = discord.Intents(voice_states = True, guild_messages = True, guilds = True) intents = discord.Intents(voice_states = True, guild_messages = True, guilds = True)
) )
self.watched_channels_file = watched_channels_file self.channels_config = channels_config
try:
self.watched_channels = json.load(watched_channels_file)
except json.JSONDecodeError as e:
logger.critical(f"Impossible to parse watched channels file, JSON error: {e}")
sys.exit(1)
self.owner = None # Fetched by on_ready self.owner = None # Fetched by on_ready
async def _check_administrator_for_command(self, message: discord.Message) -> bool: async def _check_administrator_for_command(self, message: discord.Message) -> bool:
@ -34,14 +30,6 @@ class VocalMaisBot(discord.Client):
await message.channel.send(f":dragon_face: Only an administrator can control me!") await message.channel.send(f":dragon_face: Only an administrator can control me!")
return False return False
def write_watched_channels_file(self):
logger.debug("Writing watched channels information to file")
self.watched_channels_file.seek(0)
self.watched_channels_file.truncate()
json.dump(self.watched_channels, self.watched_channels_file, indent = 2)
self.watched_channels_file.flush()
logger.debug("Written watched channels information to file")
async def sorry_do_not_understand(self, message: discord.Message): async def sorry_do_not_understand(self, message: discord.Message):
await message.channel.send(f"Sorry I did not understand this message. Try `@{self.user.display_name} help` for help") await message.channel.send(f"Sorry I did not understand this message. Try `@{self.user.display_name} help` for help")
@ -100,24 +88,15 @@ class VocalMaisBot(discord.Client):
# Retrieving the channel # Retrieving the channel
channel = self.get_channel(channel_id) channel = self.get_channel(channel_id)
if channel is None: if channel is None or channel.guild != message.guild:
return await message.channel.send(f"I could not find a channel with id {channel_id} :cry:") return await message.channel.send(f"I could not find a channel with id {channel_id} :cry:")
# Adding the channel to the list of watched channels # Adding the channel to the list of watched channels (if needed)
guild_id = str(message.guild.id) has_been_added = self.channels_config.add_channel_to_watched(message.guild, channel)
if guild_id not in self.watched_channels: if has_been_added:
self.watched_channels[guild_id] = { await message.channel.send(f":white_check_mark: I am now watching {channel.name}")
"_name": message.guild.name,
"watched_channels": [],
}
watched_channels_for_this_guild = self.watched_channels[guild_id]["watched_channels"]
if channel_id in watched_channels_for_this_guild:
return await message.channel.send(":thumbsup: I was already watching this channel")
else: else:
watched_channels_for_this_guild.append(channel_id) await message.channel.send(":thumbsup: I was already watching this channel")
self.loop.call_soon(self.write_watched_channels_file)
return await message.channel.send(f":white_check_mark: I am now watching {channel.name}")
async def forget_channel(self, message: discord.Message): async def forget_channel(self, message: discord.Message):
channel_id = await self._get_channel_id_for_command(message) channel_id = await self._get_channel_id_for_command(message)
@ -126,27 +105,15 @@ class VocalMaisBot(discord.Client):
if not await self._check_administrator_for_command(message): if not await self._check_administrator_for_command(message):
return return
did_something = False
guild_id = str(message.guild.id)
if guild_id in self.watched_channels:
guild_info = self.watched_channels[guild_id]
if "watched_channels" in guild_info:
watched_channels_ids = guild_info["watched_channels"]
try:
watched_channels_ids.remove(channel_id)
did_something = True
except ValueError:
pass # The channel already was not in the list
# Getting the channel name (if it exists), for pretty printing
channel = self.get_channel(channel_id) channel = self.get_channel(channel_id)
if channel is None: # Channel does not exist if channel is None: # The channel does not exist (maybe it has been deleted)
channel = discord.Object(channel_id)
channel_name = str(channel_id) channel_name = str(channel_id)
else: else:
channel_name = f"{channel.name} ({channel_id})" channel_name = f"{channel.name} ({channel_id})"
if did_something: has_been_removed = self.channels_config.remove_channel_from_watched(message.guild, channel)
self.write_watched_channels_file() if has_been_removed:
await message.channel.send(f":white_check_mark: I am no longer watching {channel_name}") await message.channel.send(f":white_check_mark: I am no longer watching {channel_name}")
else: else:
await message.channel.send(f":thumbsup: I already was not watching {channel_name}") await message.channel.send(f":thumbsup: I already was not watching {channel_name}")
@ -167,22 +134,16 @@ class VocalMaisBot(discord.Client):
return None return None
async def list_watched_channels(self, message: discord.Message): async def list_watched_channels(self, message: discord.Message):
guild_id = str(message.guild.id) watched_channels_ids = self.channels_config.get_watched_channels_ids(message.guild)
if guild_id in self.watched_channels: if len(watched_channels_ids) > 0:
guild_info = self.watched_channels[guild_id]
if "watched_channels" in guild_info:
watched_channels_id = guild_info["watched_channels"]
else:
watched_channels_id = []
else:
watched_channels_id = []
if len(watched_channels_id) > 0:
answer_lines = [] answer_lines = []
for channel_id in watched_channels_id: for channel_id in watched_channels_ids:
channel = self.get_channel(channel_id) channel = self.get_channel(channel_id)
if channel is not None: # Does the channel still exist? if channel is not None: # Does the channel still exist?
answer_lines.append(f":eyes: {channel.name} ({channel_id})") answer_lines.append(f":eyes: {channel.name} ({channel_id})")
else:
# We just ignore non-existing channels
pass
await message.channel.send(embed = discord.Embed(title = "I am watching the following channels", description = "\n".join(answer_lines))) await message.channel.send(embed = discord.Embed(title = "I am watching the following channels", description = "\n".join(answer_lines)))
else: else:
await message.channel.send(f":see_no_evil: I am not watching any voice channel! Do not hesitate to add some by running `@{self.user.display_name} register channel_id`") await message.channel.send(f":see_no_evil: I am not watching any voice channel! Do not hesitate to add some by running `@{self.user.display_name} register channel_id`")
@ -191,50 +152,40 @@ class VocalMaisBot(discord.Client):
if not await self._check_administrator_for_command(message): if not await self._check_administrator_for_command(message):
return return
guild_id = str(message.guild.id) self.channels_config.clear_watched_channels(message.guild)
if guild_id in self.watched_channels:
self.watched_channels[guild_id]["watched_channels"] = []
self.write_watched_channels_file()
await message.channel.send(":white_check_mark: I am no longer watching any channel") await message.channel.send(":white_check_mark: I am no longer watching any channel")
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
if member.bot:
return
if before.channel is not None: # They left a channel if before.channel is not None: # They left a channel
await self.handle_user_left_voice(before.channel) await self.handle_user_left_voice(before.channel)
if after.channel is not None: # They joined a channel. Note that there is an if and not an elif, because they could leave a channel to join another if after.channel is not None: # They joined a channel. Note that there is an if and not an elif, because they could leave a channel to join another
await self.handle_user_connected_to_voice(member, after.channel) await self.handle_user_connected_to_voice(member, after.channel)
async def handle_user_connected_to_voice(self, user: discord.Member, channel: discord.VoiceChannel): async def handle_user_connected_to_voice(self, user: discord.Member, channel: discord.VoiceChannel):
guild_id = str(channel.guild.id) if user.bot:
if guild_id not in self.watched_channels: return # We ignore bots connecting to channels, we do not want them asking us to create voice channels
return # We do not have an entry for this guild, so we surely do not have anything to do here if not self.channels_config.is_watched(channel.guild, channel):
if channel.id not in self.watched_channels[guild_id]["watched_channels"]: return # It's not one of our special watched join-to-create channels
return # It's not one of our special watched join-to-create channel
# The user connected to one of our special watched channels # The user is not a bot and connected to one of our special watched channels
# We create a channel for them and move them into it
category = channel.category category = channel.category
user_name = user.display_name user_name = user.display_name
user_channel = await category.create_voice_channel(f"{user_name}'s channel", overwrites = {user: discord.PermissionOverwrite(manage_channels = True)}) user_channel = await category.create_voice_channel(f"{user_name}'s channel", overwrites = {user: discord.PermissionOverwrite(manage_channels = True)})
await user.move_to(user_channel) await user.move_to(user_channel)
logger.info(f"Created channel {user_channel.name}({user_channel.id}) for {user.display_name}({user.id})") logger.info(f"Created channel {user_channel.name}({user_channel.id}) for {user.display_name}({user.id})")
# Updating channels information # Saving the channel in the configuration
guild_info = self.watched_channels[guild_id] self.channels_config.add_channel_to_created(user_channel.guild, user_channel)
if "created_channels" not in guild_info:
guild_info["created_channels"] = []
guild_info["created_channels"].append(user_channel.id)
self.loop.call_soon(self.write_watched_channels_file)
async def handle_user_left_voice(self, channel: discord.VoiceChannel): async def handle_user_left_voice(self, channel: discord.VoiceChannel):
guild_id = str(channel.guild.id) # We do not verify if the user is a bot, because a bot could be the last one to leave a channel (e.g. music bot)
if guild_id not in self.watched_channels:
return # We do not have an entry for this guild, so we surely do not have anything to do here
guild_info = self.watched_channels[guild_id]
if "created_channels" not in guild_info:
return # We did not create any channel in this guild, so we do not have anything to do when a user disconnects
if channel.id in guild_info["created_channels"]: # This is a temporary channel that we created if not self.channels_config.is_created(channel.guild, channel):
return # It is not a channel that we manage, nothing to do
logger.debug(f"User left temporary channel {channel.name} ({channel.id})")
# It is a channel that we manage
if len(channel.members) > 0: if len(channel.members) > 0:
return # There are still people inside it, nothing to do return # There are still people inside it, nothing to do
try: try:
@ -242,15 +193,8 @@ class VocalMaisBot(discord.Client):
except discord.NotFound: except discord.NotFound:
pass # The channel already does not exist, that's not a problem, that's what we want pass # The channel already does not exist, that's not a problem, that's what we want
logger.info(f"Deleted channel {channel.name}({channel.id}) because it was empty") logger.info(f"Deleted channel {channel.name}({channel.id}) because it was empty")
try: # updating the configuration
guild_info["created_channels"].remove(channel.id) self.channels_config.remove_channel_from_created(channel.guild, channel)
except ValueError:
pass # For some reason the value was not in the list, that's not a problem, that's what we want
# We could update the json file, but we avoid it to limit the number of writes to disk.
# It is not critical if this information is lost, and it will probably be update properly anyway the next time the file is written
pass
else:
return
def _check_list_element(l: List, index: int, expected_value: Any) -> bool: def _check_list_element(l: List, index: int, expected_value: Any) -> bool:
@ -289,14 +233,6 @@ if __name__ == '__main__':
with token_file_path.open("r") as token_file: with token_file_path.open("r") as token_file:
token = token_file.readline().strip() token = token_file.readline().strip()
watched_channels_file_path = Path(ARGS.watched_channels_file).absolute() channels_config_file = ChannelsConfigFile(ARGS.watched_channels_file)
if not watched_channels_file_path.exists(): bot = VocalMaisBot(channels_config_file)
logger.info(f"Watched channels file {watched_channels_file_path} does not exist, creating it...")
with watched_channels_file_path.open("w") as watched_channels_file:
json.dump({}, watched_channels_file)
with watched_channels_file_path.open("r+") as watched_channels_file:
bot = VocalMaisBot(watched_channels_file)
bot.run(token) bot.run(token)
bot.write_watched_channels_file()
watched_channels_file.flush()

15
utils/__init__.py Normal file
View File

@ -0,0 +1,15 @@
from typing import List, Any
def list_remove_all_occurrences(l: List[Any], value: Any) -> None:
"""
Remove all occurrences of a given value from a list **in place**
See https://stackoverflow.com/a/48253792
"""
i = -1
# Shift elements that should not be removed
for i, v in enumerate(filter(value.__ne__, l)):
l[i] = v
# Remove the tail
del l[i + 1: len(l)]