Bot now responds to commands and channels can be added to list of watched channels

This commit is contained in:
Elnath 2020-12-26 17:42:35 +01:00
parent 1f65583175
commit 093e3bd32e
2 changed files with 108 additions and 5 deletions

1
.gitignore vendored
View File

@ -3,3 +3,4 @@
__pycache__/ __pycache__/
.token .token
channels.json

View File

@ -3,6 +3,9 @@ import argparse
import logging import logging
import sys import sys
from pathlib import Path from pathlib import Path
import json
from typing import TextIO, Any, List, Dict
import textwrap
import discord import discord
import discord.utils import discord.utils
@ -12,9 +15,16 @@ logger = logging.getLogger("VocalMaisBot")
class VocalMaisBot(discord.Client): class VocalMaisBot(discord.Client):
def __init__(self, token: str): def __init__(self, watched_channels_file: TextIO):
super().__init__() super().__init__(
self.run(token) intents = discord.Intents(voice_states = True, guild_messages = True, guilds = True)
)
self.watched_channels_file = watched_channels_file
try:
self.watched_channels = json.load(watched_channels_file)
except json.JSONDecodeError as e:
logger.critical(f"Impossible to parse watched channels file, JSON error: {e}")
sys.exit(1)
async def on_ready(self): async def on_ready(self):
logger.info("Connected and ready!") logger.info("Connected and ready!")
@ -26,11 +36,92 @@ class VocalMaisBot(discord.Client):
) )
logger.info(f"You can invite the bot to your server using the following link: {oauth_url}") logger.info(f"You can invite the bot to your server using the following link: {oauth_url}")
async def on_message(self, message: discord.Message):
if self.user not in message.mentions: # We only ever react to messages that mention us
return
if message.author.bot or message.is_system(): # We ignore messages from automated sources
return
contents = message.content.split()
if _check_list_element(contents, 1, "help"):
return await self.print_help(message.channel)
elif _check_list_element(contents, 1, "ping"):
return await message.channel.send(":ping_pong:")
elif _check_list_element(contents, 1, "register"):
return await self.register_channel(message)
else:
return self.sorry_do_not_understand(message)
async def print_help(self, channel: discord.TextChannel):
me = self.user.display_name
message = f"""
**·** `@{me}register channel_id` : make me watch the voice channel with id `channel_id`
**·** `@{me} help` : print this help
**·** `@{me} ping` : pong
"""
await channel.send(embed = discord.Embed(title = "Available commands", description = textwrap.dedent(message)))
async def register_channel(self, message: discord.Message):
message_elements = message.content.split()
if len(message_elements) < 3:
return await self.sorry_do_not_understand(message)
channel_id = message_elements[2]
try:
channel_id = int(channel_id)
except ValueError:
return await message.channel.send(f":x: {channel_id} can not be converted to number")
# Retrieving the channel
channel = self.get_channel(channel_id)
if channel is None:
return await message.channel.send(f"I could not find a channel with id {channel_id} :cry:")
# Adding the channel to the list of watched channels
guild_id = str(message.guild.id)
if guild_id not in self.watched_channels:
self.watched_channels[guild_id] = {
"_name": message.guild.name,
"watched_channels": [],
}
watched_channel_for_this_guild = self.watched_channels[guild_id]["watched_channels"]
if channel_id in watched_channel_for_this_guild:
return await message.channel.send(":thumbsup: I was already watching this channel")
else:
watched_channel_for_this_guild.append(channel_id)
self.loop.call_soon(self.write_watched_channels_file)
return await message.channel.send(f":white_check_mark: I am now watching {channel.name}")
async def sorry_do_not_understand(self, message: discord.Message):
await message.channel.send(f"Sorry I did not understand this message. Try `@{self.user.display_name} help` for help")
def write_watched_channels_file(self):
logger.debug("Writing watched channels information to file")
self.watched_channels_file.seek(0)
self.watched_channels_file.truncate()
json.dump(self.watched_channels, self.watched_channels_file, indent = 2)
self.watched_channels_file.flush()
logger.debug("Written watched channels information to file")
def _check_list_element(l: List, index: int, expected_value: Any) -> bool:
try:
return l[index] == expected_value
except IndexError:
return False
def _check_dict_element(d: Dict, key: Any, expected_value: Any) -> bool:
try:
return d[key] == expected_value
except KeyError:
return False
if __name__ == '__main__': if __name__ == '__main__':
argparser = argparse.ArgumentParser(description = "Discord bot to automatically create temporary voice channels for users when they connect to a special channel", formatter_class = argparse.ArgumentDefaultsHelpFormatter) argparser = argparse.ArgumentParser(description = "Discord bot to automatically create temporary voice channels for users when they connect to a special channel", formatter_class = argparse.ArgumentDefaultsHelpFormatter)
argparser.add_argument("-t", "--token-file", default = ".token", help = "File where the discord bot token is stored") argparser.add_argument("-t", "--token-file", default = ".token", help = "File where the discord bot token is stored")
# argparser.add_argument("-c", "--config-file", default = "config.json", help = "Bot config file") argparser.add_argument("-w", "--watched-channels-file", default = "channels.json", help = "Used to store the list of channels that the bot watches")
argparser.add_argument("-v", "--verbose", action = "store_true", help = "Print debug messages") argparser.add_argument("-v", "--verbose", action = "store_true", help = "Print debug messages")
ARGS = argparser.parse_args() ARGS = argparser.parse_args()
@ -40,6 +131,8 @@ if __name__ == '__main__':
format = "%(asctime)s %(name)s [%(levelname)s] %(message)s'" format = "%(asctime)s %(name)s [%(levelname)s] %(message)s'"
) )
logger.info(f"Using discord.py version {discord.__version__}")
token_file_path = Path(ARGS.token_file).absolute() token_file_path = Path(ARGS.token_file).absolute()
if not token_file_path.is_file(): if not token_file_path.is_file():
logger.error(f"Token file {token_file_path} does not exist") logger.error(f"Token file {token_file_path} does not exist")
@ -47,4 +140,13 @@ if __name__ == '__main__':
with token_file_path.open("r") as token_file: with token_file_path.open("r") as token_file:
token = token_file.readline().strip() token = token_file.readline().strip()
VocalMaisBot(token) watched_channels_file_path = Path(ARGS.watched_channels_file).absolute()
if not watched_channels_file_path.exists():
logger.info(f"Watched channels file {watched_channels_file_path} does not exist, creating it...")
with watched_channels_file_path.open("w") as watched_channels_file:
json.dump({}, watched_channels_file)
with watched_channels_file_path.open("r+") as watched_channels_file:
bot = VocalMaisBot(watched_channels_file)
bot.run(token)
watched_channels_file.flush()