feat: add (incomplete) nix flake

dave.py isn't packaged yet. Will try to do this myself but dealing with
vcpkg is a bit annoying.
This commit is contained in:
2026-03-22 17:59:19 -04:00
parent 3232e797c8
commit 4185723b8d
40 changed files with 494 additions and 66 deletions

View File

@@ -0,0 +1,13 @@
from . import bot, tools, utils, voice
from .utils import Command, match, match_token, tokenize
__all__ = [
"bot",
"Command",
"match",
"match_token",
"tokenize",
"tools",
"utils",
"voice",
]

102
errornocord/commands/bot.py Normal file
View File

@@ -0,0 +1,102 @@
import os
import threading
import time
import disnake
import psutil
from yt_dlp import version
from .. import arguments, commands
from ..constants import EMBED_COLOR
from ..state import client, start_time
from ..utils import format_duration, reply, surround
async def status(message):
member_count = 0
channel_count = 0
for guild in client.guilds:
member_count += len(guild.members)
channel_count += len(guild.channels)
process = psutil.Process(os.getpid())
memory_usage = process.memory_info().rss / 1048576
embed = disnake.Embed(color=EMBED_COLOR)
embed.add_field(
name="Latency",
value=surround(f"{round(client.latency * 1000, 1)} ms"),
)
embed.add_field(
name="Memory",
value=surround(f"{round(memory_usage, 1)} MiB"),
)
embed.add_field(
name="Threads",
value=surround(threading.active_count()),
)
embed.add_field(
name="Guilds",
value=surround(len(client.guilds)),
)
embed.add_field(
name="Members",
value=surround(member_count),
)
embed.add_field(
name="Channels",
value=surround(channel_count),
)
embed.add_field(
name="Disnake",
value=surround(disnake.__version__),
)
embed.add_field(
name="yt-dlp",
value=surround(version.__version__),
)
embed.add_field(
name="Uptime",
value=surround(format_duration(int(time.time() - start_time), short=True)),
)
await reply(message, embed=embed)
async def uptime(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"print bot uptime",
)
parser.add_argument(
"-s",
"--since",
action="store_true",
help="bot up since",
)
if not (args := await parser.parse_args(message, tokens)):
return
if args.since:
await reply(message, f"{round(start_time)}")
else:
await reply(message, f"up {format_duration(int(time.time() - start_time))}")
async def ping(message):
await reply(
message,
embed=disnake.Embed(
title="Pong :ping_pong:",
description=f"Latency: **{round(client.latency * 1000, 1)} ms**",
color=EMBED_COLOR,
),
)
async def help(message):
await reply(
message,
", ".join(
[f"`{command.value}`" for command in commands.Command.__members__.values()],
),
)

View File

@@ -0,0 +1,277 @@
import re
import aiohttp
import disnake
from .. import arguments, commands, utils
from ..constants import APPLICATION_FLAGS, BADGE_EMOJIS, EMBED_COLOR, PUBLIC_FLAGS
from ..state import client
async def lookup(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"look up a discord user or application by ID",
)
parser.add_argument(
"-a",
"--application",
action="store_true",
help="look up applications instead of users",
)
parser.add_argument(
"id",
type=int,
help="the ID to perform a search for",
)
if not (args := await parser.parse_args(message, tokens)):
return
if args.application:
session = aiohttp.ClientSession()
response = await (
await session.get(f"https://discord.com/api/v9/applications/{args.id}/rpc")
).json()
if "code" in response.keys():
await utils.reply(message, "application not found!")
return
embed = disnake.Embed(description=response["description"], color=EMBED_COLOR)
embed.set_thumbnail(
url=f"https://cdn.discordapp.com/app-icons/{response['id']}/{response['icon']}.webp",
)
embed.add_field(name="Application Name", value=response["name"])
embed.add_field(name="Application ID", value="`" + response["id"] + "`")
embed.add_field(
name="Public Bot",
value=f"{'`' + str(response['bot_public']) + '`' if 'bot_public' in response else 'No bot'}",
)
embed.add_field(name="Public Flags", value="`" + str(response["flags"]) + "`")
embed.add_field(
name="Terms of Service",
value=(
"None"
if "terms_of_service_url" not in response.keys()
else f"[Link]({response['terms_of_service_url']})"
),
)
embed.add_field(
name="Privacy Policy",
value=(
"None"
if "privacy_policy_url" not in response.keys()
else f"[Link]({response['privacy_policy_url']})"
),
)
embed.add_field(
name="Creation Time",
value=f"<t:{utils.snowflake_timestamp(int(response['id']))}:R>",
)
embed.add_field(
name="Default Invite URL",
value=(
"None"
if "install_params" not in response.keys()
else f"[Link](https://discord.com/oauth2/authorize?client_id={response['id']}&permissions={response['install_params']['permissions']}&scope={'%20'.join(response['install_params']['scopes'])})"
),
)
embed.add_field(
name="Custom Invite URL",
value=(
"None"
if "custom_install_url" not in response.keys()
else f"[Link]({response['custom_install_url']})"
),
)
bot_intents = []
for application_flag, intent_name in APPLICATION_FLAGS.items():
if response["flags"] & application_flag == application_flag:
if intent_name.replace(" (unverified)", "") not in bot_intents:
bot_intents.append(intent_name)
embed.add_field(
name="Application Flags",
value=", ".join(bot_intents) if bot_intents else "None",
)
bot_tags = ""
if "tags" in response.keys():
for tag in response["tags"]:
bot_tags += tag + ", "
embed.add_field(
name="Tags",
value="None" if bot_tags == "" else bot_tags[:-2],
inline=False,
)
else:
try:
user = await client.fetch_user(args.id)
except Exception:
await utils.reply(message, "user not found!")
return
badges = ""
for flag, flag_name in PUBLIC_FLAGS.items():
if user.public_flags.value & flag == flag:
if flag_name != "None":
try:
badges += BADGE_EMOJIS[PUBLIC_FLAGS[flag]]
except Exception as e:
raise Exception(
f"unable to find badge: {PUBLIC_FLAGS[flag]}"
) from e
user_object = await client.fetch_user(user.id)
accent_color = 0x000000
if user_object.accent_color is not None:
accent_color = user_object.accent_color
embed = disnake.Embed(color=accent_color)
embed.add_field(
name="User ID",
value=f"`{user.id}`",
)
embed.add_field(
name="Discriminator",
value=f"`{user.name}#{user.discriminator}`",
)
embed.add_field(
name="Creation Time",
value=f"<t:{utils.snowflake_timestamp(int(user.id))}:R>",
)
embed.add_field(
name="Public Flags",
value=f"`{user.public_flags.value}` {badges}",
)
embed.add_field(
name="Bot User",
value=f"`{user.bot}`",
)
embed.add_field(
name="System User",
value=f"`{user.system}`",
)
embed.set_thumbnail(url=user.avatar if user.avatar else user.default_avatar)
if user_object.banner:
embed.set_image(url=user_object.banner)
await utils.reply(message, embed=embed)
async def clear(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"bulk delete messages in the current channel matching specified criteria",
)
parser.add_argument(
"count",
type=lambda c: arguments.range_type(c, lower=1, upper=1000),
help="amount of messages to delete",
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
"-r",
"--regex",
required=False,
help="delete messages with content matching this regex",
)
group.add_argument(
"-c",
"--contains",
required=False,
help="delete messages with content containing this substring",
)
parser.add_argument(
"-i",
"--case-insensitive",
action="store_true",
help="ignore case sensitivity when deleting messages",
)
parser.add_argument(
"-a",
"--author-id",
type=int,
action="append",
help="delete messages whose author matches this id",
)
parser.add_argument(
"-o",
"--oldest-first",
action="store_true",
help="delete oldest messages first",
)
parser.add_argument(
"-R",
"--reactions",
action="store_true",
help="delete messages with reactions",
)
parser.add_argument(
"-A",
"--attachments",
action="store_true",
help="delete messages with attachments",
)
parser.add_argument(
"-d",
"--delete-command",
action="store_true",
help="delete the command message as well",
)
parser.add_argument(
"-I",
"--ignore-ids",
type=int,
action="append",
help="ignore messages with this id",
)
if not (args := await parser.parse_args(message, tokens)):
return
if args.delete_command:
try:
await message.delete()
except Exception:
pass
regex = None
if r := args.regex:
regex = re.compile(r, re.IGNORECASE if args.case_insensitive else 0)
def check(m):
if (ids := args.ignore_ids) and m.id in ids:
return False
c = []
if regex:
c.append(regex.search(m.content))
if s := args.contains:
if args.case_insensitive:
c.append(s.lower() in m.content.lower())
else:
c.append(s in m.content)
if i := args.author_id:
c.append(m.author.id in i)
if args.reactions:
c.append(len(m.reactions) > 0)
if args.attachments:
c.append(len(m.attachments) > 0)
return all(c)
messages = len(
await message.channel.purge(
limit=args.count,
check=check,
oldest_first=args.oldest_first,
),
)
if not args.delete_command:
try:
await utils.reply(
message,
f"purged **{messages}/{args.count} {'message' if args.count == 1 else 'messages'}**",
)
except Exception:
pass

View File

@@ -0,0 +1,95 @@
from enum import Enum
from functools import lru_cache
from .. import constants
class Command(Enum):
CLEAR = "clear"
CURRENT = "current"
EXECUTE = "execute"
FAST_FORWARD = "ff"
HELP = "help"
JOIN = "join"
LEAVE = "leave"
LOOKUP = "lookup"
PAUSE = "pause"
PING = "ping"
PLAY = "play"
PLAYING = "playing"
PURGE = "purge"
QUEUE = "queue"
RELOAD = "reload"
RESUME = "resume"
SKIP = "skip"
SPONSORBLOCK = "sponsorblock"
STATUS = "status"
UPTIME = "uptime"
VOLUME = "volume"
@lru_cache
def match_token(token: str) -> list[Command]:
match token.lower():
case "r":
return [Command.RELOAD]
case "s":
return [Command.SKIP]
case "c":
return [Command.CURRENT]
if exact_match := list(
filter(
lambda command: command.value == token.lower(),
Command.__members__.values(),
),
):
return exact_match
return list(
filter(
lambda command: command.value.startswith(token.lower()),
Command.__members__.values(),
),
)
@lru_cache
def match(command: str) -> list[Command] | None:
if tokens := tokenize(command):
return match_token(tokens[0])
@lru_cache
def tokenize(string: str, remove_prefix: bool = True) -> list[str]:
tokens = []
token = ""
in_quotes = False
quote_char = None
escape = False
if remove_prefix:
string = string[len(constants.PREFIX) :]
for char in string:
if escape:
token += char
escape = False
elif char == "\\":
escape = True
elif char in ('"', "'") and not in_quotes:
in_quotes = True
quote_char = char
elif char == quote_char and in_quotes:
in_quotes = False
quote_char = None
elif char.isspace() and not in_quotes:
if token:
tokens.append(token)
token = ""
else:
token += char
if token:
tokens.append(token)
return tokens

View File

@@ -0,0 +1,20 @@
from .channel import join, leave
from .playback import fast_forward, pause, playing, resume, volume
from .queue import queue_or_play, skip
from .sponsorblock import sponsorblock_command
from .utils import remove_queued
__all__ = [
"fast_forward",
"join",
"leave",
"pause",
"playing",
"queue_or_play",
"remove_queued",
"resume",
"skip",
"skip",
"sponsorblock_command",
"volume",
]

View File

@@ -0,0 +1,23 @@
from ... import utils
from .utils import command_allowed
async def join(message):
if message.author.voice:
if message.guild.voice_client:
await message.guild.voice_client.move_to(message.channel)
else:
await message.author.voice.channel.connect()
else:
await utils.reply(message, "you are not connected to a voice channel!")
return
await utils.add_check_reaction(message)
async def leave(message):
if not command_allowed(message):
return
await message.guild.voice_client.disconnect()
await utils.add_check_reaction(message)

View File

@@ -0,0 +1,166 @@
import disnake_paginator
from ... import arguments, commands, sponsorblock, utils
from ...constants import EMBED_COLOR
from ...state import players
from .utils import command_allowed
async def playing(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"get information about the currently playing song",
)
parser.add_argument(
"-d",
"--description",
action="store_true",
help="get the description",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message, immutable=True):
return
if source := message.guild.voice_client.source:
if args.description:
if description := source.description:
paginator = disnake_paginator.ButtonPaginator(
invalid_user_function=utils.invalid_user_handler,
color=EMBED_COLOR,
title=source.title,
segments=disnake_paginator.split(description),
)
for embed in paginator.embeds:
embed.url = source.original_url
await paginator.start(utils.MessageInteractionWrapper(message))
else:
await utils.reply(
message,
source.description or "no description found!",
)
return
await utils.reply(
message,
embed=players[message.guild.id].current.embed(
is_paused=message.guild.voice_client.is_paused(),
),
)
else:
await utils.reply(
message,
"nothing is playing!",
)
async def resume(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_paused():
message.guild.voice_client.resume()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is paused!",
)
async def pause(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_playing():
message.guild.voice_client.pause()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is playing!",
)
async def fast_forward(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0], "skip the current sponsorblock segment"
)
parser.add_argument(
"-s",
"--seconds",
nargs="?",
type=lambda v: arguments.range_type(v, lower=0, upper=300),
help="the number of seconds to fast forward instead",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
seconds = args.seconds
if not seconds:
video = await sponsorblock.get_segments(
players[message.guild.id].current.player.id,
)
if not video:
await utils.reply(
message,
"no sponsorblock segments were found for this video!",
)
return
progress = message.guild.voice_client.source.original.progress
for segment in video["segments"]:
begin, end = map(float, segment["segment"])
if progress >= begin and progress < end:
seconds = end - message.guild.voice_client.source.original.progress
if not seconds:
await utils.reply(message, "no sponsorblock segment is currently playing!")
return
message.guild.voice_client.pause()
message.guild.voice_client.source.original.fast_forward(seconds)
message.guild.voice_client.resume()
await utils.add_check_reaction(message)
async def volume(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "get or set the current volume level")
parser.add_argument(
"volume",
nargs="?",
type=lambda v: arguments.range_type(v, lower=0, upper=150),
help="the volume level (0 - 150)",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message, immutable=True):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
if args.volume is None:
await utils.reply(
message,
f"{int(message.guild.voice_client.source.volume * 100)}",
)
else:
if not command_allowed(message):
return
message.guild.voice_client.source.volume = float(args.volume) / 100.0
await utils.add_check_reaction(message)

View File

@@ -0,0 +1,267 @@
import itertools
import disnake
import disnake_paginator
from ... import arguments, audio, commands, utils
from ...constants import EMBED_COLOR
from ...state import client, players, trusted_users
from .playback import resume
from .utils import command_allowed, ensure_joined, play_next
async def queue_or_play(message, edited=False):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"queue a song, list the queue, or resume playback",
)
parser.add_argument("query", nargs="*", help="yt-dlp URL or query to get song")
parser.add_argument(
"-v",
"--volume",
default=50,
type=lambda v: arguments.range_type(v, lower=0, upper=150),
help="the volume level (0 - 150) for the specified song",
)
parser.add_argument(
"-i",
"--remove-index",
type=int,
nargs="*",
help="remove queued songs by index",
)
parser.add_argument(
"-m",
"--match-multiple",
action="store_true",
help="continue removing queued after finding a match",
)
parser.add_argument(
"-c",
"--clear",
action="store_true",
help="remove all queued songs",
)
parser.add_argument(
"--now",
action="store_true",
help="play the specified song immediately",
)
parser.add_argument(
"--next",
action="store_true",
help="play the specified song next",
)
parser.add_argument(
"-t",
"--remove-title",
help="remove queued songs by title",
)
parser.add_argument(
"-q",
"--remove-queuer",
type=int,
help="remove queued songs by queuer",
)
if not (args := await parser.parse_args(message, tokens)):
return
await ensure_joined(message)
if len(tokens) == 1 and tokens[0].lower() != "play":
if not command_allowed(message, immutable=True):
return
elif not command_allowed(message):
return
if message.guild.id not in players:
players[message.guild.id] = audio.queue.Player()
if edited:
found = next(
filter(
lambda queued: queued.trigger_message.id == message.id,
players[message.guild.id].queue,
),
None,
)
if found:
players[message.guild.id].queue.remove(found)
if args.clear:
players[message.guild.id].queue.clear()
await utils.add_check_reaction(message)
elif indices := args.remove_index:
targets = []
for i in indices:
if i <= 0 or i > len(players[message.guild.id].queue):
await utils.reply(message, f"invalid index `{i}`!")
return
targets.append(players[message.guild.id].queue[i - 1])
for target in targets:
if target in players[message.guild.id].queue:
players[message.guild.id].queue.remove(target)
if len(targets) == 1:
await utils.reply(message, f"**removed** {targets[0].format()}")
else:
await utils.reply(
message,
f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}",
)
elif args.remove_title or args.remove_queuer:
targets = set()
for queued in players[message.guild.id].queue:
if t := args.remove_title:
if t in queued.player.title:
targets.add(queued)
if q := args.remove_queuer:
if q == queued.trigger_message.author.id:
targets.add(queued)
targets = list(targets)
if not args.match_multiple:
targets = targets[:1]
for target in targets:
players[message.guild.id].queue.remove(target)
await utils.reply(
message,
f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}",
)
elif query := args.query:
if (
not message.channel.permissions_for(message.author).manage_channels
and len(
list(
filter(
lambda queued: (
queued.trigger_message.author.id == message.author.id
),
players[message.guild.id].queue,
),
),
)
>= 5
and not len(message.guild.voice_client.channel.members) == 2
and message.author.id not in trusted_users
):
await utils.reply(
message,
"you can only queue **5 items** without the manage channels permission!",
)
return
try:
async with message.channel.typing():
player = await audio.youtubedl.YTDLSource.from_url(
" ".join(query),
loop=client.loop,
stream=True,
)
player.volume = float(args.volume) / 100.0
except Exception as e:
await utils.reply(message, f"failed to queue: `{e}`")
return
queued = audio.queue.Song(player, message)
if args.now or args.next:
players[message.guild.id].queue_push_front(queued)
else:
players[message.guild.id].queue_push(queued)
if not message.guild.voice_client:
await utils.reply(message, "unexpected disconnect from voice channel!")
return
elif not message.guild.voice_client.source:
play_next(message, first=True)
elif args.now:
message.guild.voice_client.stop()
else:
await utils.reply(
message,
f"**{1 if args.next else len(players[message.guild.id].queue)}.** {queued.format()}",
)
utils.cooldown(message, 2)
elif tokens[0].lower() == "play":
await resume(message)
else:
if players[message.guild.id].queue:
formatted_duration = utils.format_duration(
sum(
[
queued.player.duration if queued.player.duration else 0
for queued in players[message.guild.id].queue
],
),
natural=True,
)
def embed(description):
e = disnake.Embed(
description=description,
color=EMBED_COLOR,
)
if formatted_duration and len(players[message.guild.id].queue) > 1:
e.set_footer(text=f"{formatted_duration} in total")
return e
await disnake_paginator.ButtonPaginator(
invalid_user_function=utils.invalid_user_handler,
color=EMBED_COLOR,
segments=list(
map(
embed,
[
"\n\n".join(
[
f"**{i + 1}.** {queued.format(show_queuer=True, hide_preview=True, multiline=True)}"
for i, queued in batch
],
)
for batch in itertools.batched(
enumerate(players[message.guild.id].queue),
10,
)
],
),
),
).start(utils.MessageInteractionWrapper(message))
else:
await utils.reply(
message,
"nothing is queued!",
)
async def skip(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "skip the song currently playing")
parser.add_argument(
"-n",
"--next",
action="store_true",
help="skip the next song",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message):
return
if not players[message.guild.id] and not players[message.guild.id].queue:
message.guild.voice_client.stop()
await utils.reply(
message,
"the queue is empty now!",
)
elif args.next:
next = players[message.guild.id].queue.pop()
await utils.reply(message, f"**skipped** {next.format()}")
else:
message.guild.voice_client.stop()
await utils.add_check_reaction(message)
if not message.guild.voice_client.source:
play_next(message)

View File

@@ -0,0 +1,44 @@
import disnake
from ... import audio, sponsorblock, utils
from ...constants import EMBED_COLOR, SPONSORBLOCK_CATEGORY_NAMES
from ...state import players
from .utils import command_allowed
async def sponsorblock_command(message):
if not command_allowed(message, immutable=True):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
progress = message.guild.voice_client.source.original.progress
video = await sponsorblock.get_segments(players[message.guild.id].current.player.id)
if not video:
await utils.reply(
message,
"no sponsorblock segments were found for this video!",
)
return
text = []
for segment in video["segments"]:
begin, end = map(int, segment["segment"])
if (category := segment["category"]) in SPONSORBLOCK_CATEGORY_NAMES:
category = SPONSORBLOCK_CATEGORY_NAMES[category]
current = "**" if progress >= begin and progress < end else ""
text.append(
f"{current}`{audio.utils.format_duration(begin)}` - `{audio.utils.format_duration(end)}`: {category}{current}",
)
await utils.reply(
message,
embed=disnake.Embed(
title="Sponsorblock segments",
description="\n".join(text),
color=EMBED_COLOR,
),
)

View File

@@ -0,0 +1,72 @@
from logging import error
import disnake
from ... import utils
from ...state import client, players
def play_after_callback(e, message, once):
if e:
error(f"player error: {e}")
if not once:
play_next(message)
def play_next(message, once=False, first=False):
if not message.guild.voice_client:
return
message.guild.voice_client.stop()
if not disnake.opus.is_loaded():
utils.load_opus()
if message.guild.id in players and players[message.guild.id].queue:
queued = players[message.guild.id].queue_pop()
message.guild.voice_client.play(
queued.player,
after=lambda e: play_after_callback(e, message, once),
)
embed = queued.embed()
if first and len(players[message.guild.id].queue) == 0:
client.loop.create_task(utils.reply(message, embed=embed))
else:
client.loop.create_task(utils.channel_send(message, embed=embed))
def remove_queued(messages):
if messages[0].guild.id not in players:
return
if len(players[messages[0].guild.id].queue) == 0:
return
found = []
for message in messages:
for queued in players[message.guild.id].queue:
if queued.trigger_message.id == message.id:
found.append(queued)
for queued in found:
players[messages[0].guild.id].queue.remove(queued)
async def ensure_joined(message):
if message.guild.voice_client is None:
if message.author.voice:
await message.author.voice.channel.connect()
else:
await utils.reply(message, "you are not connected to a voice channel!")
def command_allowed(message, immutable=False):
if not message.guild.voice_client:
return False
if immutable:
return True
if not message.author.voice:
return False
return message.author.voice.channel.id == message.guild.voice_client.channel.id