#!/usr/bin/env python3 import argparse import logging import sys from pathlib import Path import discord import discord.utils from discord.ext import commands from ChannelsConfigFile import ChannelsConfigFile from utils import discord_utils logger = logging.getLogger("VocalMaisBot") class VocalMaisBot(commands.Cog): def __init__(self, channels_config: ChannelsConfigFile): super().__init__() self.bot = commands.Bot( commands.when_mentioned, intents = discord.Intents(voice_states = True, guild_messages = True, guilds = True), help_command = commands.MinimalHelpCommand(), ) self.bot.add_cog(self) self.channels_config = channels_config self.owner = None # Fetched by on_ready def run(self, token): self.bot.run(token) @commands.Cog.listener() async def on_ready(self): logger.info("Connected and ready!") logger.info(f"Logged in as {discord_utils.to_string(self.bot.user)}") self.owner = (await self.bot.application_info()).owner oauth_url = discord.utils.oauth_url( self.bot.user.id, discord.Permissions(manage_channels = True, read_messages = True, send_messages = True, move_members = True) ) logger.info(f"You can invite the bot to your server using the following link: {oauth_url}") async def check_is_administrator_or_owner(self, ctx: commands.Context): """ Verify if the user has administrator rights or if it is the bot's owner :raise discord_utils.CheckFailDoNotNotify: If the user has no the correct rights. Also notifies the user about it. """ if ctx.author.guild_permissions.administrator or ctx.author == self.owner: return True else: await ctx.send(f":dragon_face: Only an administrator can control me!") raise discord_utils.CheckFailDoNotNotify def get_command_usage_message(self, command: commands.Command, prepend = "Usage: ") -> str: command_elements = [str(cmd) for cmd in command.parents] + [command.name] return f"{prepend}`@{self.bot.user.display_name} {' '.join(command_elements)} {command.signature}`" @commands.Cog.listener() async def on_command_error(self, ctx: commands.Context, error: commands.CommandError): if isinstance(error, commands.CommandNotFound): return await ctx.send(f":x: The requested command does not exist") elif isinstance(error, commands.MissingRequiredArgument): return await ctx.send(f":x: Missing required argument {error.param}. {self.get_command_usage_message(ctx.command)}") elif isinstance(error, commands.TooManyArguments): return await ctx.send(f":x: too many arguments for command {ctx.command}. {self.get_command_usage_message(ctx.command)}") elif isinstance(error, commands.BadArgument): if hasattr(error, "argument"): return await ctx.send(f":x: I could not correctly understand argument {error.argument}. {self.get_command_usage_message(ctx.command)}") else: return await ctx.send(f":x: I could not correctly understand one of the arguments. {self.get_command_usage_message(ctx.command)}") elif isinstance(error, commands.ArgumentParsingError): return await ctx.send(f":x: Error when parsing the command: {error}") elif isinstance(error, commands.CheckFailure): if isinstance(error, discord_utils.CheckFailDoNotNotify): # A check failed, but the check function asked that we do not notify the user return else: return await ctx.send(f":x: this command can not be run: {error}") else: logger.error(f"Error when running command '{ctx.message.content}' by {discord_utils.to_string(ctx.author)} in {discord_utils.to_string(ctx.guild)}", exc_info = error) if isinstance(error, commands.CommandInvokeError): await ctx.send(f":x: There was an error during the command execution :dizzy_face:") else: await ctx.send(f":x: There was an error with the command :dizzy_face:") def cog_check(self, ctx: commands.Context): # We silently ignore messages from bots, since we do not want bots to command us if ctx.author.bot or ctx.message.is_system(): logger.info(f"[{discord_utils.to_string(ctx.guild)}] Ignored command message from bot or system: {ctx.message.content}") raise discord_utils.CheckFailDoNotNotify return True async def subcommand_invoked_without_command(self, ctx: commands.Context): """ This functions is a possible implementation for a command group created with invoke_without_command = True. It assumes that the group was called without a subcommand or with a non-existing subcommand. This function notifies the user, then raises a discord_utils.CheckFailDoNotNotify exception. """ command_elements = [str(cmd) for cmd in ctx.command.parents] + [ctx.command.name] # In case of nested groups help_message = f"Type `@{self.bot.user.display_name} help {' '.join(command_elements)}` for help" if ctx.subcommand_passed is None: # The user did not pass a subcommand await ctx.send(f":x: Subcommand expected. {help_message}") else: await ctx.send(f":x: The requested subcommand does not exist. {help_message}") raise discord_utils.CheckFailDoNotNotify() @commands.command(help = "See if I am alive") async def ping(self, ctx: commands.Context): await ctx.send(":ping_pong:") @commands.group( "guild", help = "Commands for server admins", invoke_without_command = True, ) async def guild_subcommands(self, ctx: commands.Context): await self.subcommand_invoked_without_command(ctx) @guild_subcommands.command("register", help = "Make me watch a voice channel") async def register_channel(self, ctx: commands.Context, channel_id: int): await self.check_is_administrator_or_owner(ctx) # Retrieving the channel channel = self.bot.get_channel(channel_id) if channel is None or channel.guild != ctx.guild: return await ctx.send(f"I could not find a channel with id {channel_id} :cry:") # Adding the channel to the list of watched channels (if needed) has_been_added = self.channels_config.add_channel_to_watched(ctx.guild, channel) if has_been_added: await ctx.send(f":white_check_mark: I am now watching {channel.name}") else: await ctx.send(":thumbsup: I was already watching this channel") @guild_subcommands.command("forget", help = "Make me stop watching a voice channel") async def forget_channel(self, ctx: commands.Context, channel_id: int): await self.check_is_administrator_or_owner(ctx) channel = self.bot.get_channel(channel_id) if channel is None: # The channel does not exist (maybe it has been deleted) channel = discord.Object(channel_id) channel_name = str(channel_id) else: channel_name = f"{channel.name} ({channel_id})" has_been_removed = self.channels_config.remove_channel_from_watched(ctx.guild, channel) if has_been_removed: await ctx.send(f":white_check_mark: I am no longer watching {channel_name}") else: await ctx.send(f":thumbsup: I already was not watching {channel_name}") @guild_subcommands.command("list", help = "List watched channels") async def list_watched_channels(self, ctx: commands.Context): watched_channels_ids = self.channels_config.get_watched_channels_ids(ctx.guild) if len(watched_channels_ids) > 0: answer_lines = [] for channel_id in watched_channels_ids: channel = self.bot.get_channel(channel_id) if channel is not None: # Does the channel still exist? answer_lines.append(f":eyes: {channel.name} ({channel_id})") else: answer_lines.append(f":ninja: ({channel_id})") await ctx.send(embed = discord.Embed(title = "I am watching the following channels", description = "\n".join(answer_lines))) else: await ctx.send(f":see_no_evil: I am not watching any voice channel! Do not hesitate to add some by running {self.get_command_usage_message(self.register_channel, '')}") @guild_subcommands.command("clear", help = "Make me stop watching all channels") async def clear_watched_channels(self, ctx: commands.Context): await self.check_is_administrator_or_owner(ctx) self.channels_config.clear_watched_channels(ctx.guild) await ctx.send(":white_check_mark: I am no longer watching any channel") @commands.group( "name", help = "Commands to set a default name for your channels!", invoke_without_command = True, ) async def default_channel_name_subcommands(self, ctx: commands.Context): await self.subcommand_invoked_without_command(ctx) @default_channel_name_subcommands.command( "set", help = "Set the default name for your voice channels. If you put {user} in the name, it will be replaced by your nickname", usage = "" ) async def set_user_default_channel_name(self, ctx: commands.Context, *name_elements: str): # Verify that the given name does not mention anyone by checking the message mentions if len(set(ctx.message.mentions) - {self.bot.user}) > 0: await ctx.send(":no_entry: User mentions are not allowed in channel names. Use {user} if you want to include your name.") return name = " ".join(name_elements) if len(name) == 0: await ctx.send(f":x: Missing channel name! {self.get_command_usage_message(ctx.command)}") raise discord_utils.CheckFailDoNotNotify self.channels_config.set_user_channel_name(ctx.guild, ctx.author, discord_utils.voice_channel_safe_name(name)) self.channels_config.save_to_file() # We escape the markdown from the channel name when sending the message because for now discord does not support markdown in channel names, # so we do not want the users to get their hopes up by seeing markdown working in the message await ctx.send(f":white_check_mark: The default name for your channels has been set! If you create one now, it will appear as {discord_utils.voice_channel_safe_name(name, user_name_replacement = ctx.author.display_name, escape_markdown = True)}") @default_channel_name_subcommands.command("get", help = "See what the default name for your voice channels is") async def get_user_default_channel_name(self, ctx: commands.Context): channel_name = self.channels_config.get_user_channel_name(ctx.guild, ctx.author) if channel_name is None: await ctx.send(":person_shrugging: You have not set a special name for your voice channels") else: channel_name = discord_utils.voice_channel_safe_name(channel_name, user_name_replacement = ctx.author.display_name, escape_markdown = True) await ctx.send(f":memo: If you create a voice channel right now, it will be named like this: {channel_name}") @default_channel_name_subcommands.command("clear", help = "Do not use a special name for your voice channels") async def clear_user_default_channel_name(self, ctx: commands.Context): self.channels_config.clear_user_channel_name(ctx.guild, ctx.author) self.channels_config.save_to_file() await ctx.send(":white_check_mark: Your voice channels will use the default naming convention") @commands.Cog.listener() async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): if before.channel is not None: # They left a channel await self.handle_user_left_voice(before.channel) if after.channel is not None: # They joined a channel. Note that there is an if and not an elif, because they could leave a channel to join another await self.handle_user_connected_to_voice(member, after.channel) async def handle_user_connected_to_voice(self, user: discord.Member, channel: discord.VoiceChannel): if user.bot: return # We ignore bots connecting to channels, we do not want them asking us to create voice channels if not self.channels_config.is_watched(channel.guild, channel): return # It's not one of our special watched join-to-create channels # The user is not a bot and connected to one of our special watched channels logger.debug(f"[{discord_utils.to_string(channel.guild)}] User {discord_utils.to_string(user)}) connected to watched channel {discord_utils.to_string(channel)})") # We create a channel for them and move them into it category = channel.category # Creating the permissions of the channel, based on the watched channel's category permissions. # However, tests showed that discord forbids the bot from creating overwrites on roles that are higher than the bot in the list of roles bot_highest_role_position = channel.guild.get_member(self.bot.user.id).top_role.position # The position of the highest role that the bot has channel_permissions = {} for subject, overwrite in category.overwrites.items(): if isinstance(subject, discord.Role): # Overwrites can contain role overwrites and member overwrites if subject.position >= bot_highest_role_position: # We ignore roles higher than the bot continue channel_permissions[subject] = overwrite # We allow the user for which we created the channel to change the channel's name if user not in channel_permissions: channel_permissions[user] = discord.PermissionOverwrite() channel_permissions[user].manage_channels = True # Computing the channel name user_default_channel_name = self.channels_config.get_user_channel_name(channel.guild, user) if user_default_channel_name is None: user_default_channel_name = "{user}'s channel" channel_name = discord_utils.voice_channel_safe_name(user_default_channel_name, user_name_replacement = user.display_name) # Creating the channel and moving the user into it logger.debug(f"[{discord_utils.to_string(channel.guild)}] Creating channel {channel_name} for {discord_utils.to_string(user)}") user_channel = await category.create_voice_channel(channel_name, overwrites = channel_permissions) await user.move_to(user_channel) logger.info(f"[{discord_utils.to_string(channel.guild)}] Created channel {discord_utils.to_string(user_channel)} for {discord_utils.to_string(user)}") # Saving the channel in the configuration self.channels_config.add_channel_to_created(user_channel.guild, user_channel) async def handle_user_left_voice(self, channel: discord.VoiceChannel): # We do not verify if the user is a bot, because a bot could be the last one to leave a channel (e.g. music bot) if not self.channels_config.is_created(channel.guild, channel): return # It is not a channel that we manage, nothing to do logger.debug(f"[{discord_utils.to_string(channel.guild)}] User left temporary channel {discord_utils.to_string(channel)}") # It is a channel that we manage if len(channel.members) > 0: return # There are still people inside it, nothing to do logger.debug(f"[{discord_utils.to_string(channel.guild)}] Temporary channel {discord_utils.to_string(channel)} is now empty") try: await channel.delete() except discord.NotFound: pass # The channel already does not exist, that's not a problem, that's what we want logger.info(f"[{discord_utils.to_string(channel.guild)}] Deleted channel {discord_utils.to_string(channel)} because it was empty") # updating the configuration self.channels_config.remove_channel_from_created(channel.guild, channel) if __name__ == '__main__': argparser = argparse.ArgumentParser(description = "Discord bot to automatically create temporary voice channels for users when they connect to a special channel", formatter_class = argparse.ArgumentDefaultsHelpFormatter) argparser.add_argument("-t", "--token-file", default = ".token", help = "File where the discord bot token is stored") argparser.add_argument("-w", "--watched-channels-file", default = "channels.json", help = "Used to store the list of channels that the bot watches") argparser.add_argument("-v", "--verbose", action = "store_true", help = "Print debug messages") ARGS = argparser.parse_args() logging.basicConfig( level = logging.DEBUG if ARGS.verbose else logging.INFO, format = "%(asctime)s %(name)s [%(levelname)s] %(message)s" ) logger.info(f"Using discord.py version {discord.__version__}") token_file_path = Path(ARGS.token_file).absolute() if not token_file_path.is_file(): logger.error(f"Token file {token_file_path} does not exist") sys.exit(1) with token_file_path.open("r") as token_file: token = token_file.readline().strip() channels_config_file = ChannelsConfigFile(ARGS.watched_channels_file) bot = VocalMaisBot(channels_config_file) bot.run(token)