#!/usr/bin/env python3 import argparse import logging import sys from pathlib import Path import json from typing import TextIO, Any, List, Dict import textwrap import discord import discord.utils logger = logging.getLogger("VocalMaisBot") class VocalMaisBot(discord.Client): def __init__(self, watched_channels_file: TextIO): super().__init__( intents = discord.Intents(voice_states = True, guild_messages = True, guilds = True) ) self.watched_channels_file = watched_channels_file try: self.watched_channels = json.load(watched_channels_file) except json.JSONDecodeError as e: logger.critical(f"Impossible to parse watched channels file, JSON error: {e}") sys.exit(1) async def on_ready(self): logger.info("Connected and ready!") logger.debug(f"Logged as {self.user}") oauth_url = discord.utils.oauth_url( self.user.id, discord.Permissions(manage_channels = True, read_messages = True, send_messages = True, move_members = True) ) logger.info(f"You can invite the bot to your server using the following link: {oauth_url}") async def on_message(self, message: discord.Message): if self.user not in message.mentions: # We only ever react to messages that mention us return if message.author.bot or message.is_system(): # We ignore messages from automated sources return contents = message.content.split() if _check_list_element(contents, 1, "help"): return await self.print_help(message.channel) elif _check_list_element(contents, 1, "ping"): return await message.channel.send(":ping_pong:") elif _check_list_element(contents, 1, "register"): return await self.register_channel(message) else: return self.sorry_do_not_understand(message) async def sorry_do_not_understand(self, message: discord.Message): await message.channel.send(f"Sorry I did not understand this message. Try `@{self.user.display_name} help` for help") async def print_help(self, channel: discord.TextChannel): me = self.user.display_name message = f""" **·** `@{me}register channel_id` : make me watch the voice channel with id `channel_id` **·** `@{me} help` : print this help **·** `@{me} ping` : pong """ await channel.send(embed = discord.Embed(title = "Available commands", description = textwrap.dedent(message))) async def register_channel(self, message: discord.Message): message_elements = message.content.split() if len(message_elements) < 3: return await self.sorry_do_not_understand(message) channel_id = message_elements[2] try: channel_id = int(channel_id) except ValueError: return await message.channel.send(f":x: {channel_id} can not be converted to number") # Retrieving the channel channel = self.get_channel(channel_id) if channel is None: return await message.channel.send(f"I could not find a channel with id {channel_id} :cry:") # Adding the channel to the list of watched channels guild_id = str(message.guild.id) if guild_id not in self.watched_channels: self.watched_channels[guild_id] = { "_name": message.guild.name, "watched_channels": [], } watched_channels_for_this_guild = self.watched_channels[guild_id]["watched_channels"] if channel_id in watched_channels_for_this_guild: return await message.channel.send(":thumbsup: I was already watching this channel") else: watched_channels_for_this_guild.append(channel_id) self.loop.call_soon(self.write_watched_channels_file) return await message.channel.send(f":white_check_mark: I am now watching {channel.name}") def write_watched_channels_file(self): logger.debug("Writing watched channels information to file") self.watched_channels_file.seek(0) self.watched_channels_file.truncate() json.dump(self.watched_channels, self.watched_channels_file, indent = 2) self.watched_channels_file.flush() logger.debug("Written watched channels information to file") async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): if member.bot: return if before.channel is not None: # They left a channel await self.handle_user_left_voice(before.channel) if after.channel is not None: # They joined a channel. Note that there is an if and not an elif, because they could leave a channel to join another await self.handle_user_connected_to_voice(member, after.channel) async def handle_user_connected_to_voice(self, user: discord.Member, channel: discord.VoiceChannel): guild_id = str(channel.guild.id) if guild_id not in self.watched_channels: return # We do not have an entry for this guild, so we surely do not have anything to do here if channel.id not in self.watched_channels[guild_id]["watched_channels"]: return # It's not one of our special watched join-to-create channel # The user connected to one of our special watched channels category = channel.category user_name = user.display_name user_channel = await category.create_voice_channel(f"{user_name}'s channel", overwrites = {user: discord.PermissionOverwrite(manage_channels = True)}) await user.move_to(user_channel) logger.info(f"Created channel {user_channel.name}({user_channel.id}) for {user.display_name}({user.id})") # Updating channels information guild_info = self.watched_channels[guild_id] if "created_channels" not in guild_info: guild_info["created_channels"] = [] guild_info["created_channels"].append(user_channel.id) self.loop.call_soon(self.write_watched_channels_file) async def handle_user_left_voice(self, channel: discord.VoiceChannel): guild_id = str(channel.guild.id) if guild_id not in self.watched_channels: return # We do not have an entry for this guild, so we surely do not have anything to do here guild_info = self.watched_channels[guild_id] if "created_channels" not in guild_info: return # We did not create any channel in this guild, so we do not have anything to do when a user disconnects if channel.id in guild_info["created_channels"]: # This is a temporary channel that we created if len(channel.members) > 0: return # There are still people inside it, nothing to do try: await channel.delete() except discord.NotFound: pass # The channel already does not exist, that's not a problem, that's what we want logger.info(f"Deleted channel {channel.name}({channel.id}) because it was empty") try: guild_info["created_channels"].remove(channel.id) except ValueError: pass # For some reason the value was not in the list, that's not a problem, that's what we want # We could update the json file, but we avoid it to limit the number of writes to disk. # It is not critical if this information is lost, and it will probably be update properly anyway the next time the file is written pass else: return def _check_list_element(l: List, index: int, expected_value: Any) -> bool: try: return l[index] == expected_value except IndexError: return False def _check_dict_element(d: Dict, key: Any, expected_value: Any) -> bool: try: return d[key] == expected_value except KeyError: return False if __name__ == '__main__': argparser = argparse.ArgumentParser(description = "Discord bot to automatically create temporary voice channels for users when they connect to a special channel", formatter_class = argparse.ArgumentDefaultsHelpFormatter) argparser.add_argument("-t", "--token-file", default = ".token", help = "File where the discord bot token is stored") argparser.add_argument("-w", "--watched-channels-file", default = "channels.json", help = "Used to store the list of channels that the bot watches") argparser.add_argument("-v", "--verbose", action = "store_true", help = "Print debug messages") ARGS = argparser.parse_args() logging.basicConfig( level = logging.DEBUG if ARGS.verbose else logging.INFO, format = "%(asctime)s %(name)s [%(levelname)s] %(message)s'" ) logger.info(f"Using discord.py version {discord.__version__}") token_file_path = Path(ARGS.token_file).absolute() if not token_file_path.is_file(): logger.error(f"Token file {token_file_path} does not exist") sys.exit(1) with token_file_path.open("r") as token_file: token = token_file.readline().strip() watched_channels_file_path = Path(ARGS.watched_channels_file).absolute() if not watched_channels_file_path.exists(): logger.info(f"Watched channels file {watched_channels_file_path} does not exist, creating it...") with watched_channels_file_path.open("w") as watched_channels_file: json.dump({}, watched_channels_file) with watched_channels_file_path.open("r+") as watched_channels_file: bot = VocalMaisBot(watched_channels_file) bot.run(token) watched_channels_file.flush()