refactor(utils): split into separate files
This commit is contained in:
		
							
								
								
									
										18
									
								
								state.py
									
									
									
									
									
								
							
							
						
						
									
										18
									
								
								state.py
									
									
									
									
									
								
							@@ -1,24 +1,8 @@
 | 
			
		||||
import time
 | 
			
		||||
from collections import OrderedDict
 | 
			
		||||
 | 
			
		||||
import disnake
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class LimitedSizeDict(OrderedDict):
 | 
			
		||||
    def __init__(self, *args, **kwargs):
 | 
			
		||||
        self.size_limit = kwargs.pop("size_limit", 1000)
 | 
			
		||||
        super().__init__(*args, **kwargs)
 | 
			
		||||
        self._check_size_limit()
 | 
			
		||||
 | 
			
		||||
    def __setitem__(self, key, value):
 | 
			
		||||
        super().__setitem__(key, value)
 | 
			
		||||
        self._check_size_limit()
 | 
			
		||||
 | 
			
		||||
    def _check_size_limit(self):
 | 
			
		||||
        if self.size_limit is not None:
 | 
			
		||||
            while len(self) > self.size_limit:
 | 
			
		||||
                self.popitem(last=False)
 | 
			
		||||
 | 
			
		||||
from utils import LimitedSizeDict
 | 
			
		||||
 | 
			
		||||
intents = disnake.Intents.default()
 | 
			
		||||
intents.message_content = True
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										27
									
								
								utils/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								utils/__init__.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,27 @@
 | 
			
		||||
from .common import LimitedSizeDict, filter_secrets, format_duration
 | 
			
		||||
from .discord import (
 | 
			
		||||
    ChannelResponseWrapper,
 | 
			
		||||
    MessageInteractionWrapper,
 | 
			
		||||
    add_check_reaction,
 | 
			
		||||
    channel_send,
 | 
			
		||||
    cooldown,
 | 
			
		||||
    invalid_user_handler,
 | 
			
		||||
    load_opus,
 | 
			
		||||
    parse_snowflake,
 | 
			
		||||
    reply,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
__all__ = [
 | 
			
		||||
    "add_check_reaction",
 | 
			
		||||
    "channel_send",
 | 
			
		||||
    "ChannelResponseWrapper",
 | 
			
		||||
    "cooldown",
 | 
			
		||||
    "filter_secrets",
 | 
			
		||||
    "format_duration",
 | 
			
		||||
    "invalid_user_handler",
 | 
			
		||||
    "LimitedSizeDict",
 | 
			
		||||
    "load_opus",
 | 
			
		||||
    "MessageInteractionWrapper",
 | 
			
		||||
    "parse_snowflake",
 | 
			
		||||
    "reply",
 | 
			
		||||
]
 | 
			
		||||
							
								
								
									
										60
									
								
								utils/common.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										60
									
								
								utils/common.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,60 @@
 | 
			
		||||
from collections import OrderedDict
 | 
			
		||||
 | 
			
		||||
from constants import SECRETS
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def format_duration(duration: int, natural: bool = False, short: bool = False):
 | 
			
		||||
    def format_plural(noun, count):
 | 
			
		||||
        if short:
 | 
			
		||||
            return noun[0]
 | 
			
		||||
        return " " + (noun if count == 1 else noun + "s")
 | 
			
		||||
 | 
			
		||||
    segments = []
 | 
			
		||||
 | 
			
		||||
    weeks, duration = divmod(duration, 604800)
 | 
			
		||||
    if weeks > 0:
 | 
			
		||||
        segments.append(f"{weeks}{format_plural('week', weeks)}")
 | 
			
		||||
 | 
			
		||||
    days, duration = divmod(duration, 86400)
 | 
			
		||||
    if days > 0:
 | 
			
		||||
        segments.append(f"{days}{format_plural('day', days)}")
 | 
			
		||||
 | 
			
		||||
    hours, duration = divmod(duration, 3600)
 | 
			
		||||
    if hours > 0:
 | 
			
		||||
        segments.append(f"{hours}{format_plural('hour', hours)}")
 | 
			
		||||
 | 
			
		||||
    minutes, duration = divmod(duration, 60)
 | 
			
		||||
    if minutes > 0:
 | 
			
		||||
        segments.append(f"{minutes}{format_plural('minute', minutes)}")
 | 
			
		||||
 | 
			
		||||
    if duration > 0:
 | 
			
		||||
        segments.append(f"{duration}{format_plural('second', duration)}")
 | 
			
		||||
 | 
			
		||||
    separator = " " if short else ", "
 | 
			
		||||
    if not natural or len(segments) <= 1:
 | 
			
		||||
        return separator.join(segments)
 | 
			
		||||
    return separator.join(segments[:-1]) + f" and {segments[-1]}"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def filter_secrets(text: str, secrets=SECRETS) -> str:
 | 
			
		||||
    for secret_name, secret in secrets.items():
 | 
			
		||||
        if not secret:
 | 
			
		||||
            continue
 | 
			
		||||
        text = text.replace(secret, f"<{secret_name}>")
 | 
			
		||||
    return text
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class LimitedSizeDict(OrderedDict):
 | 
			
		||||
    def __init__(self, *args, **kwargs):
 | 
			
		||||
        self.size_limit = kwargs.pop("size_limit", 1000)
 | 
			
		||||
        super().__init__(*args, **kwargs)
 | 
			
		||||
        self._check_size_limit()
 | 
			
		||||
 | 
			
		||||
    def __setitem__(self, key, value):
 | 
			
		||||
        super().__setitem__(key, value)
 | 
			
		||||
        self._check_size_limit()
 | 
			
		||||
 | 
			
		||||
    def _check_size_limit(self):
 | 
			
		||||
        if self.size_limit is not None:
 | 
			
		||||
            while len(self) > self.size_limit:
 | 
			
		||||
                self.popitem(last=False)
 | 
			
		||||
@@ -5,10 +5,73 @@ from logging import error, info
 | 
			
		||||
import disnake
 | 
			
		||||
 | 
			
		||||
import commands
 | 
			
		||||
import constants
 | 
			
		||||
from state import command_cooldowns, message_responses
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def cooldown(message, cooldown_time: int):
 | 
			
		||||
    possible_commands = commands.match(message.content)
 | 
			
		||||
    if not possible_commands or len(possible_commands) > 1:
 | 
			
		||||
        return
 | 
			
		||||
    command = possible_commands[0]
 | 
			
		||||
 | 
			
		||||
    end_time = time.time() + cooldown_time
 | 
			
		||||
    if message.author.id in command_cooldowns:
 | 
			
		||||
        command_cooldowns[message.author.id][command] = end_time
 | 
			
		||||
    else:
 | 
			
		||||
        command_cooldowns[message.author.id] = {command: end_time}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def reply(message, *args, **kwargs):
 | 
			
		||||
    if message.id in message_responses:
 | 
			
		||||
        if len(args) == 0:
 | 
			
		||||
            kwargs["content"] = None
 | 
			
		||||
        elif len(kwargs) == 0:
 | 
			
		||||
            kwargs["embeds"] = []
 | 
			
		||||
        await message_responses[message.id].edit(
 | 
			
		||||
            *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
 | 
			
		||||
        )
 | 
			
		||||
    else:
 | 
			
		||||
        response = await message.reply(
 | 
			
		||||
            *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
 | 
			
		||||
        )
 | 
			
		||||
        message_responses[message.id] = response
 | 
			
		||||
    return message_responses[message.id]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def channel_send(message, *args, **kwargs):
 | 
			
		||||
    await message.channel.send(
 | 
			
		||||
        *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def load_opus():
 | 
			
		||||
    for path in filter(
 | 
			
		||||
        lambda p: os.path.exists(p),
 | 
			
		||||
        ["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"],
 | 
			
		||||
    ):
 | 
			
		||||
        try:
 | 
			
		||||
            disnake.opus.load_opus(path)
 | 
			
		||||
            info(f"successfully loaded opus from {path}")
 | 
			
		||||
            return
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            error(f"failed to load opus from {path}: {e}")
 | 
			
		||||
    raise Exception("could not locate working opus library")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def parse_snowflake(id):
 | 
			
		||||
    return round(((id >> 22) + 1420070400000) / 1000)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def add_check_reaction(message):
 | 
			
		||||
    await message.add_reaction("✅")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def invalid_user_handler(interaction):
 | 
			
		||||
    await interaction.response.send_message(
 | 
			
		||||
        "you are not the intended receiver of this message!", ephemeral=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ChannelResponseWrapper:
 | 
			
		||||
    def __init__(self, message):
 | 
			
		||||
        self.message = message
 | 
			
		||||
@@ -35,108 +98,3 @@ class MessageInteractionWrapper:
 | 
			
		||||
 | 
			
		||||
    async def edit_original_message(self, content=None, embed=None, view=None):
 | 
			
		||||
        await self.response.edit_message(content=content, embed=embed, view=view)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def cooldown(message, cooldown_time: int):
 | 
			
		||||
    possible_commands = commands.match(message.content)
 | 
			
		||||
    if not possible_commands or len(possible_commands) > 1:
 | 
			
		||||
        return
 | 
			
		||||
    command = possible_commands[0]
 | 
			
		||||
 | 
			
		||||
    end_time = time.time() + cooldown_time
 | 
			
		||||
    if message.author.id in command_cooldowns:
 | 
			
		||||
        command_cooldowns[message.author.id][command] = end_time
 | 
			
		||||
    else:
 | 
			
		||||
        command_cooldowns[message.author.id] = {command: end_time}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def format_duration(duration: int, natural: bool = False, short: bool = False):
 | 
			
		||||
    def format_plural(noun, count):
 | 
			
		||||
        if short:
 | 
			
		||||
            return noun[0]
 | 
			
		||||
        return " " + (noun if count == 1 else noun + "s")
 | 
			
		||||
 | 
			
		||||
    segments = []
 | 
			
		||||
 | 
			
		||||
    weeks, duration = divmod(duration, 604800)
 | 
			
		||||
    if weeks > 0:
 | 
			
		||||
        segments.append(f"{weeks}{format_plural('week', weeks)}")
 | 
			
		||||
 | 
			
		||||
    days, duration = divmod(duration, 86400)
 | 
			
		||||
    if days > 0:
 | 
			
		||||
        segments.append(f"{days}{format_plural('day', days)}")
 | 
			
		||||
 | 
			
		||||
    hours, duration = divmod(duration, 3600)
 | 
			
		||||
    if hours > 0:
 | 
			
		||||
        segments.append(f"{hours}{format_plural('hour', hours)}")
 | 
			
		||||
 | 
			
		||||
    minutes, duration = divmod(duration, 60)
 | 
			
		||||
    if minutes > 0:
 | 
			
		||||
        segments.append(f"{minutes}{format_plural('minute', minutes)}")
 | 
			
		||||
 | 
			
		||||
    if duration > 0:
 | 
			
		||||
        segments.append(f"{duration}{format_plural('second', duration)}")
 | 
			
		||||
 | 
			
		||||
    separator = " " if short else ", "
 | 
			
		||||
    if not natural or len(segments) <= 1:
 | 
			
		||||
        return separator.join(segments)
 | 
			
		||||
    return separator.join(segments[:-1]) + f" and {segments[-1]}"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def parse_snowflake(id):
 | 
			
		||||
    return round(((id >> 22) + 1420070400000) / 1000)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def add_check_reaction(message):
 | 
			
		||||
    await message.add_reaction("✅")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def reply(message, *args, **kwargs):
 | 
			
		||||
    if message.id in message_responses:
 | 
			
		||||
        if len(args) == 0:
 | 
			
		||||
            kwargs["content"] = None
 | 
			
		||||
        elif len(kwargs) == 0:
 | 
			
		||||
            kwargs["embeds"] = []
 | 
			
		||||
        await message_responses[message.id].edit(
 | 
			
		||||
            *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
 | 
			
		||||
        )
 | 
			
		||||
    else:
 | 
			
		||||
        response = await message.reply(
 | 
			
		||||
            *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
 | 
			
		||||
        )
 | 
			
		||||
        message_responses[message.id] = response
 | 
			
		||||
    return message_responses[message.id]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def channel_send(message, *args, **kwargs):
 | 
			
		||||
    await message.channel.send(
 | 
			
		||||
        *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def invalid_user_handler(interaction):
 | 
			
		||||
    await interaction.response.send_message(
 | 
			
		||||
        "you are not the intended receiver of this message!", ephemeral=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def filter_secrets(text: str, secrets=constants.SECRETS) -> str:
 | 
			
		||||
    for secret_name, secret in secrets.items():
 | 
			
		||||
        if not secret:
 | 
			
		||||
            continue
 | 
			
		||||
        text = text.replace(secret, f"<{secret_name}>")
 | 
			
		||||
    return text
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def load_opus():
 | 
			
		||||
    for path in filter(
 | 
			
		||||
        lambda p: os.path.exists(p),
 | 
			
		||||
        ["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"],
 | 
			
		||||
    ):
 | 
			
		||||
        try:
 | 
			
		||||
            disnake.opus.load_opus(path)
 | 
			
		||||
            info(f"successfully loaded opus from {path}")
 | 
			
		||||
            return
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            error(f"failed to load opus from {path}: {e}")
 | 
			
		||||
    raise Exception("could not locate working opus library")
 | 
			
		||||
		Reference in New Issue
	
	Block a user