Compare commits
1 Commits
main
...
b71331a102
| Author | SHA1 | Date | |
|---|---|---|---|
|
b71331a102
|
2
.gitignore
vendored
2
.gitignore
vendored
@@ -1,4 +1,2 @@
|
||||
.direnv
|
||||
.env
|
||||
.venv
|
||||
__pycache__
|
||||
|
||||
@@ -7,4 +7,4 @@ COPY . .
|
||||
|
||||
RUN pip install -r requirements.txt
|
||||
|
||||
CMD ["python", "-m", "errornocord.main"]
|
||||
CMD ["python", "-OO", "main.py"]
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
# ErrorNoCord
|
||||
|
||||
Hot-reloadable Discord music bot
|
||||
Discord music bot for testing purposes, with live reloading support
|
||||
|
||||
@@ -2,15 +2,13 @@ import argparse
|
||||
import contextlib
|
||||
import io
|
||||
|
||||
from . import utils
|
||||
import utils
|
||||
|
||||
|
||||
class ArgumentParser:
|
||||
def __init__(self, command, description):
|
||||
self.parser = argparse.ArgumentParser(
|
||||
command,
|
||||
description=description,
|
||||
exit_on_error=False,
|
||||
command, description=description, exit_on_error=False
|
||||
)
|
||||
|
||||
def print_help(self):
|
||||
@@ -28,20 +26,21 @@ class ArgumentParser:
|
||||
async def parse_args(self, message, tokens) -> argparse.Namespace | None:
|
||||
try:
|
||||
with contextlib.redirect_stdout(io.StringIO()):
|
||||
return self.parser.parse_args(tokens[1:])
|
||||
args = self.parser.parse_args(tokens[1:])
|
||||
return args
|
||||
except SystemExit:
|
||||
await utils.reply(message, f"```\n{self.print_help()}```")
|
||||
except Exception as e:
|
||||
await utils.reply(message, f"`{e}`")
|
||||
|
||||
|
||||
def range_type(string: str, lower=0, upper=100) -> int:
|
||||
def range_type(string: str, min=0, max=100):
|
||||
try:
|
||||
value = int(string)
|
||||
except ValueError as e:
|
||||
raise argparse.ArgumentTypeError("value is not a valid integer") from e
|
||||
except ValueError:
|
||||
raise argparse.ArgumentTypeError("value is not a valid integer")
|
||||
|
||||
if lower <= value <= upper:
|
||||
if min <= value <= max:
|
||||
return value
|
||||
|
||||
raise argparse.ArgumentTypeError(f"value is not in range {lower}-{upper}")
|
||||
else:
|
||||
raise argparse.ArgumentTypeError(f"value is not in range {min}-{max}")
|
||||
@@ -3,11 +3,11 @@ from .utils import Command, match, match_token, tokenize
|
||||
|
||||
__all__ = [
|
||||
"bot",
|
||||
"tools",
|
||||
"utils",
|
||||
"voice",
|
||||
"Command",
|
||||
"match",
|
||||
"match_token",
|
||||
"tokenize",
|
||||
"tools",
|
||||
"utils",
|
||||
"voice",
|
||||
]
|
||||
@@ -6,10 +6,11 @@ import disnake
|
||||
import psutil
|
||||
from yt_dlp import version
|
||||
|
||||
from .. import arguments, commands
|
||||
from ..constants import EMBED_COLOR
|
||||
from ..state import client, start_time
|
||||
from ..utils import format_duration, reply, surround
|
||||
import arguments
|
||||
import commands
|
||||
import utils
|
||||
from constants import EMBED_COLOR
|
||||
from state import client, start_time
|
||||
|
||||
|
||||
async def status(message):
|
||||
@@ -24,41 +25,41 @@ async def status(message):
|
||||
embed = disnake.Embed(color=EMBED_COLOR)
|
||||
embed.add_field(
|
||||
name="Latency",
|
||||
value=surround(f"{round(client.latency * 1000, 1)} ms"),
|
||||
value=f"```{round(client.latency * 1000, 1)} ms```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Memory",
|
||||
value=surround(f"{round(memory_usage, 1)} MiB"),
|
||||
value=f"```{round(memory_usage, 1)} MiB```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Threads",
|
||||
value=surround(threading.active_count()),
|
||||
value=f"```{threading.active_count()}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Guilds",
|
||||
value=surround(len(client.guilds)),
|
||||
value=f"```{len(client.guilds)}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Members",
|
||||
value=surround(member_count),
|
||||
value=f"```{member_count}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Channels",
|
||||
value=surround(channel_count),
|
||||
value=f"```{channel_count}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Disnake",
|
||||
value=surround(disnake.__version__),
|
||||
value=f"```{disnake.__version__}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="yt-dlp",
|
||||
value=surround(version.__version__),
|
||||
value=f"```{version.__version__}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Uptime",
|
||||
value=surround(format_duration(int(time.time() - start_time), short=True)),
|
||||
value=f"```{utils.format_duration(int(time.time() - start_time), short=True)}```",
|
||||
)
|
||||
await reply(message, embed=embed)
|
||||
await utils.reply(message, embed=embed)
|
||||
|
||||
|
||||
async def uptime(message):
|
||||
@@ -77,13 +78,15 @@ async def uptime(message):
|
||||
return
|
||||
|
||||
if args.since:
|
||||
await reply(message, f"{round(start_time)}")
|
||||
await utils.reply(message, f"{round(start_time)}")
|
||||
else:
|
||||
await reply(message, f"up {format_duration(int(time.time() - start_time))}")
|
||||
await utils.reply(
|
||||
message, f"up {utils.format_duration(int(time.time() - start_time))}"
|
||||
)
|
||||
|
||||
|
||||
async def ping(message):
|
||||
await reply(
|
||||
await utils.reply(
|
||||
message,
|
||||
embed=disnake.Embed(
|
||||
title="Pong :ping_pong:",
|
||||
@@ -94,9 +97,9 @@ async def ping(message):
|
||||
|
||||
|
||||
async def help(message):
|
||||
await reply(
|
||||
await utils.reply(
|
||||
message,
|
||||
", ".join(
|
||||
[f"`{command.value}`" for command in commands.Command.__members__.values()],
|
||||
[f"`{command.value}`" for command in commands.Command.__members__.values()]
|
||||
),
|
||||
)
|
||||
@@ -3,22 +3,24 @@ import re
|
||||
import aiohttp
|
||||
import disnake
|
||||
|
||||
from .. import arguments, commands, utils
|
||||
from ..constants import APPLICATION_FLAGS, BADGE_EMOJIS, EMBED_COLOR, PUBLIC_FLAGS
|
||||
from ..state import client
|
||||
import arguments
|
||||
import commands
|
||||
import utils
|
||||
from constants import APPLICATION_FLAGS, BADGE_EMOJIS, EMBED_COLOR, PUBLIC_FLAGS
|
||||
from state import client
|
||||
|
||||
|
||||
async def lookup(message):
|
||||
tokens = commands.tokenize(message.content)
|
||||
parser = arguments.ArgumentParser(
|
||||
tokens[0],
|
||||
"look up a discord user or application by ID",
|
||||
"look up a user or application on discord by their ID",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-a",
|
||||
"--application",
|
||||
action="store_true",
|
||||
help="look up applications instead of users",
|
||||
help="search for applications instead of users",
|
||||
)
|
||||
parser.add_argument(
|
||||
"id",
|
||||
@@ -39,7 +41,7 @@ async def lookup(message):
|
||||
|
||||
embed = disnake.Embed(description=response["description"], color=EMBED_COLOR)
|
||||
embed.set_thumbnail(
|
||||
url=f"https://cdn.discordapp.com/app-icons/{response['id']}/{response['icon']}.webp",
|
||||
url=f"https://cdn.discordapp.com/app-icons/{response['id']}/{response['icon']}.webp"
|
||||
)
|
||||
embed.add_field(name="Application Name", value=response["name"])
|
||||
embed.add_field(name="Application ID", value="`" + response["id"] + "`")
|
||||
@@ -100,9 +102,7 @@ async def lookup(message):
|
||||
for tag in response["tags"]:
|
||||
bot_tags += tag + ", "
|
||||
embed.add_field(
|
||||
name="Tags",
|
||||
value="None" if bot_tags == "" else bot_tags[:-2],
|
||||
inline=False,
|
||||
name="Tags", value="None" if bot_tags == "" else bot_tags[:-2], inline=False
|
||||
)
|
||||
else:
|
||||
try:
|
||||
@@ -117,10 +117,8 @@ async def lookup(message):
|
||||
if flag_name != "None":
|
||||
try:
|
||||
badges += BADGE_EMOJIS[PUBLIC_FLAGS[flag]]
|
||||
except Exception as e:
|
||||
raise Exception(
|
||||
f"unable to find badge: {PUBLIC_FLAGS[flag]}"
|
||||
) from e
|
||||
except Exception:
|
||||
raise Exception(f"unable to find badge: {PUBLIC_FLAGS[flag]}")
|
||||
|
||||
user_object = await client.fetch_user(user.id)
|
||||
accent_color = 0x000000
|
||||
@@ -163,11 +161,11 @@ async def clear(message):
|
||||
tokens = commands.tokenize(message.content)
|
||||
parser = arguments.ArgumentParser(
|
||||
tokens[0],
|
||||
"bulk delete messages in the current channel matching specified criteria",
|
||||
"bulk delete messages in the current channel matching certain criteria",
|
||||
)
|
||||
parser.add_argument(
|
||||
"count",
|
||||
type=lambda c: arguments.range_type(c, lower=1, upper=1000),
|
||||
type=lambda c: arguments.range_type(c, min=1, max=1000),
|
||||
help="amount of messages to delete",
|
||||
)
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
@@ -261,10 +259,8 @@ async def clear(message):
|
||||
|
||||
messages = len(
|
||||
await message.channel.purge(
|
||||
limit=args.count,
|
||||
check=check,
|
||||
oldest_first=args.oldest_first,
|
||||
),
|
||||
limit=args.count, check=check, oldest_first=args.oldest_first
|
||||
)
|
||||
)
|
||||
|
||||
if not args.delete_command:
|
||||
@@ -1,7 +1,7 @@
|
||||
from enum import Enum
|
||||
from functools import lru_cache
|
||||
|
||||
from .. import constants
|
||||
import constants
|
||||
|
||||
|
||||
class Command(Enum):
|
||||
@@ -30,19 +30,16 @@ class Command(Enum):
|
||||
|
||||
@lru_cache
|
||||
def match_token(token: str) -> list[Command]:
|
||||
match token.lower():
|
||||
case "r":
|
||||
if token.lower() == "r":
|
||||
return [Command.RELOAD]
|
||||
case "s":
|
||||
elif token.lower() == "s":
|
||||
return [Command.SKIP]
|
||||
case "c":
|
||||
return [Command.CURRENT]
|
||||
|
||||
if exact_match := list(
|
||||
filter(
|
||||
lambda command: command.value == token.lower(),
|
||||
Command.__members__.values(),
|
||||
),
|
||||
)
|
||||
):
|
||||
return exact_match
|
||||
|
||||
@@ -50,7 +47,7 @@ def match_token(token: str) -> list[Command]:
|
||||
filter(
|
||||
lambda command: command.value.startswith(token.lower()),
|
||||
Command.__members__.values(),
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -1,14 +1,17 @@
|
||||
from ... import utils
|
||||
from ...state import players
|
||||
import disnake
|
||||
|
||||
import utils
|
||||
|
||||
from .utils import command_allowed
|
||||
|
||||
|
||||
async def join(message):
|
||||
if message.author.voice:
|
||||
if message.guild.voice_client:
|
||||
await message.guild.voice_client.move_to(message.channel)
|
||||
else:
|
||||
return await message.guild.voice_client.move_to(message.channel)
|
||||
elif message.author.voice:
|
||||
await message.author.voice.channel.connect()
|
||||
elif isinstance(message.channel, disnake.VoiceChannel):
|
||||
await message.channel.connect()
|
||||
else:
|
||||
await utils.reply(message, "you are not connected to a voice channel!")
|
||||
return
|
||||
@@ -20,8 +23,5 @@ async def leave(message):
|
||||
if not command_allowed(message):
|
||||
return
|
||||
|
||||
if message.guild.id in players:
|
||||
del players[message.guild.id]
|
||||
await message.guild.voice_client.disconnect()
|
||||
|
||||
await utils.add_check_reaction(message)
|
||||
@@ -1,16 +1,19 @@
|
||||
import arguments
|
||||
import disnake_paginator
|
||||
from constants import EMBED_COLOR
|
||||
from state import players
|
||||
|
||||
import commands
|
||||
import sponsorblock
|
||||
import utils
|
||||
|
||||
from ... import arguments, commands, sponsorblock, utils
|
||||
from ...constants import EMBED_COLOR
|
||||
from ...state import players
|
||||
from .utils import command_allowed
|
||||
|
||||
|
||||
async def playing(message):
|
||||
tokens = commands.tokenize(message.content)
|
||||
parser = arguments.ArgumentParser(
|
||||
tokens[0],
|
||||
"get information about the currently playing song",
|
||||
tokens[0], "get information about the currently playing song"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
@@ -46,7 +49,7 @@ async def playing(message):
|
||||
await utils.reply(
|
||||
message,
|
||||
embed=players[message.guild.id].current.embed(
|
||||
is_paused=message.guild.voice_client.is_paused(),
|
||||
is_paused=message.guild.voice_client.is_paused()
|
||||
),
|
||||
)
|
||||
else:
|
||||
@@ -86,15 +89,13 @@ async def pause(message):
|
||||
|
||||
async def fast_forward(message):
|
||||
tokens = commands.tokenize(message.content)
|
||||
parser = arguments.ArgumentParser(
|
||||
tokens[0], "skip the current sponsorblock segment"
|
||||
)
|
||||
parser = arguments.ArgumentParser(tokens[0], "skip current sponsorblock segment")
|
||||
parser.add_argument(
|
||||
"-s",
|
||||
"--seconds",
|
||||
nargs="?",
|
||||
type=lambda v: arguments.range_type(v, lower=0, upper=300),
|
||||
help="the number of seconds to fast forward instead",
|
||||
type=lambda v: arguments.range_type(v, min=0, max=300),
|
||||
help="the amount of seconds to fast forward instead",
|
||||
)
|
||||
if not (args := await parser.parse_args(message, tokens)):
|
||||
return
|
||||
@@ -109,12 +110,11 @@ async def fast_forward(message):
|
||||
seconds = args.seconds
|
||||
if not seconds:
|
||||
video = await sponsorblock.get_segments(
|
||||
players[message.guild.id].current.player.id,
|
||||
players[message.guild.id].current.player.id
|
||||
)
|
||||
if not video:
|
||||
await utils.reply(
|
||||
message,
|
||||
"no sponsorblock segments were found for this video!",
|
||||
message, "no sponsorblock segments were found for this video!"
|
||||
)
|
||||
return
|
||||
|
||||
@@ -140,7 +140,7 @@ async def volume(message):
|
||||
parser.add_argument(
|
||||
"volume",
|
||||
nargs="?",
|
||||
type=lambda v: arguments.range_type(v, lower=0, upper=150),
|
||||
type=lambda v: arguments.range_type(v, min=0, max=150),
|
||||
help="the volume level (0 - 150)",
|
||||
)
|
||||
if not (args := await parser.parse_args(message, tokens)):
|
||||
@@ -3,25 +3,31 @@ import itertools
|
||||
import disnake
|
||||
import disnake_paginator
|
||||
|
||||
from ... import arguments, audio, commands, utils
|
||||
from ...constants import EMBED_COLOR
|
||||
from ...state import client, players, trusted_users
|
||||
import arguments
|
||||
import commands
|
||||
import utils
|
||||
import youtubedl
|
||||
from constants import EMBED_COLOR
|
||||
from state import client, players
|
||||
|
||||
from .playback import resume
|
||||
from .utils import command_allowed, ensure_joined, play_next
|
||||
|
||||
|
||||
async def queue_or_play(message, edited=False):
|
||||
if message.guild.id not in players:
|
||||
players[message.guild.id] = youtubedl.QueuedPlayer()
|
||||
|
||||
tokens = commands.tokenize(message.content)
|
||||
parser = arguments.ArgumentParser(
|
||||
tokens[0],
|
||||
"queue a song, list the queue, or resume playback",
|
||||
tokens[0], "queue a song, list the queue, or resume playback"
|
||||
)
|
||||
parser.add_argument("query", nargs="*", help="yt-dlp URL or query to get song")
|
||||
parser.add_argument(
|
||||
"-v",
|
||||
"--volume",
|
||||
default=50,
|
||||
type=lambda v: arguments.range_type(v, lower=0, upper=150),
|
||||
type=lambda v: arguments.range_type(v, min=0, max=150),
|
||||
help="the volume level (0 - 150) for the specified song",
|
||||
)
|
||||
parser.add_argument(
|
||||
@@ -74,23 +80,19 @@ async def queue_or_play(message, edited=False):
|
||||
elif not command_allowed(message):
|
||||
return
|
||||
|
||||
if message.guild.id not in players:
|
||||
players[message.guild.id] = audio.queue.Player()
|
||||
|
||||
if edited:
|
||||
found = next(
|
||||
filter(
|
||||
lambda queued: queued.trigger_message.id == message.id,
|
||||
players[message.guild.id].queue,
|
||||
),
|
||||
None,
|
||||
)
|
||||
found = None
|
||||
for queued in players[message.guild.id].queue:
|
||||
if queued.trigger_message.id == message.id:
|
||||
found = queued
|
||||
break
|
||||
if found:
|
||||
players[message.guild.id].queue.remove(found)
|
||||
|
||||
if args.clear:
|
||||
players[message.guild.id].queue.clear()
|
||||
await utils.add_check_reaction(message)
|
||||
return
|
||||
elif indices := args.remove_index:
|
||||
targets = []
|
||||
for i in indices:
|
||||
@@ -111,15 +113,15 @@ async def queue_or_play(message, edited=False):
|
||||
f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}",
|
||||
)
|
||||
elif args.remove_title or args.remove_queuer:
|
||||
targets = set()
|
||||
targets = []
|
||||
for queued in players[message.guild.id].queue:
|
||||
if t := args.remove_title:
|
||||
if t in queued.player.title:
|
||||
targets.add(queued)
|
||||
targets.append(queued)
|
||||
continue
|
||||
if q := args.remove_queuer:
|
||||
if q == queued.trigger_message.author.id:
|
||||
targets.add(queued)
|
||||
targets = list(targets)
|
||||
targets.append(queued)
|
||||
if not args.match_multiple:
|
||||
targets = targets[:1]
|
||||
|
||||
@@ -135,16 +137,14 @@ async def queue_or_play(message, edited=False):
|
||||
and len(
|
||||
list(
|
||||
filter(
|
||||
lambda queued: (
|
||||
queued.trigger_message.author.id == message.author.id
|
||||
),
|
||||
lambda queued: queued.trigger_message.author.id
|
||||
== message.author.id,
|
||||
players[message.guild.id].queue,
|
||||
),
|
||||
),
|
||||
)
|
||||
)
|
||||
)
|
||||
>= 5
|
||||
and not len(message.guild.voice_client.channel.members) == 2
|
||||
and message.author.id not in trusted_users
|
||||
):
|
||||
await utils.reply(
|
||||
message,
|
||||
@@ -154,22 +154,22 @@ async def queue_or_play(message, edited=False):
|
||||
|
||||
try:
|
||||
async with message.channel.typing():
|
||||
player = await audio.youtubedl.YTDLSource.from_url(
|
||||
" ".join(query),
|
||||
loop=client.loop,
|
||||
stream=True,
|
||||
player = await youtubedl.YTDLSource.from_url(
|
||||
" ".join(query), loop=client.loop, stream=True
|
||||
)
|
||||
player.volume = float(args.volume) / 100.0
|
||||
except Exception as e:
|
||||
await utils.reply(message, f"failed to queue: `{e}`")
|
||||
await utils.reply(
|
||||
message, f"**failed to queue:** `{e}`", suppress_embeds=True
|
||||
)
|
||||
return
|
||||
|
||||
queued = audio.queue.Song(player, message)
|
||||
queued = youtubedl.QueuedSong(player, message)
|
||||
|
||||
if args.now or args.next:
|
||||
players[message.guild.id].queue_push_front(queued)
|
||||
players[message.guild.id].queue_add_front(queued)
|
||||
else:
|
||||
players[message.guild.id].queue_push(queued)
|
||||
players[message.guild.id].queue_add(queued)
|
||||
|
||||
if not message.guild.voice_client:
|
||||
await utils.reply(message, "unexpected disconnect from voice channel!")
|
||||
@@ -194,7 +194,7 @@ async def queue_or_play(message, edited=False):
|
||||
[
|
||||
queued.player.duration if queued.player.duration else 0
|
||||
for queued in players[message.guild.id].queue
|
||||
],
|
||||
]
|
||||
),
|
||||
natural=True,
|
||||
)
|
||||
@@ -219,14 +219,13 @@ async def queue_or_play(message, edited=False):
|
||||
[
|
||||
f"**{i + 1}.** {queued.format(show_queuer=True, hide_preview=True, multiline=True)}"
|
||||
for i, queued in batch
|
||||
],
|
||||
]
|
||||
)
|
||||
for batch in itertools.batched(
|
||||
enumerate(players[message.guild.id].queue),
|
||||
10,
|
||||
enumerate(players[message.guild.id].queue), 10
|
||||
)
|
||||
],
|
||||
),
|
||||
)
|
||||
),
|
||||
).start(utils.MessageInteractionWrapper(message))
|
||||
else:
|
||||
@@ -238,7 +237,7 @@ async def queue_or_play(message, edited=False):
|
||||
|
||||
async def skip(message):
|
||||
tokens = commands.tokenize(message.content)
|
||||
parser = arguments.ArgumentParser(tokens[0], "skip the song currently playing")
|
||||
parser = arguments.ArgumentParser(tokens[0], "skip the currently playing song")
|
||||
parser.add_argument(
|
||||
"-n",
|
||||
"--next",
|
||||
@@ -251,8 +250,7 @@ async def skip(message):
|
||||
if not command_allowed(message):
|
||||
return
|
||||
|
||||
if players[message.guild.id] and not players[message.guild.id].queue:
|
||||
del players[message.guild.id]
|
||||
if not players[message.guild.id].queue:
|
||||
message.guild.voice_client.stop()
|
||||
await utils.reply(
|
||||
message,
|
||||
@@ -1,8 +1,11 @@
|
||||
import disnake
|
||||
|
||||
from ... import audio, sponsorblock, utils
|
||||
from ...constants import EMBED_COLOR, SPONSORBLOCK_CATEGORY_NAMES
|
||||
from ...state import players
|
||||
import sponsorblock
|
||||
import utils
|
||||
import youtubedl
|
||||
from constants import EMBED_COLOR
|
||||
from state import players
|
||||
|
||||
from .utils import command_allowed
|
||||
|
||||
|
||||
@@ -18,20 +21,18 @@ async def sponsorblock_command(message):
|
||||
video = await sponsorblock.get_segments(players[message.guild.id].current.player.id)
|
||||
if not video:
|
||||
await utils.reply(
|
||||
message,
|
||||
"no sponsorblock segments were found for this video!",
|
||||
message, "no sponsorblock segments were found for this video!"
|
||||
)
|
||||
return
|
||||
|
||||
text = []
|
||||
for segment in video["segments"]:
|
||||
begin, end = map(int, segment["segment"])
|
||||
if (category := segment["category"]) in SPONSORBLOCK_CATEGORY_NAMES:
|
||||
category = SPONSORBLOCK_CATEGORY_NAMES[category]
|
||||
category_name = sponsorblock.CATEGORY_NAMES.get(segment["category"])
|
||||
|
||||
current = "**" if progress >= begin and progress < end else ""
|
||||
text.append(
|
||||
f"{current}`{audio.utils.format_duration(begin)}` - `{audio.utils.format_duration(end)}`: {category}{current}",
|
||||
f"{current}`{youtubedl.format_duration(begin)}` - `{youtubedl.format_duration(end)}`: {category_name if category_name else 'Unknown'}{current}"
|
||||
)
|
||||
|
||||
await utils.reply(
|
||||
@@ -2,8 +2,8 @@ from logging import error
|
||||
|
||||
import disnake
|
||||
|
||||
from ... import utils
|
||||
from ...state import client, players
|
||||
import utils
|
||||
from state import client, players
|
||||
|
||||
|
||||
def play_after_callback(e, message, once):
|
||||
@@ -24,8 +24,7 @@ def play_next(message, once=False, first=False):
|
||||
if message.guild.id in players and players[message.guild.id].queue:
|
||||
queued = players[message.guild.id].queue_pop()
|
||||
message.guild.voice_client.play(
|
||||
queued.player,
|
||||
after=lambda e: play_after_callback(e, message, once),
|
||||
queued.player, after=lambda e: play_after_callback(e, message, once)
|
||||
)
|
||||
|
||||
embed = queued.embed()
|
||||
@@ -19,65 +19,48 @@ BAR_LENGTH = 35
|
||||
EMBED_COLOR = 0xFF6600
|
||||
OWNERS = [531392146767347712]
|
||||
PREFIX = "%"
|
||||
SPONSORBLOCK_CATEGORY_NAMES = {
|
||||
"music_offtopic": "non-music",
|
||||
"selfpromo": "self promotion",
|
||||
"sponsor": "sponsored",
|
||||
}
|
||||
REACTIONS = {
|
||||
"cat": ["🐈"],
|
||||
"dog": ["🐕"],
|
||||
"gn": ["💤", "😪", "😴", "🛌"],
|
||||
"pizza": ["🍕"],
|
||||
}
|
||||
RELOADABLE_MODULES = [
|
||||
"errornocord.arguments",
|
||||
"errornocord.audio",
|
||||
"errornocord.audio.discord",
|
||||
"errornocord.audio.queue",
|
||||
"errornocord.audio.utils",
|
||||
"errornocord.audio.youtubedl",
|
||||
"errornocord.commands",
|
||||
"errornocord.commands.bot",
|
||||
"errornocord.commands.tools",
|
||||
"errornocord.commands.utils",
|
||||
"errornocord.commands.voice",
|
||||
"errornocord.commands.voice.channel",
|
||||
"errornocord.commands.voice.playback",
|
||||
"errornocord.commands.voice.playing",
|
||||
"errornocord.commands.voice.queue",
|
||||
"errornocord.commands.voice.sponsorblock",
|
||||
"errornocord.commands.voice.utils",
|
||||
"errornocord.constants",
|
||||
"errornocord.core",
|
||||
"errornocord.events",
|
||||
"errornocord.extra",
|
||||
"errornocord.fun",
|
||||
"errornocord.sponsorblock",
|
||||
"errornocord.tasks",
|
||||
"errornocord.utils",
|
||||
"errornocord.utils.common",
|
||||
"errornocord.utils.discord",
|
||||
"errornocord.voice",
|
||||
"arguments",
|
||||
"commands",
|
||||
"commands.bot",
|
||||
"commands.tools",
|
||||
"commands.utils",
|
||||
"commands.voice",
|
||||
"commands.voice.channel",
|
||||
"commands.voice.playback",
|
||||
"commands.voice.playing",
|
||||
"commands.voice.queue",
|
||||
"commands.voice.sponsorblock",
|
||||
"commands.voice.utils",
|
||||
"constants",
|
||||
"core",
|
||||
"events",
|
||||
"extra",
|
||||
"fun",
|
||||
"sponsorblock",
|
||||
"tasks",
|
||||
"utils",
|
||||
"voice",
|
||||
"youtubedl",
|
||||
"yt_dlp",
|
||||
"yt_dlp.version",
|
||||
]
|
||||
PUBLIC_FLAGS = {
|
||||
1 << 0: "Discord Employee",
|
||||
1 << 1: "Discord Partner",
|
||||
1 << 2: "HypeSquad Events",
|
||||
1 << 3: "Bug Hunter Level 1",
|
||||
1 << 6: "HypeSquad Bravery",
|
||||
1 << 7: "HypeSquad Brilliance",
|
||||
1 << 8: "HypeSquad Balance",
|
||||
1 << 9: "Early Supporter",
|
||||
1 << 10: "Team User",
|
||||
1 << 14: "Bug Hunter Level 2",
|
||||
1 << 16: "Verified Bot",
|
||||
1 << 17: "Verified Bot Developer",
|
||||
1 << 18: "Discord Certified Moderator",
|
||||
1 << 19: "HTTP Interactions Only",
|
||||
1 << 22: "Active Developer",
|
||||
1: "Discord Employee",
|
||||
2: "Discord Partner",
|
||||
4: "HypeSquad Events",
|
||||
8: "Bug Hunter Level 1",
|
||||
64: "HypeSquad Bravery",
|
||||
128: "HypeSquad Brilliance",
|
||||
256: "HypeSquad Balance",
|
||||
512: "Early Supporter",
|
||||
1024: "Team User",
|
||||
16384: "Bug Hunter Level 2",
|
||||
65536: "Verified Bot",
|
||||
131072: "Verified Bot Developer",
|
||||
262144: "Discord Certified Moderator",
|
||||
524288: "HTTP Interactions Only",
|
||||
4194304: "Active Developer",
|
||||
}
|
||||
BADGE_EMOJIS = {
|
||||
"Discord Employee": "<:DiscordStaff:879666899980546068>",
|
||||
@@ -3,7 +3,6 @@ import contextlib
|
||||
import importlib
|
||||
import inspect
|
||||
import io
|
||||
import signal
|
||||
import textwrap
|
||||
import time
|
||||
import traceback
|
||||
@@ -12,10 +11,11 @@ from logging import debug
|
||||
import disnake
|
||||
import disnake_paginator
|
||||
|
||||
from . import commands, utils
|
||||
from .commands import Command as C
|
||||
from .constants import EMBED_COLOR, OWNERS, PREFIX, RELOADABLE_MODULES
|
||||
from .state import client, command_cooldowns, command_locks, idle_tracker, players
|
||||
import commands
|
||||
import utils
|
||||
from commands import Command as C
|
||||
from constants import EMBED_COLOR, OWNERS, PREFIX, RELOADABLE_MODULES
|
||||
from state import client, command_cooldowns, command_locks, idle_tracker
|
||||
|
||||
|
||||
async def on_message(message, edited=False):
|
||||
@@ -48,22 +48,32 @@ async def on_message(message, edited=False):
|
||||
|
||||
try:
|
||||
if (cooldowns := command_cooldowns.get(message.author.id)) and not edited:
|
||||
if (end_time := cooldowns.get(matched)) and (
|
||||
remaining_time := round(end_time - time.time()) > 0
|
||||
if (end_time := cooldowns.get(matched)) and int(time.time()) < int(
|
||||
end_time
|
||||
):
|
||||
await utils.reply(
|
||||
message,
|
||||
f"please wait **{utils.format_duration(remaining_time, natural=True)}** before using this command again!",
|
||||
f"please wait **{utils.format_duration(int(end_time - time.time()), natural=True)}** before using this command again!",
|
||||
)
|
||||
return
|
||||
|
||||
match matched:
|
||||
case C.RELOAD if message.author.id in OWNERS:
|
||||
reloaded_modules = set()
|
||||
start = time.time()
|
||||
reloaded_modules = reload()
|
||||
|
||||
rreload(reloaded_modules, __import__("core"))
|
||||
rreload(reloaded_modules, __import__("extra"))
|
||||
for module in filter(
|
||||
lambda v: inspect.ismodule(v) and v.__name__ in RELOADABLE_MODULES,
|
||||
globals().values(),
|
||||
):
|
||||
rreload(reloaded_modules, module)
|
||||
|
||||
end = time.time()
|
||||
if __debug__:
|
||||
debug(
|
||||
f"reloaded {len(reloaded_modules)} modules in {round(end - start, 2)}s",
|
||||
f"reloaded {len(reloaded_modules)} modules in {round(end - start, 2)}s"
|
||||
)
|
||||
|
||||
await utils.add_check_reaction(message)
|
||||
@@ -148,31 +158,26 @@ async def on_message(message, edited=False):
|
||||
command_locks[(message.guild.id, message.author.id)].release()
|
||||
|
||||
|
||||
async def on_voice_state_update(member, before, after):
|
||||
def is_alone(channel):
|
||||
async def on_voice_state_update(_, before, after):
|
||||
def is_empty(channel):
|
||||
return [m.id for m in (channel.members if channel else [])] == [client.user.id]
|
||||
|
||||
if member.id == client.user.id and is_alone(after.channel):
|
||||
if before.channel.guild.id in players:
|
||||
del players[before.channel.guild.id]
|
||||
await after.channel.guild.voice_client.disconnect()
|
||||
return
|
||||
|
||||
if is_alone(before.channel):
|
||||
if before.channel.guild.id in players:
|
||||
del players[before.channel.guild.id]
|
||||
await before.channel.guild.voice_client.disconnect()
|
||||
channel = None
|
||||
if is_empty(before.channel):
|
||||
channel = before.channel
|
||||
elif is_empty(after.channel):
|
||||
channel = after.channel
|
||||
if channel:
|
||||
await channel.guild.voice_client.disconnect()
|
||||
|
||||
|
||||
def rreload(reloaded_modules, module):
|
||||
reloaded_modules.add(module.__name__)
|
||||
|
||||
for submodule in filter(
|
||||
lambda sm: (
|
||||
inspect.ismodule(sm)
|
||||
and sm.__name__ in RELOADABLE_MODULES
|
||||
and sm.__name__ not in reloaded_modules
|
||||
),
|
||||
lambda v: inspect.ismodule(v)
|
||||
and v.__name__ in RELOADABLE_MODULES
|
||||
and v.__name__ not in reloaded_modules,
|
||||
vars(module).values(),
|
||||
):
|
||||
rreload(reloaded_modules, submodule)
|
||||
@@ -181,18 +186,3 @@ def rreload(reloaded_modules, module):
|
||||
|
||||
if "__reload_module__" in dir(module):
|
||||
module.__reload_module__()
|
||||
|
||||
|
||||
def reload(*_):
|
||||
reloaded_modules = set()
|
||||
rreload(reloaded_modules, __import__("errornocord.core"))
|
||||
rreload(reloaded_modules, __import__("errornocord.extra"))
|
||||
for module in filter(
|
||||
lambda v: inspect.ismodule(v) and v.__name__ in RELOADABLE_MODULES,
|
||||
globals().values(),
|
||||
):
|
||||
rreload(reloaded_modules, module)
|
||||
return reloaded_modules
|
||||
|
||||
|
||||
signal.signal(signal.SIGUSR1, reload)
|
||||
59
default.nix
59
default.nix
@@ -1,59 +0,0 @@
|
||||
{
|
||||
lib,
|
||||
python3Packages,
|
||||
self,
|
||||
...
|
||||
}:
|
||||
let
|
||||
disnake = python3Packages.disnake.overrideAttrs (old: {
|
||||
src = self.pins.disnake;
|
||||
|
||||
propagatedBuildInputs =
|
||||
with python3Packages;
|
||||
old.propagatedBuildInputs
|
||||
++ [
|
||||
typing-extensions
|
||||
versioningit
|
||||
];
|
||||
|
||||
nativeBuildInputs = old.nativeBuildInputs ++ [ python3Packages.hatchling ];
|
||||
});
|
||||
|
||||
disnake_paginator = python3Packages.buildPythonPackage {
|
||||
pname = "disnake-paginator";
|
||||
version = "1.0.8";
|
||||
|
||||
src = self.pins.disnake-paginator;
|
||||
|
||||
pyproject = true;
|
||||
build-system = [ python3Packages.setuptools ];
|
||||
|
||||
propagatedBuildInputs = [
|
||||
disnake
|
||||
];
|
||||
|
||||
doCheck = false;
|
||||
};
|
||||
in
|
||||
python3Packages.buildPythonApplication {
|
||||
pname = "errornocord";
|
||||
version = "0.1.0";
|
||||
|
||||
src = lib.cleanSource ./.;
|
||||
|
||||
pyproject = true;
|
||||
build-system = [ python3Packages.setuptools ];
|
||||
|
||||
propagatedBuildInputs = with python3Packages; [
|
||||
aiohttp
|
||||
audioop-lts
|
||||
disnake
|
||||
disnake_paginator
|
||||
psutil
|
||||
typing-extensions
|
||||
youtube-transcript-api
|
||||
yt-dlp
|
||||
];
|
||||
|
||||
doCheck = false;
|
||||
}
|
||||
@@ -1,8 +0,0 @@
|
||||
from . import discord, queue, utils, youtubedl
|
||||
|
||||
__all__ = [
|
||||
"discord",
|
||||
"queue",
|
||||
"utils",
|
||||
"youtubedl",
|
||||
]
|
||||
@@ -1,39 +0,0 @@
|
||||
import audioop
|
||||
|
||||
import disnake
|
||||
|
||||
|
||||
class TrackedAudioSource(disnake.AudioSource):
|
||||
def __init__(self, source):
|
||||
self._source = source
|
||||
self.read_count = 0
|
||||
|
||||
def read(self) -> bytes:
|
||||
data = self._source.read()
|
||||
if data:
|
||||
self.read_count += 1
|
||||
return data
|
||||
|
||||
def fast_forward(self, seconds: int):
|
||||
for _ in range(int(seconds / 0.02)):
|
||||
self.read()
|
||||
|
||||
@property
|
||||
def progress(self) -> float:
|
||||
return self.read_count * 0.02
|
||||
|
||||
|
||||
class PCMVolumeTransformer(disnake.AudioSource):
|
||||
def __init__(self, original: TrackedAudioSource, volume: float = 1.0) -> None:
|
||||
if original.is_opus():
|
||||
raise disnake.ClientException("AudioSource must not be Opus encoded")
|
||||
|
||||
self.original = original
|
||||
self.volume = volume
|
||||
|
||||
def cleanup(self) -> None:
|
||||
self.original.cleanup()
|
||||
|
||||
def read(self) -> bytes:
|
||||
ret = self.original.read()
|
||||
return audioop.mul(ret, 2, self.volume)
|
||||
@@ -1,111 +0,0 @@
|
||||
import collections
|
||||
from dataclasses import dataclass
|
||||
from typing import ClassVar, Optional
|
||||
|
||||
import disnake
|
||||
|
||||
from ..constants import BAR_LENGTH, EMBED_COLOR
|
||||
from .utils import format_duration
|
||||
from .youtubedl import YTDLSource
|
||||
|
||||
|
||||
@dataclass
|
||||
class Song:
|
||||
player: YTDLSource
|
||||
trigger_message: disnake.Message
|
||||
|
||||
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
|
||||
title = f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})"
|
||||
duration = (
|
||||
format_duration(self.player.duration) if self.player.duration else "stream"
|
||||
)
|
||||
if multiline:
|
||||
queue_time = (
|
||||
self.trigger_message.edited_at or self.trigger_message.created_at
|
||||
)
|
||||
return f"{title}\n**duration:** {duration}" + (
|
||||
f", **queued by:** <@{self.trigger_message.author.id}> <t:{round(queue_time.timestamp())}:R>"
|
||||
if show_queuer
|
||||
else ""
|
||||
)
|
||||
return f"{title} [**{duration}**]" + (
|
||||
f" (<@{self.trigger_message.author.id}>)" if show_queuer else ""
|
||||
)
|
||||
|
||||
def embed(self, is_paused=False):
|
||||
progress = 0
|
||||
if self.player.duration:
|
||||
progress = self.player.original.progress / self.player.duration
|
||||
|
||||
embed = disnake.Embed(
|
||||
color=EMBED_COLOR,
|
||||
title=self.player.title,
|
||||
url=self.player.original_url,
|
||||
description=(
|
||||
f"{'⏸️ ' if is_paused else ''}"
|
||||
f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` "
|
||||
+ (
|
||||
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
|
||||
if self.player.duration
|
||||
else "[**stream**]"
|
||||
)
|
||||
),
|
||||
timestamp=self.trigger_message.edited_at or self.trigger_message.created_at,
|
||||
)
|
||||
|
||||
uploader_value = None
|
||||
if self.player.uploader_url:
|
||||
if self.player.uploader:
|
||||
uploader_value = f"[{self.player.uploader}]({self.player.uploader_url})"
|
||||
else:
|
||||
uploader_value = self.player.uploader_url
|
||||
elif self.player.uploader:
|
||||
uploader_value = self.player.uploader
|
||||
|
||||
if uploader_value:
|
||||
embed.add_field(name="Uploader", value=uploader_value)
|
||||
if self.player.like_count:
|
||||
embed.add_field(name="Likes", value=f"{self.player.like_count:,}")
|
||||
if self.player.view_count:
|
||||
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
|
||||
if self.player.timestamp:
|
||||
embed.add_field(name="Published", value=f"<t:{int(self.player.timestamp)}>")
|
||||
if self.player.volume:
|
||||
embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%")
|
||||
|
||||
if self.player.thumbnail_url:
|
||||
embed.set_image(self.player.thumbnail_url)
|
||||
|
||||
embed.set_footer(
|
||||
text=f"Queued by {self.trigger_message.author.name}",
|
||||
icon_url=(
|
||||
self.trigger_message.author.avatar.url
|
||||
if self.trigger_message.author.avatar
|
||||
else None
|
||||
),
|
||||
)
|
||||
|
||||
return embed
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
|
||||
@dataclass
|
||||
class Player:
|
||||
queue: ClassVar = collections.deque()
|
||||
current: Optional[Song] = None
|
||||
|
||||
def queue_pop(self):
|
||||
popped = self.queue.popleft()
|
||||
self.current = popped
|
||||
return popped
|
||||
|
||||
def queue_push(self, item):
|
||||
self.queue.append(item)
|
||||
|
||||
def queue_push_front(self, item):
|
||||
self.queue.appendleft(item)
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
@@ -1,7 +0,0 @@
|
||||
def format_duration(duration: int | float) -> str:
|
||||
hours, duration = divmod(int(duration), 3600)
|
||||
minutes, duration = divmod(duration, 60)
|
||||
segments = [hours, minutes, duration]
|
||||
if len(segments) == 3 and segments[0] == 0:
|
||||
del segments[0]
|
||||
return f"{':'.join(f'{s:0>2}' for s in segments)}"
|
||||
@@ -1,75 +0,0 @@
|
||||
import asyncio
|
||||
from typing import Any, Optional
|
||||
|
||||
import disnake
|
||||
import yt_dlp
|
||||
|
||||
from ..constants import YTDL_OPTIONS
|
||||
from .discord import PCMVolumeTransformer, TrackedAudioSource
|
||||
|
||||
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
|
||||
|
||||
|
||||
class YTDLSource(PCMVolumeTransformer):
|
||||
def __init__(
|
||||
self,
|
||||
source: TrackedAudioSource,
|
||||
*,
|
||||
data: dict[str, Any],
|
||||
volume: float = 0.5,
|
||||
):
|
||||
super().__init__(source, volume)
|
||||
|
||||
self.description = data.get("description")
|
||||
self.duration = data.get("duration")
|
||||
self.id = data.get("id")
|
||||
self.like_count = data.get("like_count")
|
||||
self.original_url = data.get("original_url")
|
||||
self.thumbnail_url = data.get("thumbnail")
|
||||
self.timestamp = data.get("timestamp")
|
||||
self.title = data.get("title")
|
||||
self.uploader = data.get("uploader")
|
||||
self.uploader_url = data.get("uploader_url")
|
||||
self.view_count = data.get("view_count")
|
||||
|
||||
@classmethod
|
||||
async def from_url(
|
||||
cls,
|
||||
url,
|
||||
*,
|
||||
loop: Optional[asyncio.AbstractEventLoop] = None,
|
||||
stream: bool = False,
|
||||
):
|
||||
loop = loop or asyncio.get_event_loop()
|
||||
data: Any = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: ytdl.extract_info(url, download=not stream),
|
||||
)
|
||||
|
||||
if "entries" in data:
|
||||
if not data["entries"]:
|
||||
raise Exception("no results found!")
|
||||
data = data["entries"][0]
|
||||
if "url" not in data:
|
||||
raise Exception("no url returned!")
|
||||
|
||||
return cls(
|
||||
TrackedAudioSource(
|
||||
disnake.FFmpegPCMAudio(
|
||||
data["url"] if stream else ytdl.prepare_filename(data),
|
||||
before_options="-vn -reconnect 1",
|
||||
),
|
||||
),
|
||||
data=data,
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<YTDLSource title={self.title} original_url={self.original_url} duration={self.duration}>"
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
|
||||
def __reload_module__():
|
||||
global ytdl
|
||||
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
|
||||
@@ -1,13 +0,0 @@
|
||||
import random
|
||||
|
||||
from . import commands
|
||||
from .constants import REACTIONS
|
||||
|
||||
|
||||
async def on_message(message):
|
||||
if random.random() < 0.01:
|
||||
tokens = commands.tokenize(message.content, remove_prefix=False)
|
||||
for keyword, options in REACTIONS.items():
|
||||
if keyword in tokens:
|
||||
await message.add_reaction(random.choice(options))
|
||||
break
|
||||
@@ -1,3 +0,0 @@
|
||||
from . import test_filter_secrets, test_format_duration
|
||||
|
||||
__all__ = ["test_filter_secrets", "test_format_duration"]
|
||||
@@ -2,8 +2,11 @@ import asyncio
|
||||
import threading
|
||||
from logging import debug, info, warning
|
||||
|
||||
from . import commands, core, fun, tasks
|
||||
from .state import client
|
||||
import commands
|
||||
import core
|
||||
import fun
|
||||
import tasks
|
||||
from state import client
|
||||
|
||||
|
||||
def prepare():
|
||||
@@ -2,21 +2,18 @@ import asyncio
|
||||
import string
|
||||
|
||||
import disnake
|
||||
from youtube_transcript_api._api import YouTubeTranscriptApi
|
||||
import youtube_transcript_api
|
||||
|
||||
from .state import client, kill, players
|
||||
from state import client, kill, players
|
||||
|
||||
|
||||
async def transcript(
|
||||
message,
|
||||
languages=["en"],
|
||||
max_messages=6,
|
||||
min_messages=3,
|
||||
upper=True,
|
||||
message, languages=["en"], max_messages=6, min_messages=3, upper=True
|
||||
):
|
||||
initial_id = message.guild.voice_client.source.id
|
||||
transcript_list = YouTubeTranscriptApi().list(initial_id)
|
||||
|
||||
transcript_list = youtube_transcript_api.YouTubeTranscriptApi.list_transcripts(
|
||||
initial_id
|
||||
)
|
||||
try:
|
||||
transcript = transcript_list.find_manually_created_transcript(languages).fetch()
|
||||
except Exception:
|
||||
@@ -24,19 +21,21 @@ async def transcript(
|
||||
await message.channel.send("(autogenerated)")
|
||||
|
||||
messages = []
|
||||
for line in transcript.snippets:
|
||||
for line in transcript:
|
||||
if (
|
||||
players[message.guild.id].current.player.original.progress
|
||||
>= line.start + line.duration
|
||||
>= line["start"] + line["duration"]
|
||||
):
|
||||
continue
|
||||
|
||||
while players[message.guild.id].current.player.original.progress < line.start:
|
||||
while (
|
||||
players[message.guild.id].current.player.original.progress < line["start"]
|
||||
):
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
messages.insert(
|
||||
0,
|
||||
await message.channel.send(line.text.upper() if upper else line.text),
|
||||
await message.channel.send(line["text"].upper() if upper else line["text"]),
|
||||
)
|
||||
if len(messages) > max_messages:
|
||||
try:
|
||||
@@ -45,7 +44,7 @@ async def transcript(
|
||||
await messages.pop().delete()
|
||||
else:
|
||||
await message.channel.delete_messages(
|
||||
[messages.pop() for _ in range(count)],
|
||||
[messages.pop() for _ in range(count)]
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
@@ -78,20 +77,19 @@ def messages_per_second(limit=500):
|
||||
average = 1
|
||||
print(
|
||||
f"I am receiving **{average} {'message' if average == 1 else 'messages'} per second** "
|
||||
f"from **{len(members)} {'member' if len(members) == 1 else 'members'}** across **{len(guilds)} {'guild' if len(guilds) == 1 else 'guilds'}**",
|
||||
f"from **{len(members)} {'member' if len(members) == 1 else 'members'}** across **{len(guilds)} {'guild' if len(guilds) == 1 else 'guilds'}**"
|
||||
)
|
||||
|
||||
|
||||
async def auto_count(channel_id: int):
|
||||
if (channel := await client.fetch_channel(channel_id)) and isinstance(
|
||||
channel,
|
||||
disnake.TextChannel,
|
||||
channel, disnake.TextChannel
|
||||
):
|
||||
last_message = (await channel.history(limit=1).flatten())[0]
|
||||
try:
|
||||
result = str(
|
||||
int("".join(filter(lambda d: d in string.digits, last_message.content)))
|
||||
+ 1,
|
||||
+ 1
|
||||
)
|
||||
except Exception:
|
||||
result = "where number"
|
||||
61
flake.lock
generated
61
flake.lock
generated
@@ -1,61 +0,0 @@
|
||||
{
|
||||
"nodes": {
|
||||
"flake-parts": {
|
||||
"inputs": {
|
||||
"nixpkgs-lib": "nixpkgs-lib"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1772408722,
|
||||
"narHash": "sha256-rHuJtdcOjK7rAHpHphUb1iCvgkU3GpfvicLMwwnfMT0=",
|
||||
"owner": "hercules-ci",
|
||||
"repo": "flake-parts",
|
||||
"rev": "f20dc5d9b8027381c474144ecabc9034d6a839a3",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "hercules-ci",
|
||||
"repo": "flake-parts",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1773821835,
|
||||
"narHash": "sha256-TJ3lSQtW0E2JrznGVm8hOQGVpXjJyXY2guAxku2O9A4=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "b40629efe5d6ec48dd1efba650c797ddbd39ace0",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "NixOS",
|
||||
"ref": "nixos-unstable",
|
||||
"repo": "nixpkgs",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nixpkgs-lib": {
|
||||
"locked": {
|
||||
"lastModified": 1772328832,
|
||||
"narHash": "sha256-e+/T/pmEkLP6BHhYjx6GmwP5ivonQQn0bJdH9YrRB+Q=",
|
||||
"owner": "nix-community",
|
||||
"repo": "nixpkgs.lib",
|
||||
"rev": "c185c7a5e5dd8f9add5b2f8ebeff00888b070742",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "nix-community",
|
||||
"repo": "nixpkgs.lib",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"root": {
|
||||
"inputs": {
|
||||
"flake-parts": "flake-parts",
|
||||
"nixpkgs": "nixpkgs"
|
||||
}
|
||||
}
|
||||
},
|
||||
"root": "root",
|
||||
"version": 7
|
||||
}
|
||||
42
flake.nix
42
flake.nix
@@ -1,42 +0,0 @@
|
||||
{
|
||||
inputs = {
|
||||
flake-parts.url = "github:hercules-ci/flake-parts";
|
||||
|
||||
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
|
||||
};
|
||||
|
||||
outputs =
|
||||
{
|
||||
flake-parts,
|
||||
self,
|
||||
...
|
||||
}@inputs:
|
||||
flake-parts.lib.mkFlake { inherit inputs; } {
|
||||
systems = [
|
||||
"aarch64-linux"
|
||||
"x86_64-linux"
|
||||
];
|
||||
|
||||
perSystem =
|
||||
{ pkgs, self', ... }:
|
||||
{
|
||||
devShells.default = pkgs.mkShell {
|
||||
name = "errornocord";
|
||||
|
||||
buildInputs = with pkgs; [
|
||||
ffmpeg
|
||||
self'.packages.default
|
||||
];
|
||||
};
|
||||
|
||||
packages = rec {
|
||||
errornocord = pkgs.callPackage ./. { inherit self; };
|
||||
default = errornocord;
|
||||
};
|
||||
};
|
||||
|
||||
flake.pins = import ./npins;
|
||||
};
|
||||
|
||||
description = "Hot-reloadable Discord music bot";
|
||||
}
|
||||
10
fun.py
Normal file
10
fun.py
Normal file
@@ -0,0 +1,10 @@
|
||||
import random
|
||||
|
||||
import commands
|
||||
|
||||
|
||||
async def on_message(message):
|
||||
if random.random() < 0.01 and "gn" in commands.tokenize(
|
||||
message.content, remove_prefix=False
|
||||
):
|
||||
await message.add_reaction(random.choice(["💤", "😪", "😴", "🛌"]))
|
||||
@@ -1,13 +1,13 @@
|
||||
import logging
|
||||
|
||||
from . import constants, events
|
||||
from .state import client
|
||||
import constants
|
||||
import events
|
||||
from state import client
|
||||
|
||||
|
||||
def main():
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(
|
||||
format=(
|
||||
"%(asctime)s %(levelname)s %(name)s:%(module)s %(message)s"
|
||||
"%(asctime)s %(levelname)s %(name):%(module)s %(message)s"
|
||||
if __debug__
|
||||
else "%(asctime)s %(levelname)s %(message)s"
|
||||
),
|
||||
@@ -18,7 +18,3 @@ def main():
|
||||
|
||||
events.prepare()
|
||||
client.run(constants.SECRETS["TOKEN"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,249 +0,0 @@
|
||||
/*
|
||||
This file is provided under the MIT licence:
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
# Generated by npins. Do not modify; will be overwritten regularly
|
||||
let
|
||||
# Backwards-compatibly make something that previously didn't take any arguments take some
|
||||
# The function must return an attrset, and will unfortunately be eagerly evaluated
|
||||
# Same thing, but it catches eval errors on the default argument so that one may still call it with other arguments
|
||||
mkFunctor =
|
||||
fn:
|
||||
let
|
||||
e = builtins.tryEval (fn { });
|
||||
in
|
||||
(if e.success then e.value else { error = fn { }; }) // { __functor = _self: fn; };
|
||||
|
||||
# https://github.com/NixOS/nixpkgs/blob/0258808f5744ca980b9a1f24fe0b1e6f0fecee9c/lib/lists.nix#L295
|
||||
range =
|
||||
first: last: if first > last then [ ] else builtins.genList (n: first + n) (last - first + 1);
|
||||
|
||||
# https://github.com/NixOS/nixpkgs/blob/0258808f5744ca980b9a1f24fe0b1e6f0fecee9c/lib/strings.nix#L257
|
||||
stringToCharacters = s: map (p: builtins.substring p 1 s) (range 0 (builtins.stringLength s - 1));
|
||||
|
||||
# https://github.com/NixOS/nixpkgs/blob/0258808f5744ca980b9a1f24fe0b1e6f0fecee9c/lib/strings.nix#L269
|
||||
stringAsChars = f: s: concatStrings (map f (stringToCharacters s));
|
||||
concatStrings = builtins.concatStringsSep "";
|
||||
|
||||
# If the environment variable NPINS_OVERRIDE_${name} is set, then use
|
||||
# the path directly as opposed to the fetched source.
|
||||
# (Taken from Niv for compatibility)
|
||||
mayOverride =
|
||||
name: path:
|
||||
let
|
||||
envVarName = "NPINS_OVERRIDE_${saneName}";
|
||||
saneName = stringAsChars (c: if (builtins.match "[a-zA-Z0-9]" c) == null then "_" else c) name;
|
||||
ersatz = builtins.getEnv envVarName;
|
||||
in
|
||||
if ersatz == "" then
|
||||
path
|
||||
else
|
||||
# this turns the string into an actual Nix path (for both absolute and
|
||||
# relative paths)
|
||||
builtins.trace "Overriding path of \"${name}\" with \"${ersatz}\" due to set \"${envVarName}\"" (
|
||||
if builtins.substring 0 1 ersatz == "/" then
|
||||
/. + ersatz
|
||||
else
|
||||
/. + builtins.getEnv "PWD" + "/${ersatz}"
|
||||
);
|
||||
|
||||
mkSource =
|
||||
name: spec:
|
||||
{
|
||||
pkgs ? null,
|
||||
}:
|
||||
assert spec ? type;
|
||||
let
|
||||
# Unify across builtin and pkgs fetchers.
|
||||
# `fetchGit` requires a wrapper because of slight API differences.
|
||||
fetchers =
|
||||
if pkgs == null then
|
||||
{
|
||||
inherit (builtins) fetchTarball fetchurl;
|
||||
# For some fucking reason, fetchGit has a different signature than the other builtin fetchers …
|
||||
fetchGit = args: (builtins.fetchGit args).outPath;
|
||||
}
|
||||
else
|
||||
{
|
||||
fetchTarball =
|
||||
{
|
||||
url,
|
||||
sha256,
|
||||
}:
|
||||
pkgs.fetchzip {
|
||||
inherit url sha256;
|
||||
extension = "tar";
|
||||
};
|
||||
inherit (pkgs) fetchurl;
|
||||
fetchGit =
|
||||
{
|
||||
url,
|
||||
submodules,
|
||||
rev,
|
||||
name,
|
||||
narHash,
|
||||
}:
|
||||
pkgs.fetchgit {
|
||||
inherit url rev name;
|
||||
fetchSubmodules = submodules;
|
||||
hash = narHash;
|
||||
};
|
||||
};
|
||||
|
||||
# Dispatch to the correct code path based on the type
|
||||
path =
|
||||
if spec.type == "Git" then
|
||||
mkGitSource fetchers spec
|
||||
else if spec.type == "GitRelease" then
|
||||
mkGitSource fetchers spec
|
||||
else if spec.type == "PyPi" then
|
||||
mkPyPiSource fetchers spec
|
||||
else if spec.type == "Channel" then
|
||||
mkChannelSource fetchers spec
|
||||
else if spec.type == "Tarball" then
|
||||
mkTarballSource fetchers spec
|
||||
else if spec.type == "Container" then
|
||||
mkContainerSource pkgs spec
|
||||
else
|
||||
builtins.throw "Unknown source type ${spec.type}";
|
||||
in
|
||||
spec // { outPath = mayOverride name path; };
|
||||
|
||||
mkGitSource =
|
||||
{
|
||||
fetchTarball,
|
||||
fetchGit,
|
||||
...
|
||||
}:
|
||||
{
|
||||
repository,
|
||||
revision,
|
||||
url ? null,
|
||||
submodules,
|
||||
hash,
|
||||
...
|
||||
}:
|
||||
assert repository ? type;
|
||||
# At the moment, either it is a plain git repository (which has an url), or it is a GitHub/GitLab repository
|
||||
# In the latter case, there we will always be an url to the tarball
|
||||
if url != null && !submodules then
|
||||
fetchTarball {
|
||||
inherit url;
|
||||
sha256 = hash;
|
||||
}
|
||||
else
|
||||
let
|
||||
url =
|
||||
if repository.type == "Git" then
|
||||
repository.url
|
||||
else if repository.type == "GitHub" then
|
||||
"https://github.com/${repository.owner}/${repository.repo}.git"
|
||||
else if repository.type == "GitLab" then
|
||||
"${repository.server}/${repository.repo_path}.git"
|
||||
else if repository.type == "Forgejo" then
|
||||
"${repository.server}/${repository.owner}/${repository.repo}.git"
|
||||
else
|
||||
throw "Unrecognized repository type ${repository.type}";
|
||||
urlToName =
|
||||
url: rev:
|
||||
let
|
||||
matched = builtins.match "^.*/([^/]*)(\\.git)?$" url;
|
||||
|
||||
short = builtins.substring 0 7 rev;
|
||||
|
||||
appendShort = if (builtins.match "[a-f0-9]*" rev) != null then "-${short}" else "";
|
||||
in
|
||||
"${if matched == null then "source" else builtins.head matched}${appendShort}";
|
||||
name = urlToName url revision;
|
||||
in
|
||||
fetchGit {
|
||||
rev = revision;
|
||||
narHash = hash;
|
||||
|
||||
inherit name submodules url;
|
||||
};
|
||||
|
||||
mkPyPiSource =
|
||||
{ fetchurl, ... }:
|
||||
{
|
||||
url,
|
||||
hash,
|
||||
...
|
||||
}:
|
||||
fetchurl {
|
||||
inherit url;
|
||||
sha256 = hash;
|
||||
};
|
||||
|
||||
mkChannelSource =
|
||||
{ fetchTarball, ... }:
|
||||
{
|
||||
url,
|
||||
hash,
|
||||
...
|
||||
}:
|
||||
fetchTarball {
|
||||
inherit url;
|
||||
sha256 = hash;
|
||||
};
|
||||
|
||||
mkTarballSource =
|
||||
{ fetchTarball, ... }:
|
||||
{
|
||||
url,
|
||||
locked_url ? url,
|
||||
hash,
|
||||
...
|
||||
}:
|
||||
fetchTarball {
|
||||
url = locked_url;
|
||||
sha256 = hash;
|
||||
};
|
||||
|
||||
mkContainerSource =
|
||||
pkgs:
|
||||
{
|
||||
image_name,
|
||||
image_tag,
|
||||
image_digest,
|
||||
...
|
||||
}:
|
||||
if pkgs == null then
|
||||
builtins.throw "container sources require passing in a Nixpkgs value: https://github.com/andir/npins/blob/master/README.md#using-the-nixpkgs-fetchers"
|
||||
else
|
||||
pkgs.dockerTools.pullImage {
|
||||
imageName = image_name;
|
||||
imageDigest = image_digest;
|
||||
finalImageTag = image_tag;
|
||||
};
|
||||
in
|
||||
mkFunctor (
|
||||
{
|
||||
input ? ./sources.json,
|
||||
}:
|
||||
let
|
||||
data =
|
||||
if builtins.isPath input then
|
||||
# while `readFile` will throw an error anyways if the path doesn't exist,
|
||||
# we still need to check beforehand because *our* error can be caught but not the one from the builtin
|
||||
# *piegames sighs*
|
||||
if builtins.pathExists input then
|
||||
builtins.fromJSON (builtins.readFile input)
|
||||
else
|
||||
throw "Input path ${toString input} does not exist"
|
||||
else if builtins.isAttrs input then
|
||||
input
|
||||
else
|
||||
throw "Unsupported input type ${builtins.typeOf input}, must be a path or an attrset";
|
||||
version = data.version;
|
||||
in
|
||||
if version == 7 then
|
||||
builtins.mapAttrs (name: spec: mkFunctor (mkSource name spec)) data.pins
|
||||
else
|
||||
throw "Unsupported format version ${toString version} in sources.json. Try running `npins upgrade`"
|
||||
)
|
||||
@@ -1,26 +0,0 @@
|
||||
{
|
||||
"pins": {
|
||||
"disnake": {
|
||||
"type": "Git",
|
||||
"repository": {
|
||||
"type": "GitHub",
|
||||
"owner": "DisnakeDev",
|
||||
"repo": "disnake"
|
||||
},
|
||||
"branch": "master",
|
||||
"submodules": false,
|
||||
"revision": "79afc2d8673299f43636730d4a1518e7901f95fa",
|
||||
"url": "https://github.com/DisnakeDev/disnake/archive/79afc2d8673299f43636730d4a1518e7901f95fa.tar.gz",
|
||||
"hash": "sha256-7gsHFnqTyANdV+eosLI4MlzsMyebLdYZShEVObJPhl8="
|
||||
},
|
||||
"disnake-paginator": {
|
||||
"type": "PyPi",
|
||||
"name": "disnake-paginator",
|
||||
"version_upper_bound": null,
|
||||
"version": "1.0.8",
|
||||
"url": "https://files.pythonhosted.org/packages/8c/db/3a86b247c7653a3f1676d16a316daf400edcf76295098ab60544f2a8f8b7/disnake_paginator-1.0.8.tar.gz",
|
||||
"hash": "sha256-Sn0qbKNkJIdX8GCRRRPxwWutBtG9c1Zt7JnQAsl5I4s="
|
||||
}
|
||||
},
|
||||
"version": 7
|
||||
}
|
||||
@@ -1,13 +0,0 @@
|
||||
[project]
|
||||
name = "errornocord"
|
||||
version = "0.1.0"
|
||||
|
||||
[project.scripts]
|
||||
errornocord = "errornocord.main:main"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["."]
|
||||
include = ["errornocord*"]
|
||||
|
||||
[tool.basedpyright]
|
||||
typeCheckingMode = "basic"
|
||||
@@ -1,6 +1,8 @@
|
||||
aiohttp
|
||||
disnake[voice] @ https://github.com/DisnakeDev/disnake/archive/master.tar.gz
|
||||
audioop-lts
|
||||
disnake
|
||||
disnake_paginator
|
||||
psutil
|
||||
PyNaCl
|
||||
youtube_transcript_api
|
||||
yt-dlp[default] @ https://github.com/yt-dlp/yt-dlp/archive/master.tar.gz
|
||||
yt-dlp
|
||||
|
||||
@@ -1,36 +1,28 @@
|
||||
import hashlib
|
||||
import json
|
||||
|
||||
import aiohttp
|
||||
|
||||
from .state import sponsorblock_cache
|
||||
from state import sponsorblock_cache
|
||||
|
||||
categories = json.dumps(
|
||||
[
|
||||
"interaction",
|
||||
"intro",
|
||||
"music_offtopic",
|
||||
"outro",
|
||||
"preview",
|
||||
"selfpromo",
|
||||
"sponsor",
|
||||
],
|
||||
)
|
||||
CATEGORY_NAMES = {
|
||||
"music_offtopic": "non-music",
|
||||
"sponsor": "sponsored",
|
||||
}
|
||||
|
||||
|
||||
async def get_segments(video_id: str):
|
||||
if video_id in sponsorblock_cache:
|
||||
return sponsorblock_cache[video_id]
|
||||
|
||||
hash_prefix = hashlib.sha256(video_id.encode()).hexdigest()[:4]
|
||||
hashPrefix = hashlib.sha256(video_id.encode()).hexdigest()[:4]
|
||||
session = aiohttp.ClientSession()
|
||||
response = await session.get(
|
||||
f"https://sponsor.ajay.app/api/skipSegments/{hash_prefix}",
|
||||
params={"categories": categories},
|
||||
f"https://sponsor.ajay.app/api/skipSegments/{hashPrefix}",
|
||||
params={"categories": '["sponsor", "music_offtopic"]'},
|
||||
)
|
||||
if response.status == 200 and (
|
||||
results := list(
|
||||
filter(lambda v: video_id == v["videoID"], await response.json()),
|
||||
filter(lambda v: video_id == v["videoID"], await response.json())
|
||||
)
|
||||
):
|
||||
sponsorblock_cache[video_id] = results[0]
|
||||
@@ -2,7 +2,7 @@ import time
|
||||
|
||||
import disnake
|
||||
|
||||
from .utils import LimitedSizeDict
|
||||
from utils import LimitedSizeDict
|
||||
|
||||
intents = disnake.Intents.default()
|
||||
intents.message_content = True
|
||||
@@ -15,6 +15,5 @@ idle_tracker = {"is_idle": False, "last_used": time.time()}
|
||||
kill = {"transcript": False}
|
||||
message_responses = LimitedSizeDict()
|
||||
players = {}
|
||||
sponsorblock_cache = LimitedSizeDict()
|
||||
sponsorblock_cache = LimitedSizeDict(size_limit=100)
|
||||
start_time = time.time()
|
||||
trusted_users = []
|
||||
@@ -4,23 +4,23 @@ from logging import debug, error
|
||||
|
||||
import disnake
|
||||
|
||||
from .state import client, idle_tracker, players
|
||||
from state import client, idle_tracker, players
|
||||
|
||||
|
||||
async def cleanup():
|
||||
debug("spawned cleanup thread")
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(3600)
|
||||
await asyncio.sleep(3600 * 12)
|
||||
|
||||
targets = []
|
||||
for guild_id, player in players.items():
|
||||
if len(player.queue) == 0 and not player.current:
|
||||
if len(player.queue) == 0:
|
||||
targets.append(guild_id)
|
||||
for target in targets:
|
||||
del players[target]
|
||||
if len(targets):
|
||||
debug(f"cleanup thread removed {len(targets)} empty players")
|
||||
if __debug__:
|
||||
debug(f"cleanup removed {len(targets)} empty players")
|
||||
|
||||
if (
|
||||
not idle_tracker["is_idle"]
|
||||
3
tests/__init__.py
Normal file
3
tests/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from . import test_filter_secrets, test_format_duration
|
||||
|
||||
__all__ = ["test_format_duration", "test_filter_secrets"]
|
||||
@@ -7,15 +7,15 @@ class TestFilterSecrets(unittest.TestCase):
|
||||
def test_filter_secrets(self):
|
||||
secret = "PLACEHOLDER_TOKEN"
|
||||
self.assertFalse(
|
||||
secret in utils.filter_secrets(f"HELLO{secret}WORLD", {"TOKEN": secret}),
|
||||
secret in utils.filter_secrets(f"HELLO{secret}WORLD", {"TOKEN": secret})
|
||||
)
|
||||
self.assertFalse(secret in utils.filter_secrets(secret, {"TOKEN": secret}))
|
||||
self.assertFalse(
|
||||
secret in utils.filter_secrets(f"123{secret}", {"TOKEN": secret}),
|
||||
secret in utils.filter_secrets(f"123{secret}", {"TOKEN": secret})
|
||||
)
|
||||
self.assertFalse(
|
||||
secret in utils.filter_secrets(f"{secret}{secret}", {"TOKEN": secret}),
|
||||
secret in utils.filter_secrets(f"{secret}{secret}", {"TOKEN": secret})
|
||||
)
|
||||
self.assertFalse(
|
||||
secret in utils.filter_secrets(f"{secret}@#(*&*$)", {"TOKEN": secret}),
|
||||
secret in utils.filter_secrets(f"{secret}@#(*&*$)", {"TOKEN": secret})
|
||||
)
|
||||
@@ -1,13 +1,13 @@
|
||||
import unittest
|
||||
|
||||
import audio
|
||||
import utils
|
||||
import youtubedl
|
||||
|
||||
|
||||
class TestFormatDuration(unittest.TestCase):
|
||||
def test_audio(self):
|
||||
def test_youtubedl(self):
|
||||
def f(s):
|
||||
return audio.utils.format_duration(s)
|
||||
return youtubedl.format_duration(s)
|
||||
|
||||
self.assertEqual(f(0), "00:00")
|
||||
self.assertEqual(f(0.5), "00:00")
|
||||
@@ -1,4 +1,4 @@
|
||||
from .common import LimitedSizeDict, filter_secrets, format_duration, surround
|
||||
from .common import LimitedSizeDict, filter_secrets, format_duration
|
||||
from .discord import (
|
||||
ChannelResponseWrapper,
|
||||
MessageInteractionWrapper,
|
||||
@@ -24,5 +24,4 @@ __all__ = [
|
||||
"MessageInteractionWrapper",
|
||||
"reply",
|
||||
"snowflake_timestamp",
|
||||
"surround",
|
||||
]
|
||||
@@ -1,13 +1,9 @@
|
||||
from collections import OrderedDict
|
||||
|
||||
from ..constants import SECRETS
|
||||
from constants import SECRETS
|
||||
|
||||
|
||||
def surround(inner: str, outer="```") -> str:
|
||||
return outer + str(inner) + outer
|
||||
|
||||
|
||||
def format_duration(duration: int, natural: bool = False, short: bool = False) -> str:
|
||||
def format_duration(duration: int, natural: bool = False, short: bool = False):
|
||||
def format_plural(noun, count):
|
||||
if short:
|
||||
return noun[0]
|
||||
@@ -50,7 +46,7 @@ def filter_secrets(text: str, secrets=SECRETS) -> str:
|
||||
|
||||
class LimitedSizeDict(OrderedDict):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.size_limit = kwargs.pop("size_limit", 100)
|
||||
self.size_limit = kwargs.pop("size_limit", 1000)
|
||||
super().__init__(*args, **kwargs)
|
||||
self._check_size_limit()
|
||||
|
||||
@@ -1,18 +1,14 @@
|
||||
import ctypes
|
||||
import os
|
||||
import time
|
||||
from logging import debug, error
|
||||
from logging import error, info
|
||||
|
||||
import disnake
|
||||
|
||||
from .. import commands
|
||||
from ..constants import OWNERS
|
||||
from ..state import command_cooldowns, message_responses
|
||||
import commands
|
||||
from state import command_cooldowns, message_responses
|
||||
|
||||
|
||||
def cooldown(message, cooldown_time: int):
|
||||
if message.author.id in OWNERS:
|
||||
return
|
||||
|
||||
possible_commands = commands.match(message.content)
|
||||
if not possible_commands or len(possible_commands) > 1:
|
||||
return
|
||||
@@ -34,9 +30,7 @@ async def reply(message, *args, **kwargs):
|
||||
|
||||
try:
|
||||
await message_responses[message.id].edit(
|
||||
*args,
|
||||
**kwargs,
|
||||
allowed_mentions=disnake.AllowedMentions.none(),
|
||||
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
|
||||
)
|
||||
return
|
||||
except Exception:
|
||||
@@ -44,9 +38,7 @@ async def reply(message, *args, **kwargs):
|
||||
|
||||
try:
|
||||
response = await message.reply(
|
||||
*args,
|
||||
**kwargs,
|
||||
allowed_mentions=disnake.AllowedMentions.none(),
|
||||
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
|
||||
)
|
||||
except Exception:
|
||||
response = await channel_send(message, *args, **kwargs)
|
||||
@@ -56,25 +48,26 @@ async def reply(message, *args, **kwargs):
|
||||
|
||||
async def channel_send(message, *args, **kwargs):
|
||||
await message.channel.send(
|
||||
*args,
|
||||
**kwargs,
|
||||
allowed_mentions=disnake.AllowedMentions.none(),
|
||||
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
|
||||
)
|
||||
|
||||
|
||||
def load_opus():
|
||||
path = ctypes.util._findLib_ld("opus")
|
||||
for path in filter(
|
||||
lambda p: os.path.exists(p),
|
||||
["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"],
|
||||
):
|
||||
try:
|
||||
disnake.opus.load_opus(path)
|
||||
debug(f"successfully loaded opus from {path}")
|
||||
info(f"successfully loaded opus from {path}")
|
||||
return
|
||||
except Exception as e:
|
||||
error(f"failed to load opus from {path}: {e}")
|
||||
raise Exception("could not locate working opus library")
|
||||
|
||||
|
||||
def snowflake_timestamp(snowflake) -> int:
|
||||
return round(((snowflake >> 22) + 1420070400000) / 1000)
|
||||
def snowflake_timestamp(id):
|
||||
return round(((id >> 22) + 1420070400000) / 1000)
|
||||
|
||||
|
||||
async def add_check_reaction(message):
|
||||
@@ -83,8 +76,7 @@ async def add_check_reaction(message):
|
||||
|
||||
async def invalid_user_handler(interaction):
|
||||
await interaction.response.send_message(
|
||||
"you are not the intended receiver of this message!",
|
||||
ephemeral=True,
|
||||
"you are not the intended receiver of this message!", ephemeral=True
|
||||
)
|
||||
|
||||
|
||||
@@ -94,7 +86,8 @@ class ChannelResponseWrapper:
|
||||
self.sent_message = None
|
||||
|
||||
async def send_message(self, **kwargs):
|
||||
kwargs.pop("ephemeral", None)
|
||||
if "ephemeral" in kwargs:
|
||||
del kwargs["ephemeral"]
|
||||
self.sent_message = await reply(self.message, **kwargs)
|
||||
|
||||
async def edit_message(self, content=None, embed=None, view=None):
|
||||
220
youtubedl.py
Normal file
220
youtubedl.py
Normal file
@@ -0,0 +1,220 @@
|
||||
import asyncio
|
||||
import audioop
|
||||
import collections
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Optional
|
||||
|
||||
import disnake
|
||||
import yt_dlp
|
||||
|
||||
from constants import BAR_LENGTH, EMBED_COLOR, YTDL_OPTIONS
|
||||
|
||||
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
|
||||
|
||||
|
||||
class PCMVolumeTransformer(disnake.AudioSource):
|
||||
def __init__(self, original: disnake.AudioSource, volume: float = 1.0) -> None:
|
||||
if original.is_opus():
|
||||
raise disnake.ClientException("AudioSource must not be Opus encoded.")
|
||||
|
||||
self.original = original
|
||||
self.volume = volume
|
||||
|
||||
@property
|
||||
def volume(self) -> float:
|
||||
return self._volume
|
||||
|
||||
@volume.setter
|
||||
def volume(self, value: float) -> None:
|
||||
self._volume = max(value, 0.0)
|
||||
|
||||
def cleanup(self) -> None:
|
||||
self.original.cleanup()
|
||||
|
||||
def read(self) -> bytes:
|
||||
ret = self.original.read()
|
||||
return audioop.mul(ret, 2, self._volume)
|
||||
|
||||
|
||||
class CustomAudioSource(disnake.AudioSource):
|
||||
def __init__(self, source):
|
||||
self._source = source
|
||||
self.read_count = 0
|
||||
|
||||
def read(self) -> bytes:
|
||||
data = self._source.read()
|
||||
if data:
|
||||
self.read_count += 1
|
||||
return data
|
||||
|
||||
def fast_forward(self, seconds: int):
|
||||
for _ in range(int(seconds / 0.02)):
|
||||
self.read()
|
||||
|
||||
@property
|
||||
def progress(self) -> float:
|
||||
return self.read_count * 0.02
|
||||
|
||||
|
||||
class YTDLSource(PCMVolumeTransformer):
|
||||
def __init__(
|
||||
self, source: CustomAudioSource, *, data: dict[str, Any], volume: float = 0.5
|
||||
):
|
||||
super().__init__(source, volume)
|
||||
|
||||
self.description = data.get("description")
|
||||
self.duration = data.get("duration")
|
||||
self.id = data.get("id")
|
||||
self.like_count = data.get("like_count")
|
||||
self.original_url = data.get("original_url")
|
||||
self.thumbnail_url = data.get("thumbnail")
|
||||
self.timestamp = data.get("timestamp")
|
||||
self.title = data.get("title")
|
||||
self.uploader = data.get("uploader")
|
||||
self.uploader_url = data.get("uploader_url")
|
||||
self.view_count = data.get("view_count")
|
||||
|
||||
@classmethod
|
||||
async def from_url(
|
||||
cls,
|
||||
url,
|
||||
*,
|
||||
loop: Optional[asyncio.AbstractEventLoop] = None,
|
||||
stream: bool = False,
|
||||
):
|
||||
loop = loop or asyncio.get_event_loop()
|
||||
data: Any = await loop.run_in_executor(
|
||||
None, lambda: ytdl.extract_info(url, download=not stream)
|
||||
)
|
||||
|
||||
if "entries" in data:
|
||||
if not data["entries"]:
|
||||
raise Exception("no entries provided by yt-dlp!")
|
||||
data = data["entries"][0]
|
||||
|
||||
return cls(
|
||||
CustomAudioSource(
|
||||
disnake.FFmpegPCMAudio(
|
||||
data["url"] if stream else ytdl.prepare_filename(data),
|
||||
before_options="-vn -reconnect 1",
|
||||
)
|
||||
),
|
||||
data=data,
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<YTDLSource title={self.title} original_url=<{self.original_url}> duration={self.duration}>"
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueuedSong:
|
||||
player: YTDLSource
|
||||
trigger_message: disnake.Message
|
||||
|
||||
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
|
||||
if multiline:
|
||||
return (
|
||||
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})\n**duration:** {format_duration(self.player.duration) if self.player.duration else '[live]'}"
|
||||
+ (
|
||||
f", **queued by:** <@{self.trigger_message.author.id}>"
|
||||
if show_queuer
|
||||
else ""
|
||||
)
|
||||
)
|
||||
else:
|
||||
return (
|
||||
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''}) [**{format_duration(self.player.duration) if self.player.duration else 'live'}**]"
|
||||
+ (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "")
|
||||
)
|
||||
|
||||
def embed(self, is_paused=False):
|
||||
progress = 0
|
||||
if self.player.duration:
|
||||
progress = self.player.original.progress / self.player.duration
|
||||
|
||||
embed = disnake.Embed(
|
||||
color=EMBED_COLOR,
|
||||
title=self.player.title,
|
||||
url=self.player.original_url,
|
||||
description=(
|
||||
f"{'⏸️ ' if is_paused else ''}"
|
||||
f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` "
|
||||
+ (
|
||||
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
|
||||
if self.player.duration
|
||||
else "[**live**]"
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
if self.player.uploader_url:
|
||||
embed.add_field(
|
||||
name="Uploader",
|
||||
value=f"[{self.player.uploader}]({self.player.uploader_url})",
|
||||
)
|
||||
else:
|
||||
embed.add_field(
|
||||
name="Uploader",
|
||||
value=self.player.uploader,
|
||||
)
|
||||
embed.add_field(
|
||||
name="Likes",
|
||||
value=f"{self.player.like_count:,}"
|
||||
if self.player.like_count
|
||||
else "Unknown",
|
||||
)
|
||||
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
|
||||
embed.add_field(name="Published", value=f"<t:{self.player.timestamp}>")
|
||||
embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%")
|
||||
|
||||
embed.set_image(self.player.thumbnail_url)
|
||||
embed.set_footer(
|
||||
text=f"queued by {self.trigger_message.author.name}",
|
||||
icon_url=(
|
||||
self.trigger_message.author.avatar.url
|
||||
if self.trigger_message.author.avatar
|
||||
else None
|
||||
),
|
||||
)
|
||||
|
||||
return embed
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueuedPlayer:
|
||||
queue = collections.deque()
|
||||
current: Optional[QueuedSong] = None
|
||||
|
||||
def queue_pop(self):
|
||||
popped = self.queue.popleft()
|
||||
self.current = popped
|
||||
return popped
|
||||
|
||||
def queue_add(self, item):
|
||||
self.queue.append(item)
|
||||
|
||||
def queue_add_front(self, item):
|
||||
self.queue.appendleft(item)
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
|
||||
def format_duration(duration: int | float) -> str:
|
||||
hours, duration = divmod(int(duration), 3600)
|
||||
minutes, duration = divmod(duration, 60)
|
||||
segments = [hours, minutes, duration]
|
||||
if len(segments) == 3 and segments[0] == 0:
|
||||
del segments[0]
|
||||
return f"{':'.join(f'{s:0>2}' for s in segments)}"
|
||||
|
||||
|
||||
def __reload_module__():
|
||||
global ytdl
|
||||
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
|
||||
Reference in New Issue
Block a user