273 lines
14 KiB
Python
Executable File
273 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
import discord
|
|
import discord.utils
|
|
from discord.ext import commands
|
|
|
|
from ChannelsConfigFile import ChannelsConfigFile
|
|
from utils import discord_utils
|
|
|
|
logger = logging.getLogger("VocalMaisBot")
|
|
|
|
|
|
class VocalMaisBot(commands.Cog):
|
|
|
|
def __init__(self, channels_config: ChannelsConfigFile):
|
|
super().__init__()
|
|
self.bot = commands.Bot(
|
|
commands.when_mentioned,
|
|
intents = discord.Intents(voice_states = True, guild_messages = True, guilds = True),
|
|
help_command = commands.MinimalHelpCommand(),
|
|
)
|
|
self.bot.add_cog(self)
|
|
self.channels_config = channels_config
|
|
self.owner = None # Fetched by on_ready
|
|
|
|
def run(self, token):
|
|
self.bot.run(token)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_ready(self):
|
|
logger.info("Connected and ready!")
|
|
logger.info(f"Logged in as {self.bot.user}")
|
|
|
|
self.owner = (await self.bot.application_info()).owner
|
|
|
|
oauth_url = discord.utils.oauth_url(
|
|
self.bot.user.id,
|
|
discord.Permissions(manage_channels = True, read_messages = True, send_messages = True, move_members = True)
|
|
)
|
|
logger.info(f"You can invite the bot to your server using the following link: {oauth_url}")
|
|
|
|
async def check_is_administrator_or_owner(self, ctx: commands.Context):
|
|
"""
|
|
Verify if the user has administrator rights or if it is the bot's owner
|
|
:raise discord_utils.CheckFailDoNotNotify: If the user has no the correct rights. Also notifies the user about it.
|
|
"""
|
|
if ctx.author.guild_permissions.administrator or ctx.author == self.owner:
|
|
return True
|
|
else:
|
|
await ctx.send(f":dragon_face: Only an administrator can control me!")
|
|
raise discord_utils.CheckFailDoNotNotify
|
|
|
|
def get_command_usage_message(self, command: commands.Command) -> str:
|
|
command_elements = [str(cmd) for cmd in command.parents] + [command.name]
|
|
return f"Usage: `@{self.bot.user.display_name} {' '.join(command_elements)} {command.signature}`"
|
|
|
|
@commands.Cog.listener()
|
|
async def on_command_error(self, ctx: commands.Context, error: commands.CommandError):
|
|
if isinstance(error, commands.CommandNotFound):
|
|
return await ctx.send(f":x: The requested command does not exist")
|
|
elif isinstance(error, commands.MissingRequiredArgument):
|
|
return await ctx.send(f":x: Missing required argument {error.param}. {self.get_command_usage_message(ctx.command)}")
|
|
elif isinstance(error, commands.TooManyArguments):
|
|
return await ctx.send(f":x: too many arguments for command {ctx.command}. {self.get_command_usage_message(ctx.command)}")
|
|
elif isinstance(error, commands.BadArgument):
|
|
if hasattr(error, "argument"):
|
|
return await ctx.send(f":x: I could not correctly understand argument {error.argument}. {self.get_command_usage_message(ctx.command)}")
|
|
else:
|
|
return await ctx.send(f":x: I could not correctly understand one of the arguments. {self.get_command_usage_message(ctx.command)}")
|
|
elif isinstance(error, commands.ArgumentParsingError):
|
|
return await ctx.send(f":x: Error when parsing the command: {error}")
|
|
elif isinstance(error, commands.CheckFailure):
|
|
if isinstance(error, discord_utils.CheckFailDoNotNotify): # A check failed, but the check function asked that we do not notify the user
|
|
return
|
|
else:
|
|
return await ctx.send(f":x: this command can not be run: {error}")
|
|
else:
|
|
logger.error(f"Error when running command '{ctx.message.content}' by {ctx.author.name}#{ctx.author.discriminator} in {ctx.guild.name}", exc_info = error)
|
|
if isinstance(error, commands.CommandInvokeError):
|
|
await ctx.send(f":x: There was an error during the command execution :dizzy_face:")
|
|
else:
|
|
await ctx.send(f":x: There was an error with the command :dizzy_face:")
|
|
|
|
def cog_check(self, ctx: commands.Context):
|
|
# We silently ignore messages from bots, since we do not want bots to command us
|
|
if ctx.author.bot or ctx.message.is_system():
|
|
logger.info(f"[{ctx.guild.name}({ctx.guild.id})] Ignored command message from bot or system {ctx.author.name}{ctx.author.discriminator}({ctx.author.id}): {ctx.message.content}")
|
|
raise discord_utils.CheckFailDoNotNotify
|
|
return True
|
|
|
|
@commands.command(help = "See if I am alive")
|
|
async def ping(self, ctx: commands.Context):
|
|
await ctx.send(":ping_pong:")
|
|
|
|
@commands.command("register", help = "Make me watch a voice channel")
|
|
async def register_channel(self, ctx: commands.Context, channel_id: int):
|
|
await self.check_is_administrator_or_owner(ctx)
|
|
|
|
# Retrieving the channel
|
|
channel = self.bot.get_channel(channel_id)
|
|
if channel is None or channel.guild != ctx.guild:
|
|
return await ctx.send(f"I could not find a channel with id {channel_id} :cry:")
|
|
|
|
# Adding the channel to the list of watched channels (if needed)
|
|
has_been_added = self.channels_config.add_channel_to_watched(ctx.guild, channel)
|
|
if has_been_added:
|
|
await ctx.send(f":white_check_mark: I am now watching {channel.name}")
|
|
else:
|
|
await ctx.send(":thumbsup: I was already watching this channel")
|
|
|
|
@commands.command("forget", help = "Make me stop watching a voice channel")
|
|
async def forget_channel(self, ctx: commands.Context, channel_id: int):
|
|
await self.check_is_administrator_or_owner(ctx)
|
|
|
|
channel = self.bot.get_channel(channel_id)
|
|
if channel is None: # The channel does not exist (maybe it has been deleted)
|
|
channel = discord.Object(channel_id)
|
|
channel_name = str(channel_id)
|
|
else:
|
|
channel_name = f"{channel.name} ({channel_id})"
|
|
|
|
has_been_removed = self.channels_config.remove_channel_from_watched(ctx.guild, channel)
|
|
if has_been_removed:
|
|
await ctx.send(f":white_check_mark: I am no longer watching {channel_name}")
|
|
else:
|
|
await ctx.send(f":thumbsup: I already was not watching {channel_name}")
|
|
|
|
@commands.command("list", help = "List watched channels")
|
|
async def list_watched_channels(self, ctx: commands.Context):
|
|
watched_channels_ids = self.channels_config.get_watched_channels_ids(ctx.guild)
|
|
if len(watched_channels_ids) > 0:
|
|
answer_lines = []
|
|
for channel_id in watched_channels_ids:
|
|
channel = self.bot.get_channel(channel_id)
|
|
if channel is not None: # Does the channel still exist?
|
|
answer_lines.append(f":eyes: {channel.name} ({channel_id})")
|
|
else:
|
|
answer_lines.append(f":ninja: <deleted channel> ({channel_id})")
|
|
await ctx.send(embed = discord.Embed(title = "I am watching the following channels", description = "\n".join(answer_lines)))
|
|
else:
|
|
await ctx.send(f":see_no_evil: I am not watching any voice channel! Do not hesitate to add some by running `@{self.bot.user.display_name} {self.register_channel.name} {self.register_channel.signature}`")
|
|
|
|
@commands.command("clear", help = "Make me stop watching all channels")
|
|
async def clear_watched_channels(self, ctx: commands.Context):
|
|
await self.check_is_administrator_or_owner(ctx)
|
|
|
|
self.channels_config.clear_watched_channels(ctx.guild)
|
|
await ctx.send(":white_check_mark: I am no longer watching any channel")
|
|
|
|
@commands.command(
|
|
"set_name",
|
|
brief = "Set the default name for your voice channels",
|
|
help = "Set the default name for your voice channels. If you put {user} in the name, it will be replaced by your nickname",
|
|
usage = "<name>"
|
|
)
|
|
async def set_user_default_channel_name(self, ctx: commands.Context, *name_elements: str):
|
|
# Verify that the given name does not mention anyone by checking the message mentions
|
|
if len(set(ctx.message.mentions) - {self.bot.user}) > 0:
|
|
await ctx.send(":no_entry: User mentions are not allowed in channel names. Use {user} if you want to include your name.")
|
|
return
|
|
|
|
name = " ".join(name_elements)
|
|
if len(name) == 0:
|
|
await ctx.send(f":x: Missing channel name! {self.get_command_usage_message(ctx.command)}")
|
|
raise discord_utils.CheckFailDoNotNotify
|
|
|
|
self.channels_config.set_user_channel_name(ctx.guild, ctx.author, discord_utils.voice_channel_safe_name(name))
|
|
self.channels_config.save_to_file()
|
|
# We escape the markdown from the channel name when sending the message because for now discord does not support markdown in channel names,
|
|
# so we do not want the users to get their hopes up by seeing markdown working in the message
|
|
await ctx.send(f":white_check_mark: The default name for your channels has been set! If you create one now, it will appear as {discord_utils.voice_channel_safe_name(name, user_name_replacement = ctx.author.display_name, escape_markdown = True)}")
|
|
|
|
@commands.command("get_name", help = "Get the default name for your voice channels")
|
|
async def get_user_default_channel_name(self, ctx: commands.Context):
|
|
channel_name = self.channels_config.get_user_channel_name(ctx.guild, ctx.author)
|
|
if channel_name is None:
|
|
await ctx.send(":person_shrugging: You have not set a special name for your voice channels")
|
|
else:
|
|
channel_name = discord_utils.voice_channel_safe_name(channel_name, user_name_replacement = ctx.author.display_name, escape_markdown = True)
|
|
await ctx.send(f":memo: If you create a voice channel right now, it will be named like this: {channel_name}")
|
|
|
|
@commands.command("clear_name", help = "Do not use a special name for your voice channels")
|
|
async def clear_user_default_channel_name(self, ctx: commands.Context):
|
|
self.channels_config.clear_user_channel_name(ctx.guild, ctx.author)
|
|
self.channels_config.save_to_file()
|
|
await ctx.send(":white_check_mark: Your voice channels will use the default naming convention")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
|
|
if before.channel is not None: # They left a channel
|
|
await self.handle_user_left_voice(before.channel)
|
|
if after.channel is not None: # They joined a channel. Note that there is an if and not an elif, because they could leave a channel to join another
|
|
await self.handle_user_connected_to_voice(member, after.channel)
|
|
|
|
async def handle_user_connected_to_voice(self, user: discord.Member, channel: discord.VoiceChannel):
|
|
if user.bot:
|
|
return # We ignore bots connecting to channels, we do not want them asking us to create voice channels
|
|
if not self.channels_config.is_watched(channel.guild, channel):
|
|
return # It's not one of our special watched join-to-create channels
|
|
|
|
# The user is not a bot and connected to one of our special watched channels
|
|
# We create a channel for them and move them into it
|
|
category = channel.category
|
|
# Creating the permissions of the channel
|
|
channel_permissions = category.overwrites # We base them on the category's permissions
|
|
if user not in channel_permissions:
|
|
channel_permissions[user] = discord.PermissionOverwrite()
|
|
channel_permissions[user].manage_channels = True # We allow the user for which we created the channel to change the channel's name
|
|
|
|
# Computing the channel name
|
|
user_default_channel_name = self.channels_config.get_user_channel_name(channel.guild, user)
|
|
if user_default_channel_name is None:
|
|
user_default_channel_name = "{user}'s channel"
|
|
channel_name = discord_utils.voice_channel_safe_name(user_default_channel_name, user_name_replacement = user.display_name)
|
|
|
|
# Creating the channel and moving the user into it
|
|
user_channel = await category.create_voice_channel(channel_name, overwrites = channel_permissions)
|
|
await user.move_to(user_channel)
|
|
logger.info(f"[{channel.guild.name}({channel.guild.id})] Created channel {user_channel.name}({user_channel.id}) for {user.name}#{user.discriminator}({user.id})")
|
|
|
|
# Saving the channel in the configuration
|
|
self.channels_config.add_channel_to_created(user_channel.guild, user_channel)
|
|
|
|
async def handle_user_left_voice(self, channel: discord.VoiceChannel):
|
|
# We do not verify if the user is a bot, because a bot could be the last one to leave a channel (e.g. music bot)
|
|
|
|
if not self.channels_config.is_created(channel.guild, channel):
|
|
return # It is not a channel that we manage, nothing to do
|
|
|
|
logger.debug(f"[{channel.guild.name}({channel.guild.id})] User left temporary channel {channel.name} ({channel.id})")
|
|
# It is a channel that we manage
|
|
if len(channel.members) > 0:
|
|
return # There are still people inside it, nothing to do
|
|
try:
|
|
await channel.delete()
|
|
except discord.NotFound:
|
|
pass # The channel already does not exist, that's not a problem, that's what we want
|
|
logger.info(f"[{channel.guild.name}({channel.guild.id})] Deleted channel {channel.name}({channel.id}) because it was empty")
|
|
# updating the configuration
|
|
self.channels_config.remove_channel_from_created(channel.guild, channel)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
argparser = argparse.ArgumentParser(description = "Discord bot to automatically create temporary voice channels for users when they connect to a special channel", formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
|
argparser.add_argument("-t", "--token-file", default = ".token", help = "File where the discord bot token is stored")
|
|
argparser.add_argument("-w", "--watched-channels-file", default = "channels.json", help = "Used to store the list of channels that the bot watches")
|
|
argparser.add_argument("-v", "--verbose", action = "store_true", help = "Print debug messages")
|
|
|
|
ARGS = argparser.parse_args()
|
|
|
|
logging.basicConfig(
|
|
level = logging.DEBUG if ARGS.verbose else logging.INFO,
|
|
format = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
|
|
)
|
|
|
|
logger.info(f"Using discord.py version {discord.__version__}")
|
|
|
|
token_file_path = Path(ARGS.token_file).absolute()
|
|
if not token_file_path.is_file():
|
|
logger.error(f"Token file {token_file_path} does not exist")
|
|
sys.exit(1)
|
|
with token_file_path.open("r") as token_file:
|
|
token = token_file.readline().strip()
|
|
|
|
channels_config_file = ChannelsConfigFile(ARGS.watched_channels_file)
|
|
bot = VocalMaisBot(channels_config_file)
|
|
bot.run(token)
|