Compare commits

...

52 Commits

Author SHA1 Message Date
019e60450f refactor: tweak descriptions 2025-06-10 20:59:11 -04:00
7672107c68 chore(requirements): use latest yt-dlp from github 2025-06-10 20:30:41 -04:00
ee6ea4eed4 refactor(audio/queue): save uploader field value 2025-06-08 17:29:36 -04:00
fed280e6c5 fix(audio/queue): check for uploader name 2025-06-08 13:46:57 -04:00
1a8f84b333 feat: reload when SIGUSR1 is received 2025-06-08 13:11:50 -04:00
94bdb91eb0 feat: add trusted user list 2025-06-08 13:11:50 -04:00
5c030a0557 refactor(commands): tweak descriptions 2025-06-08 12:57:34 -04:00
5344e89c26 feat(audio/queue): add timestamps 2025-06-08 12:26:30 -04:00
80e6d422e5 fix: add missing character in format string 2025-05-29 11:45:18 -04:00
71fad98d3d refactor(cleanup): reduce interval 2025-05-02 18:19:16 -04:00
83d784c917 refactor: reduce LimitedSizeDict size 2025-05-02 18:18:34 -04:00
f4b7e0f5ce refactor(commands/voice): clean up some code 2025-05-01 18:29:26 -04:00
1316fb593c refactor(commands/voice/queue): delay player creation 2025-05-01 18:29:26 -04:00
b6d105a519 refactor(sponsorblock): hashPrefix -> hash_prefix 2025-04-25 21:27:04 -04:00
ec31250153 refactor: follow more guidelines 2025-04-03 17:53:46 -04:00
f360566824 refactor: minor changes 2025-03-28 21:22:19 -04:00
b0c96a11cd feat(fun): add more reactions 2025-03-28 21:22:19 -04:00
062676df26 fix: handle missing sponsorblock category names 2025-02-26 16:21:42 -05:00
5430f7c632 refactor(utils): add surround function 2025-02-25 17:55:21 -05:00
0a8482c030 refactor(constants): use bit-shifts for public flags 2025-02-25 17:49:45 -05:00
87c88f796d refactor(audio/youtubedl): remove redundant <> from repr 2025-02-25 17:47:38 -05:00
d08744ebb2 refactor(sponsorblock): clean up categories 2025-02-25 17:46:42 -05:00
0b3425a658 fix(commands/voice/join): check message author voice channel before joining 2025-02-25 17:42:44 -05:00
e7105f1828 chore(docker): run with debug output 2025-02-13 20:32:56 -05:00
4f7bd903b8 fix(youtubedl): handle error when no url 2025-02-13 18:59:27 -05:00
22249ecf7a refactor: remove useless debug checks
Debug messages shouldn't be printed in the first place if debug isn't on.
2025-02-13 16:31:52 -05:00
5610fc7acd refactor: minor cleanups 2025-02-13 16:31:30 -05:00
c73260badb feat(commands): add c as alias for current 2025-02-13 16:28:51 -05:00
2645f33940 fix: reload all util modules 2025-02-12 19:04:16 -05:00
8d76a107c5 refactor(utils/cooldown): ignore owners 2025-02-12 19:02:50 -05:00
8ee7693b91 refactor(audio/youtubedl): improve empty entries error message 2025-02-12 15:29:55 -05:00
0f5532a14a refactor(commands/voice/queue): remove extra bold from queue error 2025-02-12 15:29:11 -05:00
c8c4756cc3 fix(commands/voice/join): only join when author is in voice channel 2025-02-12 11:56:30 -05:00
ea09f291e5 refactor(core): tweak cooldown calculation
Should prevent issues where cooldown is 0 when formatted.
2025-02-10 18:08:06 -05:00
c7658f84dc chore: ignore .venv directory 2025-02-09 22:20:42 -05:00
b562ea4ac5 audio/queue: display player metadata correctly 2025-02-09 17:44:50 -05:00
623de96463 fix(audio/queue): correctly import format_duration 2025-02-09 03:39:17 -05:00
97f4787b39 audio/queue/player: rename queue_add to queue_push 2025-02-09 03:35:41 -05:00
69f4d6967f audio: split into more modules 2025-02-09 03:25:05 -05:00
af0896a6a0 audio: rename from youtubedl 2025-02-09 03:04:44 -05:00
9a58bc964d refactor(commands/voice/queue): remove redundant suppress_embeds 2025-02-06 19:59:30 -05:00
65168d38f9 refactor(youtubedl): completely remove limits from PCMVolumeTransformer 2025-02-05 21:55:13 -05:00
70ed37737c refactor(youtubedl): CustomAudioSource -> TrackedAudioSource 2025-02-05 21:54:42 -05:00
3719fc69b5 refactor(youtubedl): use CustomAudioSource as hint for PCMVolumeTransformer 2025-02-05 21:54:42 -05:00
94837f0e77 refactor: fix casing and add type hints 2025-02-05 21:42:21 -05:00
d63155d0fb chore(requirements): add aiohttp 2025-02-04 21:27:47 -05:00
40cd8238dd refactor: log messages on gateway connection state change 2025-02-04 17:04:36 -05:00
81e30c7e70 refactor(commands/utils): add override for skip 2025-02-03 16:40:01 -05:00
a1d63f1bb1 refactor(youtubedl): raise exception if no entries 2025-02-03 16:26:23 -05:00
1a24754549 refactor(commands/voice/queue): check for voice_client 2025-02-03 16:24:47 -05:00
2c6d05b33d fix(youtubedl): show placeholder if no likes 2025-02-01 18:44:28 -05:00
117438be76 refactor(fun): simplify if statement 2025-02-01 18:43:56 -05:00
34 changed files with 524 additions and 410 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
.env
.venv
__pycache__

View File

@@ -7,4 +7,4 @@ COPY . .
RUN pip install -r requirements.txt
CMD ["python", "-OO", "main.py"]
CMD ["python", "main.py"]

View File

@@ -8,7 +8,9 @@ import utils
class ArgumentParser:
def __init__(self, command, description):
self.parser = argparse.ArgumentParser(
command, description=description, exit_on_error=False
command,
description=description,
exit_on_error=False,
)
def print_help(self):
@@ -26,21 +28,20 @@ class ArgumentParser:
async def parse_args(self, message, tokens) -> argparse.Namespace | None:
try:
with contextlib.redirect_stdout(io.StringIO()):
args = self.parser.parse_args(tokens[1:])
return args
return self.parser.parse_args(tokens[1:])
except SystemExit:
await utils.reply(message, f"```\n{self.print_help()}```")
except Exception as e:
await utils.reply(message, f"`{e}`")
def range_type(string, min=0, max=100):
def range_type(string: str, lower=0, upper=100) -> int:
try:
value = int(string)
except ValueError:
raise argparse.ArgumentTypeError("value is not a valid integer")
except ValueError as e:
raise argparse.ArgumentTypeError("value is not a valid integer") from e
if min <= value <= max:
if lower <= value <= upper:
return value
else:
raise argparse.ArgumentTypeError(f"value is not in range {min}-{max}")
raise argparse.ArgumentTypeError(f"value is not in range {lower}-{upper}")

8
audio/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
from . import discord, queue, utils, youtubedl
__all__ = [
"discord",
"queue",
"utils",
"youtubedl",
]

39
audio/discord.py Normal file
View File

@@ -0,0 +1,39 @@
import audioop
import disnake
class TrackedAudioSource(disnake.AudioSource):
def __init__(self, source):
self._source = source
self.read_count = 0
def read(self) -> bytes:
data = self._source.read()
if data:
self.read_count += 1
return data
def fast_forward(self, seconds: int):
for _ in range(int(seconds / 0.02)):
self.read()
@property
def progress(self) -> float:
return self.read_count * 0.02
class PCMVolumeTransformer(disnake.AudioSource):
def __init__(self, original: TrackedAudioSource, volume: float = 1.0) -> None:
if original.is_opus():
raise disnake.ClientException("AudioSource must not be Opus encoded")
self.original = original
self.volume = volume
def cleanup(self) -> None:
self.original.cleanup()
def read(self) -> bytes:
ret = self.original.read()
return audioop.mul(ret, 2, self.volume)

112
audio/queue.py Normal file
View File

@@ -0,0 +1,112 @@
import collections
from dataclasses import dataclass
from typing import ClassVar, Optional
import disnake
from constants import BAR_LENGTH, EMBED_COLOR
from .utils import format_duration
from .youtubedl import YTDLSource
@dataclass
class Song:
player: YTDLSource
trigger_message: disnake.Message
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
title = f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})"
duration = (
format_duration(self.player.duration) if self.player.duration else "stream"
)
if multiline:
queue_time = (
self.trigger_message.edited_at or self.trigger_message.created_at
)
return f"{title}\n**duration:** {duration}" + (
f", **queued by:** <@{self.trigger_message.author.id}> <t:{round(queue_time.timestamp())}:R>"
if show_queuer
else ""
)
return f"{title} [**{duration}**]" + (
f" (<@{self.trigger_message.author.id}>)" if show_queuer else ""
)
def embed(self, is_paused=False):
progress = 0
if self.player.duration:
progress = self.player.original.progress / self.player.duration
embed = disnake.Embed(
color=EMBED_COLOR,
title=self.player.title,
url=self.player.original_url,
description=(
f"{'⏸️ ' if is_paused else ''}"
f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` "
+ (
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
if self.player.duration
else "[**stream**]"
)
),
timestamp=self.trigger_message.edited_at or self.trigger_message.created_at,
)
uploader_value = None
if self.player.uploader_url:
if self.player.uploader:
uploader_value = f"[{self.player.uploader}]({self.player.uploader_url})"
else:
uploader_value = self.player.uploader_url
elif self.player.uploader:
uploader_value = self.player.uploader
if uploader_value:
embed.add_field(name="Uploader", value=uploader_value)
if self.player.like_count:
embed.add_field(name="Likes", value=f"{self.player.like_count:,}")
if self.player.view_count:
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
if self.player.timestamp:
embed.add_field(name="Published", value=f"<t:{int(self.player.timestamp)}>")
if self.player.volume:
embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%")
if self.player.thumbnail_url:
embed.set_image(self.player.thumbnail_url)
embed.set_footer(
text=f"Queued by {self.trigger_message.author.name}",
icon_url=(
self.trigger_message.author.avatar.url
if self.trigger_message.author.avatar
else None
),
)
return embed
def __str__(self):
return self.__repr__()
@dataclass
class Player:
queue: ClassVar = collections.deque()
current: Optional[Song] = None
def queue_pop(self):
popped = self.queue.popleft()
self.current = popped
return popped
def queue_push(self, item):
self.queue.append(item)
def queue_push_front(self, item):
self.queue.appendleft(item)
def __str__(self):
return self.__repr__()

7
audio/utils.py Normal file
View File

@@ -0,0 +1,7 @@
def format_duration(duration: int | float) -> str:
hours, duration = divmod(int(duration), 3600)
minutes, duration = divmod(duration, 60)
segments = [hours, minutes, duration]
if len(segments) == 3 and segments[0] == 0:
del segments[0]
return f"{':'.join(f'{s:0>2}' for s in segments)}"

76
audio/youtubedl.py Normal file
View File

@@ -0,0 +1,76 @@
import asyncio
from typing import Any, Optional
import disnake
import yt_dlp
from constants import YTDL_OPTIONS
from .discord import PCMVolumeTransformer, TrackedAudioSource
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
class YTDLSource(PCMVolumeTransformer):
def __init__(
self,
source: TrackedAudioSource,
*,
data: dict[str, Any],
volume: float = 0.5,
):
super().__init__(source, volume)
self.description = data.get("description")
self.duration = data.get("duration")
self.id = data.get("id")
self.like_count = data.get("like_count")
self.original_url = data.get("original_url")
self.thumbnail_url = data.get("thumbnail")
self.timestamp = data.get("timestamp")
self.title = data.get("title")
self.uploader = data.get("uploader")
self.uploader_url = data.get("uploader_url")
self.view_count = data.get("view_count")
@classmethod
async def from_url(
cls,
url,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
stream: bool = False,
):
loop = loop or asyncio.get_event_loop()
data: Any = await loop.run_in_executor(
None,
lambda: ytdl.extract_info(url, download=not stream),
)
if "entries" in data:
if not data["entries"]:
raise Exception("no results found!")
data = data["entries"][0]
if "url" not in data:
raise Exception("no url returned!")
return cls(
TrackedAudioSource(
disnake.FFmpegPCMAudio(
data["url"] if stream else ytdl.prepare_filename(data),
before_options="-vn -reconnect 1",
),
),
data=data,
)
def __repr__(self):
return f"<YTDLSource title={self.title} original_url={self.original_url} duration={self.duration}>"
def __str__(self):
return self.__repr__()
def __reload_module__():
global ytdl
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)

View File

@@ -3,11 +3,11 @@ from .utils import Command, match, match_token, tokenize
__all__ = [
"bot",
"tools",
"utils",
"voice",
"Command",
"match",
"match_token",
"tokenize",
"tools",
"utils",
"voice",
]

View File

@@ -8,9 +8,9 @@ from yt_dlp import version
import arguments
import commands
import utils
from constants import EMBED_COLOR
from state import client, start_time
from utils import format_duration, reply, surround
async def status(message):
@@ -25,41 +25,41 @@ async def status(message):
embed = disnake.Embed(color=EMBED_COLOR)
embed.add_field(
name="Latency",
value=f"```{round(client.latency * 1000, 1)} ms```",
value=surround(f"{round(client.latency * 1000, 1)} ms"),
)
embed.add_field(
name="Memory",
value=f"```{round(memory_usage, 1)} MiB```",
value=surround(f"{round(memory_usage, 1)} MiB"),
)
embed.add_field(
name="Threads",
value=f"```{threading.active_count()}```",
value=surround(threading.active_count()),
)
embed.add_field(
name="Guilds",
value=f"```{len(client.guilds)}```",
value=surround(len(client.guilds)),
)
embed.add_field(
name="Members",
value=f"```{member_count}```",
value=surround(member_count),
)
embed.add_field(
name="Channels",
value=f"```{channel_count}```",
value=surround(channel_count),
)
embed.add_field(
name="Disnake",
value=f"```{disnake.__version__}```",
value=surround(disnake.__version__),
)
embed.add_field(
name="yt-dlp",
value=f"```{version.__version__}```",
value=surround(version.__version__),
)
embed.add_field(
name="Uptime",
value=f"```{utils.format_duration(int(time.time() - start_time), short=True)}```",
value=surround(format_duration(int(time.time() - start_time), short=True)),
)
await utils.reply(message, embed=embed)
await reply(message, embed=embed)
async def uptime(message):
@@ -78,15 +78,13 @@ async def uptime(message):
return
if args.since:
await utils.reply(message, f"{round(start_time)}")
await reply(message, f"{round(start_time)}")
else:
await utils.reply(
message, f"up {utils.format_duration(int(time.time() - start_time))}"
)
await reply(message, f"up {format_duration(int(time.time() - start_time))}")
async def ping(message):
await utils.reply(
await reply(
message,
embed=disnake.Embed(
title="Pong :ping_pong:",
@@ -97,9 +95,9 @@ async def ping(message):
async def help(message):
await utils.reply(
await reply(
message,
", ".join(
[f"`{command.value}`" for command in commands.Command.__members__.values()]
[f"`{command.value}`" for command in commands.Command.__members__.values()],
),
)

View File

@@ -14,13 +14,13 @@ async def lookup(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"look up a user or application on discord by their ID",
"look up a discord user or application by ID",
)
parser.add_argument(
"-a",
"--application",
action="store_true",
help="search for applications instead of users",
help="look up applications instead of users",
)
parser.add_argument(
"id",
@@ -41,7 +41,7 @@ async def lookup(message):
embed = disnake.Embed(description=response["description"], color=EMBED_COLOR)
embed.set_thumbnail(
url=f"https://cdn.discordapp.com/app-icons/{response['id']}/{response['icon']}.webp"
url=f"https://cdn.discordapp.com/app-icons/{response['id']}/{response['icon']}.webp",
)
embed.add_field(name="Application Name", value=response["name"])
embed.add_field(name="Application ID", value="`" + response["id"] + "`")
@@ -102,7 +102,9 @@ async def lookup(message):
for tag in response["tags"]:
bot_tags += tag + ", "
embed.add_field(
name="Tags", value="None" if bot_tags == "" else bot_tags[:-2], inline=False
name="Tags",
value="None" if bot_tags == "" else bot_tags[:-2],
inline=False,
)
else:
try:
@@ -117,8 +119,10 @@ async def lookup(message):
if flag_name != "None":
try:
badges += BADGE_EMOJIS[PUBLIC_FLAGS[flag]]
except Exception:
raise Exception(f"unable to find badge: {PUBLIC_FLAGS[flag]}")
except Exception as e:
raise Exception(
f"unable to find badge: {PUBLIC_FLAGS[flag]}"
) from e
user_object = await client.fetch_user(user.id)
accent_color = 0x000000
@@ -161,11 +165,11 @@ async def clear(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"bulk delete messages in the current channel matching certain criteria",
"bulk delete messages in the current channel matching specified criteria",
)
parser.add_argument(
"count",
type=lambda c: arguments.range_type(c, min=1, max=1000),
type=lambda c: arguments.range_type(c, lower=1, upper=1000),
help="amount of messages to delete",
)
group = parser.add_mutually_exclusive_group()
@@ -259,8 +263,10 @@ async def clear(message):
messages = len(
await message.channel.purge(
limit=args.count, check=check, oldest_first=args.oldest_first
)
limit=args.count,
check=check,
oldest_first=args.oldest_first,
),
)
if not args.delete_command:

View File

@@ -30,14 +30,19 @@ class Command(Enum):
@lru_cache
def match_token(token: str) -> list[Command]:
if token.lower() == "r":
match token.lower():
case "r":
return [Command.RELOAD]
case "s":
return [Command.SKIP]
case "c":
return [Command.CURRENT]
if exact_match := list(
filter(
lambda command: command.value == token.lower(),
Command.__members__.values(),
)
),
):
return exact_match
@@ -45,7 +50,7 @@ def match_token(token: str) -> list[Command]:
filter(
lambda command: command.value.startswith(token.lower()),
Command.__members__.values(),
)
),
)

View File

@@ -1,17 +1,14 @@
import disnake
import utils
from .utils import command_allowed
async def join(message):
if message.author.voice:
if message.guild.voice_client:
return await message.guild.voice_client.move_to(message.channel)
elif message.author.voice:
await message.guild.voice_client.move_to(message.channel)
else:
await message.author.voice.channel.connect()
elif isinstance(message.channel, disnake.VoiceChannel):
await message.channel.connect()
else:
await utils.reply(message, "you are not connected to a voice channel!")
return

View File

@@ -1,11 +1,11 @@
import arguments
import disnake_paginator
from constants import EMBED_COLOR
from state import players
import arguments
import commands
import sponsorblock
import utils
from constants import EMBED_COLOR
from state import players
from .utils import command_allowed
@@ -13,7 +13,8 @@ from .utils import command_allowed
async def playing(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0], "get information about the currently playing song"
tokens[0],
"get information about the currently playing song",
)
parser.add_argument(
"-d",
@@ -49,7 +50,7 @@ async def playing(message):
await utils.reply(
message,
embed=players[message.guild.id].current.embed(
is_paused=message.guild.voice_client.is_paused()
is_paused=message.guild.voice_client.is_paused(),
),
)
else:
@@ -89,13 +90,13 @@ async def pause(message):
async def fast_forward(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "skip current sponsorblock segment")
parser = arguments.ArgumentParser(tokens[0], "skip the current sponsorblock segment")
parser.add_argument(
"-s",
"--seconds",
nargs="?",
type=lambda v: arguments.range_type(v, min=0, max=300),
help="the amount of seconds to fast forward instead",
type=lambda v: arguments.range_type(v, lower=0, upper=300),
help="the number of seconds to fast forward instead",
)
if not (args := await parser.parse_args(message, tokens)):
return
@@ -110,11 +111,12 @@ async def fast_forward(message):
seconds = args.seconds
if not seconds:
video = await sponsorblock.get_segments(
players[message.guild.id].current.player.id
players[message.guild.id].current.player.id,
)
if not video:
await utils.reply(
message, "no sponsorblock segments were found for this video!"
message,
"no sponsorblock segments were found for this video!",
)
return
@@ -140,7 +142,7 @@ async def volume(message):
parser.add_argument(
"volume",
nargs="?",
type=lambda v: arguments.range_type(v, min=0, max=150),
type=lambda v: arguments.range_type(v, lower=0, upper=150),
help="the volume level (0 - 150)",
)
if not (args := await parser.parse_args(message, tokens)):

View File

@@ -4,30 +4,28 @@ import disnake
import disnake_paginator
import arguments
import audio
import commands
import utils
import youtubedl
from constants import EMBED_COLOR
from state import client, players
from state import client, players, trusted_users
from .playback import resume
from .utils import command_allowed, ensure_joined, play_next
async def queue_or_play(message, edited=False):
if message.guild.id not in players:
players[message.guild.id] = youtubedl.QueuedPlayer()
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0], "queue a song, list the queue, or resume playback"
tokens[0],
"queue a song, list the queue, or resume playback",
)
parser.add_argument("query", nargs="*", help="yt-dlp URL or query to get song")
parser.add_argument(
"-v",
"--volume",
default=50,
type=lambda v: arguments.range_type(v, min=0, max=150),
type=lambda v: arguments.range_type(v, lower=0, upper=150),
help="the volume level (0 - 150) for the specified song",
)
parser.add_argument(
@@ -80,19 +78,23 @@ async def queue_or_play(message, edited=False):
elif not command_allowed(message):
return
if message.guild.id not in players:
players[message.guild.id] = audio.queue.Player()
if edited:
found = None
for queued in players[message.guild.id].queue:
if queued.trigger_message.id == message.id:
found = queued
break
found = next(
filter(
lambda queued: queued.trigger_message.id == message.id,
players[message.guild.id].queue,
),
None,
)
if found:
players[message.guild.id].queue.remove(found)
if args.clear:
players[message.guild.id].queue.clear()
await utils.add_check_reaction(message)
return
elif indices := args.remove_index:
targets = []
for i in indices:
@@ -113,15 +115,15 @@ async def queue_or_play(message, edited=False):
f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}",
)
elif args.remove_title or args.remove_queuer:
targets = []
targets = set()
for queued in players[message.guild.id].queue:
if t := args.remove_title:
if t in queued.player.title:
targets.append(queued)
continue
targets.add(queued)
if q := args.remove_queuer:
if q == queued.trigger_message.author.id:
targets.append(queued)
targets.add(queued)
targets = list(targets)
if not args.match_multiple:
targets = targets[:1]
@@ -140,11 +142,12 @@ async def queue_or_play(message, edited=False):
lambda queued: queued.trigger_message.author.id
== message.author.id,
players[message.guild.id].queue,
)
)
),
),
)
>= 5
and not len(message.guild.voice_client.channel.members) == 2
and message.author.id not in trusted_users
):
await utils.reply(
message,
@@ -154,24 +157,27 @@ async def queue_or_play(message, edited=False):
try:
async with message.channel.typing():
player = await youtubedl.YTDLSource.from_url(
" ".join(query), loop=client.loop, stream=True
player = await audio.youtubedl.YTDLSource.from_url(
" ".join(query),
loop=client.loop,
stream=True,
)
player.volume = float(args.volume) / 100.0
except Exception as e:
await utils.reply(
message, f"**failed to queue:** `{e}`", suppress_embeds=True
)
await utils.reply(message, f"failed to queue: `{e}`")
return
queued = youtubedl.QueuedSong(player, message)
queued = audio.queue.Song(player, message)
if args.now or args.next:
players[message.guild.id].queue_add_front(queued)
players[message.guild.id].queue_push_front(queued)
else:
players[message.guild.id].queue_add(queued)
players[message.guild.id].queue_push(queued)
if not message.guild.voice_client.source:
if not message.guild.voice_client:
await utils.reply(message, "unexpected disconnect from voice channel!")
return
elif not message.guild.voice_client.source:
play_next(message, first=True)
elif args.now:
message.guild.voice_client.stop()
@@ -191,7 +197,7 @@ async def queue_or_play(message, edited=False):
[
queued.player.duration if queued.player.duration else 0
for queued in players[message.guild.id].queue
]
],
),
natural=True,
)
@@ -216,13 +222,14 @@ async def queue_or_play(message, edited=False):
[
f"**{i + 1}.** {queued.format(show_queuer=True, hide_preview=True, multiline=True)}"
for i, queued in batch
]
)
for batch in itertools.batched(
enumerate(players[message.guild.id].queue), 10
)
],
)
for batch in itertools.batched(
enumerate(players[message.guild.id].queue),
10,
)
],
),
),
).start(utils.MessageInteractionWrapper(message))
else:
@@ -234,7 +241,7 @@ async def queue_or_play(message, edited=False):
async def skip(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "skip the currently playing song")
parser = arguments.ArgumentParser(tokens[0], "skip the song currently playing")
parser.add_argument(
"-n",
"--next",

View File

@@ -1,9 +1,9 @@
import disnake
import audio
import sponsorblock
import utils
import youtubedl
from constants import EMBED_COLOR
from constants import EMBED_COLOR, SPONSORBLOCK_CATEGORY_NAMES
from state import players
from .utils import command_allowed
@@ -21,18 +21,20 @@ async def sponsorblock_command(message):
video = await sponsorblock.get_segments(players[message.guild.id].current.player.id)
if not video:
await utils.reply(
message, "no sponsorblock segments were found for this video!"
message,
"no sponsorblock segments were found for this video!",
)
return
text = []
for segment in video["segments"]:
begin, end = map(int, segment["segment"])
category_name = sponsorblock.CATEGORY_NAMES.get(segment["category"])
if (category := segment["category"]) in SPONSORBLOCK_CATEGORY_NAMES:
category = SPONSORBLOCK_CATEGORY_NAMES[category]
current = "**" if progress >= begin and progress < end else ""
text.append(
f"{current}`{youtubedl.format_duration(begin)}` - `{youtubedl.format_duration(end)}`: {category_name if category_name else 'Unknown'}{current}"
f"{current}`{audio.utils.format_duration(begin)}` - `{audio.utils.format_duration(end)}`: {category}{current}",
)
await utils.reply(

View File

@@ -24,7 +24,8 @@ def play_next(message, once=False, first=False):
if message.guild.id in players and players[message.guild.id].queue:
queued = players[message.guild.id].queue_pop()
message.guild.voice_client.play(
queued.player, after=lambda e: play_after_callback(e, message, once)
queued.player,
after=lambda e: play_after_callback(e, message, once),
)
embed = queued.embed()

View File

@@ -19,8 +19,24 @@ BAR_LENGTH = 35
EMBED_COLOR = 0xFF6600
OWNERS = [531392146767347712]
PREFIX = "%"
SPONSORBLOCK_CATEGORY_NAMES = {
"music_offtopic": "non-music",
"selfpromo": "self promotion",
"sponsor": "sponsored",
}
REACTIONS = {
"cat": ["🐈"],
"dog": ["🐕"],
"gn": ["💤", "😪", "😴", "🛌"],
"pizza": ["🍕"],
}
RELOADABLE_MODULES = [
"arguments",
"audio",
"audio.discord",
"audio.queue",
"audio.utils",
"audio.youtubedl",
"commands",
"commands.bot",
"commands.tools",
@@ -40,27 +56,28 @@ RELOADABLE_MODULES = [
"sponsorblock",
"tasks",
"utils",
"utils.common",
"utils.discord",
"voice",
"youtubedl",
"yt_dlp",
"yt_dlp.version",
]
PUBLIC_FLAGS = {
1: "Discord Employee",
2: "Discord Partner",
4: "HypeSquad Events",
8: "Bug Hunter Level 1",
64: "HypeSquad Bravery",
128: "HypeSquad Brilliance",
256: "HypeSquad Balance",
512: "Early Supporter",
1024: "Team User",
16384: "Bug Hunter Level 2",
65536: "Verified Bot",
131072: "Verified Bot Developer",
262144: "Discord Certified Moderator",
524288: "HTTP Interactions Only",
4194304: "Active Developer",
1 << 0: "Discord Employee",
1 << 1: "Discord Partner",
1 << 2: "HypeSquad Events",
1 << 3: "Bug Hunter Level 1",
1 << 6: "HypeSquad Bravery",
1 << 7: "HypeSquad Brilliance",
1 << 8: "HypeSquad Balance",
1 << 9: "Early Supporter",
1 << 10: "Team User",
1 << 14: "Bug Hunter Level 2",
1 << 16: "Verified Bot",
1 << 17: "Verified Bot Developer",
1 << 18: "Discord Certified Moderator",
1 << 19: "HTTP Interactions Only",
1 << 22: "Active Developer",
}
BADGE_EMOJIS = {
"Discord Employee": "<:DiscordStaff:879666899980546068>",

43
core.py
View File

@@ -3,6 +3,7 @@ import contextlib
import importlib
import inspect
import io
import signal
import textwrap
import time
import traceback
@@ -48,32 +49,22 @@ async def on_message(message, edited=False):
try:
if (cooldowns := command_cooldowns.get(message.author.id)) and not edited:
if (end_time := cooldowns.get(matched)) and int(time.time()) < int(
end_time
if (end_time := cooldowns.get(matched)) and (
remaining_time := round(end_time - time.time()) > 0
):
await utils.reply(
message,
f"please wait **{utils.format_duration(int(end_time - time.time()), natural=True)}** before using this command again!",
f"please wait **{utils.format_duration(remaining_time, natural=True)}** before using this command again!",
)
return
match matched:
case C.RELOAD if message.author.id in OWNERS:
reloaded_modules = set()
start = time.time()
rreload(reloaded_modules, __import__("core"))
rreload(reloaded_modules, __import__("extra"))
for module in filter(
lambda v: inspect.ismodule(v) and v.__name__ in RELOADABLE_MODULES,
globals().values(),
):
rreload(reloaded_modules, module)
reloaded_modules = reload()
end = time.time()
if __debug__:
debug(
f"reloaded {len(reloaded_modules)} modules in {round(end - start, 2)}s"
f"reloaded {len(reloaded_modules)} modules in {round(end - start, 2)}s",
)
await utils.add_check_reaction(message)
@@ -167,6 +158,7 @@ async def on_voice_state_update(_, before, after):
channel = before.channel
elif is_empty(after.channel):
channel = after.channel
if channel:
await channel.guild.voice_client.disconnect()
@@ -175,9 +167,9 @@ def rreload(reloaded_modules, module):
reloaded_modules.add(module.__name__)
for submodule in filter(
lambda v: inspect.ismodule(v)
and v.__name__ in RELOADABLE_MODULES
and v.__name__ not in reloaded_modules,
lambda sm: inspect.ismodule(sm)
and sm.__name__ in RELOADABLE_MODULES
and sm.__name__ not in reloaded_modules,
vars(module).values(),
):
rreload(reloaded_modules, submodule)
@@ -186,3 +178,18 @@ def rreload(reloaded_modules, module):
if "__reload_module__" in dir(module):
module.__reload_module__()
def reload(*_):
reloaded_modules = set()
rreload(reloaded_modules, __import__("core"))
rreload(reloaded_modules, __import__("extra"))
for module in filter(
lambda v: inspect.ismodule(v) and v.__name__ in RELOADABLE_MODULES,
globals().values(),
):
rreload(reloaded_modules, module)
return reloaded_modules
signal.signal(signal.SIGUSR1, reload)

View File

@@ -1,14 +1,12 @@
import asyncio
import threading
import time
from logging import info
import fun
from logging import debug, info, warning
import commands
import core
import fun
import tasks
from state import client, start_time
from state import client
def prepare():
@@ -42,19 +40,29 @@ async def on_message_edit(before, after):
await core.on_message(after, edited=True)
async def on_ready():
info(f"logged in as {client.user} in {round(time.time() - start_time, 1)}s")
async def on_voice_state_update(member, before, after):
await core.on_voice_state_update(member, before, after)
async def on_ready():
info(f"logged in as {client.user}")
async def on_connect():
debug("connected to the gateway!")
async def on_disconnect():
warning("disconnected from the gateway!")
for event_type, handlers in client.get_listeners().items():
for handler in handlers:
client.remove_listener(handler, event_type)
client.add_listener(on_bulk_message_delete, "on_bulk_message_delete")
client.add_listener(on_connect, "on_connect")
client.add_listener(on_disconnect, "on_disconnect")
client.add_listener(on_message, "on_message")
client.add_listener(on_message_delete, "on_message_delete")
client.add_listener(on_message_edit, "on_message_edit")

View File

@@ -8,11 +8,15 @@ from state import client, kill, players
async def transcript(
message, languages=["en"], max_messages=6, min_messages=3, upper=True
message,
languages=["en"],
max_messages=6,
min_messages=3,
upper=True,
):
initial_id = message.guild.voice_client.source.id
transcript_list = youtube_transcript_api.YouTubeTranscriptApi.list_transcripts(
initial_id
initial_id,
)
try:
transcript = transcript_list.find_manually_created_transcript(languages).fetch()
@@ -44,7 +48,7 @@ async def transcript(
await messages.pop().delete()
else:
await message.channel.delete_messages(
[messages.pop() for _ in range(count)]
[messages.pop() for _ in range(count)],
)
except Exception:
pass
@@ -77,19 +81,20 @@ def messages_per_second(limit=500):
average = 1
print(
f"I am receiving **{average} {'message' if average == 1 else 'messages'} per second** "
f"from **{len(members)} {'member' if len(members) == 1 else 'members'}** across **{len(guilds)} {'guild' if len(guilds) == 1 else 'guilds'}**"
f"from **{len(members)} {'member' if len(members) == 1 else 'members'}** across **{len(guilds)} {'guild' if len(guilds) == 1 else 'guilds'}**",
)
async def auto_count(channel_id):
async def auto_count(channel_id: int):
if (channel := await client.fetch_channel(channel_id)) and isinstance(
channel, disnake.TextChannel
channel,
disnake.TextChannel,
):
last_message = (await channel.history(limit=1).flatten())[0]
try:
result = str(
int("".join(filter(lambda d: d in string.digits, last_message.content)))
+ 1
+ 1,
)
except Exception:
result = "where number"

8
fun.py
View File

@@ -1,9 +1,13 @@
import random
import commands
from constants import REACTIONS
async def on_message(message):
if "gn" in commands.tokenize(message.content, remove_prefix=False):
if random.random() < 0.01:
await message.add_reaction(random.choice(["💤", "😪", "😴", "🛌"]))
tokens = commands.tokenize(message.content, remove_prefix=False)
for keyword, options in REACTIONS.items():
if keyword in tokens:
await message.add_reaction(random.choice(options))
break

View File

@@ -7,7 +7,7 @@ from state import client
if __name__ == "__main__":
logging.basicConfig(
format=(
"%(asctime)s %(levelname)s %(name):%(module)s %(message)s"
"%(asctime)s %(levelname)s %(name)s:%(module)s %(message)s"
if __debug__
else "%(asctime)s %(levelname)s %(message)s"
),

View File

@@ -1,7 +1,8 @@
aiohttp
audioop-lts
disnake
disnake_paginator
psutil
PyNaCl
youtube_transcript_api
yt-dlp
yt-dlp[default] @ https://github.com/yt-dlp/yt-dlp/archive/master.tar.gz

View File

@@ -1,29 +1,37 @@
import hashlib
import json
import aiohttp
from state import sponsorblock_cache
CATEGORY_NAMES = {
"music_offtopic": "non-music",
"sponsor": "sponsored",
}
categories = json.dumps(
[
"interaction",
"intro",
"music_offtopic",
"outro",
"preview",
"selfpromo",
"sponsor",
],
)
async def get_segments(videoId: str):
if videoId in sponsorblock_cache:
return sponsorblock_cache[videoId]
async def get_segments(video_id: str):
if video_id in sponsorblock_cache:
return sponsorblock_cache[video_id]
hashPrefix = hashlib.sha256(videoId.encode()).hexdigest()[:4]
hash_prefix = hashlib.sha256(video_id.encode()).hexdigest()[:4]
session = aiohttp.ClientSession()
response = await session.get(
f"https://sponsor.ajay.app/api/skipSegments/{hashPrefix}",
params={"categories": '["sponsor", "music_offtopic"]'},
f"https://sponsor.ajay.app/api/skipSegments/{hash_prefix}",
params={"categories": categories},
)
if response.status == 200 and (
results := list(
filter(lambda v: videoId == v["videoID"], await response.json())
filter(lambda v: video_id == v["videoID"], await response.json()),
)
):
sponsorblock_cache[videoId] = results[0]
sponsorblock_cache[video_id] = results[0]
return results[0]

View File

@@ -15,5 +15,6 @@ idle_tracker = {"is_idle": False, "last_used": time.time()}
kill = {"transcript": False}
message_responses = LimitedSizeDict()
players = {}
sponsorblock_cache = LimitedSizeDict(size_limit=100)
sponsorblock_cache = LimitedSizeDict()
start_time = time.time()
trusted_users = []

View File

@@ -11,7 +11,7 @@ async def cleanup():
debug("spawned cleanup thread")
while True:
await asyncio.sleep(3600 * 12)
await asyncio.sleep(3600)
targets = []
for guild_id, player in players.items():
@@ -19,8 +19,7 @@ async def cleanup():
targets.append(guild_id)
for target in targets:
del players[target]
if __debug__:
debug(f"cleanup removed {len(targets)} empty players")
debug(f"cleanup thread removed {len(targets)} empty players")
if (
not idle_tracker["is_idle"]

View File

@@ -1,3 +1,3 @@
from . import test_filter_secrets, test_format_duration
__all__ = ["test_format_duration", "test_filter_secrets"]
__all__ = ["test_filter_secrets", "test_format_duration"]

View File

@@ -7,15 +7,15 @@ class TestFilterSecrets(unittest.TestCase):
def test_filter_secrets(self):
secret = "PLACEHOLDER_TOKEN"
self.assertFalse(
secret in utils.filter_secrets(f"HELLO{secret}WORLD", {"TOKEN": secret})
secret in utils.filter_secrets(f"HELLO{secret}WORLD", {"TOKEN": secret}),
)
self.assertFalse(secret in utils.filter_secrets(secret, {"TOKEN": secret}))
self.assertFalse(
secret in utils.filter_secrets(f"123{secret}", {"TOKEN": secret})
secret in utils.filter_secrets(f"123{secret}", {"TOKEN": secret}),
)
self.assertFalse(
secret in utils.filter_secrets(f"{secret}{secret}", {"TOKEN": secret})
secret in utils.filter_secrets(f"{secret}{secret}", {"TOKEN": secret}),
)
self.assertFalse(
secret in utils.filter_secrets(f"{secret}@#(*&*$)", {"TOKEN": secret})
secret in utils.filter_secrets(f"{secret}@#(*&*$)", {"TOKEN": secret}),
)

View File

@@ -1,13 +1,13 @@
import unittest
import audio
import utils
import youtubedl
class TestFormatDuration(unittest.TestCase):
def test_youtubedl(self):
def test_audio(self):
def f(s):
return youtubedl.format_duration(s)
return audio.utils.format_duration(s)
self.assertEqual(f(0), "00:00")
self.assertEqual(f(0.5), "00:00")

View File

@@ -1,4 +1,4 @@
from .common import LimitedSizeDict, filter_secrets, format_duration
from .common import LimitedSizeDict, filter_secrets, format_duration, surround
from .discord import (
ChannelResponseWrapper,
MessageInteractionWrapper,
@@ -24,4 +24,5 @@ __all__ = [
"MessageInteractionWrapper",
"reply",
"snowflake_timestamp",
"surround",
]

View File

@@ -3,7 +3,11 @@ from collections import OrderedDict
from constants import SECRETS
def format_duration(duration: int, natural: bool = False, short: bool = False):
def surround(inner: str, outer="```") -> str:
return outer + str(inner) + outer
def format_duration(duration: int, natural: bool = False, short: bool = False) -> str:
def format_plural(noun, count):
if short:
return noun[0]
@@ -46,7 +50,7 @@ def filter_secrets(text: str, secrets=SECRETS) -> str:
class LimitedSizeDict(OrderedDict):
def __init__(self, *args, **kwargs):
self.size_limit = kwargs.pop("size_limit", 1000)
self.size_limit = kwargs.pop("size_limit", 100)
super().__init__(*args, **kwargs)
self._check_size_limit()

View File

@@ -1,14 +1,18 @@
import os
import time
from logging import error, info
from pathlib import Path
import disnake
import commands
from constants import OWNERS
from state import command_cooldowns, message_responses
def cooldown(message, cooldown_time: int):
if message.author.id in OWNERS:
return
possible_commands = commands.match(message.content)
if not possible_commands or len(possible_commands) > 1:
return
@@ -30,7 +34,9 @@ async def reply(message, *args, **kwargs):
try:
await message_responses[message.id].edit(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
*args,
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
)
return
except Exception:
@@ -38,7 +44,9 @@ async def reply(message, *args, **kwargs):
try:
response = await message.reply(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
*args,
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
)
except Exception:
response = await channel_send(message, *args, **kwargs)
@@ -48,13 +56,15 @@ async def reply(message, *args, **kwargs):
async def channel_send(message, *args, **kwargs):
await message.channel.send(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
*args,
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
)
def load_opus():
for path in filter(
lambda p: os.path.exists(p),
lambda p: Path(p).exists(),
["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"],
):
try:
@@ -66,8 +76,8 @@ def load_opus():
raise Exception("could not locate working opus library")
def snowflake_timestamp(id):
return round(((id >> 22) + 1420070400000) / 1000)
def snowflake_timestamp(snowflake) -> int:
return round(((snowflake >> 22) + 1420070400000) / 1000)
async def add_check_reaction(message):
@@ -76,7 +86,8 @@ async def add_check_reaction(message):
async def invalid_user_handler(interaction):
await interaction.response.send_message(
"you are not the intended receiver of this message!", ephemeral=True
"you are not the intended receiver of this message!",
ephemeral=True,
)
@@ -86,8 +97,7 @@ class ChannelResponseWrapper:
self.sent_message = None
async def send_message(self, **kwargs):
if "ephemeral" in kwargs:
del kwargs["ephemeral"]
kwargs.pop("ephemeral", None)
self.sent_message = await reply(self.message, **kwargs)
async def edit_message(self, content=None, embed=None, view=None):

View File

@@ -1,213 +0,0 @@
import asyncio
import audioop
import collections
from dataclasses import dataclass
from typing import Any, Optional
import disnake
import yt_dlp
from constants import BAR_LENGTH, EMBED_COLOR, YTDL_OPTIONS
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
class PCMVolumeTransformer(disnake.AudioSource):
def __init__(self, original: disnake.AudioSource, volume: float = 1.0) -> None:
if original.is_opus():
raise disnake.ClientException("AudioSource must not be Opus encoded.")
self.original = original
self.volume = volume
@property
def volume(self) -> float:
return self._volume
@volume.setter
def volume(self, value: float) -> None:
self._volume = max(value, 0.0)
def cleanup(self) -> None:
self.original.cleanup()
def read(self) -> bytes:
ret = self.original.read()
return audioop.mul(ret, 2, self._volume)
class CustomAudioSource(disnake.AudioSource):
def __init__(self, source):
self._source = source
self.read_count = 0
def read(self) -> bytes:
data = self._source.read()
if data:
self.read_count += 1
return data
def fast_forward(self, seconds: int):
for _ in range(int(seconds / 0.02)):
self.read()
@property
def progress(self) -> float:
return self.read_count * 0.02
class YTDLSource(PCMVolumeTransformer):
def __init__(
self, source: CustomAudioSource, *, data: dict[str, Any], volume: float = 0.5
):
super().__init__(source, volume)
self.description = data.get("description")
self.duration = data.get("duration")
self.id = data.get("id")
self.like_count = data.get("like_count")
self.original_url = data.get("original_url")
self.thumbnail_url = data.get("thumbnail")
self.timestamp = data.get("timestamp")
self.title = data.get("title")
self.uploader = data.get("uploader")
self.uploader_url = data.get("uploader_url")
self.view_count = data.get("view_count")
@classmethod
async def from_url(
cls,
url,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
stream: bool = False,
):
loop = loop or asyncio.get_event_loop()
data: Any = await loop.run_in_executor(
None, lambda: ytdl.extract_info(url, download=not stream)
)
if "entries" in data:
data = data["entries"][0]
return cls(
CustomAudioSource(
disnake.FFmpegPCMAudio(
data["url"] if stream else ytdl.prepare_filename(data),
before_options="-vn -reconnect 1",
)
),
data=data,
)
def __repr__(self):
return f"<YTDLSource title={self.title} original_url=<{self.original_url}> duration={self.duration}>"
def __str__(self):
return self.__repr__()
@dataclass
class QueuedSong:
player: YTDLSource
trigger_message: disnake.Message
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
if multiline:
return (
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})\n**duration:** {format_duration(self.player.duration) if self.player.duration else '[live]'}"
+ (
f", **queued by:** <@{self.trigger_message.author.id}>"
if show_queuer
else ""
)
)
else:
return (
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''}) [**{format_duration(self.player.duration) if self.player.duration else 'live'}**]"
+ (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "")
)
def embed(self, is_paused=False):
progress = 0
if self.player.duration:
progress = self.player.original.progress / self.player.duration
embed = disnake.Embed(
color=EMBED_COLOR,
title=self.player.title,
url=self.player.original_url,
description=(
f"{'⏸️ ' if is_paused else ''}"
f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` "
+ (
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
if self.player.duration
else "[**live**]"
)
),
)
if self.player.uploader_url:
embed.add_field(
name="Uploader",
value=f"[{self.player.uploader}]({self.player.uploader_url})",
)
else:
embed.add_field(
name="Uploader",
value=self.player.uploader,
)
embed.add_field(name="Likes", value=f"{self.player.like_count:,}")
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
embed.add_field(name="Published", value=f"<t:{self.player.timestamp}>")
embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%")
embed.set_image(self.player.thumbnail_url)
embed.set_footer(
text=f"queued by {self.trigger_message.author.name}",
icon_url=(
self.trigger_message.author.avatar.url
if self.trigger_message.author.avatar
else None
),
)
return embed
def __str__(self):
return self.__repr__()
@dataclass
class QueuedPlayer:
queue = collections.deque()
current: Optional[QueuedSong] = None
def queue_pop(self):
popped = self.queue.popleft()
self.current = popped
return popped
def queue_add(self, item):
self.queue.append(item)
def queue_add_front(self, item):
self.queue.appendleft(item)
def __str__(self):
return self.__repr__()
def format_duration(duration: int | float) -> str:
hours, duration = divmod(int(duration), 3600)
minutes, duration = divmod(duration, 60)
segments = [hours, minutes, duration]
if len(segments) == 3 and segments[0] == 0:
del segments[0]
return f"{':'.join(f'{s:0>2}' for s in segments)}"
def __reload_module__():
global ytdl
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)