VocalMaisBot/VocalMaisBot.py

325 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import logging
import sys
from pathlib import Path
import discord
import discord.utils
from discord.ext import commands
from ChannelsConfigFile import ChannelsConfigFile
from utils import discord_utils
logger = logging.getLogger("VocalMaisBot")
class VocalMaisBot(commands.Cog):
def __init__(self, channels_config: ChannelsConfigFile):
super().__init__()
self.bot = commands.Bot(
commands.when_mentioned,
help_command = commands.MinimalHelpCommand(),
intents = discord.Intents(voice_states = True, guild_messages = True, guilds = True),
allowed_mentions = discord.AllowedMentions.none(),
)
self.bot.add_cog(self)
self.channels_config = channels_config
self.owner = None # Fetched by on_ready
def run(self, token):
self.bot.run(token)
@commands.Cog.listener()
async def on_ready(self):
logger.info("Connected and ready!")
logger.info(f"Logged in as {discord_utils.to_string(self.bot.user)}")
self.owner = (await self.bot.application_info()).owner
oauth_url = discord.utils.oauth_url(
self.bot.user.id,
discord.Permissions(manage_channels = True, read_messages = True, send_messages = True, move_members = True)
)
logger.info(f"You can invite the bot to your server using the following link: {oauth_url}")
async def check_is_administrator_or_owner(self, ctx: commands.Context):
"""
Verify if the user has administrator rights or if it is the bot's owner
:raise discord_utils.CheckFailDoNotNotify: If the user has no the correct rights. Also notifies the user about it.
"""
if ctx.author.guild_permissions.administrator or ctx.author == self.owner:
return True
else:
await ctx.reply(f":dragon_face: Only an administrator can control me!")
raise discord_utils.CheckFailDoNotNotify
def get_command_usage_message(self, command: commands.Command, prepend = "Usage: ") -> str:
command_elements = [str(cmd) for cmd in command.parents] + [command.name]
return f"{prepend}`@{self.bot.user.display_name} {' '.join(command_elements)} {command.signature}`"
@commands.Cog.listener()
async def on_command_error(self, ctx: commands.Context, error: commands.CommandError):
if isinstance(error, commands.CommandNotFound):
return await ctx.reply(f":x: The requested command does not exist")
elif isinstance(error, commands.MissingRequiredArgument):
return await ctx.reply(f":x: Missing required argument {error.param}. {self.get_command_usage_message(ctx.command)}")
elif isinstance(error, commands.TooManyArguments):
return await ctx.reply(f":x: too many arguments for command {ctx.command}. {self.get_command_usage_message(ctx.command)}")
elif isinstance(error, commands.BadArgument):
if hasattr(error, "argument"):
return await ctx.reply(f":x: I could not correctly understand argument {error.argument}. {self.get_command_usage_message(ctx.command)}")
else:
return await ctx.reply(f":x: I could not correctly understand one of the arguments. {self.get_command_usage_message(ctx.command)}")
elif isinstance(error, commands.ArgumentParsingError):
return await ctx.reply(f":x: Error when parsing the command: {error}")
elif isinstance(error, commands.CheckFailure):
if isinstance(error, discord_utils.CheckFailDoNotNotify): # A check failed, but the check function asked that we do not notify the user
return
else:
return await ctx.reply(f":x: this command can not be run: {error}")
else:
logger.error(f"Error when running command '{ctx.message.content}' by {discord_utils.to_string(ctx.author)} in {discord_utils.to_string(ctx.guild)}", exc_info = error)
if isinstance(error, commands.CommandInvokeError):
await ctx.reply(f":x: There was an error during the command execution :dizzy_face:")
else:
await ctx.reply(f":x: There was an error with the command :dizzy_face:")
def cog_check(self, ctx: commands.Context):
# We silently ignore messages from bots, since we do not want bots to command us
if ctx.author.bot or ctx.message.is_system():
logger.info(f"[{discord_utils.to_string(ctx.guild)}] Ignored command message from bot or system: {ctx.message.content}")
raise discord_utils.CheckFailDoNotNotify
return True
async def subcommand_invoked_without_command(self, ctx: commands.Context):
"""
This functions is a possible implementation for a command group created with invoke_without_command = True.
It assumes that the group was called without a subcommand or with a non-existing subcommand.
This function notifies the user, then raises a discord_utils.CheckFailDoNotNotify exception.
"""
command_elements = [str(cmd) for cmd in ctx.command.parents] + [ctx.command.name] # In case of nested groups
help_message = f"Type `@{self.bot.user.display_name} help {' '.join(command_elements)}` for help"
if ctx.subcommand_passed is None: # The user did not pass a subcommand
await ctx.reply(f":x: Subcommand expected. {help_message}")
else:
await ctx.reply(f":x: The requested subcommand does not exist. {help_message}")
raise discord_utils.CheckFailDoNotNotify()
@commands.command(help = "See if I am alive")
async def ping(self, ctx: commands.Context):
await ctx.reply(":ping_pong:")
@commands.group(
"guild",
help = "Commands for server admins",
invoke_without_command = True,
)
async def guild_subcommands(self, ctx: commands.Context):
await self.subcommand_invoked_without_command(ctx)
@guild_subcommands.command("register", help = "Make me watch a voice channel")
async def register_channel(self, ctx: commands.Context, channel_id: int):
await self.check_is_administrator_or_owner(ctx)
# Retrieving the channel
channel = self.bot.get_channel(channel_id)
if channel is None or channel.guild != ctx.guild:
return await ctx.reply(f"I could not find a channel with id {channel_id} :cry:")
# Adding the channel to the list of watched channels (if needed)
has_been_added = self.channels_config.add_channel_to_watched(ctx.guild, channel)
if has_been_added:
await ctx.reply(f":white_check_mark: I am now watching {channel.name}")
else:
await ctx.reply(":thumbsup: I was already watching this channel")
@guild_subcommands.command("forget", help = "Make me stop watching a voice channel")
async def forget_channel(self, ctx: commands.Context, channel_id: int):
await self.check_is_administrator_or_owner(ctx)
channel = self.bot.get_channel(channel_id)
if channel is None: # The channel does not exist (maybe it has been deleted)
channel = discord.Object(channel_id)
channel_name = str(channel_id)
else:
channel_name = f"{channel.name} ({channel_id})"
has_been_removed = self.channels_config.remove_channel_from_watched(ctx.guild, channel)
if has_been_removed:
await ctx.reply(f":white_check_mark: I am no longer watching {channel_name}")
else:
await ctx.reply(f":thumbsup: I already was not watching {channel_name}")
@guild_subcommands.command("list", help = "List watched channels")
async def list_watched_channels(self, ctx: commands.Context):
watched_channels_ids = self.channels_config.get_watched_channels_ids(ctx.guild)
if len(watched_channels_ids) > 0:
answer_lines = []
for channel_id in watched_channels_ids:
channel = self.bot.get_channel(channel_id)
if channel is not None: # Does the channel still exist?
answer_lines.append(f":eyes: {channel.name} ({channel_id})")
else:
answer_lines.append(f":ninja: <deleted channel> ({channel_id})")
await ctx.reply(embed = discord.Embed(title = "I am watching the following channels", description = "\n".join(answer_lines)))
else:
await ctx.reply(f":see_no_evil: I am not watching any voice channel! Do not hesitate to add some by running {self.get_command_usage_message(self.register_channel, '')}")
@guild_subcommands.command("clear", help = "Make me stop watching all channels")
async def clear_watched_channels(self, ctx: commands.Context):
await self.check_is_administrator_or_owner(ctx)
self.channels_config.clear_watched_channels(ctx.guild)
await ctx.reply(":white_check_mark: I am no longer watching any channel")
@commands.group(
"name",
help = "Commands to set a default name for your channels!",
invoke_without_command = True,
)
async def default_channel_name_subcommands(self, ctx: commands.Context):
await self.subcommand_invoked_without_command(ctx)
@default_channel_name_subcommands.command(
"set",
help = "Set the default name for your voice channels. If you put {user} in the name, it will be replaced by your nickname",
usage = "<name>"
)
async def set_user_default_channel_name(self, ctx: commands.Context, *name_elements: str):
# Verify that the given name does not mention anyone by checking the message mentions
if len(set(ctx.message.mentions) - {self.bot.user}) > 0:
await ctx.reply(":no_entry: User mentions are not allowed in channel names. Use {user} if you want to include your name.")
return
name = " ".join(name_elements)
if len(name) == 0:
await ctx.reply(f":x: Missing channel name! {self.get_command_usage_message(ctx.command)}")
raise discord_utils.CheckFailDoNotNotify
self.channels_config.set_user_channel_name(ctx.guild, ctx.author, discord_utils.voice_channel_safe_name(name))
self.channels_config.save_to_file()
# We escape the markdown from the channel name when sending the message because for now discord does not support markdown in channel names,
# so we do not want the users to get their hopes up by seeing markdown working in the message
await ctx.reply(f":white_check_mark: The default name for your channels has been set! If you create one now, it will appear as {discord_utils.voice_channel_safe_name(name, user_name_replacement = ctx.author.display_name, escape_markdown = True)}")
@default_channel_name_subcommands.command("get", help = "See what the default name for your voice channels is")
async def get_user_default_channel_name(self, ctx: commands.Context):
channel_name = self.channels_config.get_user_channel_name(ctx.guild, ctx.author)
if channel_name is None:
await ctx.reply(":person_shrugging: You have not set a special name for your voice channels")
else:
channel_name = discord_utils.voice_channel_safe_name(channel_name, user_name_replacement = ctx.author.display_name, escape_markdown = True)
await ctx.reply(f":memo: If you create a voice channel right now, it will be named like this: {channel_name}")
@default_channel_name_subcommands.command("clear", help = "Do not use a special name for your voice channels")
async def clear_user_default_channel_name(self, ctx: commands.Context):
self.channels_config.clear_user_channel_name(ctx.guild, ctx.author)
self.channels_config.save_to_file()
await ctx.reply(":white_check_mark: Your voice channels will use the default naming convention")
@commands.Cog.listener()
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
if before.channel is not None: # They left a channel
await self.handle_user_left_voice(before.channel)
if after.channel is not None: # They joined a channel. Note that there is an if and not an elif, because they could leave a channel to join another
await self.handle_user_connected_to_voice(member, after.channel)
async def handle_user_connected_to_voice(self, user: discord.Member, channel: discord.VoiceChannel):
if user.bot:
return # We ignore bots connecting to channels, we do not want them asking us to create voice channels
if not self.channels_config.is_watched(channel.guild, channel):
return # It's not one of our special watched join-to-create channels
# The user is not a bot and connected to one of our special watched channels
logger.debug(f"[{discord_utils.to_string(channel.guild)}] User {discord_utils.to_string(user)}) connected to watched channel {discord_utils.to_string(channel)})")
# We create a channel for them and move them into it
category = channel.category
# Creating the permissions of the channel, based on the watched channel's category permissions.
# The bot can not set a permission on a channel if the bot itself does not have the permission
bot_permissions = set(perm for perm, value in channel.guild.get_member(self.bot.user.id).guild_permissions if value) # The permissions that the bot has
channel_permissions = {}
for subject, overwrite in category.overwrites.items():
granted_permissions = set(perm for perm, value in overwrite if value)
deny_permissions = set(perm for perm, value in overwrite if value == False) # We need to check that value is False, because None means "not specified"
# We set all the permissions that the bot can set
channel_permissions[subject] = discord.PermissionOverwrite(
**{perm: True for perm in granted_permissions if perm in bot_permissions},
**{perm: False for perm in deny_permissions if perm in bot_permissions}
)
# We add a log message if there are permissions that the bot can not set
permissions_that_bot_does_not_have = (granted_permissions.union(deny_permissions)) - bot_permissions
if len(permissions_that_bot_does_not_have) > 0:
logger.warning(f"[{discord_utils.to_string(channel.guild)}] Error: the bot can not set some of the permissions for {discord_utils.to_string(subject)} specified on category {discord_utils.to_string(category)} because it does not have them: {permissions_that_bot_does_not_have}")
# We allow the user for which we created the channel to change the channel's name
if user not in channel_permissions:
channel_permissions[user] = discord.PermissionOverwrite()
channel_permissions[user].manage_channels = True
# Computing the channel name
user_default_channel_name = self.channels_config.get_user_channel_name(channel.guild, user)
if user_default_channel_name is None:
user_default_channel_name = "{user}'s channel"
channel_name = discord_utils.voice_channel_safe_name(user_default_channel_name, user_name_replacement = user.display_name)
# We set bitrate and user limit for the new channel the same as the channel that has been joined to create
bitrate = channel.bitrate
user_limit = channel.user_limit
# Creating the channel and moving the user into it
logger.debug(f"[{discord_utils.to_string(channel.guild)}] Creating channel {channel_name} for {discord_utils.to_string(user)}")
user_channel = await category.create_voice_channel(channel_name, overwrites = channel_permissions, bitrate = bitrate, user_limit = user_limit)
await user.move_to(user_channel)
logger.info(f"[{discord_utils.to_string(channel.guild)}] Created channel {discord_utils.to_string(user_channel)} for {discord_utils.to_string(user)}")
# Saving the channel in the configuration
self.channels_config.add_channel_to_created(user_channel.guild, user_channel)
async def handle_user_left_voice(self, channel: discord.VoiceChannel):
# We do not verify if the user is a bot, because a bot could be the last one to leave a channel (e.g. music bot)
if not self.channels_config.is_created(channel.guild, channel):
return # It is not a channel that we manage, nothing to do
logger.debug(f"[{discord_utils.to_string(channel.guild)}] User left temporary channel {discord_utils.to_string(channel)}")
# It is a channel that we manage
if len(channel.members) > 0:
return # There are still people inside it, nothing to do
logger.debug(f"[{discord_utils.to_string(channel.guild)}] Temporary channel {discord_utils.to_string(channel)} is now empty")
try:
await channel.delete()
except discord.NotFound:
pass # The channel already does not exist, that's not a problem, that's what we want
logger.info(f"[{discord_utils.to_string(channel.guild)}] Deleted channel {discord_utils.to_string(channel)} because it was empty")
# updating the configuration
self.channels_config.remove_channel_from_created(channel.guild, channel)
if __name__ == '__main__':
argparser = argparse.ArgumentParser(description = "Discord bot to automatically create temporary voice channels for users when they connect to a special channel", formatter_class = argparse.ArgumentDefaultsHelpFormatter)
argparser.add_argument("-t", "--token-file", default = ".token", help = "File where the discord bot token is stored")
argparser.add_argument("-w", "--watched-channels-file", default = "channels.json", help = "Used to store the list of channels that the bot watches")
argparser.add_argument("-v", "--verbose", action = "store_true", help = "Print debug messages")
ARGS = argparser.parse_args()
logging.basicConfig(
level = logging.DEBUG if ARGS.verbose else logging.INFO,
format = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
)
logger.info(f"Using discord.py version {discord.__version__}")
token_file_path = Path(ARGS.token_file).absolute()
if not token_file_path.is_file():
logger.error(f"Token file {token_file_path} does not exist")
sys.exit(1)
with token_file_path.open("r") as token_file:
token = token_file.readline().strip()
channels_config_file = ChannelsConfigFile(ARGS.watched_channels_file)
bot = VocalMaisBot(channels_config_file)
bot.run(token)