Compare commits

..

1 Commits

Author SHA1 Message Date
b71331a102 refactor: fix casing andn add type hints 2025-02-05 17:24:15 -05:00
46 changed files with 519 additions and 1056 deletions

6
.envrc
View File

@@ -1,6 +0,0 @@
dotenv
export VIRTUAL_ENV=."venv"
layout python
use flake

2
.gitignore vendored
View File

@@ -1,4 +1,2 @@
.direnv
.env .env
.venv
__pycache__ __pycache__

View File

@@ -7,4 +7,4 @@ COPY . .
RUN pip install -r requirements.txt RUN pip install -r requirements.txt
CMD ["python", "-m", "errornocord.main"] CMD ["python", "-OO", "main.py"]

View File

@@ -1,3 +1,3 @@
# ErrorNoCord # ErrorNoCord
Hot-reloadable Discord music bot Discord music bot for testing purposes, with live reloading support

View File

@@ -2,15 +2,13 @@ import argparse
import contextlib import contextlib
import io import io
from . import utils import utils
class ArgumentParser: class ArgumentParser:
def __init__(self, command, description): def __init__(self, command, description):
self.parser = argparse.ArgumentParser( self.parser = argparse.ArgumentParser(
command, command, description=description, exit_on_error=False
description=description,
exit_on_error=False,
) )
def print_help(self): def print_help(self):
@@ -28,20 +26,21 @@ class ArgumentParser:
async def parse_args(self, message, tokens) -> argparse.Namespace | None: async def parse_args(self, message, tokens) -> argparse.Namespace | None:
try: try:
with contextlib.redirect_stdout(io.StringIO()): with contextlib.redirect_stdout(io.StringIO()):
return self.parser.parse_args(tokens[1:]) args = self.parser.parse_args(tokens[1:])
return args
except SystemExit: except SystemExit:
await utils.reply(message, f"```\n{self.print_help()}```") await utils.reply(message, f"```\n{self.print_help()}```")
except Exception as e: except Exception as e:
await utils.reply(message, f"`{e}`") await utils.reply(message, f"`{e}`")
def range_type(string: str, lower=0, upper=100) -> int: def range_type(string: str, min=0, max=100):
try: try:
value = int(string) value = int(string)
except ValueError as e: except ValueError:
raise argparse.ArgumentTypeError("value is not a valid integer") from e raise argparse.ArgumentTypeError("value is not a valid integer")
if lower <= value <= upper: if min <= value <= max:
return value return value
else:
raise argparse.ArgumentTypeError(f"value is not in range {lower}-{upper}") raise argparse.ArgumentTypeError(f"value is not in range {min}-{max}")

View File

@@ -3,11 +3,11 @@ from .utils import Command, match, match_token, tokenize
__all__ = [ __all__ = [
"bot", "bot",
"tools",
"utils",
"voice",
"Command", "Command",
"match", "match",
"match_token", "match_token",
"tokenize", "tokenize",
"tools",
"utils",
"voice",
] ]

View File

@@ -6,10 +6,11 @@ import disnake
import psutil import psutil
from yt_dlp import version from yt_dlp import version
from .. import arguments, commands import arguments
from ..constants import EMBED_COLOR import commands
from ..state import client, start_time import utils
from ..utils import format_duration, reply, surround from constants import EMBED_COLOR
from state import client, start_time
async def status(message): async def status(message):
@@ -24,41 +25,41 @@ async def status(message):
embed = disnake.Embed(color=EMBED_COLOR) embed = disnake.Embed(color=EMBED_COLOR)
embed.add_field( embed.add_field(
name="Latency", name="Latency",
value=surround(f"{round(client.latency * 1000, 1)} ms"), value=f"```{round(client.latency * 1000, 1)} ms```",
) )
embed.add_field( embed.add_field(
name="Memory", name="Memory",
value=surround(f"{round(memory_usage, 1)} MiB"), value=f"```{round(memory_usage, 1)} MiB```",
) )
embed.add_field( embed.add_field(
name="Threads", name="Threads",
value=surround(threading.active_count()), value=f"```{threading.active_count()}```",
) )
embed.add_field( embed.add_field(
name="Guilds", name="Guilds",
value=surround(len(client.guilds)), value=f"```{len(client.guilds)}```",
) )
embed.add_field( embed.add_field(
name="Members", name="Members",
value=surround(member_count), value=f"```{member_count}```",
) )
embed.add_field( embed.add_field(
name="Channels", name="Channels",
value=surround(channel_count), value=f"```{channel_count}```",
) )
embed.add_field( embed.add_field(
name="Disnake", name="Disnake",
value=surround(disnake.__version__), value=f"```{disnake.__version__}```",
) )
embed.add_field( embed.add_field(
name="yt-dlp", name="yt-dlp",
value=surround(version.__version__), value=f"```{version.__version__}```",
) )
embed.add_field( embed.add_field(
name="Uptime", name="Uptime",
value=surround(format_duration(int(time.time() - start_time), short=True)), value=f"```{utils.format_duration(int(time.time() - start_time), short=True)}```",
) )
await reply(message, embed=embed) await utils.reply(message, embed=embed)
async def uptime(message): async def uptime(message):
@@ -77,13 +78,15 @@ async def uptime(message):
return return
if args.since: if args.since:
await reply(message, f"{round(start_time)}") await utils.reply(message, f"{round(start_time)}")
else: else:
await reply(message, f"up {format_duration(int(time.time() - start_time))}") await utils.reply(
message, f"up {utils.format_duration(int(time.time() - start_time))}"
)
async def ping(message): async def ping(message):
await reply( await utils.reply(
message, message,
embed=disnake.Embed( embed=disnake.Embed(
title="Pong :ping_pong:", title="Pong :ping_pong:",
@@ -94,9 +97,9 @@ async def ping(message):
async def help(message): async def help(message):
await reply( await utils.reply(
message, message,
", ".join( ", ".join(
[f"`{command.value}`" for command in commands.Command.__members__.values()], [f"`{command.value}`" for command in commands.Command.__members__.values()]
), ),
) )

View File

@@ -3,22 +3,24 @@ import re
import aiohttp import aiohttp
import disnake import disnake
from .. import arguments, commands, utils import arguments
from ..constants import APPLICATION_FLAGS, BADGE_EMOJIS, EMBED_COLOR, PUBLIC_FLAGS import commands
from ..state import client import utils
from constants import APPLICATION_FLAGS, BADGE_EMOJIS, EMBED_COLOR, PUBLIC_FLAGS
from state import client
async def lookup(message): async def lookup(message):
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser( parser = arguments.ArgumentParser(
tokens[0], tokens[0],
"look up a discord user or application by ID", "look up a user or application on discord by their ID",
) )
parser.add_argument( parser.add_argument(
"-a", "-a",
"--application", "--application",
action="store_true", action="store_true",
help="look up applications instead of users", help="search for applications instead of users",
) )
parser.add_argument( parser.add_argument(
"id", "id",
@@ -39,7 +41,7 @@ async def lookup(message):
embed = disnake.Embed(description=response["description"], color=EMBED_COLOR) embed = disnake.Embed(description=response["description"], color=EMBED_COLOR)
embed.set_thumbnail( embed.set_thumbnail(
url=f"https://cdn.discordapp.com/app-icons/{response['id']}/{response['icon']}.webp", url=f"https://cdn.discordapp.com/app-icons/{response['id']}/{response['icon']}.webp"
) )
embed.add_field(name="Application Name", value=response["name"]) embed.add_field(name="Application Name", value=response["name"])
embed.add_field(name="Application ID", value="`" + response["id"] + "`") embed.add_field(name="Application ID", value="`" + response["id"] + "`")
@@ -100,9 +102,7 @@ async def lookup(message):
for tag in response["tags"]: for tag in response["tags"]:
bot_tags += tag + ", " bot_tags += tag + ", "
embed.add_field( embed.add_field(
name="Tags", name="Tags", value="None" if bot_tags == "" else bot_tags[:-2], inline=False
value="None" if bot_tags == "" else bot_tags[:-2],
inline=False,
) )
else: else:
try: try:
@@ -117,10 +117,8 @@ async def lookup(message):
if flag_name != "None": if flag_name != "None":
try: try:
badges += BADGE_EMOJIS[PUBLIC_FLAGS[flag]] badges += BADGE_EMOJIS[PUBLIC_FLAGS[flag]]
except Exception as e: except Exception:
raise Exception( raise Exception(f"unable to find badge: {PUBLIC_FLAGS[flag]}")
f"unable to find badge: {PUBLIC_FLAGS[flag]}"
) from e
user_object = await client.fetch_user(user.id) user_object = await client.fetch_user(user.id)
accent_color = 0x000000 accent_color = 0x000000
@@ -163,11 +161,11 @@ async def clear(message):
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser( parser = arguments.ArgumentParser(
tokens[0], tokens[0],
"bulk delete messages in the current channel matching specified criteria", "bulk delete messages in the current channel matching certain criteria",
) )
parser.add_argument( parser.add_argument(
"count", "count",
type=lambda c: arguments.range_type(c, lower=1, upper=1000), type=lambda c: arguments.range_type(c, min=1, max=1000),
help="amount of messages to delete", help="amount of messages to delete",
) )
group = parser.add_mutually_exclusive_group() group = parser.add_mutually_exclusive_group()
@@ -261,10 +259,8 @@ async def clear(message):
messages = len( messages = len(
await message.channel.purge( await message.channel.purge(
limit=args.count, limit=args.count, check=check, oldest_first=args.oldest_first
check=check, )
oldest_first=args.oldest_first,
),
) )
if not args.delete_command: if not args.delete_command:

View File

@@ -1,7 +1,7 @@
from enum import Enum from enum import Enum
from functools import lru_cache from functools import lru_cache
from .. import constants import constants
class Command(Enum): class Command(Enum):
@@ -30,19 +30,16 @@ class Command(Enum):
@lru_cache @lru_cache
def match_token(token: str) -> list[Command]: def match_token(token: str) -> list[Command]:
match token.lower(): if token.lower() == "r":
case "r":
return [Command.RELOAD] return [Command.RELOAD]
case "s": elif token.lower() == "s":
return [Command.SKIP] return [Command.SKIP]
case "c":
return [Command.CURRENT]
if exact_match := list( if exact_match := list(
filter( filter(
lambda command: command.value == token.lower(), lambda command: command.value == token.lower(),
Command.__members__.values(), Command.__members__.values(),
), )
): ):
return exact_match return exact_match
@@ -50,7 +47,7 @@ def match_token(token: str) -> list[Command]:
filter( filter(
lambda command: command.value.startswith(token.lower()), lambda command: command.value.startswith(token.lower()),
Command.__members__.values(), Command.__members__.values(),
), )
) )

View File

@@ -1,14 +1,17 @@
from ... import utils import disnake
from ...state import players
import utils
from .utils import command_allowed from .utils import command_allowed
async def join(message): async def join(message):
if message.author.voice:
if message.guild.voice_client: if message.guild.voice_client:
await message.guild.voice_client.move_to(message.channel) return await message.guild.voice_client.move_to(message.channel)
else: elif message.author.voice:
await message.author.voice.channel.connect() await message.author.voice.channel.connect()
elif isinstance(message.channel, disnake.VoiceChannel):
await message.channel.connect()
else: else:
await utils.reply(message, "you are not connected to a voice channel!") await utils.reply(message, "you are not connected to a voice channel!")
return return
@@ -20,8 +23,5 @@ async def leave(message):
if not command_allowed(message): if not command_allowed(message):
return return
if message.guild.id in players:
del players[message.guild.id]
await message.guild.voice_client.disconnect() await message.guild.voice_client.disconnect()
await utils.add_check_reaction(message) await utils.add_check_reaction(message)

View File

@@ -1,16 +1,19 @@
import arguments
import disnake_paginator import disnake_paginator
from constants import EMBED_COLOR
from state import players
import commands
import sponsorblock
import utils
from ... import arguments, commands, sponsorblock, utils
from ...constants import EMBED_COLOR
from ...state import players
from .utils import command_allowed from .utils import command_allowed
async def playing(message): async def playing(message):
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser( parser = arguments.ArgumentParser(
tokens[0], tokens[0], "get information about the currently playing song"
"get information about the currently playing song",
) )
parser.add_argument( parser.add_argument(
"-d", "-d",
@@ -46,7 +49,7 @@ async def playing(message):
await utils.reply( await utils.reply(
message, message,
embed=players[message.guild.id].current.embed( embed=players[message.guild.id].current.embed(
is_paused=message.guild.voice_client.is_paused(), is_paused=message.guild.voice_client.is_paused()
), ),
) )
else: else:
@@ -86,15 +89,13 @@ async def pause(message):
async def fast_forward(message): async def fast_forward(message):
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser( parser = arguments.ArgumentParser(tokens[0], "skip current sponsorblock segment")
tokens[0], "skip the current sponsorblock segment"
)
parser.add_argument( parser.add_argument(
"-s", "-s",
"--seconds", "--seconds",
nargs="?", nargs="?",
type=lambda v: arguments.range_type(v, lower=0, upper=300), type=lambda v: arguments.range_type(v, min=0, max=300),
help="the number of seconds to fast forward instead", help="the amount of seconds to fast forward instead",
) )
if not (args := await parser.parse_args(message, tokens)): if not (args := await parser.parse_args(message, tokens)):
return return
@@ -109,12 +110,11 @@ async def fast_forward(message):
seconds = args.seconds seconds = args.seconds
if not seconds: if not seconds:
video = await sponsorblock.get_segments( video = await sponsorblock.get_segments(
players[message.guild.id].current.player.id, players[message.guild.id].current.player.id
) )
if not video: if not video:
await utils.reply( await utils.reply(
message, message, "no sponsorblock segments were found for this video!"
"no sponsorblock segments were found for this video!",
) )
return return
@@ -140,7 +140,7 @@ async def volume(message):
parser.add_argument( parser.add_argument(
"volume", "volume",
nargs="?", nargs="?",
type=lambda v: arguments.range_type(v, lower=0, upper=150), type=lambda v: arguments.range_type(v, min=0, max=150),
help="the volume level (0 - 150)", help="the volume level (0 - 150)",
) )
if not (args := await parser.parse_args(message, tokens)): if not (args := await parser.parse_args(message, tokens)):

View File

@@ -3,25 +3,31 @@ import itertools
import disnake import disnake
import disnake_paginator import disnake_paginator
from ... import arguments, audio, commands, utils import arguments
from ...constants import EMBED_COLOR import commands
from ...state import client, players, trusted_users import utils
import youtubedl
from constants import EMBED_COLOR
from state import client, players
from .playback import resume from .playback import resume
from .utils import command_allowed, ensure_joined, play_next from .utils import command_allowed, ensure_joined, play_next
async def queue_or_play(message, edited=False): async def queue_or_play(message, edited=False):
if message.guild.id not in players:
players[message.guild.id] = youtubedl.QueuedPlayer()
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser( parser = arguments.ArgumentParser(
tokens[0], tokens[0], "queue a song, list the queue, or resume playback"
"queue a song, list the queue, or resume playback",
) )
parser.add_argument("query", nargs="*", help="yt-dlp URL or query to get song") parser.add_argument("query", nargs="*", help="yt-dlp URL or query to get song")
parser.add_argument( parser.add_argument(
"-v", "-v",
"--volume", "--volume",
default=50, default=50,
type=lambda v: arguments.range_type(v, lower=0, upper=150), type=lambda v: arguments.range_type(v, min=0, max=150),
help="the volume level (0 - 150) for the specified song", help="the volume level (0 - 150) for the specified song",
) )
parser.add_argument( parser.add_argument(
@@ -74,23 +80,19 @@ async def queue_or_play(message, edited=False):
elif not command_allowed(message): elif not command_allowed(message):
return return
if message.guild.id not in players:
players[message.guild.id] = audio.queue.Player()
if edited: if edited:
found = next( found = None
filter( for queued in players[message.guild.id].queue:
lambda queued: queued.trigger_message.id == message.id, if queued.trigger_message.id == message.id:
players[message.guild.id].queue, found = queued
), break
None,
)
if found: if found:
players[message.guild.id].queue.remove(found) players[message.guild.id].queue.remove(found)
if args.clear: if args.clear:
players[message.guild.id].queue.clear() players[message.guild.id].queue.clear()
await utils.add_check_reaction(message) await utils.add_check_reaction(message)
return
elif indices := args.remove_index: elif indices := args.remove_index:
targets = [] targets = []
for i in indices: for i in indices:
@@ -111,15 +113,15 @@ async def queue_or_play(message, edited=False):
f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}", f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}",
) )
elif args.remove_title or args.remove_queuer: elif args.remove_title or args.remove_queuer:
targets = set() targets = []
for queued in players[message.guild.id].queue: for queued in players[message.guild.id].queue:
if t := args.remove_title: if t := args.remove_title:
if t in queued.player.title: if t in queued.player.title:
targets.add(queued) targets.append(queued)
continue
if q := args.remove_queuer: if q := args.remove_queuer:
if q == queued.trigger_message.author.id: if q == queued.trigger_message.author.id:
targets.add(queued) targets.append(queued)
targets = list(targets)
if not args.match_multiple: if not args.match_multiple:
targets = targets[:1] targets = targets[:1]
@@ -135,16 +137,14 @@ async def queue_or_play(message, edited=False):
and len( and len(
list( list(
filter( filter(
lambda queued: ( lambda queued: queued.trigger_message.author.id
queued.trigger_message.author.id == message.author.id == message.author.id,
),
players[message.guild.id].queue, players[message.guild.id].queue,
), )
), )
) )
>= 5 >= 5
and not len(message.guild.voice_client.channel.members) == 2 and not len(message.guild.voice_client.channel.members) == 2
and message.author.id not in trusted_users
): ):
await utils.reply( await utils.reply(
message, message,
@@ -154,22 +154,22 @@ async def queue_or_play(message, edited=False):
try: try:
async with message.channel.typing(): async with message.channel.typing():
player = await audio.youtubedl.YTDLSource.from_url( player = await youtubedl.YTDLSource.from_url(
" ".join(query), " ".join(query), loop=client.loop, stream=True
loop=client.loop,
stream=True,
) )
player.volume = float(args.volume) / 100.0 player.volume = float(args.volume) / 100.0
except Exception as e: except Exception as e:
await utils.reply(message, f"failed to queue: `{e}`") await utils.reply(
message, f"**failed to queue:** `{e}`", suppress_embeds=True
)
return return
queued = audio.queue.Song(player, message) queued = youtubedl.QueuedSong(player, message)
if args.now or args.next: if args.now or args.next:
players[message.guild.id].queue_push_front(queued) players[message.guild.id].queue_add_front(queued)
else: else:
players[message.guild.id].queue_push(queued) players[message.guild.id].queue_add(queued)
if not message.guild.voice_client: if not message.guild.voice_client:
await utils.reply(message, "unexpected disconnect from voice channel!") await utils.reply(message, "unexpected disconnect from voice channel!")
@@ -194,7 +194,7 @@ async def queue_or_play(message, edited=False):
[ [
queued.player.duration if queued.player.duration else 0 queued.player.duration if queued.player.duration else 0
for queued in players[message.guild.id].queue for queued in players[message.guild.id].queue
], ]
), ),
natural=True, natural=True,
) )
@@ -219,14 +219,13 @@ async def queue_or_play(message, edited=False):
[ [
f"**{i + 1}.** {queued.format(show_queuer=True, hide_preview=True, multiline=True)}" f"**{i + 1}.** {queued.format(show_queuer=True, hide_preview=True, multiline=True)}"
for i, queued in batch for i, queued in batch
], ]
) )
for batch in itertools.batched( for batch in itertools.batched(
enumerate(players[message.guild.id].queue), enumerate(players[message.guild.id].queue), 10
10,
) )
], ],
), )
), ),
).start(utils.MessageInteractionWrapper(message)) ).start(utils.MessageInteractionWrapper(message))
else: else:
@@ -238,7 +237,7 @@ async def queue_or_play(message, edited=False):
async def skip(message): async def skip(message):
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "skip the song currently playing") parser = arguments.ArgumentParser(tokens[0], "skip the currently playing song")
parser.add_argument( parser.add_argument(
"-n", "-n",
"--next", "--next",
@@ -251,8 +250,7 @@ async def skip(message):
if not command_allowed(message): if not command_allowed(message):
return return
if players[message.guild.id] and not players[message.guild.id].queue: if not players[message.guild.id].queue:
del players[message.guild.id]
message.guild.voice_client.stop() message.guild.voice_client.stop()
await utils.reply( await utils.reply(
message, message,

View File

@@ -1,8 +1,11 @@
import disnake import disnake
from ... import audio, sponsorblock, utils import sponsorblock
from ...constants import EMBED_COLOR, SPONSORBLOCK_CATEGORY_NAMES import utils
from ...state import players import youtubedl
from constants import EMBED_COLOR
from state import players
from .utils import command_allowed from .utils import command_allowed
@@ -18,20 +21,18 @@ async def sponsorblock_command(message):
video = await sponsorblock.get_segments(players[message.guild.id].current.player.id) video = await sponsorblock.get_segments(players[message.guild.id].current.player.id)
if not video: if not video:
await utils.reply( await utils.reply(
message, message, "no sponsorblock segments were found for this video!"
"no sponsorblock segments were found for this video!",
) )
return return
text = [] text = []
for segment in video["segments"]: for segment in video["segments"]:
begin, end = map(int, segment["segment"]) begin, end = map(int, segment["segment"])
if (category := segment["category"]) in SPONSORBLOCK_CATEGORY_NAMES: category_name = sponsorblock.CATEGORY_NAMES.get(segment["category"])
category = SPONSORBLOCK_CATEGORY_NAMES[category]
current = "**" if progress >= begin and progress < end else "" current = "**" if progress >= begin and progress < end else ""
text.append( text.append(
f"{current}`{audio.utils.format_duration(begin)}` - `{audio.utils.format_duration(end)}`: {category}{current}", f"{current}`{youtubedl.format_duration(begin)}` - `{youtubedl.format_duration(end)}`: {category_name if category_name else 'Unknown'}{current}"
) )
await utils.reply( await utils.reply(

View File

@@ -2,8 +2,8 @@ from logging import error
import disnake import disnake
from ... import utils import utils
from ...state import client, players from state import client, players
def play_after_callback(e, message, once): def play_after_callback(e, message, once):
@@ -24,8 +24,7 @@ def play_next(message, once=False, first=False):
if message.guild.id in players and players[message.guild.id].queue: if message.guild.id in players and players[message.guild.id].queue:
queued = players[message.guild.id].queue_pop() queued = players[message.guild.id].queue_pop()
message.guild.voice_client.play( message.guild.voice_client.play(
queued.player, queued.player, after=lambda e: play_after_callback(e, message, once)
after=lambda e: play_after_callback(e, message, once),
) )
embed = queued.embed() embed = queued.embed()

View File

@@ -19,65 +19,48 @@ BAR_LENGTH = 35
EMBED_COLOR = 0xFF6600 EMBED_COLOR = 0xFF6600
OWNERS = [531392146767347712] OWNERS = [531392146767347712]
PREFIX = "%" PREFIX = "%"
SPONSORBLOCK_CATEGORY_NAMES = {
"music_offtopic": "non-music",
"selfpromo": "self promotion",
"sponsor": "sponsored",
}
REACTIONS = {
"cat": ["🐈"],
"dog": ["🐕"],
"gn": ["💤", "😪", "😴", "🛌"],
"pizza": ["🍕"],
}
RELOADABLE_MODULES = [ RELOADABLE_MODULES = [
"errornocord.arguments", "arguments",
"errornocord.audio", "commands",
"errornocord.audio.discord", "commands.bot",
"errornocord.audio.queue", "commands.tools",
"errornocord.audio.utils", "commands.utils",
"errornocord.audio.youtubedl", "commands.voice",
"errornocord.commands", "commands.voice.channel",
"errornocord.commands.bot", "commands.voice.playback",
"errornocord.commands.tools", "commands.voice.playing",
"errornocord.commands.utils", "commands.voice.queue",
"errornocord.commands.voice", "commands.voice.sponsorblock",
"errornocord.commands.voice.channel", "commands.voice.utils",
"errornocord.commands.voice.playback", "constants",
"errornocord.commands.voice.playing", "core",
"errornocord.commands.voice.queue", "events",
"errornocord.commands.voice.sponsorblock", "extra",
"errornocord.commands.voice.utils", "fun",
"errornocord.constants", "sponsorblock",
"errornocord.core", "tasks",
"errornocord.events", "utils",
"errornocord.extra", "voice",
"errornocord.fun", "youtubedl",
"errornocord.sponsorblock",
"errornocord.tasks",
"errornocord.utils",
"errornocord.utils.common",
"errornocord.utils.discord",
"errornocord.voice",
"yt_dlp", "yt_dlp",
"yt_dlp.version", "yt_dlp.version",
] ]
PUBLIC_FLAGS = { PUBLIC_FLAGS = {
1 << 0: "Discord Employee", 1: "Discord Employee",
1 << 1: "Discord Partner", 2: "Discord Partner",
1 << 2: "HypeSquad Events", 4: "HypeSquad Events",
1 << 3: "Bug Hunter Level 1", 8: "Bug Hunter Level 1",
1 << 6: "HypeSquad Bravery", 64: "HypeSquad Bravery",
1 << 7: "HypeSquad Brilliance", 128: "HypeSquad Brilliance",
1 << 8: "HypeSquad Balance", 256: "HypeSquad Balance",
1 << 9: "Early Supporter", 512: "Early Supporter",
1 << 10: "Team User", 1024: "Team User",
1 << 14: "Bug Hunter Level 2", 16384: "Bug Hunter Level 2",
1 << 16: "Verified Bot", 65536: "Verified Bot",
1 << 17: "Verified Bot Developer", 131072: "Verified Bot Developer",
1 << 18: "Discord Certified Moderator", 262144: "Discord Certified Moderator",
1 << 19: "HTTP Interactions Only", 524288: "HTTP Interactions Only",
1 << 22: "Active Developer", 4194304: "Active Developer",
} }
BADGE_EMOJIS = { BADGE_EMOJIS = {
"Discord Employee": "<:DiscordStaff:879666899980546068>", "Discord Employee": "<:DiscordStaff:879666899980546068>",

View File

@@ -3,7 +3,6 @@ import contextlib
import importlib import importlib
import inspect import inspect
import io import io
import signal
import textwrap import textwrap
import time import time
import traceback import traceback
@@ -12,10 +11,11 @@ from logging import debug
import disnake import disnake
import disnake_paginator import disnake_paginator
from . import commands, utils import commands
from .commands import Command as C import utils
from .constants import EMBED_COLOR, OWNERS, PREFIX, RELOADABLE_MODULES from commands import Command as C
from .state import client, command_cooldowns, command_locks, idle_tracker, players from constants import EMBED_COLOR, OWNERS, PREFIX, RELOADABLE_MODULES
from state import client, command_cooldowns, command_locks, idle_tracker
async def on_message(message, edited=False): async def on_message(message, edited=False):
@@ -48,22 +48,32 @@ async def on_message(message, edited=False):
try: try:
if (cooldowns := command_cooldowns.get(message.author.id)) and not edited: if (cooldowns := command_cooldowns.get(message.author.id)) and not edited:
if (end_time := cooldowns.get(matched)) and ( if (end_time := cooldowns.get(matched)) and int(time.time()) < int(
remaining_time := round(end_time - time.time()) > 0 end_time
): ):
await utils.reply( await utils.reply(
message, message,
f"please wait **{utils.format_duration(remaining_time, natural=True)}** before using this command again!", f"please wait **{utils.format_duration(int(end_time - time.time()), natural=True)}** before using this command again!",
) )
return return
match matched: match matched:
case C.RELOAD if message.author.id in OWNERS: case C.RELOAD if message.author.id in OWNERS:
reloaded_modules = set()
start = time.time() start = time.time()
reloaded_modules = reload()
rreload(reloaded_modules, __import__("core"))
rreload(reloaded_modules, __import__("extra"))
for module in filter(
lambda v: inspect.ismodule(v) and v.__name__ in RELOADABLE_MODULES,
globals().values(),
):
rreload(reloaded_modules, module)
end = time.time() end = time.time()
if __debug__:
debug( debug(
f"reloaded {len(reloaded_modules)} modules in {round(end - start, 2)}s", f"reloaded {len(reloaded_modules)} modules in {round(end - start, 2)}s"
) )
await utils.add_check_reaction(message) await utils.add_check_reaction(message)
@@ -148,31 +158,26 @@ async def on_message(message, edited=False):
command_locks[(message.guild.id, message.author.id)].release() command_locks[(message.guild.id, message.author.id)].release()
async def on_voice_state_update(member, before, after): async def on_voice_state_update(_, before, after):
def is_alone(channel): def is_empty(channel):
return [m.id for m in (channel.members if channel else [])] == [client.user.id] return [m.id for m in (channel.members if channel else [])] == [client.user.id]
if member.id == client.user.id and is_alone(after.channel): channel = None
if before.channel.guild.id in players: if is_empty(before.channel):
del players[before.channel.guild.id] channel = before.channel
await after.channel.guild.voice_client.disconnect() elif is_empty(after.channel):
return channel = after.channel
if channel:
if is_alone(before.channel): await channel.guild.voice_client.disconnect()
if before.channel.guild.id in players:
del players[before.channel.guild.id]
await before.channel.guild.voice_client.disconnect()
def rreload(reloaded_modules, module): def rreload(reloaded_modules, module):
reloaded_modules.add(module.__name__) reloaded_modules.add(module.__name__)
for submodule in filter( for submodule in filter(
lambda sm: ( lambda v: inspect.ismodule(v)
inspect.ismodule(sm) and v.__name__ in RELOADABLE_MODULES
and sm.__name__ in RELOADABLE_MODULES and v.__name__ not in reloaded_modules,
and sm.__name__ not in reloaded_modules
),
vars(module).values(), vars(module).values(),
): ):
rreload(reloaded_modules, submodule) rreload(reloaded_modules, submodule)
@@ -181,18 +186,3 @@ def rreload(reloaded_modules, module):
if "__reload_module__" in dir(module): if "__reload_module__" in dir(module):
module.__reload_module__() module.__reload_module__()
def reload(*_):
reloaded_modules = set()
rreload(reloaded_modules, __import__("errornocord.core"))
rreload(reloaded_modules, __import__("errornocord.extra"))
for module in filter(
lambda v: inspect.ismodule(v) and v.__name__ in RELOADABLE_MODULES,
globals().values(),
):
rreload(reloaded_modules, module)
return reloaded_modules
signal.signal(signal.SIGUSR1, reload)

View File

@@ -1,59 +0,0 @@
{
lib,
python3Packages,
self,
...
}:
let
disnake = python3Packages.disnake.overrideAttrs (old: {
src = self.pins.disnake;
propagatedBuildInputs =
with python3Packages;
old.propagatedBuildInputs
++ [
typing-extensions
versioningit
];
nativeBuildInputs = old.nativeBuildInputs ++ [ python3Packages.hatchling ];
});
disnake_paginator = python3Packages.buildPythonPackage {
pname = "disnake-paginator";
version = "1.0.8";
src = self.pins.disnake-paginator;
pyproject = true;
build-system = [ python3Packages.setuptools ];
propagatedBuildInputs = [
disnake
];
doCheck = false;
};
in
python3Packages.buildPythonApplication {
pname = "errornocord";
version = "0.1.0";
src = lib.cleanSource ./.;
pyproject = true;
build-system = [ python3Packages.setuptools ];
propagatedBuildInputs = with python3Packages; [
aiohttp
audioop-lts
disnake
disnake_paginator
psutil
typing-extensions
youtube-transcript-api
yt-dlp
];
doCheck = false;
}

View File

@@ -1,8 +0,0 @@
from . import discord, queue, utils, youtubedl
__all__ = [
"discord",
"queue",
"utils",
"youtubedl",
]

View File

@@ -1,39 +0,0 @@
import audioop
import disnake
class TrackedAudioSource(disnake.AudioSource):
def __init__(self, source):
self._source = source
self.read_count = 0
def read(self) -> bytes:
data = self._source.read()
if data:
self.read_count += 1
return data
def fast_forward(self, seconds: int):
for _ in range(int(seconds / 0.02)):
self.read()
@property
def progress(self) -> float:
return self.read_count * 0.02
class PCMVolumeTransformer(disnake.AudioSource):
def __init__(self, original: TrackedAudioSource, volume: float = 1.0) -> None:
if original.is_opus():
raise disnake.ClientException("AudioSource must not be Opus encoded")
self.original = original
self.volume = volume
def cleanup(self) -> None:
self.original.cleanup()
def read(self) -> bytes:
ret = self.original.read()
return audioop.mul(ret, 2, self.volume)

View File

@@ -1,111 +0,0 @@
import collections
from dataclasses import dataclass
from typing import ClassVar, Optional
import disnake
from ..constants import BAR_LENGTH, EMBED_COLOR
from .utils import format_duration
from .youtubedl import YTDLSource
@dataclass
class Song:
player: YTDLSource
trigger_message: disnake.Message
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
title = f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})"
duration = (
format_duration(self.player.duration) if self.player.duration else "stream"
)
if multiline:
queue_time = (
self.trigger_message.edited_at or self.trigger_message.created_at
)
return f"{title}\n**duration:** {duration}" + (
f", **queued by:** <@{self.trigger_message.author.id}> <t:{round(queue_time.timestamp())}:R>"
if show_queuer
else ""
)
return f"{title} [**{duration}**]" + (
f" (<@{self.trigger_message.author.id}>)" if show_queuer else ""
)
def embed(self, is_paused=False):
progress = 0
if self.player.duration:
progress = self.player.original.progress / self.player.duration
embed = disnake.Embed(
color=EMBED_COLOR,
title=self.player.title,
url=self.player.original_url,
description=(
f"{'⏸️ ' if is_paused else ''}"
f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` "
+ (
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
if self.player.duration
else "[**stream**]"
)
),
timestamp=self.trigger_message.edited_at or self.trigger_message.created_at,
)
uploader_value = None
if self.player.uploader_url:
if self.player.uploader:
uploader_value = f"[{self.player.uploader}]({self.player.uploader_url})"
else:
uploader_value = self.player.uploader_url
elif self.player.uploader:
uploader_value = self.player.uploader
if uploader_value:
embed.add_field(name="Uploader", value=uploader_value)
if self.player.like_count:
embed.add_field(name="Likes", value=f"{self.player.like_count:,}")
if self.player.view_count:
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
if self.player.timestamp:
embed.add_field(name="Published", value=f"<t:{int(self.player.timestamp)}>")
if self.player.volume:
embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%")
if self.player.thumbnail_url:
embed.set_image(self.player.thumbnail_url)
embed.set_footer(
text=f"Queued by {self.trigger_message.author.name}",
icon_url=(
self.trigger_message.author.avatar.url
if self.trigger_message.author.avatar
else None
),
)
return embed
def __str__(self):
return self.__repr__()
@dataclass
class Player:
queue: ClassVar = collections.deque()
current: Optional[Song] = None
def queue_pop(self):
popped = self.queue.popleft()
self.current = popped
return popped
def queue_push(self, item):
self.queue.append(item)
def queue_push_front(self, item):
self.queue.appendleft(item)
def __str__(self):
return self.__repr__()

View File

@@ -1,7 +0,0 @@
def format_duration(duration: int | float) -> str:
hours, duration = divmod(int(duration), 3600)
minutes, duration = divmod(duration, 60)
segments = [hours, minutes, duration]
if len(segments) == 3 and segments[0] == 0:
del segments[0]
return f"{':'.join(f'{s:0>2}' for s in segments)}"

View File

@@ -1,75 +0,0 @@
import asyncio
from typing import Any, Optional
import disnake
import yt_dlp
from ..constants import YTDL_OPTIONS
from .discord import PCMVolumeTransformer, TrackedAudioSource
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
class YTDLSource(PCMVolumeTransformer):
def __init__(
self,
source: TrackedAudioSource,
*,
data: dict[str, Any],
volume: float = 0.5,
):
super().__init__(source, volume)
self.description = data.get("description")
self.duration = data.get("duration")
self.id = data.get("id")
self.like_count = data.get("like_count")
self.original_url = data.get("original_url")
self.thumbnail_url = data.get("thumbnail")
self.timestamp = data.get("timestamp")
self.title = data.get("title")
self.uploader = data.get("uploader")
self.uploader_url = data.get("uploader_url")
self.view_count = data.get("view_count")
@classmethod
async def from_url(
cls,
url,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
stream: bool = False,
):
loop = loop or asyncio.get_event_loop()
data: Any = await loop.run_in_executor(
None,
lambda: ytdl.extract_info(url, download=not stream),
)
if "entries" in data:
if not data["entries"]:
raise Exception("no results found!")
data = data["entries"][0]
if "url" not in data:
raise Exception("no url returned!")
return cls(
TrackedAudioSource(
disnake.FFmpegPCMAudio(
data["url"] if stream else ytdl.prepare_filename(data),
before_options="-vn -reconnect 1",
),
),
data=data,
)
def __repr__(self):
return f"<YTDLSource title={self.title} original_url={self.original_url} duration={self.duration}>"
def __str__(self):
return self.__repr__()
def __reload_module__():
global ytdl
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)

View File

@@ -1,13 +0,0 @@
import random
from . import commands
from .constants import REACTIONS
async def on_message(message):
if random.random() < 0.01:
tokens = commands.tokenize(message.content, remove_prefix=False)
for keyword, options in REACTIONS.items():
if keyword in tokens:
await message.add_reaction(random.choice(options))
break

View File

@@ -1,3 +0,0 @@
from . import test_filter_secrets, test_format_duration
__all__ = ["test_filter_secrets", "test_format_duration"]

View File

@@ -2,8 +2,11 @@ import asyncio
import threading import threading
from logging import debug, info, warning from logging import debug, info, warning
from . import commands, core, fun, tasks import commands
from .state import client import core
import fun
import tasks
from state import client
def prepare(): def prepare():

View File

@@ -2,21 +2,18 @@ import asyncio
import string import string
import disnake import disnake
from youtube_transcript_api._api import YouTubeTranscriptApi import youtube_transcript_api
from .state import client, kill, players from state import client, kill, players
async def transcript( async def transcript(
message, message, languages=["en"], max_messages=6, min_messages=3, upper=True
languages=["en"],
max_messages=6,
min_messages=3,
upper=True,
): ):
initial_id = message.guild.voice_client.source.id initial_id = message.guild.voice_client.source.id
transcript_list = YouTubeTranscriptApi().list(initial_id) transcript_list = youtube_transcript_api.YouTubeTranscriptApi.list_transcripts(
initial_id
)
try: try:
transcript = transcript_list.find_manually_created_transcript(languages).fetch() transcript = transcript_list.find_manually_created_transcript(languages).fetch()
except Exception: except Exception:
@@ -24,19 +21,21 @@ async def transcript(
await message.channel.send("(autogenerated)") await message.channel.send("(autogenerated)")
messages = [] messages = []
for line in transcript.snippets: for line in transcript:
if ( if (
players[message.guild.id].current.player.original.progress players[message.guild.id].current.player.original.progress
>= line.start + line.duration >= line["start"] + line["duration"]
): ):
continue continue
while players[message.guild.id].current.player.original.progress < line.start: while (
players[message.guild.id].current.player.original.progress < line["start"]
):
await asyncio.sleep(0.2) await asyncio.sleep(0.2)
messages.insert( messages.insert(
0, 0,
await message.channel.send(line.text.upper() if upper else line.text), await message.channel.send(line["text"].upper() if upper else line["text"]),
) )
if len(messages) > max_messages: if len(messages) > max_messages:
try: try:
@@ -45,7 +44,7 @@ async def transcript(
await messages.pop().delete() await messages.pop().delete()
else: else:
await message.channel.delete_messages( await message.channel.delete_messages(
[messages.pop() for _ in range(count)], [messages.pop() for _ in range(count)]
) )
except Exception: except Exception:
pass pass
@@ -78,20 +77,19 @@ def messages_per_second(limit=500):
average = 1 average = 1
print( print(
f"I am receiving **{average} {'message' if average == 1 else 'messages'} per second** " f"I am receiving **{average} {'message' if average == 1 else 'messages'} per second** "
f"from **{len(members)} {'member' if len(members) == 1 else 'members'}** across **{len(guilds)} {'guild' if len(guilds) == 1 else 'guilds'}**", f"from **{len(members)} {'member' if len(members) == 1 else 'members'}** across **{len(guilds)} {'guild' if len(guilds) == 1 else 'guilds'}**"
) )
async def auto_count(channel_id: int): async def auto_count(channel_id: int):
if (channel := await client.fetch_channel(channel_id)) and isinstance( if (channel := await client.fetch_channel(channel_id)) and isinstance(
channel, channel, disnake.TextChannel
disnake.TextChannel,
): ):
last_message = (await channel.history(limit=1).flatten())[0] last_message = (await channel.history(limit=1).flatten())[0]
try: try:
result = str( result = str(
int("".join(filter(lambda d: d in string.digits, last_message.content))) int("".join(filter(lambda d: d in string.digits, last_message.content)))
+ 1, + 1
) )
except Exception: except Exception:
result = "where number" result = "where number"

61
flake.lock generated
View File

@@ -1,61 +0,0 @@
{
"nodes": {
"flake-parts": {
"inputs": {
"nixpkgs-lib": "nixpkgs-lib"
},
"locked": {
"lastModified": 1772408722,
"narHash": "sha256-rHuJtdcOjK7rAHpHphUb1iCvgkU3GpfvicLMwwnfMT0=",
"owner": "hercules-ci",
"repo": "flake-parts",
"rev": "f20dc5d9b8027381c474144ecabc9034d6a839a3",
"type": "github"
},
"original": {
"owner": "hercules-ci",
"repo": "flake-parts",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1773821835,
"narHash": "sha256-TJ3lSQtW0E2JrznGVm8hOQGVpXjJyXY2guAxku2O9A4=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "b40629efe5d6ec48dd1efba650c797ddbd39ace0",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-lib": {
"locked": {
"lastModified": 1772328832,
"narHash": "sha256-e+/T/pmEkLP6BHhYjx6GmwP5ivonQQn0bJdH9YrRB+Q=",
"owner": "nix-community",
"repo": "nixpkgs.lib",
"rev": "c185c7a5e5dd8f9add5b2f8ebeff00888b070742",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "nixpkgs.lib",
"type": "github"
}
},
"root": {
"inputs": {
"flake-parts": "flake-parts",
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

View File

@@ -1,42 +0,0 @@
{
inputs = {
flake-parts.url = "github:hercules-ci/flake-parts";
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
};
outputs =
{
flake-parts,
self,
...
}@inputs:
flake-parts.lib.mkFlake { inherit inputs; } {
systems = [
"aarch64-linux"
"x86_64-linux"
];
perSystem =
{ pkgs, self', ... }:
{
devShells.default = pkgs.mkShell {
name = "errornocord";
buildInputs = with pkgs; [
ffmpeg
self'.packages.default
];
};
packages = rec {
errornocord = pkgs.callPackage ./. { inherit self; };
default = errornocord;
};
};
flake.pins = import ./npins;
};
description = "Hot-reloadable Discord music bot";
}

10
fun.py Normal file
View File

@@ -0,0 +1,10 @@
import random
import commands
async def on_message(message):
if random.random() < 0.01 and "gn" in commands.tokenize(
message.content, remove_prefix=False
):
await message.add_reaction(random.choice(["💤", "😪", "😴", "🛌"]))

View File

@@ -1,13 +1,13 @@
import logging import logging
from . import constants, events import constants
from .state import client import events
from state import client
if __name__ == "__main__":
def main():
logging.basicConfig( logging.basicConfig(
format=( format=(
"%(asctime)s %(levelname)s %(name)s:%(module)s %(message)s" "%(asctime)s %(levelname)s %(name):%(module)s %(message)s"
if __debug__ if __debug__
else "%(asctime)s %(levelname)s %(message)s" else "%(asctime)s %(levelname)s %(message)s"
), ),
@@ -18,7 +18,3 @@ def main():
events.prepare() events.prepare()
client.run(constants.SECRETS["TOKEN"]) client.run(constants.SECRETS["TOKEN"])
if __name__ == "__main__":
main()

View File

@@ -1,249 +0,0 @@
/*
This file is provided under the MIT licence:
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the Software), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED AS IS, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
# Generated by npins. Do not modify; will be overwritten regularly
let
# Backwards-compatibly make something that previously didn't take any arguments take some
# The function must return an attrset, and will unfortunately be eagerly evaluated
# Same thing, but it catches eval errors on the default argument so that one may still call it with other arguments
mkFunctor =
fn:
let
e = builtins.tryEval (fn { });
in
(if e.success then e.value else { error = fn { }; }) // { __functor = _self: fn; };
# https://github.com/NixOS/nixpkgs/blob/0258808f5744ca980b9a1f24fe0b1e6f0fecee9c/lib/lists.nix#L295
range =
first: last: if first > last then [ ] else builtins.genList (n: first + n) (last - first + 1);
# https://github.com/NixOS/nixpkgs/blob/0258808f5744ca980b9a1f24fe0b1e6f0fecee9c/lib/strings.nix#L257
stringToCharacters = s: map (p: builtins.substring p 1 s) (range 0 (builtins.stringLength s - 1));
# https://github.com/NixOS/nixpkgs/blob/0258808f5744ca980b9a1f24fe0b1e6f0fecee9c/lib/strings.nix#L269
stringAsChars = f: s: concatStrings (map f (stringToCharacters s));
concatStrings = builtins.concatStringsSep "";
# If the environment variable NPINS_OVERRIDE_${name} is set, then use
# the path directly as opposed to the fetched source.
# (Taken from Niv for compatibility)
mayOverride =
name: path:
let
envVarName = "NPINS_OVERRIDE_${saneName}";
saneName = stringAsChars (c: if (builtins.match "[a-zA-Z0-9]" c) == null then "_" else c) name;
ersatz = builtins.getEnv envVarName;
in
if ersatz == "" then
path
else
# this turns the string into an actual Nix path (for both absolute and
# relative paths)
builtins.trace "Overriding path of \"${name}\" with \"${ersatz}\" due to set \"${envVarName}\"" (
if builtins.substring 0 1 ersatz == "/" then
/. + ersatz
else
/. + builtins.getEnv "PWD" + "/${ersatz}"
);
mkSource =
name: spec:
{
pkgs ? null,
}:
assert spec ? type;
let
# Unify across builtin and pkgs fetchers.
# `fetchGit` requires a wrapper because of slight API differences.
fetchers =
if pkgs == null then
{
inherit (builtins) fetchTarball fetchurl;
# For some fucking reason, fetchGit has a different signature than the other builtin fetchers …
fetchGit = args: (builtins.fetchGit args).outPath;
}
else
{
fetchTarball =
{
url,
sha256,
}:
pkgs.fetchzip {
inherit url sha256;
extension = "tar";
};
inherit (pkgs) fetchurl;
fetchGit =
{
url,
submodules,
rev,
name,
narHash,
}:
pkgs.fetchgit {
inherit url rev name;
fetchSubmodules = submodules;
hash = narHash;
};
};
# Dispatch to the correct code path based on the type
path =
if spec.type == "Git" then
mkGitSource fetchers spec
else if spec.type == "GitRelease" then
mkGitSource fetchers spec
else if spec.type == "PyPi" then
mkPyPiSource fetchers spec
else if spec.type == "Channel" then
mkChannelSource fetchers spec
else if spec.type == "Tarball" then
mkTarballSource fetchers spec
else if spec.type == "Container" then
mkContainerSource pkgs spec
else
builtins.throw "Unknown source type ${spec.type}";
in
spec // { outPath = mayOverride name path; };
mkGitSource =
{
fetchTarball,
fetchGit,
...
}:
{
repository,
revision,
url ? null,
submodules,
hash,
...
}:
assert repository ? type;
# At the moment, either it is a plain git repository (which has an url), or it is a GitHub/GitLab repository
# In the latter case, there we will always be an url to the tarball
if url != null && !submodules then
fetchTarball {
inherit url;
sha256 = hash;
}
else
let
url =
if repository.type == "Git" then
repository.url
else if repository.type == "GitHub" then
"https://github.com/${repository.owner}/${repository.repo}.git"
else if repository.type == "GitLab" then
"${repository.server}/${repository.repo_path}.git"
else if repository.type == "Forgejo" then
"${repository.server}/${repository.owner}/${repository.repo}.git"
else
throw "Unrecognized repository type ${repository.type}";
urlToName =
url: rev:
let
matched = builtins.match "^.*/([^/]*)(\\.git)?$" url;
short = builtins.substring 0 7 rev;
appendShort = if (builtins.match "[a-f0-9]*" rev) != null then "-${short}" else "";
in
"${if matched == null then "source" else builtins.head matched}${appendShort}";
name = urlToName url revision;
in
fetchGit {
rev = revision;
narHash = hash;
inherit name submodules url;
};
mkPyPiSource =
{ fetchurl, ... }:
{
url,
hash,
...
}:
fetchurl {
inherit url;
sha256 = hash;
};
mkChannelSource =
{ fetchTarball, ... }:
{
url,
hash,
...
}:
fetchTarball {
inherit url;
sha256 = hash;
};
mkTarballSource =
{ fetchTarball, ... }:
{
url,
locked_url ? url,
hash,
...
}:
fetchTarball {
url = locked_url;
sha256 = hash;
};
mkContainerSource =
pkgs:
{
image_name,
image_tag,
image_digest,
...
}:
if pkgs == null then
builtins.throw "container sources require passing in a Nixpkgs value: https://github.com/andir/npins/blob/master/README.md#using-the-nixpkgs-fetchers"
else
pkgs.dockerTools.pullImage {
imageName = image_name;
imageDigest = image_digest;
finalImageTag = image_tag;
};
in
mkFunctor (
{
input ? ./sources.json,
}:
let
data =
if builtins.isPath input then
# while `readFile` will throw an error anyways if the path doesn't exist,
# we still need to check beforehand because *our* error can be caught but not the one from the builtin
# *piegames sighs*
if builtins.pathExists input then
builtins.fromJSON (builtins.readFile input)
else
throw "Input path ${toString input} does not exist"
else if builtins.isAttrs input then
input
else
throw "Unsupported input type ${builtins.typeOf input}, must be a path or an attrset";
version = data.version;
in
if version == 7 then
builtins.mapAttrs (name: spec: mkFunctor (mkSource name spec)) data.pins
else
throw "Unsupported format version ${toString version} in sources.json. Try running `npins upgrade`"
)

View File

@@ -1,26 +0,0 @@
{
"pins": {
"disnake": {
"type": "Git",
"repository": {
"type": "GitHub",
"owner": "DisnakeDev",
"repo": "disnake"
},
"branch": "master",
"submodules": false,
"revision": "79afc2d8673299f43636730d4a1518e7901f95fa",
"url": "https://github.com/DisnakeDev/disnake/archive/79afc2d8673299f43636730d4a1518e7901f95fa.tar.gz",
"hash": "sha256-7gsHFnqTyANdV+eosLI4MlzsMyebLdYZShEVObJPhl8="
},
"disnake-paginator": {
"type": "PyPi",
"name": "disnake-paginator",
"version_upper_bound": null,
"version": "1.0.8",
"url": "https://files.pythonhosted.org/packages/8c/db/3a86b247c7653a3f1676d16a316daf400edcf76295098ab60544f2a8f8b7/disnake_paginator-1.0.8.tar.gz",
"hash": "sha256-Sn0qbKNkJIdX8GCRRRPxwWutBtG9c1Zt7JnQAsl5I4s="
}
},
"version": 7
}

View File

@@ -1,13 +0,0 @@
[project]
name = "errornocord"
version = "0.1.0"
[project.scripts]
errornocord = "errornocord.main:main"
[tool.setuptools.packages.find]
where = ["."]
include = ["errornocord*"]
[tool.basedpyright]
typeCheckingMode = "basic"

View File

@@ -1,6 +1,8 @@
aiohttp aiohttp
disnake[voice] @ https://github.com/DisnakeDev/disnake/archive/master.tar.gz audioop-lts
disnake
disnake_paginator disnake_paginator
psutil psutil
PyNaCl
youtube_transcript_api youtube_transcript_api
yt-dlp[default] @ https://github.com/yt-dlp/yt-dlp/archive/master.tar.gz yt-dlp

View File

@@ -1,36 +1,28 @@
import hashlib import hashlib
import json
import aiohttp import aiohttp
from .state import sponsorblock_cache from state import sponsorblock_cache
categories = json.dumps( CATEGORY_NAMES = {
[ "music_offtopic": "non-music",
"interaction", "sponsor": "sponsored",
"intro", }
"music_offtopic",
"outro",
"preview",
"selfpromo",
"sponsor",
],
)
async def get_segments(video_id: str): async def get_segments(video_id: str):
if video_id in sponsorblock_cache: if video_id in sponsorblock_cache:
return sponsorblock_cache[video_id] return sponsorblock_cache[video_id]
hash_prefix = hashlib.sha256(video_id.encode()).hexdigest()[:4] hashPrefix = hashlib.sha256(video_id.encode()).hexdigest()[:4]
session = aiohttp.ClientSession() session = aiohttp.ClientSession()
response = await session.get( response = await session.get(
f"https://sponsor.ajay.app/api/skipSegments/{hash_prefix}", f"https://sponsor.ajay.app/api/skipSegments/{hashPrefix}",
params={"categories": categories}, params={"categories": '["sponsor", "music_offtopic"]'},
) )
if response.status == 200 and ( if response.status == 200 and (
results := list( results := list(
filter(lambda v: video_id == v["videoID"], await response.json()), filter(lambda v: video_id == v["videoID"], await response.json())
) )
): ):
sponsorblock_cache[video_id] = results[0] sponsorblock_cache[video_id] = results[0]

View File

@@ -2,7 +2,7 @@ import time
import disnake import disnake
from .utils import LimitedSizeDict from utils import LimitedSizeDict
intents = disnake.Intents.default() intents = disnake.Intents.default()
intents.message_content = True intents.message_content = True
@@ -15,6 +15,5 @@ idle_tracker = {"is_idle": False, "last_used": time.time()}
kill = {"transcript": False} kill = {"transcript": False}
message_responses = LimitedSizeDict() message_responses = LimitedSizeDict()
players = {} players = {}
sponsorblock_cache = LimitedSizeDict() sponsorblock_cache = LimitedSizeDict(size_limit=100)
start_time = time.time() start_time = time.time()
trusted_users = []

View File

@@ -4,23 +4,23 @@ from logging import debug, error
import disnake import disnake
from .state import client, idle_tracker, players from state import client, idle_tracker, players
async def cleanup(): async def cleanup():
debug("spawned cleanup thread") debug("spawned cleanup thread")
while True: while True:
await asyncio.sleep(3600) await asyncio.sleep(3600 * 12)
targets = [] targets = []
for guild_id, player in players.items(): for guild_id, player in players.items():
if len(player.queue) == 0 and not player.current: if len(player.queue) == 0:
targets.append(guild_id) targets.append(guild_id)
for target in targets: for target in targets:
del players[target] del players[target]
if len(targets): if __debug__:
debug(f"cleanup thread removed {len(targets)} empty players") debug(f"cleanup removed {len(targets)} empty players")
if ( if (
not idle_tracker["is_idle"] not idle_tracker["is_idle"]

3
tests/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from . import test_filter_secrets, test_format_duration
__all__ = ["test_format_duration", "test_filter_secrets"]

View File

@@ -7,15 +7,15 @@ class TestFilterSecrets(unittest.TestCase):
def test_filter_secrets(self): def test_filter_secrets(self):
secret = "PLACEHOLDER_TOKEN" secret = "PLACEHOLDER_TOKEN"
self.assertFalse( self.assertFalse(
secret in utils.filter_secrets(f"HELLO{secret}WORLD", {"TOKEN": secret}), secret in utils.filter_secrets(f"HELLO{secret}WORLD", {"TOKEN": secret})
) )
self.assertFalse(secret in utils.filter_secrets(secret, {"TOKEN": secret})) self.assertFalse(secret in utils.filter_secrets(secret, {"TOKEN": secret}))
self.assertFalse( self.assertFalse(
secret in utils.filter_secrets(f"123{secret}", {"TOKEN": secret}), secret in utils.filter_secrets(f"123{secret}", {"TOKEN": secret})
) )
self.assertFalse( self.assertFalse(
secret in utils.filter_secrets(f"{secret}{secret}", {"TOKEN": secret}), secret in utils.filter_secrets(f"{secret}{secret}", {"TOKEN": secret})
) )
self.assertFalse( self.assertFalse(
secret in utils.filter_secrets(f"{secret}@#(*&*$)", {"TOKEN": secret}), secret in utils.filter_secrets(f"{secret}@#(*&*$)", {"TOKEN": secret})
) )

View File

@@ -1,13 +1,13 @@
import unittest import unittest
import audio
import utils import utils
import youtubedl
class TestFormatDuration(unittest.TestCase): class TestFormatDuration(unittest.TestCase):
def test_audio(self): def test_youtubedl(self):
def f(s): def f(s):
return audio.utils.format_duration(s) return youtubedl.format_duration(s)
self.assertEqual(f(0), "00:00") self.assertEqual(f(0), "00:00")
self.assertEqual(f(0.5), "00:00") self.assertEqual(f(0.5), "00:00")

View File

@@ -1,4 +1,4 @@
from .common import LimitedSizeDict, filter_secrets, format_duration, surround from .common import LimitedSizeDict, filter_secrets, format_duration
from .discord import ( from .discord import (
ChannelResponseWrapper, ChannelResponseWrapper,
MessageInteractionWrapper, MessageInteractionWrapper,
@@ -24,5 +24,4 @@ __all__ = [
"MessageInteractionWrapper", "MessageInteractionWrapper",
"reply", "reply",
"snowflake_timestamp", "snowflake_timestamp",
"surround",
] ]

View File

@@ -1,13 +1,9 @@
from collections import OrderedDict from collections import OrderedDict
from ..constants import SECRETS from constants import SECRETS
def surround(inner: str, outer="```") -> str: def format_duration(duration: int, natural: bool = False, short: bool = False):
return outer + str(inner) + outer
def format_duration(duration: int, natural: bool = False, short: bool = False) -> str:
def format_plural(noun, count): def format_plural(noun, count):
if short: if short:
return noun[0] return noun[0]
@@ -50,7 +46,7 @@ def filter_secrets(text: str, secrets=SECRETS) -> str:
class LimitedSizeDict(OrderedDict): class LimitedSizeDict(OrderedDict):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.size_limit = kwargs.pop("size_limit", 100) self.size_limit = kwargs.pop("size_limit", 1000)
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._check_size_limit() self._check_size_limit()

View File

@@ -1,18 +1,14 @@
import ctypes import os
import time import time
from logging import debug, error from logging import error, info
import disnake import disnake
from .. import commands import commands
from ..constants import OWNERS from state import command_cooldowns, message_responses
from ..state import command_cooldowns, message_responses
def cooldown(message, cooldown_time: int): def cooldown(message, cooldown_time: int):
if message.author.id in OWNERS:
return
possible_commands = commands.match(message.content) possible_commands = commands.match(message.content)
if not possible_commands or len(possible_commands) > 1: if not possible_commands or len(possible_commands) > 1:
return return
@@ -34,9 +30,7 @@ async def reply(message, *args, **kwargs):
try: try:
await message_responses[message.id].edit( await message_responses[message.id].edit(
*args, *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
) )
return return
except Exception: except Exception:
@@ -44,9 +38,7 @@ async def reply(message, *args, **kwargs):
try: try:
response = await message.reply( response = await message.reply(
*args, *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
) )
except Exception: except Exception:
response = await channel_send(message, *args, **kwargs) response = await channel_send(message, *args, **kwargs)
@@ -56,25 +48,26 @@ async def reply(message, *args, **kwargs):
async def channel_send(message, *args, **kwargs): async def channel_send(message, *args, **kwargs):
await message.channel.send( await message.channel.send(
*args, *args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
) )
def load_opus(): def load_opus():
path = ctypes.util._findLib_ld("opus") for path in filter(
lambda p: os.path.exists(p),
["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"],
):
try: try:
disnake.opus.load_opus(path) disnake.opus.load_opus(path)
debug(f"successfully loaded opus from {path}") info(f"successfully loaded opus from {path}")
return return
except Exception as e: except Exception as e:
error(f"failed to load opus from {path}: {e}") error(f"failed to load opus from {path}: {e}")
raise Exception("could not locate working opus library") raise Exception("could not locate working opus library")
def snowflake_timestamp(snowflake) -> int: def snowflake_timestamp(id):
return round(((snowflake >> 22) + 1420070400000) / 1000) return round(((id >> 22) + 1420070400000) / 1000)
async def add_check_reaction(message): async def add_check_reaction(message):
@@ -83,8 +76,7 @@ async def add_check_reaction(message):
async def invalid_user_handler(interaction): async def invalid_user_handler(interaction):
await interaction.response.send_message( await interaction.response.send_message(
"you are not the intended receiver of this message!", "you are not the intended receiver of this message!", ephemeral=True
ephemeral=True,
) )
@@ -94,7 +86,8 @@ class ChannelResponseWrapper:
self.sent_message = None self.sent_message = None
async def send_message(self, **kwargs): async def send_message(self, **kwargs):
kwargs.pop("ephemeral", None) if "ephemeral" in kwargs:
del kwargs["ephemeral"]
self.sent_message = await reply(self.message, **kwargs) self.sent_message = await reply(self.message, **kwargs)
async def edit_message(self, content=None, embed=None, view=None): async def edit_message(self, content=None, embed=None, view=None):

220
youtubedl.py Normal file
View File

@@ -0,0 +1,220 @@
import asyncio
import audioop
import collections
from dataclasses import dataclass
from typing import Any, Optional
import disnake
import yt_dlp
from constants import BAR_LENGTH, EMBED_COLOR, YTDL_OPTIONS
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
class PCMVolumeTransformer(disnake.AudioSource):
def __init__(self, original: disnake.AudioSource, volume: float = 1.0) -> None:
if original.is_opus():
raise disnake.ClientException("AudioSource must not be Opus encoded.")
self.original = original
self.volume = volume
@property
def volume(self) -> float:
return self._volume
@volume.setter
def volume(self, value: float) -> None:
self._volume = max(value, 0.0)
def cleanup(self) -> None:
self.original.cleanup()
def read(self) -> bytes:
ret = self.original.read()
return audioop.mul(ret, 2, self._volume)
class CustomAudioSource(disnake.AudioSource):
def __init__(self, source):
self._source = source
self.read_count = 0
def read(self) -> bytes:
data = self._source.read()
if data:
self.read_count += 1
return data
def fast_forward(self, seconds: int):
for _ in range(int(seconds / 0.02)):
self.read()
@property
def progress(self) -> float:
return self.read_count * 0.02
class YTDLSource(PCMVolumeTransformer):
def __init__(
self, source: CustomAudioSource, *, data: dict[str, Any], volume: float = 0.5
):
super().__init__(source, volume)
self.description = data.get("description")
self.duration = data.get("duration")
self.id = data.get("id")
self.like_count = data.get("like_count")
self.original_url = data.get("original_url")
self.thumbnail_url = data.get("thumbnail")
self.timestamp = data.get("timestamp")
self.title = data.get("title")
self.uploader = data.get("uploader")
self.uploader_url = data.get("uploader_url")
self.view_count = data.get("view_count")
@classmethod
async def from_url(
cls,
url,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
stream: bool = False,
):
loop = loop or asyncio.get_event_loop()
data: Any = await loop.run_in_executor(
None, lambda: ytdl.extract_info(url, download=not stream)
)
if "entries" in data:
if not data["entries"]:
raise Exception("no entries provided by yt-dlp!")
data = data["entries"][0]
return cls(
CustomAudioSource(
disnake.FFmpegPCMAudio(
data["url"] if stream else ytdl.prepare_filename(data),
before_options="-vn -reconnect 1",
)
),
data=data,
)
def __repr__(self):
return f"<YTDLSource title={self.title} original_url=<{self.original_url}> duration={self.duration}>"
def __str__(self):
return self.__repr__()
@dataclass
class QueuedSong:
player: YTDLSource
trigger_message: disnake.Message
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
if multiline:
return (
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})\n**duration:** {format_duration(self.player.duration) if self.player.duration else '[live]'}"
+ (
f", **queued by:** <@{self.trigger_message.author.id}>"
if show_queuer
else ""
)
)
else:
return (
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''}) [**{format_duration(self.player.duration) if self.player.duration else 'live'}**]"
+ (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "")
)
def embed(self, is_paused=False):
progress = 0
if self.player.duration:
progress = self.player.original.progress / self.player.duration
embed = disnake.Embed(
color=EMBED_COLOR,
title=self.player.title,
url=self.player.original_url,
description=(
f"{'⏸️ ' if is_paused else ''}"
f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` "
+ (
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
if self.player.duration
else "[**live**]"
)
),
)
if self.player.uploader_url:
embed.add_field(
name="Uploader",
value=f"[{self.player.uploader}]({self.player.uploader_url})",
)
else:
embed.add_field(
name="Uploader",
value=self.player.uploader,
)
embed.add_field(
name="Likes",
value=f"{self.player.like_count:,}"
if self.player.like_count
else "Unknown",
)
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
embed.add_field(name="Published", value=f"<t:{self.player.timestamp}>")
embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%")
embed.set_image(self.player.thumbnail_url)
embed.set_footer(
text=f"queued by {self.trigger_message.author.name}",
icon_url=(
self.trigger_message.author.avatar.url
if self.trigger_message.author.avatar
else None
),
)
return embed
def __str__(self):
return self.__repr__()
@dataclass
class QueuedPlayer:
queue = collections.deque()
current: Optional[QueuedSong] = None
def queue_pop(self):
popped = self.queue.popleft()
self.current = popped
return popped
def queue_add(self, item):
self.queue.append(item)
def queue_add_front(self, item):
self.queue.appendleft(item)
def __str__(self):
return self.__repr__()
def format_duration(duration: int | float) -> str:
hours, duration = divmod(int(duration), 3600)
minutes, duration = divmod(duration, 60)
segments = [hours, minutes, duration]
if len(segments) == 3 and segments[0] == 0:
del segments[0]
return f"{':'.join(f'{s:0>2}' for s in segments)}"
def __reload_module__():
global ytdl
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)