Compare commits

..

7 Commits

10 changed files with 179 additions and 138 deletions

View File

@ -1,6 +1,6 @@
FROM python:3.13-alpine FROM python:3.13-alpine
RUN apk --no-cache add ffmpeg opus RUN apk --no-cache add ffmpeg gcc linux-headers musl-dev opus python3-dev
WORKDIR /bot WORKDIR /bot
COPY . . COPY . .

View File

@ -4,6 +4,7 @@ import time
import disnake import disnake
import psutil import psutil
from yt_dlp import version
import arguments import arguments
import commands import commands
@ -27,7 +28,7 @@ async def status(message):
value=f"```{round(client.latency * 1000, 1)} ms```", value=f"```{round(client.latency * 1000, 1)} ms```",
) )
embed.add_field( embed.add_field(
name="RSS", name="Memory",
value=f"```{round(memory_usage, 1)} MiB```", value=f"```{round(memory_usage, 1)} MiB```",
) )
embed.add_field( embed.add_field(
@ -46,14 +47,14 @@ async def status(message):
name="Channels", name="Channels",
value=f"```{channel_count}```", value=f"```{channel_count}```",
) )
embed.add_field(
name="Commands",
value=f"```{len(commands.Command.__members__)}```",
)
embed.add_field( embed.add_field(
name="Disnake", name="Disnake",
value=f"```{disnake.__version__}```", value=f"```{disnake.__version__}```",
) )
embed.add_field(
name="yt-dlp",
value=f"```{version.__version__}```",
)
embed.add_field( embed.add_field(
name="Uptime", name="Uptime",
value=f"```{utils.format_duration(int(time.time() - start_time), short=True)}```", value=f"```{utils.format_duration(int(time.time() - start_time), short=True)}```",

View File

@ -67,7 +67,7 @@ async def lookup(message):
) )
embed.add_field( embed.add_field(
name="Creation Time", name="Creation Time",
value=f"<t:{utils.parse_snowflake(int(response['id']))}:R>", value=f"<t:{utils.snowflake_timestamp(int(response['id']))}:R>",
) )
embed.add_field( embed.add_field(
name="Default Invite URL", name="Default Invite URL",
@ -135,7 +135,7 @@ async def lookup(message):
) )
embed.add_field( embed.add_field(
name="Creation Time", name="Creation Time",
value=f"<t:{utils.parse_snowflake(int(user.id))}:R>", value=f"<t:{utils.snowflake_timestamp(int(user.id))}:R>",
) )
embed.add_field( embed.add_field(
name="Public Flags", name="Public Flags",

View File

@ -1,3 +1,5 @@
import disnake
import utils import utils
from .utils import command_allowed from .utils import command_allowed
@ -6,8 +8,14 @@ from .utils import command_allowed
async def join(message): async def join(message):
if message.guild.voice_client: if message.guild.voice_client:
return await message.guild.voice_client.move_to(message.channel) return await message.guild.voice_client.move_to(message.channel)
elif message.author.voice:
await message.author.voice.channel.connect()
elif isinstance(message.channel, disnake.VoiceChannel):
await message.channel.connect()
else:
await utils.reply(message, "you are not connected to a voice channel!")
return
await message.channel.connect()
await utils.add_check_reaction(message) await utils.add_check_reaction(message)

View File

@ -60,10 +60,12 @@ async def ensure_joined(message):
def command_allowed(message, immutable=False): def command_allowed(message, immutable=False):
if not message.guild.voice_client: if not message.guild.voice_client:
return return False
if immutable: if immutable:
return message.channel.id == message.guild.voice_client.channel.id return True
else:
if not message.author.voice: if not message.author.voice:
return False return False
return message.author.voice.channel.id == message.guild.voice_client.channel.id
return message.author.voice.channel.id == message.guild.voice_client.channel.id

View File

@ -1,6 +1,7 @@
audioop-lts audioop-lts
disnake disnake
disnake_paginator disnake_paginator
psutil
PyNaCl PyNaCl
youtube_transcript_api youtube_transcript_api
yt-dlp yt-dlp

View File

@ -1,24 +1,8 @@
import time import time
from collections import OrderedDict
import disnake import disnake
from utils import LimitedSizeDict
class LimitedSizeDict(OrderedDict):
def __init__(self, *args, **kwargs):
self.size_limit = kwargs.pop("size_limit", 1000)
super().__init__(*args, **kwargs)
self._check_size_limit()
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is not None:
while len(self) > self.size_limit:
self.popitem(last=False)
intents = disnake.Intents.default() intents = disnake.Intents.default()
intents.message_content = True intents.message_content = True

27
utils/__init__.py Normal file
View File

@ -0,0 +1,27 @@
from .common import LimitedSizeDict, filter_secrets, format_duration
from .discord import (
ChannelResponseWrapper,
MessageInteractionWrapper,
add_check_reaction,
channel_send,
cooldown,
invalid_user_handler,
load_opus,
reply,
snowflake_timestamp,
)
__all__ = [
"add_check_reaction",
"channel_send",
"ChannelResponseWrapper",
"cooldown",
"filter_secrets",
"format_duration",
"invalid_user_handler",
"LimitedSizeDict",
"load_opus",
"MessageInteractionWrapper",
"reply",
"snowflake_timestamp",
]

60
utils/common.py Normal file
View File

@ -0,0 +1,60 @@
from collections import OrderedDict
from constants import SECRETS
def format_duration(duration: int, natural: bool = False, short: bool = False):
def format_plural(noun, count):
if short:
return noun[0]
return " " + (noun if count == 1 else noun + "s")
segments = []
weeks, duration = divmod(duration, 604800)
if weeks > 0:
segments.append(f"{weeks}{format_plural('week', weeks)}")
days, duration = divmod(duration, 86400)
if days > 0:
segments.append(f"{days}{format_plural('day', days)}")
hours, duration = divmod(duration, 3600)
if hours > 0:
segments.append(f"{hours}{format_plural('hour', hours)}")
minutes, duration = divmod(duration, 60)
if minutes > 0:
segments.append(f"{minutes}{format_plural('minute', minutes)}")
if duration > 0:
segments.append(f"{duration}{format_plural('second', duration)}")
separator = " " if short else ", "
if not natural or len(segments) <= 1:
return separator.join(segments)
return separator.join(segments[:-1]) + f" and {segments[-1]}"
def filter_secrets(text: str, secrets=SECRETS) -> str:
for secret_name, secret in secrets.items():
if not secret:
continue
text = text.replace(secret, f"<{secret_name}>")
return text
class LimitedSizeDict(OrderedDict):
def __init__(self, *args, **kwargs):
self.size_limit = kwargs.pop("size_limit", 1000)
super().__init__(*args, **kwargs)
self._check_size_limit()
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is not None:
while len(self) > self.size_limit:
self.popitem(last=False)

View File

@ -5,10 +5,73 @@ from logging import error, info
import disnake import disnake
import commands import commands
import constants
from state import command_cooldowns, message_responses from state import command_cooldowns, message_responses
def cooldown(message, cooldown_time: int):
possible_commands = commands.match(message.content)
if not possible_commands or len(possible_commands) > 1:
return
command = possible_commands[0]
end_time = time.time() + cooldown_time
if message.author.id in command_cooldowns:
command_cooldowns[message.author.id][command] = end_time
else:
command_cooldowns[message.author.id] = {command: end_time}
async def reply(message, *args, **kwargs):
if message.id in message_responses:
if len(args) == 0:
kwargs["content"] = None
elif len(kwargs) == 0:
kwargs["embeds"] = []
await message_responses[message.id].edit(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
)
else:
response = await message.reply(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
)
message_responses[message.id] = response
return message_responses[message.id]
async def channel_send(message, *args, **kwargs):
await message.channel.send(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
)
def load_opus():
for path in filter(
lambda p: os.path.exists(p),
["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"],
):
try:
disnake.opus.load_opus(path)
info(f"successfully loaded opus from {path}")
return
except Exception as e:
error(f"failed to load opus from {path}: {e}")
raise Exception("could not locate working opus library")
def snowflake_timestamp(id):
return round(((id >> 22) + 1420070400000) / 1000)
async def add_check_reaction(message):
await message.add_reaction("")
async def invalid_user_handler(interaction):
await interaction.response.send_message(
"you are not the intended receiver of this message!", ephemeral=True
)
class ChannelResponseWrapper: class ChannelResponseWrapper:
def __init__(self, message): def __init__(self, message):
self.message = message self.message = message
@ -35,108 +98,3 @@ class MessageInteractionWrapper:
async def edit_original_message(self, content=None, embed=None, view=None): async def edit_original_message(self, content=None, embed=None, view=None):
await self.response.edit_message(content=content, embed=embed, view=view) await self.response.edit_message(content=content, embed=embed, view=view)
def cooldown(message, cooldown_time: int):
possible_commands = commands.match(message.content)
if not possible_commands or len(possible_commands) > 1:
return
command = possible_commands[0]
end_time = time.time() + cooldown_time
if message.author.id in command_cooldowns:
command_cooldowns[message.author.id][command] = end_time
else:
command_cooldowns[message.author.id] = {command: end_time}
def format_duration(duration: int, natural: bool = False, short: bool = False):
def format_plural(noun, count):
if short:
return noun[0]
return " " + (noun if count == 1 else noun + "s")
segments = []
weeks, duration = divmod(duration, 604800)
if weeks > 0:
segments.append(f"{weeks}{format_plural('week', weeks)}")
days, duration = divmod(duration, 86400)
if days > 0:
segments.append(f"{days}{format_plural('day', days)}")
hours, duration = divmod(duration, 3600)
if hours > 0:
segments.append(f"{hours}{format_plural('hour', hours)}")
minutes, duration = divmod(duration, 60)
if minutes > 0:
segments.append(f"{minutes}{format_plural('minute', minutes)}")
if duration > 0:
segments.append(f"{duration}{format_plural('second', duration)}")
separator = " " if short else ", "
if not natural or len(segments) <= 1:
return separator.join(segments)
return separator.join(segments[:-1]) + f" and {segments[-1]}"
def parse_snowflake(id):
return round(((id >> 22) + 1420070400000) / 1000)
async def add_check_reaction(message):
await message.add_reaction("")
async def reply(message, *args, **kwargs):
if message.id in message_responses:
if len(args) == 0:
kwargs["content"] = None
elif len(kwargs) == 0:
kwargs["embeds"] = []
await message_responses[message.id].edit(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
)
else:
response = await message.reply(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
)
message_responses[message.id] = response
return message_responses[message.id]
async def channel_send(message, *args, **kwargs):
await message.channel.send(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
)
async def invalid_user_handler(interaction):
await interaction.response.send_message(
"you are not the intended receiver of this message!", ephemeral=True
)
def filter_secrets(text: str, secrets=constants.SECRETS) -> str:
for secret_name, secret in secrets.items():
if not secret:
continue
text = text.replace(secret, f"<{secret_name}>")
return text
def load_opus():
for path in filter(
lambda p: os.path.exists(p),
["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"],
):
try:
disnake.opus.load_opus(path)
info(f"successfully loaded opus from {path}")
return
except Exception as e:
error(f"failed to load opus from {path}: {e}")
raise Exception("could not locate working opus library")