#!/usr/bin/env python3 import argparse import logging import sys from pathlib import Path import json from typing import TextIO, Any, List, Dict, Union import textwrap import discord import discord.utils logger = logging.getLogger("VocalMaisBot") class VocalMaisBot(discord.Client): def __init__(self, watched_channels_file: TextIO): super().__init__( intents = discord.Intents(voice_states = True, guild_messages = True, guilds = True) ) self.watched_channels_file = watched_channels_file try: self.watched_channels = json.load(watched_channels_file) except json.JSONDecodeError as e: logger.critical(f"Impossible to parse watched channels file, JSON error: {e}") sys.exit(1) self.owner = None # Fetched by on_ready async def _check_administrator_for_command(self, message: discord.Message) -> bool: if message.author.guild_permissions.administrator or message.author == self.owner: return True else: await message.channel.send(f":dragon_face: Only an administrator can control me!") return False def write_watched_channels_file(self): logger.debug("Writing watched channels information to file") self.watched_channels_file.seek(0) self.watched_channels_file.truncate() json.dump(self.watched_channels, self.watched_channels_file, indent = 2) self.watched_channels_file.flush() logger.debug("Written watched channels information to file") async def sorry_do_not_understand(self, message: discord.Message): await message.channel.send(f"Sorry I did not understand this message. Try `@{self.user.display_name} help` for help") async def on_ready(self): logger.info("Connected and ready!") logger.debug(f"Logged as {self.user}") self.owner = (await self.application_info()).owner oauth_url = discord.utils.oauth_url( self.user.id, discord.Permissions(manage_channels = True, read_messages = True, send_messages = True, move_members = True) ) logger.info(f"You can invite the bot to your server using the following link: {oauth_url}") async def on_message(self, message: discord.Message): if self.user not in message.mentions: # We only ever react to messages that mention us return if message.author.bot or message.is_system(): # We ignore messages from automated sources return contents = message.content.split() if _check_list_element(contents, 1, "help"): return await self.print_help(message.channel) elif _check_list_element(contents, 1, "ping"): return await message.channel.send(":ping_pong:") elif _check_list_element(contents, 1, "register"): return await self.register_channel(message) elif _check_list_element(contents, 1, "forget"): return await self.forget_channel(message) elif _check_list_element(contents, 1, "list"): return await self.list_watched_channels(message) elif _check_list_element(contents, 1, "clear"): return await self.clear_watched_channels(message) else: return self.sorry_do_not_understand(message) async def print_help(self, channel: discord.TextChannel): me = self.user.display_name message = f""" **·** `@{me} register channel_id` : make me watch the voice channel with id `channel_id` **·** `@{me} forget channel_id` : make me stop watching the voice channel with id `channel_id` **·** `@{me} list` : list watched channels **·** `@{me} clear` : stop watching all channels **·** `@{me} help` : print this help **·** `@{me} ping` : pong """ await channel.send(embed = discord.Embed(title = "Available commands", description = textwrap.dedent(message))) async def register_channel(self, message: discord.Message): channel_id = await self._get_channel_id_for_command(message) if channel_id is None: return if not await self._check_administrator_for_command(message): return # Retrieving the channel channel = self.get_channel(channel_id) if channel is None: return await message.channel.send(f"I could not find a channel with id {channel_id} :cry:") # Adding the channel to the list of watched channels guild_id = str(message.guild.id) if guild_id not in self.watched_channels: self.watched_channels[guild_id] = { "_name": message.guild.name, "watched_channels": [], } watched_channels_for_this_guild = self.watched_channels[guild_id]["watched_channels"] if channel_id in watched_channels_for_this_guild: return await message.channel.send(":thumbsup: I was already watching this channel") else: watched_channels_for_this_guild.append(channel_id) self.loop.call_soon(self.write_watched_channels_file) return await message.channel.send(f":white_check_mark: I am now watching {channel.name}") async def forget_channel(self, message: discord.Message): channel_id = await self._get_channel_id_for_command(message) if channel_id is None: return if not await self._check_administrator_for_command(message): return did_something = False guild_id = str(message.guild.id) if guild_id in self.watched_channels: guild_info = self.watched_channels[guild_id] if "watched_channels" in guild_info: watched_channels_ids = guild_info["watched_channels"] try: watched_channels_ids.remove(channel_id) did_something = True except ValueError: pass # The channel already was not in the list # Getting the channel name (if it exists), for pretty printing channel = self.get_channel(channel_id) if channel is None: # Channel does not exist channel_name = str(channel_id) else: channel_name = f"{channel.name} ({channel_id})" if did_something: self.write_watched_channels_file() await message.channel.send(f":white_check_mark: I am no longer watching {channel_name}") else: await message.channel.send(f":thumbsup: I already was not watching {channel_name}") async def _get_channel_id_for_command(self, message: discord.Message) -> Union[int, None]: """ Get the channel id from a command message in the form "@Bot command channel_id", or answer with an error message and return None if not possible. """ message_elements = message.content.split() if len(message_elements) < 3: await self.sorry_do_not_understand(message) return None channel_id = message_elements[2] try: return int(channel_id) except ValueError: await message.channel.send(f":x: {channel_id} can not be converted to number") return None async def list_watched_channels(self, message: discord.Message): guild_id = str(message.guild.id) if guild_id in self.watched_channels: guild_info = self.watched_channels[guild_id] if "watched_channels" in guild_info: watched_channels_id = guild_info["watched_channels"] else: watched_channels_id = [] else: watched_channels_id = [] if len(watched_channels_id) > 0: answer_lines = [] for channel_id in watched_channels_id: channel = self.get_channel(channel_id) if channel is not None: # Does the channel still exist? answer_lines.append(f":eyes: {channel.name} ({channel_id})") await message.channel.send(embed = discord.Embed(title = "I am watching the following channels", description = "\n".join(answer_lines))) else: await message.channel.send(f":see_no_evil: I am not watching any voice channel! Do not hesitate to add some by running `@{self.user.display_name} register channel_id`") async def clear_watched_channels(self, message: discord.Message): if not await self._check_administrator_for_command(message): return guild_id = str(message.guild.id) if guild_id in self.watched_channels: self.watched_channels[guild_id]["watched_channels"] = [] self.write_watched_channels_file() await message.channel.send(":white_check_mark: I am no longer watching any channel") async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): if member.bot: return if before.channel is not None: # They left a channel await self.handle_user_left_voice(before.channel) if after.channel is not None: # They joined a channel. Note that there is an if and not an elif, because they could leave a channel to join another await self.handle_user_connected_to_voice(member, after.channel) async def handle_user_connected_to_voice(self, user: discord.Member, channel: discord.VoiceChannel): guild_id = str(channel.guild.id) if guild_id not in self.watched_channels: return # We do not have an entry for this guild, so we surely do not have anything to do here if channel.id not in self.watched_channels[guild_id]["watched_channels"]: return # It's not one of our special watched join-to-create channel # The user connected to one of our special watched channels category = channel.category user_name = user.display_name user_channel = await category.create_voice_channel(f"{user_name}'s channel", overwrites = {user: discord.PermissionOverwrite(manage_channels = True)}) await user.move_to(user_channel) logger.info(f"Created channel {user_channel.name}({user_channel.id}) for {user.display_name}({user.id})") # Updating channels information guild_info = self.watched_channels[guild_id] if "created_channels" not in guild_info: guild_info["created_channels"] = [] guild_info["created_channels"].append(user_channel.id) self.loop.call_soon(self.write_watched_channels_file) async def handle_user_left_voice(self, channel: discord.VoiceChannel): guild_id = str(channel.guild.id) if guild_id not in self.watched_channels: return # We do not have an entry for this guild, so we surely do not have anything to do here guild_info = self.watched_channels[guild_id] if "created_channels" not in guild_info: return # We did not create any channel in this guild, so we do not have anything to do when a user disconnects if channel.id in guild_info["created_channels"]: # This is a temporary channel that we created if len(channel.members) > 0: return # There are still people inside it, nothing to do try: await channel.delete() except discord.NotFound: pass # The channel already does not exist, that's not a problem, that's what we want logger.info(f"Deleted channel {channel.name}({channel.id}) because it was empty") try: guild_info["created_channels"].remove(channel.id) except ValueError: pass # For some reason the value was not in the list, that's not a problem, that's what we want # We could update the json file, but we avoid it to limit the number of writes to disk. # It is not critical if this information is lost, and it will probably be update properly anyway the next time the file is written pass else: return def _check_list_element(l: List, index: int, expected_value: Any) -> bool: try: return l[index] == expected_value except IndexError: return False def _check_dict_element(d: Dict, key: Any, expected_value: Any) -> bool: try: return d[key] == expected_value except KeyError: return False if __name__ == '__main__': argparser = argparse.ArgumentParser(description = "Discord bot to automatically create temporary voice channels for users when they connect to a special channel", formatter_class = argparse.ArgumentDefaultsHelpFormatter) argparser.add_argument("-t", "--token-file", default = ".token", help = "File where the discord bot token is stored") argparser.add_argument("-w", "--watched-channels-file", default = "channels.json", help = "Used to store the list of channels that the bot watches") argparser.add_argument("-v", "--verbose", action = "store_true", help = "Print debug messages") ARGS = argparser.parse_args() logging.basicConfig( level = logging.DEBUG if ARGS.verbose else logging.INFO, format = "%(asctime)s %(name)s [%(levelname)s] %(message)s'" ) logger.info(f"Using discord.py version {discord.__version__}") token_file_path = Path(ARGS.token_file).absolute() if not token_file_path.is_file(): logger.error(f"Token file {token_file_path} does not exist") sys.exit(1) with token_file_path.open("r") as token_file: token = token_file.readline().strip() watched_channels_file_path = Path(ARGS.watched_channels_file).absolute() if not watched_channels_file_path.exists(): logger.info(f"Watched channels file {watched_channels_file_path} does not exist, creating it...") with watched_channels_file_path.open("w") as watched_channels_file: json.dump({}, watched_channels_file) with watched_channels_file_path.open("r+") as watched_channels_file: bot = VocalMaisBot(watched_channels_file) bot.run(token) bot.write_watched_channels_file() watched_channels_file.flush()