Compare commits

...

17 Commits

Author SHA1 Message Date
019e60450f refactor: tweak descriptions 2025-06-10 20:59:11 -04:00
7672107c68 chore(requirements): use latest yt-dlp from github 2025-06-10 20:30:41 -04:00
ee6ea4eed4 refactor(audio/queue): save uploader field value 2025-06-08 17:29:36 -04:00
fed280e6c5 fix(audio/queue): check for uploader name 2025-06-08 13:46:57 -04:00
1a8f84b333 feat: reload when SIGUSR1 is received 2025-06-08 13:11:50 -04:00
94bdb91eb0 feat: add trusted user list 2025-06-08 13:11:50 -04:00
5c030a0557 refactor(commands): tweak descriptions 2025-06-08 12:57:34 -04:00
5344e89c26 feat(audio/queue): add timestamps 2025-06-08 12:26:30 -04:00
80e6d422e5 fix: add missing character in format string 2025-05-29 11:45:18 -04:00
71fad98d3d refactor(cleanup): reduce interval 2025-05-02 18:19:16 -04:00
83d784c917 refactor: reduce LimitedSizeDict size 2025-05-02 18:18:34 -04:00
f4b7e0f5ce refactor(commands/voice): clean up some code 2025-05-01 18:29:26 -04:00
1316fb593c refactor(commands/voice/queue): delay player creation 2025-05-01 18:29:26 -04:00
b6d105a519 refactor(sponsorblock): hashPrefix -> hash_prefix 2025-04-25 21:27:04 -04:00
ec31250153 refactor: follow more guidelines 2025-04-03 17:53:46 -04:00
f360566824 refactor: minor changes 2025-03-28 21:22:19 -04:00
b0c96a11cd feat(fun): add more reactions 2025-03-28 21:22:19 -04:00
26 changed files with 200 additions and 139 deletions

View File

@@ -8,7 +8,9 @@ import utils
class ArgumentParser: class ArgumentParser:
def __init__(self, command, description): def __init__(self, command, description):
self.parser = argparse.ArgumentParser( self.parser = argparse.ArgumentParser(
command, description=description, exit_on_error=False command,
description=description,
exit_on_error=False,
) )
def print_help(self): def print_help(self):
@@ -26,21 +28,20 @@ class ArgumentParser:
async def parse_args(self, message, tokens) -> argparse.Namespace | None: async def parse_args(self, message, tokens) -> argparse.Namespace | None:
try: try:
with contextlib.redirect_stdout(io.StringIO()): with contextlib.redirect_stdout(io.StringIO()):
args = self.parser.parse_args(tokens[1:]) return self.parser.parse_args(tokens[1:])
return args
except SystemExit: except SystemExit:
await utils.reply(message, f"```\n{self.print_help()}```") await utils.reply(message, f"```\n{self.print_help()}```")
except Exception as e: except Exception as e:
await utils.reply(message, f"`{e}`") await utils.reply(message, f"`{e}`")
def range_type(string: str, min=0, max=100): def range_type(string: str, lower=0, upper=100) -> int:
try: try:
value = int(string) value = int(string)
except ValueError: except ValueError as e:
raise argparse.ArgumentTypeError("value is not a valid integer") raise argparse.ArgumentTypeError("value is not a valid integer") from e
if min <= value <= max: if lower <= value <= upper:
return value return value
else:
raise argparse.ArgumentTypeError(f"value is not in range {min}-{max}") raise argparse.ArgumentTypeError(f"value is not in range {lower}-{upper}")

View File

@@ -1,3 +1,8 @@
from . import discord, queue, utils, youtubedl from . import discord, queue, utils, youtubedl
__all__ = ["utils", "queue", "youtubedl", "discord"] __all__ = [
"discord",
"queue",
"utils",
"youtubedl",
]

View File

@@ -26,7 +26,7 @@ class TrackedAudioSource(disnake.AudioSource):
class PCMVolumeTransformer(disnake.AudioSource): class PCMVolumeTransformer(disnake.AudioSource):
def __init__(self, original: TrackedAudioSource, volume: float = 1.0) -> None: def __init__(self, original: TrackedAudioSource, volume: float = 1.0) -> None:
if original.is_opus(): if original.is_opus():
raise disnake.ClientException("AudioSource must not be Opus encoded.") raise disnake.ClientException("AudioSource must not be Opus encoded")
self.original = original self.original = original
self.volume = volume self.volume = volume

View File

@@ -1,6 +1,6 @@
import collections import collections
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional from typing import ClassVar, Optional
import disnake import disnake
@@ -16,20 +16,22 @@ class Song:
trigger_message: disnake.Message trigger_message: disnake.Message
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str: def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
title = f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})"
duration = (
format_duration(self.player.duration) if self.player.duration else "stream"
)
if multiline: if multiline:
return ( queue_time = (
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})\n**duration:** {format_duration(self.player.duration) if self.player.duration else '[stream]'}" self.trigger_message.edited_at or self.trigger_message.created_at
+ (
f", **queued by:** <@{self.trigger_message.author.id}>"
if show_queuer
else ""
)
) )
else: return f"{title}\n**duration:** {duration}" + (
return ( f", **queued by:** <@{self.trigger_message.author.id}> <t:{round(queue_time.timestamp())}:R>"
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''}) [**{format_duration(self.player.duration) if self.player.duration else 'stream'}**]" if show_queuer
+ (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "") else ""
) )
return f"{title} [**{duration}**]" + (
f" (<@{self.trigger_message.author.id}>)" if show_queuer else ""
)
def embed(self, is_paused=False): def embed(self, is_paused=False):
progress = 0 progress = 0
@@ -49,18 +51,20 @@ class Song:
else "[**stream**]" else "[**stream**]"
) )
), ),
timestamp=self.trigger_message.edited_at or self.trigger_message.created_at,
) )
uploader_value = None
if self.player.uploader_url: if self.player.uploader_url:
embed.add_field( if self.player.uploader:
name="Uploader", uploader_value = f"[{self.player.uploader}]({self.player.uploader_url})"
value=f"[{self.player.uploader}]({self.player.uploader_url})", else:
) uploader_value = self.player.uploader_url
elif self.player.uploader: elif self.player.uploader:
embed.add_field( uploader_value = self.player.uploader
name="Uploader",
value=self.player.uploader, if uploader_value:
) embed.add_field(name="Uploader", value=uploader_value)
if self.player.like_count: if self.player.like_count:
embed.add_field(name="Likes", value=f"{self.player.like_count:,}") embed.add_field(name="Likes", value=f"{self.player.like_count:,}")
if self.player.view_count: if self.player.view_count:
@@ -74,7 +78,7 @@ class Song:
embed.set_image(self.player.thumbnail_url) embed.set_image(self.player.thumbnail_url)
embed.set_footer( embed.set_footer(
text=f"queued by {self.trigger_message.author.name}", text=f"Queued by {self.trigger_message.author.name}",
icon_url=( icon_url=(
self.trigger_message.author.avatar.url self.trigger_message.author.avatar.url
if self.trigger_message.author.avatar if self.trigger_message.author.avatar
@@ -90,7 +94,7 @@ class Song:
@dataclass @dataclass
class Player: class Player:
queue = collections.deque() queue: ClassVar = collections.deque()
current: Optional[Song] = None current: Optional[Song] = None
def queue_pop(self): def queue_pop(self):

View File

@@ -13,7 +13,11 @@ ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
class YTDLSource(PCMVolumeTransformer): class YTDLSource(PCMVolumeTransformer):
def __init__( def __init__(
self, source: TrackedAudioSource, *, data: dict[str, Any], volume: float = 0.5 self,
source: TrackedAudioSource,
*,
data: dict[str, Any],
volume: float = 0.5,
): ):
super().__init__(source, volume) super().__init__(source, volume)
@@ -39,7 +43,8 @@ class YTDLSource(PCMVolumeTransformer):
): ):
loop = loop or asyncio.get_event_loop() loop = loop or asyncio.get_event_loop()
data: Any = await loop.run_in_executor( data: Any = await loop.run_in_executor(
None, lambda: ytdl.extract_info(url, download=not stream) None,
lambda: ytdl.extract_info(url, download=not stream),
) )
if "entries" in data: if "entries" in data:
@@ -54,7 +59,7 @@ class YTDLSource(PCMVolumeTransformer):
disnake.FFmpegPCMAudio( disnake.FFmpegPCMAudio(
data["url"] if stream else ytdl.prepare_filename(data), data["url"] if stream else ytdl.prepare_filename(data),
before_options="-vn -reconnect 1", before_options="-vn -reconnect 1",
) ),
), ),
data=data, data=data,
) )

View File

@@ -3,11 +3,11 @@ from .utils import Command, match, match_token, tokenize
__all__ = [ __all__ = [
"bot", "bot",
"tools",
"utils",
"voice",
"Command", "Command",
"match", "match",
"match_token", "match_token",
"tokenize", "tokenize",
"tools",
"utils",
"voice",
] ]

View File

@@ -98,6 +98,6 @@ async def help(message):
await reply( await reply(
message, message,
", ".join( ", ".join(
[f"`{command.value}`" for command in commands.Command.__members__.values()] [f"`{command.value}`" for command in commands.Command.__members__.values()],
), ),
) )

View File

@@ -14,13 +14,13 @@ async def lookup(message):
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser( parser = arguments.ArgumentParser(
tokens[0], tokens[0],
"look up a user or application on discord by their ID", "look up a discord user or application by ID",
) )
parser.add_argument( parser.add_argument(
"-a", "-a",
"--application", "--application",
action="store_true", action="store_true",
help="search for applications instead of users", help="look up applications instead of users",
) )
parser.add_argument( parser.add_argument(
"id", "id",
@@ -41,7 +41,7 @@ async def lookup(message):
embed = disnake.Embed(description=response["description"], color=EMBED_COLOR) embed = disnake.Embed(description=response["description"], color=EMBED_COLOR)
embed.set_thumbnail( embed.set_thumbnail(
url=f"https://cdn.discordapp.com/app-icons/{response['id']}/{response['icon']}.webp" url=f"https://cdn.discordapp.com/app-icons/{response['id']}/{response['icon']}.webp",
) )
embed.add_field(name="Application Name", value=response["name"]) embed.add_field(name="Application Name", value=response["name"])
embed.add_field(name="Application ID", value="`" + response["id"] + "`") embed.add_field(name="Application ID", value="`" + response["id"] + "`")
@@ -102,7 +102,9 @@ async def lookup(message):
for tag in response["tags"]: for tag in response["tags"]:
bot_tags += tag + ", " bot_tags += tag + ", "
embed.add_field( embed.add_field(
name="Tags", value="None" if bot_tags == "" else bot_tags[:-2], inline=False name="Tags",
value="None" if bot_tags == "" else bot_tags[:-2],
inline=False,
) )
else: else:
try: try:
@@ -117,8 +119,10 @@ async def lookup(message):
if flag_name != "None": if flag_name != "None":
try: try:
badges += BADGE_EMOJIS[PUBLIC_FLAGS[flag]] badges += BADGE_EMOJIS[PUBLIC_FLAGS[flag]]
except Exception: except Exception as e:
raise Exception(f"unable to find badge: {PUBLIC_FLAGS[flag]}") raise Exception(
f"unable to find badge: {PUBLIC_FLAGS[flag]}"
) from e
user_object = await client.fetch_user(user.id) user_object = await client.fetch_user(user.id)
accent_color = 0x000000 accent_color = 0x000000
@@ -161,11 +165,11 @@ async def clear(message):
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser( parser = arguments.ArgumentParser(
tokens[0], tokens[0],
"bulk delete messages in the current channel matching certain criteria", "bulk delete messages in the current channel matching specified criteria",
) )
parser.add_argument( parser.add_argument(
"count", "count",
type=lambda c: arguments.range_type(c, min=1, max=1000), type=lambda c: arguments.range_type(c, lower=1, upper=1000),
help="amount of messages to delete", help="amount of messages to delete",
) )
group = parser.add_mutually_exclusive_group() group = parser.add_mutually_exclusive_group()
@@ -259,8 +263,10 @@ async def clear(message):
messages = len( messages = len(
await message.channel.purge( await message.channel.purge(
limit=args.count, check=check, oldest_first=args.oldest_first limit=args.count,
) check=check,
oldest_first=args.oldest_first,
),
) )
if not args.delete_command: if not args.delete_command:

View File

@@ -42,7 +42,7 @@ def match_token(token: str) -> list[Command]:
filter( filter(
lambda command: command.value == token.lower(), lambda command: command.value == token.lower(),
Command.__members__.values(), Command.__members__.values(),
) ),
): ):
return exact_match return exact_match
@@ -50,7 +50,7 @@ def match_token(token: str) -> list[Command]:
filter( filter(
lambda command: command.value.startswith(token.lower()), lambda command: command.value.startswith(token.lower()),
Command.__members__.values(), Command.__members__.values(),
) ),
) )

View File

@@ -1,11 +1,11 @@
import arguments
import disnake_paginator import disnake_paginator
from constants import EMBED_COLOR
from state import players
import arguments
import commands import commands
import sponsorblock import sponsorblock
import utils import utils
from constants import EMBED_COLOR
from state import players
from .utils import command_allowed from .utils import command_allowed
@@ -13,7 +13,8 @@ from .utils import command_allowed
async def playing(message): async def playing(message):
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser( parser = arguments.ArgumentParser(
tokens[0], "get information about the currently playing song" tokens[0],
"get information about the currently playing song",
) )
parser.add_argument( parser.add_argument(
"-d", "-d",
@@ -49,7 +50,7 @@ async def playing(message):
await utils.reply( await utils.reply(
message, message,
embed=players[message.guild.id].current.embed( embed=players[message.guild.id].current.embed(
is_paused=message.guild.voice_client.is_paused() is_paused=message.guild.voice_client.is_paused(),
), ),
) )
else: else:
@@ -89,13 +90,13 @@ async def pause(message):
async def fast_forward(message): async def fast_forward(message):
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "skip current sponsorblock segment") parser = arguments.ArgumentParser(tokens[0], "skip the current sponsorblock segment")
parser.add_argument( parser.add_argument(
"-s", "-s",
"--seconds", "--seconds",
nargs="?", nargs="?",
type=lambda v: arguments.range_type(v, min=0, max=300), type=lambda v: arguments.range_type(v, lower=0, upper=300),
help="the amount of seconds to fast forward instead", help="the number of seconds to fast forward instead",
) )
if not (args := await parser.parse_args(message, tokens)): if not (args := await parser.parse_args(message, tokens)):
return return
@@ -110,11 +111,12 @@ async def fast_forward(message):
seconds = args.seconds seconds = args.seconds
if not seconds: if not seconds:
video = await sponsorblock.get_segments( video = await sponsorblock.get_segments(
players[message.guild.id].current.player.id players[message.guild.id].current.player.id,
) )
if not video: if not video:
await utils.reply( await utils.reply(
message, "no sponsorblock segments were found for this video!" message,
"no sponsorblock segments were found for this video!",
) )
return return
@@ -140,7 +142,7 @@ async def volume(message):
parser.add_argument( parser.add_argument(
"volume", "volume",
nargs="?", nargs="?",
type=lambda v: arguments.range_type(v, min=0, max=150), type=lambda v: arguments.range_type(v, lower=0, upper=150),
help="the volume level (0 - 150)", help="the volume level (0 - 150)",
) )
if not (args := await parser.parse_args(message, tokens)): if not (args := await parser.parse_args(message, tokens)):

View File

@@ -8,26 +8,24 @@ import audio
import commands import commands
import utils import utils
from constants import EMBED_COLOR from constants import EMBED_COLOR
from state import client, players from state import client, players, trusted_users
from .playback import resume from .playback import resume
from .utils import command_allowed, ensure_joined, play_next from .utils import command_allowed, ensure_joined, play_next
async def queue_or_play(message, edited=False): async def queue_or_play(message, edited=False):
if message.guild.id not in players:
players[message.guild.id] = audio.queue.Player()
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser( parser = arguments.ArgumentParser(
tokens[0], "queue a song, list the queue, or resume playback" tokens[0],
"queue a song, list the queue, or resume playback",
) )
parser.add_argument("query", nargs="*", help="yt-dlp URL or query to get song") parser.add_argument("query", nargs="*", help="yt-dlp URL or query to get song")
parser.add_argument( parser.add_argument(
"-v", "-v",
"--volume", "--volume",
default=50, default=50,
type=lambda v: arguments.range_type(v, min=0, max=150), type=lambda v: arguments.range_type(v, lower=0, upper=150),
help="the volume level (0 - 150) for the specified song", help="the volume level (0 - 150) for the specified song",
) )
parser.add_argument( parser.add_argument(
@@ -80,19 +78,23 @@ async def queue_or_play(message, edited=False):
elif not command_allowed(message): elif not command_allowed(message):
return return
if message.guild.id not in players:
players[message.guild.id] = audio.queue.Player()
if edited: if edited:
found = None found = next(
for queued in players[message.guild.id].queue: filter(
if queued.trigger_message.id == message.id: lambda queued: queued.trigger_message.id == message.id,
found = queued players[message.guild.id].queue,
break ),
None,
)
if found: if found:
players[message.guild.id].queue.remove(found) players[message.guild.id].queue.remove(found)
if args.clear: if args.clear:
players[message.guild.id].queue.clear() players[message.guild.id].queue.clear()
await utils.add_check_reaction(message) await utils.add_check_reaction(message)
return
elif indices := args.remove_index: elif indices := args.remove_index:
targets = [] targets = []
for i in indices: for i in indices:
@@ -113,15 +115,15 @@ async def queue_or_play(message, edited=False):
f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}", f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}",
) )
elif args.remove_title or args.remove_queuer: elif args.remove_title or args.remove_queuer:
targets = [] targets = set()
for queued in players[message.guild.id].queue: for queued in players[message.guild.id].queue:
if t := args.remove_title: if t := args.remove_title:
if t in queued.player.title: if t in queued.player.title:
targets.append(queued) targets.add(queued)
continue
if q := args.remove_queuer: if q := args.remove_queuer:
if q == queued.trigger_message.author.id: if q == queued.trigger_message.author.id:
targets.append(queued) targets.add(queued)
targets = list(targets)
if not args.match_multiple: if not args.match_multiple:
targets = targets[:1] targets = targets[:1]
@@ -140,11 +142,12 @@ async def queue_or_play(message, edited=False):
lambda queued: queued.trigger_message.author.id lambda queued: queued.trigger_message.author.id
== message.author.id, == message.author.id,
players[message.guild.id].queue, players[message.guild.id].queue,
) ),
) ),
) )
>= 5 >= 5
and not len(message.guild.voice_client.channel.members) == 2 and not len(message.guild.voice_client.channel.members) == 2
and message.author.id not in trusted_users
): ):
await utils.reply( await utils.reply(
message, message,
@@ -155,7 +158,9 @@ async def queue_or_play(message, edited=False):
try: try:
async with message.channel.typing(): async with message.channel.typing():
player = await audio.youtubedl.YTDLSource.from_url( player = await audio.youtubedl.YTDLSource.from_url(
" ".join(query), loop=client.loop, stream=True " ".join(query),
loop=client.loop,
stream=True,
) )
player.volume = float(args.volume) / 100.0 player.volume = float(args.volume) / 100.0
except Exception as e: except Exception as e:
@@ -192,7 +197,7 @@ async def queue_or_play(message, edited=False):
[ [
queued.player.duration if queued.player.duration else 0 queued.player.duration if queued.player.duration else 0
for queued in players[message.guild.id].queue for queued in players[message.guild.id].queue
] ],
), ),
natural=True, natural=True,
) )
@@ -217,13 +222,14 @@ async def queue_or_play(message, edited=False):
[ [
f"**{i + 1}.** {queued.format(show_queuer=True, hide_preview=True, multiline=True)}" f"**{i + 1}.** {queued.format(show_queuer=True, hide_preview=True, multiline=True)}"
for i, queued in batch for i, queued in batch
] ],
) )
for batch in itertools.batched( for batch in itertools.batched(
enumerate(players[message.guild.id].queue), 10 enumerate(players[message.guild.id].queue),
10,
) )
], ],
) ),
), ),
).start(utils.MessageInteractionWrapper(message)) ).start(utils.MessageInteractionWrapper(message))
else: else:
@@ -235,7 +241,7 @@ async def queue_or_play(message, edited=False):
async def skip(message): async def skip(message):
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "skip the currently playing song") parser = arguments.ArgumentParser(tokens[0], "skip the song currently playing")
parser.add_argument( parser.add_argument(
"-n", "-n",
"--next", "--next",

View File

@@ -21,7 +21,8 @@ async def sponsorblock_command(message):
video = await sponsorblock.get_segments(players[message.guild.id].current.player.id) video = await sponsorblock.get_segments(players[message.guild.id].current.player.id)
if not video: if not video:
await utils.reply( await utils.reply(
message, "no sponsorblock segments were found for this video!" message,
"no sponsorblock segments were found for this video!",
) )
return return
@@ -33,7 +34,7 @@ async def sponsorblock_command(message):
current = "**" if progress >= begin and progress < end else "" current = "**" if progress >= begin and progress < end else ""
text.append( text.append(
f"{current}`{audio.utils.format_duration(begin)}` - `{audio.utils.format_duration(end)}`: {category}{current}" f"{current}`{audio.utils.format_duration(begin)}` - `{audio.utils.format_duration(end)}`: {category}{current}",
) )
await utils.reply( await utils.reply(

View File

@@ -24,7 +24,8 @@ def play_next(message, once=False, first=False):
if message.guild.id in players and players[message.guild.id].queue: if message.guild.id in players and players[message.guild.id].queue:
queued = players[message.guild.id].queue_pop() queued = players[message.guild.id].queue_pop()
message.guild.voice_client.play( message.guild.voice_client.play(
queued.player, after=lambda e: play_after_callback(e, message, once) queued.player,
after=lambda e: play_after_callback(e, message, once),
) )
embed = queued.embed() embed = queued.embed()

View File

@@ -24,6 +24,12 @@ SPONSORBLOCK_CATEGORY_NAMES = {
"selfpromo": "self promotion", "selfpromo": "self promotion",
"sponsor": "sponsored", "sponsor": "sponsored",
} }
REACTIONS = {
"cat": ["🐈"],
"dog": ["🐕"],
"gn": ["💤", "😪", "😴", "🛌"],
"pizza": ["🍕"],
}
RELOADABLE_MODULES = [ RELOADABLE_MODULES = [
"arguments", "arguments",
"audio", "audio",

36
core.py
View File

@@ -3,6 +3,7 @@ import contextlib
import importlib import importlib
import inspect import inspect
import io import io
import signal
import textwrap import textwrap
import time import time
import traceback import traceback
@@ -59,20 +60,11 @@ async def on_message(message, edited=False):
match matched: match matched:
case C.RELOAD if message.author.id in OWNERS: case C.RELOAD if message.author.id in OWNERS:
reloaded_modules = set()
start = time.time() start = time.time()
reloaded_modules = reload()
rreload(reloaded_modules, __import__("core"))
rreload(reloaded_modules, __import__("extra"))
for module in filter(
lambda v: inspect.ismodule(v) and v.__name__ in RELOADABLE_MODULES,
globals().values(),
):
rreload(reloaded_modules, module)
end = time.time() end = time.time()
debug( debug(
f"reloaded {len(reloaded_modules)} modules in {round(end - start, 2)}s" f"reloaded {len(reloaded_modules)} modules in {round(end - start, 2)}s",
) )
await utils.add_check_reaction(message) await utils.add_check_reaction(message)
@@ -166,6 +158,7 @@ async def on_voice_state_update(_, before, after):
channel = before.channel channel = before.channel
elif is_empty(after.channel): elif is_empty(after.channel):
channel = after.channel channel = after.channel
if channel: if channel:
await channel.guild.voice_client.disconnect() await channel.guild.voice_client.disconnect()
@@ -174,9 +167,9 @@ def rreload(reloaded_modules, module):
reloaded_modules.add(module.__name__) reloaded_modules.add(module.__name__)
for submodule in filter( for submodule in filter(
lambda v: inspect.ismodule(v) lambda sm: inspect.ismodule(sm)
and v.__name__ in RELOADABLE_MODULES and sm.__name__ in RELOADABLE_MODULES
and v.__name__ not in reloaded_modules, and sm.__name__ not in reloaded_modules,
vars(module).values(), vars(module).values(),
): ):
rreload(reloaded_modules, submodule) rreload(reloaded_modules, submodule)
@@ -185,3 +178,18 @@ def rreload(reloaded_modules, module):
if "__reload_module__" in dir(module): if "__reload_module__" in dir(module):
module.__reload_module__() module.__reload_module__()
def reload(*_):
reloaded_modules = set()
rreload(reloaded_modules, __import__("core"))
rreload(reloaded_modules, __import__("extra"))
for module in filter(
lambda v: inspect.ismodule(v) and v.__name__ in RELOADABLE_MODULES,
globals().values(),
):
rreload(reloaded_modules, module)
return reloaded_modules
signal.signal(signal.SIGUSR1, reload)

View File

@@ -8,11 +8,15 @@ from state import client, kill, players
async def transcript( async def transcript(
message, languages=["en"], max_messages=6, min_messages=3, upper=True message,
languages=["en"],
max_messages=6,
min_messages=3,
upper=True,
): ):
initial_id = message.guild.voice_client.source.id initial_id = message.guild.voice_client.source.id
transcript_list = youtube_transcript_api.YouTubeTranscriptApi.list_transcripts( transcript_list = youtube_transcript_api.YouTubeTranscriptApi.list_transcripts(
initial_id initial_id,
) )
try: try:
transcript = transcript_list.find_manually_created_transcript(languages).fetch() transcript = transcript_list.find_manually_created_transcript(languages).fetch()
@@ -44,7 +48,7 @@ async def transcript(
await messages.pop().delete() await messages.pop().delete()
else: else:
await message.channel.delete_messages( await message.channel.delete_messages(
[messages.pop() for _ in range(count)] [messages.pop() for _ in range(count)],
) )
except Exception: except Exception:
pass pass
@@ -77,19 +81,20 @@ def messages_per_second(limit=500):
average = 1 average = 1
print( print(
f"I am receiving **{average} {'message' if average == 1 else 'messages'} per second** " f"I am receiving **{average} {'message' if average == 1 else 'messages'} per second** "
f"from **{len(members)} {'member' if len(members) == 1 else 'members'}** across **{len(guilds)} {'guild' if len(guilds) == 1 else 'guilds'}**" f"from **{len(members)} {'member' if len(members) == 1 else 'members'}** across **{len(guilds)} {'guild' if len(guilds) == 1 else 'guilds'}**",
) )
async def auto_count(channel_id: int): async def auto_count(channel_id: int):
if (channel := await client.fetch_channel(channel_id)) and isinstance( if (channel := await client.fetch_channel(channel_id)) and isinstance(
channel, disnake.TextChannel channel,
disnake.TextChannel,
): ):
last_message = (await channel.history(limit=1).flatten())[0] last_message = (await channel.history(limit=1).flatten())[0]
try: try:
result = str( result = str(
int("".join(filter(lambda d: d in string.digits, last_message.content))) int("".join(filter(lambda d: d in string.digits, last_message.content)))
+ 1 + 1,
) )
except Exception: except Exception:
result = "where number" result = "where number"

11
fun.py
View File

@@ -1,10 +1,13 @@
import random import random
import commands import commands
from constants import REACTIONS
async def on_message(message): async def on_message(message):
if random.random() < 0.01 and "gn" in commands.tokenize( if random.random() < 0.01:
message.content, remove_prefix=False tokens = commands.tokenize(message.content, remove_prefix=False)
): for keyword, options in REACTIONS.items():
await message.add_reaction(random.choice(["💤", "😪", "😴", "🛌"])) if keyword in tokens:
await message.add_reaction(random.choice(options))
break

View File

@@ -7,7 +7,7 @@ from state import client
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig( logging.basicConfig(
format=( format=(
"%(asctime)s %(levelname)s %(name):%(module)s %(message)s" "%(asctime)s %(levelname)s %(name)s:%(module)s %(message)s"
if __debug__ if __debug__
else "%(asctime)s %(levelname)s %(message)s" else "%(asctime)s %(levelname)s %(message)s"
), ),

View File

@@ -5,4 +5,4 @@ disnake_paginator
psutil psutil
PyNaCl PyNaCl
youtube_transcript_api youtube_transcript_api
yt-dlp yt-dlp[default] @ https://github.com/yt-dlp/yt-dlp/archive/master.tar.gz

View File

@@ -14,7 +14,7 @@ categories = json.dumps(
"preview", "preview",
"selfpromo", "selfpromo",
"sponsor", "sponsor",
] ],
) )
@@ -22,15 +22,15 @@ async def get_segments(video_id: str):
if video_id in sponsorblock_cache: if video_id in sponsorblock_cache:
return sponsorblock_cache[video_id] return sponsorblock_cache[video_id]
hashPrefix = hashlib.sha256(video_id.encode()).hexdigest()[:4] hash_prefix = hashlib.sha256(video_id.encode()).hexdigest()[:4]
session = aiohttp.ClientSession() session = aiohttp.ClientSession()
response = await session.get( response = await session.get(
f"https://sponsor.ajay.app/api/skipSegments/{hashPrefix}", f"https://sponsor.ajay.app/api/skipSegments/{hash_prefix}",
params={"categories": categories}, params={"categories": categories},
) )
if response.status == 200 and ( if response.status == 200 and (
results := list( results := list(
filter(lambda v: video_id == v["videoID"], await response.json()) filter(lambda v: video_id == v["videoID"], await response.json()),
) )
): ):
sponsorblock_cache[video_id] = results[0] sponsorblock_cache[video_id] = results[0]

View File

@@ -15,5 +15,6 @@ idle_tracker = {"is_idle": False, "last_used": time.time()}
kill = {"transcript": False} kill = {"transcript": False}
message_responses = LimitedSizeDict() message_responses = LimitedSizeDict()
players = {} players = {}
sponsorblock_cache = LimitedSizeDict(size_limit=100) sponsorblock_cache = LimitedSizeDict()
start_time = time.time() start_time = time.time()
trusted_users = []

View File

@@ -11,7 +11,7 @@ async def cleanup():
debug("spawned cleanup thread") debug("spawned cleanup thread")
while True: while True:
await asyncio.sleep(3600 * 12) await asyncio.sleep(3600)
targets = [] targets = []
for guild_id, player in players.items(): for guild_id, player in players.items():
@@ -19,7 +19,7 @@ async def cleanup():
targets.append(guild_id) targets.append(guild_id)
for target in targets: for target in targets:
del players[target] del players[target]
debug(f"cleanup removed {len(targets)} empty players") debug(f"cleanup thread removed {len(targets)} empty players")
if ( if (
not idle_tracker["is_idle"] not idle_tracker["is_idle"]

View File

@@ -1,3 +1,3 @@
from . import test_filter_secrets, test_format_duration from . import test_filter_secrets, test_format_duration
__all__ = ["test_format_duration", "test_filter_secrets"] __all__ = ["test_filter_secrets", "test_format_duration"]

View File

@@ -7,15 +7,15 @@ class TestFilterSecrets(unittest.TestCase):
def test_filter_secrets(self): def test_filter_secrets(self):
secret = "PLACEHOLDER_TOKEN" secret = "PLACEHOLDER_TOKEN"
self.assertFalse( self.assertFalse(
secret in utils.filter_secrets(f"HELLO{secret}WORLD", {"TOKEN": secret}) secret in utils.filter_secrets(f"HELLO{secret}WORLD", {"TOKEN": secret}),
) )
self.assertFalse(secret in utils.filter_secrets(secret, {"TOKEN": secret})) self.assertFalse(secret in utils.filter_secrets(secret, {"TOKEN": secret}))
self.assertFalse( self.assertFalse(
secret in utils.filter_secrets(f"123{secret}", {"TOKEN": secret}) secret in utils.filter_secrets(f"123{secret}", {"TOKEN": secret}),
) )
self.assertFalse( self.assertFalse(
secret in utils.filter_secrets(f"{secret}{secret}", {"TOKEN": secret}) secret in utils.filter_secrets(f"{secret}{secret}", {"TOKEN": secret}),
) )
self.assertFalse( self.assertFalse(
secret in utils.filter_secrets(f"{secret}@#(*&*$)", {"TOKEN": secret}) secret in utils.filter_secrets(f"{secret}@#(*&*$)", {"TOKEN": secret}),
) )

View File

@@ -3,11 +3,11 @@ from collections import OrderedDict
from constants import SECRETS from constants import SECRETS
def surround(inner, outer="```"): def surround(inner: str, outer="```") -> str:
return outer + str(inner) + outer return outer + str(inner) + outer
def format_duration(duration: int, natural: bool = False, short: bool = False): def format_duration(duration: int, natural: bool = False, short: bool = False) -> str:
def format_plural(noun, count): def format_plural(noun, count):
if short: if short:
return noun[0] return noun[0]
@@ -50,7 +50,7 @@ def filter_secrets(text: str, secrets=SECRETS) -> str:
class LimitedSizeDict(OrderedDict): class LimitedSizeDict(OrderedDict):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.size_limit = kwargs.pop("size_limit", 1000) self.size_limit = kwargs.pop("size_limit", 100)
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._check_size_limit() self._check_size_limit()

View File

@@ -1,6 +1,6 @@
import os
import time import time
from logging import error, info from logging import error, info
from pathlib import Path
import disnake import disnake
@@ -34,7 +34,9 @@ async def reply(message, *args, **kwargs):
try: try:
await message_responses[message.id].edit( await message_responses[message.id].edit(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none() *args,
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
) )
return return
except Exception: except Exception:
@@ -42,7 +44,9 @@ async def reply(message, *args, **kwargs):
try: try:
response = await message.reply( response = await message.reply(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none() *args,
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
) )
except Exception: except Exception:
response = await channel_send(message, *args, **kwargs) response = await channel_send(message, *args, **kwargs)
@@ -52,13 +56,15 @@ async def reply(message, *args, **kwargs):
async def channel_send(message, *args, **kwargs): async def channel_send(message, *args, **kwargs):
await message.channel.send( await message.channel.send(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none() *args,
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
) )
def load_opus(): def load_opus():
for path in filter( for path in filter(
lambda p: os.path.exists(p), lambda p: Path(p).exists(),
["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"], ["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"],
): ):
try: try:
@@ -70,8 +76,8 @@ def load_opus():
raise Exception("could not locate working opus library") raise Exception("could not locate working opus library")
def snowflake_timestamp(id): def snowflake_timestamp(snowflake) -> int:
return round(((id >> 22) + 1420070400000) / 1000) return round(((snowflake >> 22) + 1420070400000) / 1000)
async def add_check_reaction(message): async def add_check_reaction(message):
@@ -80,7 +86,8 @@ async def add_check_reaction(message):
async def invalid_user_handler(interaction): async def invalid_user_handler(interaction):
await interaction.response.send_message( await interaction.response.send_message(
"you are not the intended receiver of this message!", ephemeral=True "you are not the intended receiver of this message!",
ephemeral=True,
) )