Compare commits
No commits in common. "7f07e1bd71b6cdbee7f937c7495b3c3248d8dfa3" and "c420f3de6bd85efc8a8a76f335e2e97e542ca501" have entirely different histories.
7f07e1bd71
...
c420f3de6b
@ -1,6 +1,6 @@
|
||||
FROM python:3.13-alpine
|
||||
|
||||
RUN apk --no-cache add ffmpeg gcc linux-headers musl-dev opus python3-dev
|
||||
RUN apk --no-cache add ffmpeg opus
|
||||
|
||||
WORKDIR /bot
|
||||
COPY . .
|
||||
|
@ -4,7 +4,6 @@ import time
|
||||
|
||||
import disnake
|
||||
import psutil
|
||||
from yt_dlp import version
|
||||
|
||||
import arguments
|
||||
import commands
|
||||
@ -28,7 +27,7 @@ async def status(message):
|
||||
value=f"```{round(client.latency * 1000, 1)} ms```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Memory",
|
||||
name="RSS",
|
||||
value=f"```{round(memory_usage, 1)} MiB```",
|
||||
)
|
||||
embed.add_field(
|
||||
@ -48,12 +47,12 @@ async def status(message):
|
||||
value=f"```{channel_count}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Disnake",
|
||||
value=f"```{disnake.__version__}```",
|
||||
name="Commands",
|
||||
value=f"```{len(commands.Command.__members__)}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="yt-dlp",
|
||||
value=f"```{version.__version__}```",
|
||||
name="Disnake",
|
||||
value=f"```{disnake.__version__}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Uptime",
|
||||
|
@ -67,7 +67,7 @@ async def lookup(message):
|
||||
)
|
||||
embed.add_field(
|
||||
name="Creation Time",
|
||||
value=f"<t:{utils.snowflake_timestamp(int(response['id']))}:R>",
|
||||
value=f"<t:{utils.parse_snowflake(int(response['id']))}:R>",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Default Invite URL",
|
||||
@ -135,7 +135,7 @@ async def lookup(message):
|
||||
)
|
||||
embed.add_field(
|
||||
name="Creation Time",
|
||||
value=f"<t:{utils.snowflake_timestamp(int(user.id))}:R>",
|
||||
value=f"<t:{utils.parse_snowflake(int(user.id))}:R>",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Public Flags",
|
||||
|
@ -1,5 +1,3 @@
|
||||
import disnake
|
||||
|
||||
import utils
|
||||
|
||||
from .utils import command_allowed
|
||||
@ -8,14 +6,8 @@ from .utils import command_allowed
|
||||
async def join(message):
|
||||
if message.guild.voice_client:
|
||||
return await message.guild.voice_client.move_to(message.channel)
|
||||
elif message.author.voice:
|
||||
await message.author.voice.channel.connect()
|
||||
elif isinstance(message.channel, disnake.VoiceChannel):
|
||||
await message.channel.connect()
|
||||
else:
|
||||
await utils.reply(message, "you are not connected to a voice channel!")
|
||||
return
|
||||
|
||||
await message.channel.connect()
|
||||
await utils.add_check_reaction(message)
|
||||
|
||||
|
||||
|
@ -60,12 +60,10 @@ async def ensure_joined(message):
|
||||
|
||||
def command_allowed(message, immutable=False):
|
||||
if not message.guild.voice_client:
|
||||
return False
|
||||
|
||||
return
|
||||
if immutable:
|
||||
return True
|
||||
|
||||
if not message.author.voice:
|
||||
return False
|
||||
|
||||
return message.author.voice.channel.id == message.guild.voice_client.channel.id
|
||||
return message.channel.id == message.guild.voice_client.channel.id
|
||||
else:
|
||||
if not message.author.voice:
|
||||
return False
|
||||
return message.author.voice.channel.id == message.guild.voice_client.channel.id
|
||||
|
@ -1,7 +1,6 @@
|
||||
audioop-lts
|
||||
disnake
|
||||
disnake_paginator
|
||||
psutil
|
||||
PyNaCl
|
||||
youtube_transcript_api
|
||||
yt-dlp
|
||||
|
18
state.py
18
state.py
@ -1,8 +1,24 @@
|
||||
import time
|
||||
from collections import OrderedDict
|
||||
|
||||
import disnake
|
||||
|
||||
from utils import LimitedSizeDict
|
||||
|
||||
class LimitedSizeDict(OrderedDict):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.size_limit = kwargs.pop("size_limit", 1000)
|
||||
super().__init__(*args, **kwargs)
|
||||
self._check_size_limit()
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
super().__setitem__(key, value)
|
||||
self._check_size_limit()
|
||||
|
||||
def _check_size_limit(self):
|
||||
if self.size_limit is not None:
|
||||
while len(self) > self.size_limit:
|
||||
self.popitem(last=False)
|
||||
|
||||
|
||||
intents = disnake.Intents.default()
|
||||
intents.message_content = True
|
||||
|
@ -5,73 +5,10 @@ from logging import error, info
|
||||
import disnake
|
||||
|
||||
import commands
|
||||
import constants
|
||||
from state import command_cooldowns, message_responses
|
||||
|
||||
|
||||
def cooldown(message, cooldown_time: int):
|
||||
possible_commands = commands.match(message.content)
|
||||
if not possible_commands or len(possible_commands) > 1:
|
||||
return
|
||||
command = possible_commands[0]
|
||||
|
||||
end_time = time.time() + cooldown_time
|
||||
if message.author.id in command_cooldowns:
|
||||
command_cooldowns[message.author.id][command] = end_time
|
||||
else:
|
||||
command_cooldowns[message.author.id] = {command: end_time}
|
||||
|
||||
|
||||
async def reply(message, *args, **kwargs):
|
||||
if message.id in message_responses:
|
||||
if len(args) == 0:
|
||||
kwargs["content"] = None
|
||||
elif len(kwargs) == 0:
|
||||
kwargs["embeds"] = []
|
||||
await message_responses[message.id].edit(
|
||||
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
|
||||
)
|
||||
else:
|
||||
response = await message.reply(
|
||||
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
|
||||
)
|
||||
message_responses[message.id] = response
|
||||
return message_responses[message.id]
|
||||
|
||||
|
||||
async def channel_send(message, *args, **kwargs):
|
||||
await message.channel.send(
|
||||
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
|
||||
)
|
||||
|
||||
|
||||
def load_opus():
|
||||
for path in filter(
|
||||
lambda p: os.path.exists(p),
|
||||
["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"],
|
||||
):
|
||||
try:
|
||||
disnake.opus.load_opus(path)
|
||||
info(f"successfully loaded opus from {path}")
|
||||
return
|
||||
except Exception as e:
|
||||
error(f"failed to load opus from {path}: {e}")
|
||||
raise Exception("could not locate working opus library")
|
||||
|
||||
|
||||
def snowflake_timestamp(id):
|
||||
return round(((id >> 22) + 1420070400000) / 1000)
|
||||
|
||||
|
||||
async def add_check_reaction(message):
|
||||
await message.add_reaction("✅")
|
||||
|
||||
|
||||
async def invalid_user_handler(interaction):
|
||||
await interaction.response.send_message(
|
||||
"you are not the intended receiver of this message!", ephemeral=True
|
||||
)
|
||||
|
||||
|
||||
class ChannelResponseWrapper:
|
||||
def __init__(self, message):
|
||||
self.message = message
|
||||
@ -98,3 +35,108 @@ class MessageInteractionWrapper:
|
||||
|
||||
async def edit_original_message(self, content=None, embed=None, view=None):
|
||||
await self.response.edit_message(content=content, embed=embed, view=view)
|
||||
|
||||
|
||||
def cooldown(message, cooldown_time: int):
|
||||
possible_commands = commands.match(message.content)
|
||||
if not possible_commands or len(possible_commands) > 1:
|
||||
return
|
||||
command = possible_commands[0]
|
||||
|
||||
end_time = time.time() + cooldown_time
|
||||
if message.author.id in command_cooldowns:
|
||||
command_cooldowns[message.author.id][command] = end_time
|
||||
else:
|
||||
command_cooldowns[message.author.id] = {command: end_time}
|
||||
|
||||
|
||||
def format_duration(duration: int, natural: bool = False, short: bool = False):
|
||||
def format_plural(noun, count):
|
||||
if short:
|
||||
return noun[0]
|
||||
return " " + (noun if count == 1 else noun + "s")
|
||||
|
||||
segments = []
|
||||
|
||||
weeks, duration = divmod(duration, 604800)
|
||||
if weeks > 0:
|
||||
segments.append(f"{weeks}{format_plural('week', weeks)}")
|
||||
|
||||
days, duration = divmod(duration, 86400)
|
||||
if days > 0:
|
||||
segments.append(f"{days}{format_plural('day', days)}")
|
||||
|
||||
hours, duration = divmod(duration, 3600)
|
||||
if hours > 0:
|
||||
segments.append(f"{hours}{format_plural('hour', hours)}")
|
||||
|
||||
minutes, duration = divmod(duration, 60)
|
||||
if minutes > 0:
|
||||
segments.append(f"{minutes}{format_plural('minute', minutes)}")
|
||||
|
||||
if duration > 0:
|
||||
segments.append(f"{duration}{format_plural('second', duration)}")
|
||||
|
||||
separator = " " if short else ", "
|
||||
if not natural or len(segments) <= 1:
|
||||
return separator.join(segments)
|
||||
return separator.join(segments[:-1]) + f" and {segments[-1]}"
|
||||
|
||||
|
||||
def parse_snowflake(id):
|
||||
return round(((id >> 22) + 1420070400000) / 1000)
|
||||
|
||||
|
||||
async def add_check_reaction(message):
|
||||
await message.add_reaction("✅")
|
||||
|
||||
|
||||
async def reply(message, *args, **kwargs):
|
||||
if message.id in message_responses:
|
||||
if len(args) == 0:
|
||||
kwargs["content"] = None
|
||||
elif len(kwargs) == 0:
|
||||
kwargs["embeds"] = []
|
||||
await message_responses[message.id].edit(
|
||||
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
|
||||
)
|
||||
else:
|
||||
response = await message.reply(
|
||||
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
|
||||
)
|
||||
message_responses[message.id] = response
|
||||
return message_responses[message.id]
|
||||
|
||||
|
||||
async def channel_send(message, *args, **kwargs):
|
||||
await message.channel.send(
|
||||
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
|
||||
)
|
||||
|
||||
|
||||
async def invalid_user_handler(interaction):
|
||||
await interaction.response.send_message(
|
||||
"you are not the intended receiver of this message!", ephemeral=True
|
||||
)
|
||||
|
||||
|
||||
def filter_secrets(text: str, secrets=constants.SECRETS) -> str:
|
||||
for secret_name, secret in secrets.items():
|
||||
if not secret:
|
||||
continue
|
||||
text = text.replace(secret, f"<{secret_name}>")
|
||||
return text
|
||||
|
||||
|
||||
def load_opus():
|
||||
for path in filter(
|
||||
lambda p: os.path.exists(p),
|
||||
["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"],
|
||||
):
|
||||
try:
|
||||
disnake.opus.load_opus(path)
|
||||
info(f"successfully loaded opus from {path}")
|
||||
return
|
||||
except Exception as e:
|
||||
error(f"failed to load opus from {path}: {e}")
|
||||
raise Exception("could not locate working opus library")
|
@ -1,27 +0,0 @@
|
||||
from .common import LimitedSizeDict, filter_secrets, format_duration
|
||||
from .discord import (
|
||||
ChannelResponseWrapper,
|
||||
MessageInteractionWrapper,
|
||||
add_check_reaction,
|
||||
channel_send,
|
||||
cooldown,
|
||||
invalid_user_handler,
|
||||
load_opus,
|
||||
reply,
|
||||
snowflake_timestamp,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"add_check_reaction",
|
||||
"channel_send",
|
||||
"ChannelResponseWrapper",
|
||||
"cooldown",
|
||||
"filter_secrets",
|
||||
"format_duration",
|
||||
"invalid_user_handler",
|
||||
"LimitedSizeDict",
|
||||
"load_opus",
|
||||
"MessageInteractionWrapper",
|
||||
"reply",
|
||||
"snowflake_timestamp",
|
||||
]
|
@ -1,60 +0,0 @@
|
||||
from collections import OrderedDict
|
||||
|
||||
from constants import SECRETS
|
||||
|
||||
|
||||
def format_duration(duration: int, natural: bool = False, short: bool = False):
|
||||
def format_plural(noun, count):
|
||||
if short:
|
||||
return noun[0]
|
||||
return " " + (noun if count == 1 else noun + "s")
|
||||
|
||||
segments = []
|
||||
|
||||
weeks, duration = divmod(duration, 604800)
|
||||
if weeks > 0:
|
||||
segments.append(f"{weeks}{format_plural('week', weeks)}")
|
||||
|
||||
days, duration = divmod(duration, 86400)
|
||||
if days > 0:
|
||||
segments.append(f"{days}{format_plural('day', days)}")
|
||||
|
||||
hours, duration = divmod(duration, 3600)
|
||||
if hours > 0:
|
||||
segments.append(f"{hours}{format_plural('hour', hours)}")
|
||||
|
||||
minutes, duration = divmod(duration, 60)
|
||||
if minutes > 0:
|
||||
segments.append(f"{minutes}{format_plural('minute', minutes)}")
|
||||
|
||||
if duration > 0:
|
||||
segments.append(f"{duration}{format_plural('second', duration)}")
|
||||
|
||||
separator = " " if short else ", "
|
||||
if not natural or len(segments) <= 1:
|
||||
return separator.join(segments)
|
||||
return separator.join(segments[:-1]) + f" and {segments[-1]}"
|
||||
|
||||
|
||||
def filter_secrets(text: str, secrets=SECRETS) -> str:
|
||||
for secret_name, secret in secrets.items():
|
||||
if not secret:
|
||||
continue
|
||||
text = text.replace(secret, f"<{secret_name}>")
|
||||
return text
|
||||
|
||||
|
||||
class LimitedSizeDict(OrderedDict):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.size_limit = kwargs.pop("size_limit", 1000)
|
||||
super().__init__(*args, **kwargs)
|
||||
self._check_size_limit()
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
super().__setitem__(key, value)
|
||||
self._check_size_limit()
|
||||
|
||||
def _check_size_limit(self):
|
||||
if self.size_limit is not None:
|
||||
while len(self) > self.size_limit:
|
||||
self.popitem(last=False)
|
Loading…
x
Reference in New Issue
Block a user