diff --git a/state.py b/state.py index adeef32..52dd17c 100644 --- a/state.py +++ b/state.py @@ -1,24 +1,8 @@ import time -from collections import OrderedDict import disnake - -class LimitedSizeDict(OrderedDict): - def __init__(self, *args, **kwargs): - self.size_limit = kwargs.pop("size_limit", 1000) - super().__init__(*args, **kwargs) - self._check_size_limit() - - def __setitem__(self, key, value): - super().__setitem__(key, value) - self._check_size_limit() - - def _check_size_limit(self): - if self.size_limit is not None: - while len(self) > self.size_limit: - self.popitem(last=False) - +from utils import LimitedSizeDict intents = disnake.Intents.default() intents.message_content = True diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..5840d8a --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,27 @@ +from .common import LimitedSizeDict, filter_secrets, format_duration +from .discord import ( + ChannelResponseWrapper, + MessageInteractionWrapper, + add_check_reaction, + channel_send, + cooldown, + invalid_user_handler, + load_opus, + parse_snowflake, + reply, +) + +__all__ = [ + "add_check_reaction", + "channel_send", + "ChannelResponseWrapper", + "cooldown", + "filter_secrets", + "format_duration", + "invalid_user_handler", + "LimitedSizeDict", + "load_opus", + "MessageInteractionWrapper", + "parse_snowflake", + "reply", +] diff --git a/utils/common.py b/utils/common.py new file mode 100644 index 0000000..d652c6b --- /dev/null +++ b/utils/common.py @@ -0,0 +1,60 @@ +from collections import OrderedDict + +from constants import SECRETS + + +def format_duration(duration: int, natural: bool = False, short: bool = False): + def format_plural(noun, count): + if short: + return noun[0] + return " " + (noun if count == 1 else noun + "s") + + segments = [] + + weeks, duration = divmod(duration, 604800) + if weeks > 0: + segments.append(f"{weeks}{format_plural('week', weeks)}") + + days, duration = divmod(duration, 86400) + if days > 0: + segments.append(f"{days}{format_plural('day', days)}") + + hours, duration = divmod(duration, 3600) + if hours > 0: + segments.append(f"{hours}{format_plural('hour', hours)}") + + minutes, duration = divmod(duration, 60) + if minutes > 0: + segments.append(f"{minutes}{format_plural('minute', minutes)}") + + if duration > 0: + segments.append(f"{duration}{format_plural('second', duration)}") + + separator = " " if short else ", " + if not natural or len(segments) <= 1: + return separator.join(segments) + return separator.join(segments[:-1]) + f" and {segments[-1]}" + + +def filter_secrets(text: str, secrets=SECRETS) -> str: + for secret_name, secret in secrets.items(): + if not secret: + continue + text = text.replace(secret, f"<{secret_name}>") + return text + + +class LimitedSizeDict(OrderedDict): + def __init__(self, *args, **kwargs): + self.size_limit = kwargs.pop("size_limit", 1000) + super().__init__(*args, **kwargs) + self._check_size_limit() + + def __setitem__(self, key, value): + super().__setitem__(key, value) + self._check_size_limit() + + def _check_size_limit(self): + if self.size_limit is not None: + while len(self) > self.size_limit: + self.popitem(last=False) diff --git a/utils.py b/utils/discord.py similarity index 70% rename from utils.py rename to utils/discord.py index dc98779..df5a2c9 100644 --- a/utils.py +++ b/utils/discord.py @@ -5,10 +5,73 @@ from logging import error, info import disnake import commands -import constants from state import command_cooldowns, message_responses +def cooldown(message, cooldown_time: int): + possible_commands = commands.match(message.content) + if not possible_commands or len(possible_commands) > 1: + return + command = possible_commands[0] + + end_time = time.time() + cooldown_time + if message.author.id in command_cooldowns: + command_cooldowns[message.author.id][command] = end_time + else: + command_cooldowns[message.author.id] = {command: end_time} + + +async def reply(message, *args, **kwargs): + if message.id in message_responses: + if len(args) == 0: + kwargs["content"] = None + elif len(kwargs) == 0: + kwargs["embeds"] = [] + await message_responses[message.id].edit( + *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none() + ) + else: + response = await message.reply( + *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none() + ) + message_responses[message.id] = response + return message_responses[message.id] + + +async def channel_send(message, *args, **kwargs): + await message.channel.send( + *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none() + ) + + +def load_opus(): + for path in filter( + lambda p: os.path.exists(p), + ["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"], + ): + try: + disnake.opus.load_opus(path) + info(f"successfully loaded opus from {path}") + return + except Exception as e: + error(f"failed to load opus from {path}: {e}") + raise Exception("could not locate working opus library") + + +def parse_snowflake(id): + return round(((id >> 22) + 1420070400000) / 1000) + + +async def add_check_reaction(message): + await message.add_reaction("✅") + + +async def invalid_user_handler(interaction): + await interaction.response.send_message( + "you are not the intended receiver of this message!", ephemeral=True + ) + + class ChannelResponseWrapper: def __init__(self, message): self.message = message @@ -35,108 +98,3 @@ class MessageInteractionWrapper: async def edit_original_message(self, content=None, embed=None, view=None): await self.response.edit_message(content=content, embed=embed, view=view) - - -def cooldown(message, cooldown_time: int): - possible_commands = commands.match(message.content) - if not possible_commands or len(possible_commands) > 1: - return - command = possible_commands[0] - - end_time = time.time() + cooldown_time - if message.author.id in command_cooldowns: - command_cooldowns[message.author.id][command] = end_time - else: - command_cooldowns[message.author.id] = {command: end_time} - - -def format_duration(duration: int, natural: bool = False, short: bool = False): - def format_plural(noun, count): - if short: - return noun[0] - return " " + (noun if count == 1 else noun + "s") - - segments = [] - - weeks, duration = divmod(duration, 604800) - if weeks > 0: - segments.append(f"{weeks}{format_plural('week', weeks)}") - - days, duration = divmod(duration, 86400) - if days > 0: - segments.append(f"{days}{format_plural('day', days)}") - - hours, duration = divmod(duration, 3600) - if hours > 0: - segments.append(f"{hours}{format_plural('hour', hours)}") - - minutes, duration = divmod(duration, 60) - if minutes > 0: - segments.append(f"{minutes}{format_plural('minute', minutes)}") - - if duration > 0: - segments.append(f"{duration}{format_plural('second', duration)}") - - separator = " " if short else ", " - if not natural or len(segments) <= 1: - return separator.join(segments) - return separator.join(segments[:-1]) + f" and {segments[-1]}" - - -def parse_snowflake(id): - return round(((id >> 22) + 1420070400000) / 1000) - - -async def add_check_reaction(message): - await message.add_reaction("✅") - - -async def reply(message, *args, **kwargs): - if message.id in message_responses: - if len(args) == 0: - kwargs["content"] = None - elif len(kwargs) == 0: - kwargs["embeds"] = [] - await message_responses[message.id].edit( - *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none() - ) - else: - response = await message.reply( - *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none() - ) - message_responses[message.id] = response - return message_responses[message.id] - - -async def channel_send(message, *args, **kwargs): - await message.channel.send( - *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none() - ) - - -async def invalid_user_handler(interaction): - await interaction.response.send_message( - "you are not the intended receiver of this message!", ephemeral=True - ) - - -def filter_secrets(text: str, secrets=constants.SECRETS) -> str: - for secret_name, secret in secrets.items(): - if not secret: - continue - text = text.replace(secret, f"<{secret_name}>") - return text - - -def load_opus(): - for path in filter( - lambda p: os.path.exists(p), - ["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"], - ): - try: - disnake.opus.load_opus(path) - info(f"successfully loaded opus from {path}") - return - except Exception as e: - error(f"failed to load opus from {path}: {e}") - raise Exception("could not locate working opus library")