Using discord.py commands framework by subclassing Cog
This commit is contained in:
parent
2c86a4a14d
commit
2fe4142cd6
196
VocalMaisBot.py
196
VocalMaisBot.py
|
|
@ -2,159 +2,155 @@
|
|||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Dict, Union
|
||||
|
||||
import discord
|
||||
import discord.utils
|
||||
from discord.ext import commands
|
||||
|
||||
from ChannelsConfigFile import ChannelsConfigFile
|
||||
import utils
|
||||
from utils import discord_utils
|
||||
|
||||
logger = logging.getLogger("VocalMaisBot")
|
||||
|
||||
|
||||
class VocalMaisBot(discord.Client):
|
||||
class VocalMaisBot(commands.Cog):
|
||||
|
||||
def __init__(self, channels_config: ChannelsConfigFile):
|
||||
super().__init__(
|
||||
intents = discord.Intents(voice_states = True, guild_messages = True, guilds = True)
|
||||
super().__init__()
|
||||
self.bot = commands.Bot(
|
||||
commands.when_mentioned,
|
||||
intents = discord.Intents(voice_states = True, guild_messages = True, guilds = True),
|
||||
help_command = commands.MinimalHelpCommand(),
|
||||
)
|
||||
self.bot.add_cog(self)
|
||||
self.channels_config = channels_config
|
||||
self.owner = None # Fetched by on_ready
|
||||
|
||||
async def _check_administrator_for_command(self, message: discord.Message) -> bool:
|
||||
if message.author.guild_permissions.administrator or message.author == self.owner:
|
||||
return True
|
||||
else:
|
||||
await message.channel.send(f":dragon_face: Only an administrator can control me!")
|
||||
return False
|
||||
|
||||
async def sorry_do_not_understand(self, message: discord.Message):
|
||||
await message.channel.send(f"Sorry I did not understand this message. Try `@{self.user.display_name} help` for help")
|
||||
def run(self, token):
|
||||
self.bot.run(token)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_ready(self):
|
||||
logger.info("Connected and ready!")
|
||||
logger.debug(f"Logged as {self.user}")
|
||||
logger.debug(f"Logged as {self.bot.user}")
|
||||
|
||||
self.owner = (await self.application_info()).owner
|
||||
self.owner = (await self.bot.application_info()).owner
|
||||
|
||||
oauth_url = discord.utils.oauth_url(
|
||||
self.user.id,
|
||||
self.bot.user.id,
|
||||
discord.Permissions(manage_channels = True, read_messages = True, send_messages = True, move_members = True)
|
||||
)
|
||||
logger.info(f"You can invite the bot to your server using the following link: {oauth_url}")
|
||||
|
||||
async def on_message(self, message: discord.Message):
|
||||
if self.user not in message.mentions: # We only ever react to messages that mention us
|
||||
return
|
||||
if message.author.bot or message.is_system(): # We ignore messages from automated sources
|
||||
return
|
||||
|
||||
contents = message.content.split()
|
||||
if utils.check_list_element_no_bounds(contents, 1, "help"):
|
||||
return await self.print_help(message.channel)
|
||||
elif utils.check_list_element_no_bounds(contents, 1, "ping"):
|
||||
return await message.channel.send(":ping_pong:")
|
||||
elif utils.check_list_element_no_bounds(contents, 1, "register"):
|
||||
return await self.register_channel(message)
|
||||
elif utils.check_list_element_no_bounds(contents, 1, "forget"):
|
||||
return await self.forget_channel(message)
|
||||
elif utils.check_list_element_no_bounds(contents, 1, "list"):
|
||||
return await self.list_watched_channels(message)
|
||||
elif utils.check_list_element_no_bounds(contents, 1, "clear"):
|
||||
return await self.clear_watched_channels(message)
|
||||
else:
|
||||
return await self.sorry_do_not_understand(message)
|
||||
|
||||
async def print_help(self, channel: discord.TextChannel):
|
||||
me = self.user.display_name
|
||||
message = f"""
|
||||
**·** `@{me} register channel_id` : make me watch the voice channel with id `channel_id`
|
||||
**·** `@{me} forget channel_id` : make me stop watching the voice channel with id `channel_id`
|
||||
**·** `@{me} list` : list watched channels
|
||||
**·** `@{me} clear` : stop watching all channels
|
||||
**·** `@{me} help` : print this help
|
||||
**·** `@{me} ping` : pong
|
||||
async def check_is_administrator_or_owner(self, ctx: commands.Context):
|
||||
"""
|
||||
await channel.send(embed = discord.Embed(title = "Available commands", description = textwrap.dedent(message)))
|
||||
Verify if the user has administrator rights or if it is the bot's owner
|
||||
:raise discord_utils.CheckFailDoNotNotify: If the user has no the correct rights. Also notifies the user about it.
|
||||
"""
|
||||
if ctx.author.guild_permissions.administrator or ctx.author == self.owner:
|
||||
return True
|
||||
else:
|
||||
await ctx.send(f":dragon_face: Only an administrator can control me!")
|
||||
raise discord_utils.CheckFailDoNotNotify
|
||||
|
||||
async def register_channel(self, message: discord.Message):
|
||||
channel_id = await self._get_channel_id_for_command(message)
|
||||
if channel_id is None:
|
||||
return
|
||||
if not await self._check_administrator_for_command(message):
|
||||
return
|
||||
def get_command_usage_message(self, command: commands.Command) -> str:
|
||||
return f"Usage: `@{self.bot.user.display_name} {command.name} {command.signature}`"
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_command_error(self, ctx: commands.Context, error: commands.CommandError):
|
||||
if isinstance(error, commands.CommandNotFound):
|
||||
return await ctx.send(f":x: The requested command does not exist")
|
||||
elif isinstance(error, commands.MissingRequiredArgument):
|
||||
return await ctx.send(f":x: Missing required argument {error.param}. {self.get_command_usage_message(ctx.command)}")
|
||||
elif isinstance(error, commands.TooManyArguments):
|
||||
return await ctx.send(f":x: too many arguments for command {ctx.command}. {self.get_command_usage_message(ctx.command)}")
|
||||
elif isinstance(error, commands.BadArgument):
|
||||
if hasattr(error, "argument"):
|
||||
return await ctx.send(f":x: I could not correctly understand argument {error.argument}. {self.get_command_usage_message(ctx.command)}")
|
||||
else:
|
||||
return await ctx.send(f":x: I could not correctly understand one of the arguments. {self.get_command_usage_message(ctx.command)}")
|
||||
elif isinstance(error, commands.ArgumentParsingError):
|
||||
return await ctx.send(f":x: Error when parsing the command: {error}")
|
||||
elif isinstance(error, commands.CheckFailure):
|
||||
if isinstance(error, discord_utils.CheckFailDoNotNotify): # A check failed, but the check function asked that we do not notify the user
|
||||
return
|
||||
else:
|
||||
return await ctx.send(f":x: this command can not be run: {error}")
|
||||
else:
|
||||
logger.error(f"Error when running command '{ctx.message.content}' by {ctx.author.name}#{ctx.author.discriminator} in {ctx.guild.name}", exc_info = error)
|
||||
if isinstance(error, commands.CommandInvokeError):
|
||||
await ctx.send(f":x: There was an error during the command execution :dizzy_face:")
|
||||
else:
|
||||
await ctx.send(f":x: There was an error with the command :dizzy_face:")
|
||||
|
||||
def cog_check(self, ctx: commands.Context):
|
||||
# We silently ignore messages from bots, since we do not want bots to command us
|
||||
if ctx.author.bot or ctx.message.is_system():
|
||||
logger.info(f"[{ctx.guild.name}({ctx.guild.id})] Ignored command message from bot or system {ctx.author.name}{ctx.author.discriminator}({ctx.author.id}): {ctx.message.content}")
|
||||
raise discord_utils.CheckFailDoNotNotify
|
||||
return True
|
||||
|
||||
@commands.command(help = "See if I am alive")
|
||||
async def ping(self, ctx: commands.Context):
|
||||
await ctx.send(":ping_pong:")
|
||||
|
||||
@commands.command("register", help = "Make me watch a voice channel")
|
||||
async def register_channel(self, ctx: commands.Context, channel_id: int):
|
||||
await self.check_is_administrator_or_owner(ctx)
|
||||
|
||||
# Retrieving the channel
|
||||
channel = self.get_channel(channel_id)
|
||||
if channel is None or channel.guild != message.guild:
|
||||
return await message.channel.send(f"I could not find a channel with id {channel_id} :cry:")
|
||||
channel = self.bot.get_channel(channel_id)
|
||||
if channel is None or channel.guild != ctx.guild:
|
||||
return await ctx.send(f"I could not find a channel with id {channel_id} :cry:")
|
||||
|
||||
# Adding the channel to the list of watched channels (if needed)
|
||||
has_been_added = self.channels_config.add_channel_to_watched(message.guild, channel)
|
||||
has_been_added = self.channels_config.add_channel_to_watched(ctx.guild, channel)
|
||||
if has_been_added:
|
||||
await message.channel.send(f":white_check_mark: I am now watching {channel.name}")
|
||||
await ctx.send(f":white_check_mark: I am now watching {channel.name}")
|
||||
else:
|
||||
await message.channel.send(":thumbsup: I was already watching this channel")
|
||||
await ctx.send(":thumbsup: I was already watching this channel")
|
||||
|
||||
async def forget_channel(self, message: discord.Message):
|
||||
channel_id = await self._get_channel_id_for_command(message)
|
||||
if channel_id is None:
|
||||
return
|
||||
if not await self._check_administrator_for_command(message):
|
||||
return
|
||||
@commands.command("forget", help = "Make me stop watching a voice channel")
|
||||
async def forget_channel(self, ctx: commands.Context, channel_id: int):
|
||||
await self.check_is_administrator_or_owner(ctx)
|
||||
|
||||
channel = self.get_channel(channel_id)
|
||||
channel = self.bot.get_channel(channel_id)
|
||||
if channel is None: # The channel does not exist (maybe it has been deleted)
|
||||
channel = discord.Object(channel_id)
|
||||
channel_name = str(channel_id)
|
||||
else:
|
||||
channel_name = f"{channel.name} ({channel_id})"
|
||||
|
||||
has_been_removed = self.channels_config.remove_channel_from_watched(message.guild, channel)
|
||||
has_been_removed = self.channels_config.remove_channel_from_watched(ctx.guild, channel)
|
||||
if has_been_removed:
|
||||
await message.channel.send(f":white_check_mark: I am no longer watching {channel_name}")
|
||||
await ctx.send(f":white_check_mark: I am no longer watching {channel_name}")
|
||||
else:
|
||||
await message.channel.send(f":thumbsup: I already was not watching {channel_name}")
|
||||
await ctx.send(f":thumbsup: I already was not watching {channel_name}")
|
||||
|
||||
async def _get_channel_id_for_command(self, message: discord.Message) -> Union[int, None]:
|
||||
"""
|
||||
Get the channel id from a command message in the form "@Bot command channel_id", or answer with an error message and return None if not possible.
|
||||
"""
|
||||
message_elements = message.content.split()
|
||||
if len(message_elements) < 3:
|
||||
await self.sorry_do_not_understand(message)
|
||||
return None
|
||||
channel_id = message_elements[2]
|
||||
try:
|
||||
return int(channel_id)
|
||||
except ValueError:
|
||||
await message.channel.send(f":x: {channel_id} can not be converted to number")
|
||||
return None
|
||||
|
||||
async def list_watched_channels(self, message: discord.Message):
|
||||
watched_channels_ids = self.channels_config.get_watched_channels_ids(message.guild)
|
||||
@commands.command("list", help = "List watched channels")
|
||||
async def list_watched_channels(self, ctx: commands.Context):
|
||||
watched_channels_ids = self.channels_config.get_watched_channels_ids(ctx.guild)
|
||||
if len(watched_channels_ids) > 0:
|
||||
answer_lines = []
|
||||
for channel_id in watched_channels_ids:
|
||||
channel = self.get_channel(channel_id)
|
||||
channel = self.bot.get_channel(channel_id)
|
||||
if channel is not None: # Does the channel still exist?
|
||||
answer_lines.append(f":eyes: {channel.name} ({channel_id})")
|
||||
else:
|
||||
answer_lines.append(f":ninja: <deleted channel> ({channel_id})")
|
||||
await message.channel.send(embed = discord.Embed(title = "I am watching the following channels", description = "\n".join(answer_lines)))
|
||||
await ctx.send(embed = discord.Embed(title = "I am watching the following channels", description = "\n".join(answer_lines)))
|
||||
else:
|
||||
await message.channel.send(f":see_no_evil: I am not watching any voice channel! Do not hesitate to add some by running `@{self.user.display_name} register channel_id`")
|
||||
await ctx.send(f":see_no_evil: I am not watching any voice channel! Do not hesitate to add some by running `@{self.bot.user.display_name} {self.register_channel.name} {self.register_channel.signature}`")
|
||||
|
||||
async def clear_watched_channels(self, message: discord.Message):
|
||||
if not await self._check_administrator_for_command(message):
|
||||
return
|
||||
@commands.command("clear", help = "Make me stop watching all channels")
|
||||
async def clear_watched_channels(self, ctx: commands.Context):
|
||||
await self.check_is_administrator_or_owner(ctx)
|
||||
|
||||
self.channels_config.clear_watched_channels(message.guild)
|
||||
await message.channel.send(":white_check_mark: I am no longer watching any channel")
|
||||
self.channels_config.clear_watched_channels(ctx.guild)
|
||||
await ctx.send(":white_check_mark: I am no longer watching any channel")
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
|
||||
if before.channel is not None: # They left a channel
|
||||
await self.handle_user_left_voice(before.channel)
|
||||
|
|
@ -173,7 +169,7 @@ class VocalMaisBot(discord.Client):
|
|||
user_name = user.display_name
|
||||
user_channel = await category.create_voice_channel(f"{user_name}'s channel", overwrites = {user: discord.PermissionOverwrite(manage_channels = True)})
|
||||
await user.move_to(user_channel)
|
||||
logger.info(f"Created channel {user_channel.name}({user_channel.id}) for {user.display_name}({user.id})")
|
||||
logger.info(f"[{channel.guild.name}({channel.guild.id})] Created channel {user_channel.name}({user_channel.id}) for {user.display_name}({user.id})")
|
||||
|
||||
# Saving the channel in the configuration
|
||||
self.channels_config.add_channel_to_created(user_channel.guild, user_channel)
|
||||
|
|
@ -184,7 +180,7 @@ class VocalMaisBot(discord.Client):
|
|||
if not self.channels_config.is_created(channel.guild, channel):
|
||||
return # It is not a channel that we manage, nothing to do
|
||||
|
||||
logger.debug(f"User left temporary channel {channel.name} ({channel.id})")
|
||||
logger.debug(f"[{channel.guild.name}({channel.guild.id})] User left temporary channel {channel.name} ({channel.id})")
|
||||
# It is a channel that we manage
|
||||
if len(channel.members) > 0:
|
||||
return # There are still people inside it, nothing to do
|
||||
|
|
@ -192,7 +188,7 @@ class VocalMaisBot(discord.Client):
|
|||
await channel.delete()
|
||||
except discord.NotFound:
|
||||
pass # The channel already does not exist, that's not a problem, that's what we want
|
||||
logger.info(f"Deleted channel {channel.name}({channel.id}) because it was empty")
|
||||
logger.info(f"[{channel.guild.name}({channel.guild.id})] Deleted channel {channel.name}({channel.id}) because it was empty")
|
||||
# updating the configuration
|
||||
self.channels_config.remove_channel_from_created(channel.guild, channel)
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,9 @@
|
|||
import discord.ext.commands as commands
|
||||
|
||||
|
||||
class CheckFailDoNotNotify(commands.CheckFailure):
|
||||
"""
|
||||
A custom exception that we can raise inside command check functions if the check fails, but the global error handling should not notify the user about the error.
|
||||
For example it can be used if the check function already sent the user a customised error message.
|
||||
"""
|
||||
pass
|
||||
Loading…
Reference in New Issue