#!/usr/bin/env python3 import argparse import logging import sys from pathlib import Path from typing import List import discord import discord.utils from discord.ext import commands from ChannelsConfigFile import ChannelsConfigFile from utils import discord_utils logger = logging.getLogger("VocalMaisBot") class VocalMaisBot(commands.Cog): def __init__(self, channels_config: ChannelsConfigFile): super().__init__() self.bot = commands.Bot( commands.when_mentioned, intents = discord.Intents(voice_states = True, guild_messages = True, guilds = True), help_command = commands.MinimalHelpCommand(), ) self.bot.add_cog(self) self.channels_config = channels_config self.owner = None # Fetched by on_ready def run(self, token): self.bot.run(token) @commands.Cog.listener() async def on_ready(self): logger.info("Connected and ready!") logger.info(f"Logged in as {self.bot.user}") self.owner = (await self.bot.application_info()).owner oauth_url = discord.utils.oauth_url( self.bot.user.id, discord.Permissions(manage_channels = True, read_messages = True, send_messages = True, move_members = True) ) logger.info(f"You can invite the bot to your server using the following link: {oauth_url}") async def check_is_administrator_or_owner(self, ctx: commands.Context): """ Verify if the user has administrator rights or if it is the bot's owner :raise discord_utils.CheckFailDoNotNotify: If the user has no the correct rights. Also notifies the user about it. """ if ctx.author.guild_permissions.administrator or ctx.author == self.owner: return True else: await ctx.send(f":dragon_face: Only an administrator can control me!") raise discord_utils.CheckFailDoNotNotify def get_command_usage_message(self, command: commands.Command) -> str: command_elements = [str(cmd) for cmd in command.parents] + [command.name] return f"Usage: `@{self.bot.user.display_name} {' '.join(command_elements)} {command.signature}`" @commands.Cog.listener() async def on_command_error(self, ctx: commands.Context, error: commands.CommandError): if isinstance(error, commands.CommandNotFound): return await ctx.send(f":x: The requested command does not exist") elif isinstance(error, commands.MissingRequiredArgument): return await ctx.send(f":x: Missing required argument {error.param}. {self.get_command_usage_message(ctx.command)}") elif isinstance(error, commands.TooManyArguments): return await ctx.send(f":x: too many arguments for command {ctx.command}. {self.get_command_usage_message(ctx.command)}") elif isinstance(error, commands.BadArgument): if hasattr(error, "argument"): return await ctx.send(f":x: I could not correctly understand argument {error.argument}. {self.get_command_usage_message(ctx.command)}") else: return await ctx.send(f":x: I could not correctly understand one of the arguments. {self.get_command_usage_message(ctx.command)}") elif isinstance(error, commands.ArgumentParsingError): return await ctx.send(f":x: Error when parsing the command: {error}") elif isinstance(error, commands.CheckFailure): if isinstance(error, discord_utils.CheckFailDoNotNotify): # A check failed, but the check function asked that we do not notify the user return else: return await ctx.send(f":x: this command can not be run: {error}") else: logger.error(f"Error when running command '{ctx.message.content}' by {ctx.author.name}#{ctx.author.discriminator} in {ctx.guild.name}", exc_info = error) if isinstance(error, commands.CommandInvokeError): await ctx.send(f":x: There was an error during the command execution :dizzy_face:") else: await ctx.send(f":x: There was an error with the command :dizzy_face:") def cog_check(self, ctx: commands.Context): # We silently ignore messages from bots, since we do not want bots to command us if ctx.author.bot or ctx.message.is_system(): logger.info(f"[{ctx.guild.name}({ctx.guild.id})] Ignored command message from bot or system {ctx.author.name}{ctx.author.discriminator}({ctx.author.id}): {ctx.message.content}") raise discord_utils.CheckFailDoNotNotify return True async def subcommand_invoked_without_command(self, ctx: commands.Context): """ This functions is a possible implementation for a command group created with invoke_without_command = True. It assumes that the group was called without a subcommand or with a non-existing subcommand. This function notifies the user, then raises a discord_utils.CheckFailDoNotNotify exception. """ command_elements = [str(cmd) for cmd in ctx.command.parents] + [ctx.command.name] # In case of nested groups help_message = f"Type `@{self.bot.user.display_name} help {' '.join(command_elements)}` for help" if ctx.subcommand_passed is None: # The user did not pass a subcommand await ctx.send(f":x: Subcommand expected. {help_message}") else: await ctx.send(f":x: The requested subcommand does not exist. {help_message}") raise discord_utils.CheckFailDoNotNotify() @commands.command(help = "See if I am alive") async def ping(self, ctx: commands.Context): await ctx.send(":ping_pong:") @commands.group( "guild", help = "Commands for server admins", invoke_without_command = True, ) async def guild_subcommands(self, ctx: commands.Context): await self.subcommand_invoked_without_command(ctx) @guild_subcommands.command("register", help = "Make me watch a voice channel") async def register_channel(self, ctx: commands.Context, channel_id: int): await self.check_is_administrator_or_owner(ctx) # Retrieving the channel channel = self.bot.get_channel(channel_id) if channel is None or channel.guild != ctx.guild: return await ctx.send(f"I could not find a channel with id {channel_id} :cry:") # Adding the channel to the list of watched channels (if needed) has_been_added = self.channels_config.add_channel_to_watched(ctx.guild, channel) if has_been_added: await ctx.send(f":white_check_mark: I am now watching {channel.name}") else: await ctx.send(":thumbsup: I was already watching this channel") @guild_subcommands.command("forget", help = "Make me stop watching a voice channel") async def forget_channel(self, ctx: commands.Context, channel_id: int): await self.check_is_administrator_or_owner(ctx) channel = self.bot.get_channel(channel_id) if channel is None: # The channel does not exist (maybe it has been deleted) channel = discord.Object(channel_id) channel_name = str(channel_id) else: channel_name = f"{channel.name} ({channel_id})" has_been_removed = self.channels_config.remove_channel_from_watched(ctx.guild, channel) if has_been_removed: await ctx.send(f":white_check_mark: I am no longer watching {channel_name}") else: await ctx.send(f":thumbsup: I already was not watching {channel_name}") @guild_subcommands.command("list", help = "List watched channels") async def list_watched_channels(self, ctx: commands.Context): watched_channels_ids = self.channels_config.get_watched_channels_ids(ctx.guild) if len(watched_channels_ids) > 0: answer_lines = [] for channel_id in watched_channels_ids: channel = self.bot.get_channel(channel_id) if channel is not None: # Does the channel still exist? answer_lines.append(f":eyes: {channel.name} ({channel_id})") else: answer_lines.append(f":ninja: ({channel_id})") await ctx.send(embed = discord.Embed(title = "I am watching the following channels", description = "\n".join(answer_lines))) else: await ctx.send(f":see_no_evil: I am not watching any voice channel! Do not hesitate to add some by running `@{self.bot.user.display_name} {self.register_channel.name} {self.register_channel.signature}`") @guild_subcommands.command("clear", help = "Make me stop watching all channels") async def clear_watched_channels(self, ctx: commands.Context): await self.check_is_administrator_or_owner(ctx) self.channels_config.clear_watched_channels(ctx.guild) await ctx.send(":white_check_mark: I am no longer watching any channel") @commands.group( "name", help = "Commands to set a default name for your channels!", invoke_without_command = True, ) async def default_channel_name_subcommands(self, ctx: commands.Context): await self.subcommand_invoked_without_command(ctx) @default_channel_name_subcommands.command( "set", help = "Set the default name for your voice channels. If you put {user} in the name, it will be replaced by your nickname", usage = "" ) async def set_user_default_channel_name(self, ctx: commands.Context, *name_elements: str): # Verify that the given name does not mention anyone by checking the message mentions if len(set(ctx.message.mentions) - {self.bot.user}) > 0: await ctx.send(":no_entry: User mentions are not allowed in channel names. Use {user} if you want to include your name.") return name = " ".join(name_elements) if len(name) == 0: await ctx.send(f":x: Missing channel name! {self.get_command_usage_message(ctx.command)}") raise discord_utils.CheckFailDoNotNotify self.channels_config.set_user_channel_name(ctx.guild, ctx.author, discord_utils.voice_channel_safe_name(name)) self.channels_config.save_to_file() # We escape the markdown from the channel name when sending the message because for now discord does not support markdown in channel names, # so we do not want the users to get their hopes up by seeing markdown working in the message await ctx.send(f":white_check_mark: The default name for your channels has been set! If you create one now, it will appear as {discord_utils.voice_channel_safe_name(name, user_name_replacement = ctx.author.display_name, escape_markdown = True)}") @default_channel_name_subcommands.command("get", help = "See what the default name for your voice channels is") async def get_user_default_channel_name(self, ctx: commands.Context): channel_name = self.channels_config.get_user_channel_name(ctx.guild, ctx.author) if channel_name is None: await ctx.send(":person_shrugging: You have not set a special name for your voice channels") else: channel_name = discord_utils.voice_channel_safe_name(channel_name, user_name_replacement = ctx.author.display_name, escape_markdown = True) await ctx.send(f":memo: If you create a voice channel right now, it will be named like this: {channel_name}") @default_channel_name_subcommands.command("clear", help = "Do not use a special name for your voice channels") async def clear_user_default_channel_name(self, ctx: commands.Context): self.channels_config.clear_user_channel_name(ctx.guild, ctx.author) self.channels_config.save_to_file() await ctx.send(":white_check_mark: Your voice channels will use the default naming convention") @commands.Cog.listener() async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): if before.channel is not None: # They left a channel await self.handle_user_left_voice(before.channel) if after.channel is not None: # They joined a channel. Note that there is an if and not an elif, because they could leave a channel to join another await self.handle_user_connected_to_voice(member, after.channel) async def handle_user_connected_to_voice(self, user: discord.Member, channel: discord.VoiceChannel): if user.bot: return # We ignore bots connecting to channels, we do not want them asking us to create voice channels if not self.channels_config.is_watched(channel.guild, channel): return # It's not one of our special watched join-to-create channels # The user is not a bot and connected to one of our special watched channels # We create a channel for them and move them into it category = channel.category # Creating the permissions of the channel channel_permissions = category.overwrites # We base them on the category's permissions if user not in channel_permissions: channel_permissions[user] = discord.PermissionOverwrite() channel_permissions[user].manage_channels = True # We allow the user for which we created the channel to change the channel's name # Computing the channel name user_default_channel_name = self.channels_config.get_user_channel_name(channel.guild, user) if user_default_channel_name is None: user_default_channel_name = "{user}'s channel" channel_name = discord_utils.voice_channel_safe_name(user_default_channel_name, user_name_replacement = user.display_name) # Creating the channel and moving the user into it user_channel = await category.create_voice_channel(channel_name, overwrites = channel_permissions) await user.move_to(user_channel) logger.info(f"[{channel.guild.name}({channel.guild.id})] Created channel {user_channel.name}({user_channel.id}) for {user.name}#{user.discriminator}({user.id})") # Saving the channel in the configuration self.channels_config.add_channel_to_created(user_channel.guild, user_channel) async def handle_user_left_voice(self, channel: discord.VoiceChannel): # We do not verify if the user is a bot, because a bot could be the last one to leave a channel (e.g. music bot) if not self.channels_config.is_created(channel.guild, channel): return # It is not a channel that we manage, nothing to do logger.debug(f"[{channel.guild.name}({channel.guild.id})] User left temporary channel {channel.name} ({channel.id})") # It is a channel that we manage if len(channel.members) > 0: return # There are still people inside it, nothing to do try: await channel.delete() except discord.NotFound: pass # The channel already does not exist, that's not a problem, that's what we want logger.info(f"[{channel.guild.name}({channel.guild.id})] Deleted channel {channel.name}({channel.id}) because it was empty") # updating the configuration self.channels_config.remove_channel_from_created(channel.guild, channel) if __name__ == '__main__': argparser = argparse.ArgumentParser(description = "Discord bot to automatically create temporary voice channels for users when they connect to a special channel", formatter_class = argparse.ArgumentDefaultsHelpFormatter) argparser.add_argument("-t", "--token-file", default = ".token", help = "File where the discord bot token is stored") argparser.add_argument("-w", "--watched-channels-file", default = "channels.json", help = "Used to store the list of channels that the bot watches") argparser.add_argument("-v", "--verbose", action = "store_true", help = "Print debug messages") ARGS = argparser.parse_args() logging.basicConfig( level = logging.DEBUG if ARGS.verbose else logging.INFO, format = "%(asctime)s %(name)s [%(levelname)s] %(message)s" ) logger.info(f"Using discord.py version {discord.__version__}") token_file_path = Path(ARGS.token_file).absolute() if not token_file_path.is_file(): logger.error(f"Token file {token_file_path} does not exist") sys.exit(1) with token_file_path.open("r") as token_file: token = token_file.readline().strip() channels_config_file = ChannelsConfigFile(ARGS.watched_channels_file) bot = VocalMaisBot(channels_config_file) bot.run(token)