From 093e3bd32e03b594212bd0f7f81403640cb3b620 Mon Sep 17 00:00:00 2001 From: Elnath Date: Sat, 26 Dec 2020 17:42:35 +0100 Subject: [PATCH] Bot now responds to commands and channels can be added to list of watched channels --- .gitignore | 1 + VocalMaisBot.py | 112 +++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 108 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index f1c5b59..7ff52d1 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ __pycache__/ .token +channels.json diff --git a/VocalMaisBot.py b/VocalMaisBot.py index 5f439dc..a9af3eb 100755 --- a/VocalMaisBot.py +++ b/VocalMaisBot.py @@ -3,6 +3,9 @@ import argparse import logging import sys from pathlib import Path +import json +from typing import TextIO, Any, List, Dict +import textwrap import discord import discord.utils @@ -12,9 +15,16 @@ logger = logging.getLogger("VocalMaisBot") class VocalMaisBot(discord.Client): - def __init__(self, token: str): - super().__init__() - self.run(token) + def __init__(self, watched_channels_file: TextIO): + super().__init__( + intents = discord.Intents(voice_states = True, guild_messages = True, guilds = True) + ) + self.watched_channels_file = watched_channels_file + try: + self.watched_channels = json.load(watched_channels_file) + except json.JSONDecodeError as e: + logger.critical(f"Impossible to parse watched channels file, JSON error: {e}") + sys.exit(1) async def on_ready(self): logger.info("Connected and ready!") @@ -26,11 +36,92 @@ class VocalMaisBot(discord.Client): ) logger.info(f"You can invite the bot to your server using the following link: {oauth_url}") + async def on_message(self, message: discord.Message): + if self.user not in message.mentions: # We only ever react to messages that mention us + return + if message.author.bot or message.is_system(): # We ignore messages from automated sources + return + + contents = message.content.split() + if _check_list_element(contents, 1, "help"): + return await self.print_help(message.channel) + elif _check_list_element(contents, 1, "ping"): + return await message.channel.send(":ping_pong:") + elif _check_list_element(contents, 1, "register"): + return await self.register_channel(message) + else: + return self.sorry_do_not_understand(message) + + async def print_help(self, channel: discord.TextChannel): + me = self.user.display_name + message = f""" + **·** `@{me}register channel_id` : make me watch the voice channel with id `channel_id` + **·** `@{me} help` : print this help + **·** `@{me} ping` : pong + """ + await channel.send(embed = discord.Embed(title = "Available commands", description = textwrap.dedent(message))) + + async def register_channel(self, message: discord.Message): + message_elements = message.content.split() + if len(message_elements) < 3: + return await self.sorry_do_not_understand(message) + channel_id = message_elements[2] + try: + channel_id = int(channel_id) + except ValueError: + return await message.channel.send(f":x: {channel_id} can not be converted to number") + + # Retrieving the channel + channel = self.get_channel(channel_id) + if channel is None: + return await message.channel.send(f"I could not find a channel with id {channel_id} :cry:") + + # Adding the channel to the list of watched channels + guild_id = str(message.guild.id) + if guild_id not in self.watched_channels: + self.watched_channels[guild_id] = { + "_name": message.guild.name, + "watched_channels": [], + } + watched_channel_for_this_guild = self.watched_channels[guild_id]["watched_channels"] + + if channel_id in watched_channel_for_this_guild: + return await message.channel.send(":thumbsup: I was already watching this channel") + else: + watched_channel_for_this_guild.append(channel_id) + self.loop.call_soon(self.write_watched_channels_file) + return await message.channel.send(f":white_check_mark: I am now watching {channel.name}") + + async def sorry_do_not_understand(self, message: discord.Message): + await message.channel.send(f"Sorry I did not understand this message. Try `@{self.user.display_name} help` for help") + + def write_watched_channels_file(self): + logger.debug("Writing watched channels information to file") + self.watched_channels_file.seek(0) + self.watched_channels_file.truncate() + json.dump(self.watched_channels, self.watched_channels_file, indent = 2) + self.watched_channels_file.flush() + logger.debug("Written watched channels information to file") + + +def _check_list_element(l: List, index: int, expected_value: Any) -> bool: + try: + return l[index] == expected_value + except IndexError: + return False + + +def _check_dict_element(d: Dict, key: Any, expected_value: Any) -> bool: + try: + return d[key] == expected_value + except KeyError: + return False + if __name__ == '__main__': argparser = argparse.ArgumentParser(description = "Discord bot to automatically create temporary voice channels for users when they connect to a special channel", formatter_class = argparse.ArgumentDefaultsHelpFormatter) argparser.add_argument("-t", "--token-file", default = ".token", help = "File where the discord bot token is stored") - # argparser.add_argument("-c", "--config-file", default = "config.json", help = "Bot config file") + argparser.add_argument("-w", "--watched-channels-file", default = "channels.json", help = "Used to store the list of channels that the bot watches") argparser.add_argument("-v", "--verbose", action = "store_true", help = "Print debug messages") ARGS = argparser.parse_args() @@ -40,6 +131,8 @@ if __name__ == '__main__': format = "%(asctime)s %(name)s [%(levelname)s] %(message)s'" ) + logger.info(f"Using discord.py version {discord.__version__}") + token_file_path = Path(ARGS.token_file).absolute() if not token_file_path.is_file(): logger.error(f"Token file {token_file_path} does not exist") @@ -47,4 +140,13 @@ if __name__ == '__main__': with token_file_path.open("r") as token_file: token = token_file.readline().strip() - VocalMaisBot(token) + watched_channels_file_path = Path(ARGS.watched_channels_file).absolute() + if not watched_channels_file_path.exists(): + logger.info(f"Watched channels file {watched_channels_file_path} does not exist, creating it...") + with watched_channels_file_path.open("w") as watched_channels_file: + json.dump({}, watched_channels_file) + + with watched_channels_file_path.open("r+") as watched_channels_file: + bot = VocalMaisBot(watched_channels_file) + bot.run(token) + watched_channels_file.flush()