Compare commits

...

115 Commits

Author SHA1 Message Date
019e60450f refactor: tweak descriptions 2025-06-10 20:59:11 -04:00
7672107c68 chore(requirements): use latest yt-dlp from github 2025-06-10 20:30:41 -04:00
ee6ea4eed4 refactor(audio/queue): save uploader field value 2025-06-08 17:29:36 -04:00
fed280e6c5 fix(audio/queue): check for uploader name 2025-06-08 13:46:57 -04:00
1a8f84b333 feat: reload when SIGUSR1 is received 2025-06-08 13:11:50 -04:00
94bdb91eb0 feat: add trusted user list 2025-06-08 13:11:50 -04:00
5c030a0557 refactor(commands): tweak descriptions 2025-06-08 12:57:34 -04:00
5344e89c26 feat(audio/queue): add timestamps 2025-06-08 12:26:30 -04:00
80e6d422e5 fix: add missing character in format string 2025-05-29 11:45:18 -04:00
71fad98d3d refactor(cleanup): reduce interval 2025-05-02 18:19:16 -04:00
83d784c917 refactor: reduce LimitedSizeDict size 2025-05-02 18:18:34 -04:00
f4b7e0f5ce refactor(commands/voice): clean up some code 2025-05-01 18:29:26 -04:00
1316fb593c refactor(commands/voice/queue): delay player creation 2025-05-01 18:29:26 -04:00
b6d105a519 refactor(sponsorblock): hashPrefix -> hash_prefix 2025-04-25 21:27:04 -04:00
ec31250153 refactor: follow more guidelines 2025-04-03 17:53:46 -04:00
f360566824 refactor: minor changes 2025-03-28 21:22:19 -04:00
b0c96a11cd feat(fun): add more reactions 2025-03-28 21:22:19 -04:00
062676df26 fix: handle missing sponsorblock category names 2025-02-26 16:21:42 -05:00
5430f7c632 refactor(utils): add surround function 2025-02-25 17:55:21 -05:00
0a8482c030 refactor(constants): use bit-shifts for public flags 2025-02-25 17:49:45 -05:00
87c88f796d refactor(audio/youtubedl): remove redundant <> from repr 2025-02-25 17:47:38 -05:00
d08744ebb2 refactor(sponsorblock): clean up categories 2025-02-25 17:46:42 -05:00
0b3425a658 fix(commands/voice/join): check message author voice channel before joining 2025-02-25 17:42:44 -05:00
e7105f1828 chore(docker): run with debug output 2025-02-13 20:32:56 -05:00
4f7bd903b8 fix(youtubedl): handle error when no url 2025-02-13 18:59:27 -05:00
22249ecf7a refactor: remove useless debug checks
Debug messages shouldn't be printed in the first place if debug isn't on.
2025-02-13 16:31:52 -05:00
5610fc7acd refactor: minor cleanups 2025-02-13 16:31:30 -05:00
c73260badb feat(commands): add c as alias for current 2025-02-13 16:28:51 -05:00
2645f33940 fix: reload all util modules 2025-02-12 19:04:16 -05:00
8d76a107c5 refactor(utils/cooldown): ignore owners 2025-02-12 19:02:50 -05:00
8ee7693b91 refactor(audio/youtubedl): improve empty entries error message 2025-02-12 15:29:55 -05:00
0f5532a14a refactor(commands/voice/queue): remove extra bold from queue error 2025-02-12 15:29:11 -05:00
c8c4756cc3 fix(commands/voice/join): only join when author is in voice channel 2025-02-12 11:56:30 -05:00
ea09f291e5 refactor(core): tweak cooldown calculation
Should prevent issues where cooldown is 0 when formatted.
2025-02-10 18:08:06 -05:00
c7658f84dc chore: ignore .venv directory 2025-02-09 22:20:42 -05:00
b562ea4ac5 audio/queue: display player metadata correctly 2025-02-09 17:44:50 -05:00
623de96463 fix(audio/queue): correctly import format_duration 2025-02-09 03:39:17 -05:00
97f4787b39 audio/queue/player: rename queue_add to queue_push 2025-02-09 03:35:41 -05:00
69f4d6967f audio: split into more modules 2025-02-09 03:25:05 -05:00
af0896a6a0 audio: rename from youtubedl 2025-02-09 03:04:44 -05:00
9a58bc964d refactor(commands/voice/queue): remove redundant suppress_embeds 2025-02-06 19:59:30 -05:00
65168d38f9 refactor(youtubedl): completely remove limits from PCMVolumeTransformer 2025-02-05 21:55:13 -05:00
70ed37737c refactor(youtubedl): CustomAudioSource -> TrackedAudioSource 2025-02-05 21:54:42 -05:00
3719fc69b5 refactor(youtubedl): use CustomAudioSource as hint for PCMVolumeTransformer 2025-02-05 21:54:42 -05:00
94837f0e77 refactor: fix casing and add type hints 2025-02-05 21:42:21 -05:00
d63155d0fb chore(requirements): add aiohttp 2025-02-04 21:27:47 -05:00
40cd8238dd refactor: log messages on gateway connection state change 2025-02-04 17:04:36 -05:00
81e30c7e70 refactor(commands/utils): add override for skip 2025-02-03 16:40:01 -05:00
a1d63f1bb1 refactor(youtubedl): raise exception if no entries 2025-02-03 16:26:23 -05:00
1a24754549 refactor(commands/voice/queue): check for voice_client 2025-02-03 16:24:47 -05:00
2c6d05b33d fix(youtubedl): show placeholder if no likes 2025-02-01 18:44:28 -05:00
117438be76 refactor(fun): simplify if statement 2025-02-01 18:43:56 -05:00
fbdd442a8e style(commands/voice/playback): format again 2025-02-01 16:09:55 -05:00
ca9f811e8f fix(commands/voice/sponsorblock): use youtubedl duration and make sponsorblock default 2025-01-25 19:03:26 -05:00
26f81bd58f feat(commands/voice): add sponsorblock command 2025-01-24 17:51:25 -05:00
256156b9d2 fix(extra): reset state.kill 2025-01-24 16:26:31 -05:00
10f7ce991c fix(commands/voice/queue): show correct index for queue --next 2025-01-23 19:25:01 -05:00
fc06b312cd fix(commands/tools/lookup): get accent color properly 2025-01-23 19:24:50 -05:00
98f61c623c feat(youtubedl): add unlimited PCMVolumeTransformer 2025-01-23 19:19:17 -05:00
0e69a039a1 feat(commands/utils/tokenize): add remove_prefix parameter 2025-01-23 14:50:39 -05:00
3930175c79 fix: send another message if reply fails 2025-01-22 23:18:46 -05:00
03a8014d2f refactor(sponsorblock): use params and include sponsor category as well 2025-01-22 14:53:21 -05:00
8ef4c85bd8 refactor: use aiohttp for requests 2025-01-22 14:49:29 -05:00
640e750e3d feat: add sponsorblock integration
Add the -S option to the fast forward command.
2025-01-22 14:47:00 -05:00
3ca58a4c19 refactor(commands/voice/queue): allow query with spaces 2025-01-19 17:53:58 -05:00
a8d69e079d refactor: allow reloading yt_dlp 2025-01-19 04:44:00 -05:00
7f07e1bd71 refactor(commands/bot/status): show yt-dlp version 2025-01-14 23:40:12 -05:00
5bdfddbbbd fix(commands/voice): clean up command_allowed check 2025-01-13 01:20:17 -05:00
d0ddf9ee47 fix(commands/voice/join): check if channels correct before joining 2025-01-13 01:08:55 -05:00
de5d4d2793 refactor: parse_snowflake -> snowflake_timestamp 2025-01-10 23:48:45 -05:00
7fdaf7b379 chore(docker): add psutil build dependencyes 2025-01-09 20:26:59 -05:00
b8b566cb57 chore(requirements): add psutil 2025-01-09 20:23:22 -05:00
234d6b438b refactor(utils): split into separate files 2025-01-09 20:04:40 -05:00
c420f3de6b refactor(core): clean up execute command 2025-01-09 16:57:14 -05:00
c80b926b35 feat: add fun module 2025-01-09 16:57:14 -05:00
23c29e1fa0 refactor(utils): remove opus load warning 2025-01-09 16:53:00 -05:00
8cbd7d6aef style: format with ruff 0.9 2025-01-09 16:31:32 -05:00
57809fe26d refactor(commands/tools): clean up 2025-01-09 16:25:52 -05:00
8a4f12fcce refactor(commands/voice/utils): check if loaded opus manually 2025-01-09 16:20:36 -05:00
1e4271217d refactor(commands): get rid of custom module reloader 2025-01-09 16:20:16 -05:00
c892358fef fix(utils/reply): remove content or embeds when editing message 2025-01-08 16:36:00 -05:00
849af9d394 feat(commands/tools): add lookup 2025-01-08 16:29:10 -05:00
aa4632b4dd feat(commands/bot): add ping 2025-01-08 16:03:24 -05:00
79fd40a8e3 feat(commands/voice/clear): add --attachments 2025-01-08 14:27:40 -05:00
d6150a664f fix(commands/tools/clear): check for ignore_ids 2025-01-08 14:25:58 -05:00
f00ac9c977 refactor(commands/voice/queue): reduce cooldown to 2 seconds 2025-01-08 14:20:51 -05:00
50651db89e test: add secret filtering 2025-01-08 13:52:27 -05:00
bb3c379755 fix(youtubedl): check for uploader_url when creating embed 2025-01-08 13:31:18 -05:00
7a400da1ee feat(commands/tools/clear): add --ignore-ids 2025-01-08 13:20:58 -05:00
78f2d0a568 fix(extra/transcript): calculate count properly 2025-01-08 13:18:56 -05:00
d5d8c56ba1 fix(commands): don't check cooldown for edited messages 2025-01-08 13:08:30 -05:00
93c67f707c refactor: set disnake log level to WARNING 2025-01-08 13:06:21 -05:00
27a460fa6e refactor(commands): use format_duration for cooldown message 2025-01-08 10:28:04 -05:00
ffdd25d849 style: format files 2025-01-08 10:18:57 -05:00
32c7be659b perf(commands/utils): use lru cache for matching 2025-01-08 10:18:46 -05:00
672ae02e16 feat(commands): add cooldown system 2025-01-08 10:17:06 -05:00
7c2e17e0d3 refactor: use from imports for constants 2025-01-08 09:28:01 -05:00
07b3bde22d test: check for short formatting cases 2025-01-08 09:28:00 -05:00
6afbce5d8f feat(utils): add support for short formatting 2025-01-08 09:25:37 -05:00
be77e62e53 feat(commands/bot): add status 2025-01-08 09:25:11 -05:00
e3982c064d feat: add proper logging 2025-01-08 08:58:16 -05:00
d56bac1b2f refactor: improve debgu messages 2025-01-08 08:37:40 -05:00
d6bc67f17a refactor: clean up some import and names 2025-01-08 08:32:59 -05:00
8d0bec4cf2 fix(tasks): add error handling to presence changing 2025-01-08 08:26:36 -05:00
c34bdc2bfd refactor(youtubedl): channel -> uploader and add url 2025-01-07 17:19:17 -05:00
6c5e92aec2 refactor(extra): use delete() if only one message 2025-01-07 16:40:25 -05:00
5e5a91d879 feat(youtubedl): add more fields to embed 2025-01-07 16:31:27 -05:00
41f9beb6e8 refactor(commands/voice): move embed generation into separate file 2025-01-07 16:11:32 -05:00
8ee5d01bf6 refactor(commands/voice): split into separate files 2025-01-07 15:29:28 -05:00
2ea3d74e8a fix(commands/voice): return early if voice_client doesn't exist 2025-01-07 14:49:13 -05:00
c04cf1b05f feat(state): add kill dict 2025-01-07 14:34:23 -05:00
803eae2adc feat(commands/voice/skip): add --next 2025-01-07 12:38:08 -05:00
1b781ac6a0 refactor(commands/voice): check if command allowed after argparse
So the help command always works.
2025-01-07 12:37:56 -05:00
5824fcdf16 refactor(commands/voice): minor description updates 2025-01-07 12:37:33 -05:00
930169346b fix(commands/voice/queue): only allow first when queue is actually empty 2025-01-07 12:37:02 -05:00
37 changed files with 1733 additions and 910 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
.env .env
.venv
__pycache__ __pycache__

View File

@@ -1,10 +1,10 @@
FROM python:3.13-alpine FROM python:3.13-alpine
RUN apk --no-cache add ffmpeg opus RUN apk --no-cache add ffmpeg gcc linux-headers musl-dev opus python3-dev
WORKDIR /bot WORKDIR /bot
COPY . . COPY . .
RUN pip install -r requirements.txt RUN pip install -r requirements.txt
CMD ["python", "-OO", "main.py"] CMD ["python", "main.py"]

View File

@@ -8,7 +8,9 @@ import utils
class ArgumentParser: class ArgumentParser:
def __init__(self, command, description): def __init__(self, command, description):
self.parser = argparse.ArgumentParser( self.parser = argparse.ArgumentParser(
command, description=description, exit_on_error=False command,
description=description,
exit_on_error=False,
) )
def print_help(self): def print_help(self):
@@ -26,21 +28,20 @@ class ArgumentParser:
async def parse_args(self, message, tokens) -> argparse.Namespace | None: async def parse_args(self, message, tokens) -> argparse.Namespace | None:
try: try:
with contextlib.redirect_stdout(io.StringIO()): with contextlib.redirect_stdout(io.StringIO()):
args = self.parser.parse_args(tokens[1:]) return self.parser.parse_args(tokens[1:])
return args
except SystemExit: except SystemExit:
await utils.reply(message, f"```\n{self.print_help()}```") await utils.reply(message, f"```\n{self.print_help()}```")
except Exception as e: except Exception as e:
await utils.reply(message, f"`{e}`") await utils.reply(message, f"`{e}`")
def range_type(string, min=0, max=100): def range_type(string: str, lower=0, upper=100) -> int:
try: try:
value = int(string) value = int(string)
except ValueError: except ValueError as e:
raise argparse.ArgumentTypeError("value is not a valid integer") raise argparse.ArgumentTypeError("value is not a valid integer") from e
if min <= value <= max: if lower <= value <= upper:
return value return value
else:
raise argparse.ArgumentTypeError(f"value is not in range {min}-{max}") raise argparse.ArgumentTypeError(f"value is not in range {lower}-{upper}")

8
audio/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
from . import discord, queue, utils, youtubedl
__all__ = [
"discord",
"queue",
"utils",
"youtubedl",
]

39
audio/discord.py Normal file
View File

@@ -0,0 +1,39 @@
import audioop
import disnake
class TrackedAudioSource(disnake.AudioSource):
def __init__(self, source):
self._source = source
self.read_count = 0
def read(self) -> bytes:
data = self._source.read()
if data:
self.read_count += 1
return data
def fast_forward(self, seconds: int):
for _ in range(int(seconds / 0.02)):
self.read()
@property
def progress(self) -> float:
return self.read_count * 0.02
class PCMVolumeTransformer(disnake.AudioSource):
def __init__(self, original: TrackedAudioSource, volume: float = 1.0) -> None:
if original.is_opus():
raise disnake.ClientException("AudioSource must not be Opus encoded")
self.original = original
self.volume = volume
def cleanup(self) -> None:
self.original.cleanup()
def read(self) -> bytes:
ret = self.original.read()
return audioop.mul(ret, 2, self.volume)

112
audio/queue.py Normal file
View File

@@ -0,0 +1,112 @@
import collections
from dataclasses import dataclass
from typing import ClassVar, Optional
import disnake
from constants import BAR_LENGTH, EMBED_COLOR
from .utils import format_duration
from .youtubedl import YTDLSource
@dataclass
class Song:
player: YTDLSource
trigger_message: disnake.Message
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
title = f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})"
duration = (
format_duration(self.player.duration) if self.player.duration else "stream"
)
if multiline:
queue_time = (
self.trigger_message.edited_at or self.trigger_message.created_at
)
return f"{title}\n**duration:** {duration}" + (
f", **queued by:** <@{self.trigger_message.author.id}> <t:{round(queue_time.timestamp())}:R>"
if show_queuer
else ""
)
return f"{title} [**{duration}**]" + (
f" (<@{self.trigger_message.author.id}>)" if show_queuer else ""
)
def embed(self, is_paused=False):
progress = 0
if self.player.duration:
progress = self.player.original.progress / self.player.duration
embed = disnake.Embed(
color=EMBED_COLOR,
title=self.player.title,
url=self.player.original_url,
description=(
f"{'⏸️ ' if is_paused else ''}"
f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` "
+ (
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
if self.player.duration
else "[**stream**]"
)
),
timestamp=self.trigger_message.edited_at or self.trigger_message.created_at,
)
uploader_value = None
if self.player.uploader_url:
if self.player.uploader:
uploader_value = f"[{self.player.uploader}]({self.player.uploader_url})"
else:
uploader_value = self.player.uploader_url
elif self.player.uploader:
uploader_value = self.player.uploader
if uploader_value:
embed.add_field(name="Uploader", value=uploader_value)
if self.player.like_count:
embed.add_field(name="Likes", value=f"{self.player.like_count:,}")
if self.player.view_count:
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
if self.player.timestamp:
embed.add_field(name="Published", value=f"<t:{int(self.player.timestamp)}>")
if self.player.volume:
embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%")
if self.player.thumbnail_url:
embed.set_image(self.player.thumbnail_url)
embed.set_footer(
text=f"Queued by {self.trigger_message.author.name}",
icon_url=(
self.trigger_message.author.avatar.url
if self.trigger_message.author.avatar
else None
),
)
return embed
def __str__(self):
return self.__repr__()
@dataclass
class Player:
queue: ClassVar = collections.deque()
current: Optional[Song] = None
def queue_pop(self):
popped = self.queue.popleft()
self.current = popped
return popped
def queue_push(self, item):
self.queue.append(item)
def queue_push_front(self, item):
self.queue.appendleft(item)
def __str__(self):
return self.__repr__()

7
audio/utils.py Normal file
View File

@@ -0,0 +1,7 @@
def format_duration(duration: int | float) -> str:
hours, duration = divmod(int(duration), 3600)
minutes, duration = divmod(duration, 60)
segments = [hours, minutes, duration]
if len(segments) == 3 and segments[0] == 0:
del segments[0]
return f"{':'.join(f'{s:0>2}' for s in segments)}"

76
audio/youtubedl.py Normal file
View File

@@ -0,0 +1,76 @@
import asyncio
from typing import Any, Optional
import disnake
import yt_dlp
from constants import YTDL_OPTIONS
from .discord import PCMVolumeTransformer, TrackedAudioSource
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
class YTDLSource(PCMVolumeTransformer):
def __init__(
self,
source: TrackedAudioSource,
*,
data: dict[str, Any],
volume: float = 0.5,
):
super().__init__(source, volume)
self.description = data.get("description")
self.duration = data.get("duration")
self.id = data.get("id")
self.like_count = data.get("like_count")
self.original_url = data.get("original_url")
self.thumbnail_url = data.get("thumbnail")
self.timestamp = data.get("timestamp")
self.title = data.get("title")
self.uploader = data.get("uploader")
self.uploader_url = data.get("uploader_url")
self.view_count = data.get("view_count")
@classmethod
async def from_url(
cls,
url,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
stream: bool = False,
):
loop = loop or asyncio.get_event_loop()
data: Any = await loop.run_in_executor(
None,
lambda: ytdl.extract_info(url, download=not stream),
)
if "entries" in data:
if not data["entries"]:
raise Exception("no results found!")
data = data["entries"][0]
if "url" not in data:
raise Exception("no url returned!")
return cls(
TrackedAudioSource(
disnake.FFmpegPCMAudio(
data["url"] if stream else ytdl.prepare_filename(data),
before_options="-vn -reconnect 1",
),
),
data=data,
)
def __repr__(self):
return f"<YTDLSource title={self.title} original_url={self.original_url} duration={self.duration}>"
def __str__(self):
return self.__repr__()
def __reload_module__():
global ytdl
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)

View File

@@ -3,15 +3,11 @@ from .utils import Command, match, match_token, tokenize
__all__ = [ __all__ = [
"bot", "bot",
"tools",
"utils",
"voice",
"Command", "Command",
"match", "match",
"match_token", "match_token",
"tokenize", "tokenize",
"tools",
"utils",
"voice",
] ]
def __reload_module__():
globals().update({k: v for k, v in vars(utils).items() if not k.startswith("_")})

View File

@@ -1,18 +1,65 @@
import os
import threading
import time import time
import disnake
import psutil
from yt_dlp import version
import arguments import arguments
import commands import commands
import utils from constants import EMBED_COLOR
from state import start_time from state import client, start_time
from utils import format_duration, reply, surround
async def help(message): async def status(message):
await utils.reply( member_count = 0
message, channel_count = 0
", ".join( for guild in client.guilds:
[f"`{command.value}`" for command in commands.Command.__members__.values()] member_count += len(guild.members)
), channel_count += len(guild.channels)
process = psutil.Process(os.getpid())
memory_usage = process.memory_info().rss / 1048576
embed = disnake.Embed(color=EMBED_COLOR)
embed.add_field(
name="Latency",
value=surround(f"{round(client.latency * 1000, 1)} ms"),
) )
embed.add_field(
name="Memory",
value=surround(f"{round(memory_usage, 1)} MiB"),
)
embed.add_field(
name="Threads",
value=surround(threading.active_count()),
)
embed.add_field(
name="Guilds",
value=surround(len(client.guilds)),
)
embed.add_field(
name="Members",
value=surround(member_count),
)
embed.add_field(
name="Channels",
value=surround(channel_count),
)
embed.add_field(
name="Disnake",
value=surround(disnake.__version__),
)
embed.add_field(
name="yt-dlp",
value=surround(version.__version__),
)
embed.add_field(
name="Uptime",
value=surround(format_duration(int(time.time() - start_time), short=True)),
)
await reply(message, embed=embed)
async def uptime(message): async def uptime(message):
@@ -31,8 +78,26 @@ async def uptime(message):
return return
if args.since: if args.since:
await utils.reply(message, f"{round(start_time)}") await reply(message, f"{round(start_time)}")
else: else:
await utils.reply( await reply(message, f"up {format_duration(int(time.time() - start_time))}")
message, f"up {utils.format_duration(int(time.time() - start_time))}"
)
async def ping(message):
await reply(
message,
embed=disnake.Embed(
title="Pong :ping_pong:",
description=f"Latency: **{round(client.latency * 1000, 1)} ms**",
color=EMBED_COLOR,
),
)
async def help(message):
await reply(
message,
", ".join(
[f"`{command.value}`" for command in commands.Command.__members__.values()],
),
)

View File

@@ -1,19 +1,175 @@
import re import re
import aiohttp
import disnake
import arguments import arguments
import commands import commands
import utils import utils
from constants import APPLICATION_FLAGS, BADGE_EMOJIS, EMBED_COLOR, PUBLIC_FLAGS
from state import client
async def lookup(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"look up a discord user or application by ID",
)
parser.add_argument(
"-a",
"--application",
action="store_true",
help="look up applications instead of users",
)
parser.add_argument(
"id",
type=int,
help="the ID to perform a search for",
)
if not (args := await parser.parse_args(message, tokens)):
return
if args.application:
session = aiohttp.ClientSession()
response = await (
await session.get(f"https://discord.com/api/v9/applications/{args.id}/rpc")
).json()
if "code" in response.keys():
await utils.reply(message, "application not found!")
return
embed = disnake.Embed(description=response["description"], color=EMBED_COLOR)
embed.set_thumbnail(
url=f"https://cdn.discordapp.com/app-icons/{response['id']}/{response['icon']}.webp",
)
embed.add_field(name="Application Name", value=response["name"])
embed.add_field(name="Application ID", value="`" + response["id"] + "`")
embed.add_field(
name="Public Bot",
value=f"{'`' + str(response['bot_public']) + '`' if 'bot_public' in response else 'No bot'}",
)
embed.add_field(name="Public Flags", value="`" + str(response["flags"]) + "`")
embed.add_field(
name="Terms of Service",
value=(
"None"
if "terms_of_service_url" not in response.keys()
else f"[Link]({response['terms_of_service_url']})"
),
)
embed.add_field(
name="Privacy Policy",
value=(
"None"
if "privacy_policy_url" not in response.keys()
else f"[Link]({response['privacy_policy_url']})"
),
)
embed.add_field(
name="Creation Time",
value=f"<t:{utils.snowflake_timestamp(int(response['id']))}:R>",
)
embed.add_field(
name="Default Invite URL",
value=(
"None"
if "install_params" not in response.keys()
else f"[Link](https://discord.com/oauth2/authorize?client_id={response['id']}&permissions={response['install_params']['permissions']}&scope={'%20'.join(response['install_params']['scopes'])})"
),
)
embed.add_field(
name="Custom Invite URL",
value=(
"None"
if "custom_install_url" not in response.keys()
else f"[Link]({response['custom_install_url']})"
),
)
bot_intents = []
for application_flag, intent_name in APPLICATION_FLAGS.items():
if response["flags"] & application_flag == application_flag:
if intent_name.replace(" (unverified)", "") not in bot_intents:
bot_intents.append(intent_name)
embed.add_field(
name="Application Flags",
value=", ".join(bot_intents) if bot_intents else "None",
)
bot_tags = ""
if "tags" in response.keys():
for tag in response["tags"]:
bot_tags += tag + ", "
embed.add_field(
name="Tags",
value="None" if bot_tags == "" else bot_tags[:-2],
inline=False,
)
else:
try:
user = await client.fetch_user(args.id)
except Exception:
await utils.reply(message, "user not found!")
return
badges = ""
for flag, flag_name in PUBLIC_FLAGS.items():
if user.public_flags.value & flag == flag:
if flag_name != "None":
try:
badges += BADGE_EMOJIS[PUBLIC_FLAGS[flag]]
except Exception as e:
raise Exception(
f"unable to find badge: {PUBLIC_FLAGS[flag]}"
) from e
user_object = await client.fetch_user(user.id)
accent_color = 0x000000
if user_object.accent_color is not None:
accent_color = user_object.accent_color
embed = disnake.Embed(color=accent_color)
embed.add_field(
name="User ID",
value=f"`{user.id}`",
)
embed.add_field(
name="Discriminator",
value=f"`{user.name}#{user.discriminator}`",
)
embed.add_field(
name="Creation Time",
value=f"<t:{utils.snowflake_timestamp(int(user.id))}:R>",
)
embed.add_field(
name="Public Flags",
value=f"`{user.public_flags.value}` {badges}",
)
embed.add_field(
name="Bot User",
value=f"`{user.bot}`",
)
embed.add_field(
name="System User",
value=f"`{user.system}`",
)
embed.set_thumbnail(url=user.avatar if user.avatar else user.default_avatar)
if user_object.banner:
embed.set_image(url=user_object.banner)
await utils.reply(message, embed=embed)
async def clear(message): async def clear(message):
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser( parser = arguments.ArgumentParser(
tokens[0], tokens[0],
"bulk delete messages in the current channel matching certain criteria", "bulk delete messages in the current channel matching specified criteria",
) )
parser.add_argument( parser.add_argument(
"count", "count",
type=lambda c: arguments.range_type(c, min=1, max=1000), type=lambda c: arguments.range_type(c, lower=1, upper=1000),
help="amount of messages to delete", help="amount of messages to delete",
) )
group = parser.add_mutually_exclusive_group() group = parser.add_mutually_exclusive_group()
@@ -54,12 +210,25 @@ async def clear(message):
action="store_true", action="store_true",
help="delete messages with reactions", help="delete messages with reactions",
) )
parser.add_argument(
"-A",
"--attachments",
action="store_true",
help="delete messages with attachments",
)
parser.add_argument( parser.add_argument(
"-d", "-d",
"--delete-command", "--delete-command",
action="store_true", action="store_true",
help="delete the command message as well", help="delete the command message as well",
) )
parser.add_argument(
"-I",
"--ignore-ids",
type=int,
action="append",
help="ignore messages with this id",
)
if not (args := await parser.parse_args(message, tokens)): if not (args := await parser.parse_args(message, tokens)):
return return
@@ -74,6 +243,8 @@ async def clear(message):
regex = re.compile(r, re.IGNORECASE if args.case_insensitive else 0) regex = re.compile(r, re.IGNORECASE if args.case_insensitive else 0)
def check(m): def check(m):
if (ids := args.ignore_ids) and m.id in ids:
return False
c = [] c = []
if regex: if regex:
c.append(regex.search(m.content)) c.append(regex.search(m.content))
@@ -86,12 +257,16 @@ async def clear(message):
c.append(m.author.id in i) c.append(m.author.id in i)
if args.reactions: if args.reactions:
c.append(len(m.reactions) > 0) c.append(len(m.reactions) > 0)
if args.attachments:
c.append(len(m.attachments) > 0)
return all(c) return all(c)
messages = len( messages = len(
await message.channel.purge( await message.channel.purge(
limit=args.count, check=check, oldest_first=args.oldest_first limit=args.count,
) check=check,
oldest_first=args.oldest_first,
),
) )
if not args.delete_command: if not args.delete_command:

View File

@@ -1,9 +1,10 @@
import enum from enum import Enum
from functools import lru_cache
import constants import constants
class Command(enum.Enum): class Command(Enum):
CLEAR = "clear" CLEAR = "clear"
CURRENT = "current" CURRENT = "current"
EXECUTE = "execute" EXECUTE = "execute"
@@ -11,7 +12,9 @@ class Command(enum.Enum):
HELP = "help" HELP = "help"
JOIN = "join" JOIN = "join"
LEAVE = "leave" LEAVE = "leave"
LOOKUP = "lookup"
PAUSE = "pause" PAUSE = "pause"
PING = "ping"
PLAY = "play" PLAY = "play"
PLAYING = "playing" PLAYING = "playing"
PURGE = "purge" PURGE = "purge"
@@ -19,19 +22,27 @@ class Command(enum.Enum):
RELOAD = "reload" RELOAD = "reload"
RESUME = "resume" RESUME = "resume"
SKIP = "skip" SKIP = "skip"
SPONSORBLOCK = "sponsorblock"
STATUS = "status"
UPTIME = "uptime" UPTIME = "uptime"
VOLUME = "volume" VOLUME = "volume"
@lru_cache
def match_token(token: str) -> list[Command]: def match_token(token: str) -> list[Command]:
if token.lower() == "r": match token.lower():
return [Command.RELOAD] case "r":
return [Command.RELOAD]
case "s":
return [Command.SKIP]
case "c":
return [Command.CURRENT]
if exact_match := list( if exact_match := list(
filter( filter(
lambda command: command.value == token.lower(), lambda command: command.value == token.lower(),
Command.__members__.values(), Command.__members__.values(),
) ),
): ):
return exact_match return exact_match
@@ -39,23 +50,28 @@ def match_token(token: str) -> list[Command]:
filter( filter(
lambda command: command.value.startswith(token.lower()), lambda command: command.value.startswith(token.lower()),
Command.__members__.values(), Command.__members__.values(),
) ),
) )
@lru_cache
def match(command: str) -> list[Command] | None: def match(command: str) -> list[Command] | None:
if tokens := tokenize(command): if tokens := tokenize(command):
return match_token(tokens[0]) return match_token(tokens[0])
def tokenize(string: str) -> list[str]: @lru_cache
def tokenize(string: str, remove_prefix: bool = True) -> list[str]:
tokens = [] tokens = []
token = "" token = ""
in_quotes = False in_quotes = False
quote_char = None quote_char = None
escape = False escape = False
for char in string[len(constants.PREFIX) :]: if remove_prefix:
string = string[len(constants.PREFIX) :]
for char in string:
if escape: if escape:
token += char token += char
escape = False escape = False

View File

@@ -1,486 +0,0 @@
import itertools
import disnake
import disnake_paginator
import arguments
import commands
import constants
import utils
import youtubedl
from state import client, players
async def queue_or_play(message, edited=False):
if message.guild.id not in players:
players[message.guild.id] = youtubedl.QueuedPlayer()
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0], "queue a song, list the queue, or resume playback"
)
parser.add_argument("query", nargs="?", help="yt-dlp URL or query to get song")
parser.add_argument(
"-v",
"--volume",
default=50,
type=lambda v: arguments.range_type(v, min=0, max=150),
help="the volume level (0 - 150) for the specified song",
)
parser.add_argument(
"-i",
"--remove-index",
type=int,
nargs="*",
help="remove queued songs by index",
)
parser.add_argument(
"-m",
"--match-multiple",
action="store_true",
help="continue removing queued after finding a match",
)
parser.add_argument(
"-c",
"--clear",
action="store_true",
help="remove all queued songs",
)
parser.add_argument(
"--now",
action="store_true",
help="play the specified song immediately",
)
parser.add_argument(
"--next",
action="store_true",
help="play the specified song next",
)
parser.add_argument(
"-t",
"--remove-title",
help="remove queued songs by title",
)
parser.add_argument(
"-q",
"--remove-queuer",
type=int,
help="remove queued songs by queuer",
)
if not (args := await parser.parse_args(message, tokens)):
return
await ensure_joined(message)
if len(tokens) == 1 and tokens[0].lower() != "play":
if not command_allowed(message, immutable=True):
return
elif not command_allowed(message):
return
if edited:
found = None
for queued in players[message.guild.id].queue:
if queued.trigger_message.id == message.id:
found = queued
break
if found:
players[message.guild.id].queue.remove(found)
if args.clear:
players[message.guild.id].queue.clear()
await utils.add_check_reaction(message)
return
elif indices := args.remove_index:
targets = []
for i in indices:
if i <= 0 or i > len(players[message.guild.id].queue):
await utils.reply(message, f"invalid index `{i}`!")
return
targets.append(players[message.guild.id].queue[i - 1])
for target in targets:
if target in players[message.guild.id].queue:
players[message.guild.id].queue.remove(target)
if len(targets) == 1:
await utils.reply(message, f"**X** {targets[0].format()}")
else:
await utils.reply(
message,
f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}",
)
elif args.remove_title or args.remove_queuer:
targets = []
for queued in players[message.guild.id].queue:
if t := args.remove_title:
if t in queued.player.title:
targets.append(queued)
continue
if q := args.remove_queuer:
if q == queued.trigger_message.author.id:
targets.append(queued)
if not args.match_multiple:
targets = targets[:1]
for target in targets:
players[message.guild.id].queue.remove(target)
await utils.reply(
message,
f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}",
)
elif query := args.query:
if (
not message.channel.permissions_for(message.author).manage_channels
and len(
list(
filter(
lambda queued: queued.trigger_message.author.id
== message.author.id,
players[message.guild.id].queue,
)
)
)
>= 5
and not len(message.guild.voice_client.channel.members) == 2
):
await utils.reply(
message,
"you can only queue **5 items** without the manage channels permission!",
)
return
try:
async with message.channel.typing():
player = await youtubedl.YTDLSource.from_url(
query, loop=client.loop, stream=True
)
player.volume = float(args.volume) / 100.0
except Exception as e:
await utils.reply(
message, f"**failed to queue:** `{e}`", suppress_embeds=True
)
return
queued = youtubedl.QueuedSong(player, message)
if args.now or args.next:
players[message.guild.id].queue_add_front(queued)
else:
players[message.guild.id].queue_add(queued)
if not message.guild.voice_client.source:
play_next(message, first=True)
elif args.now:
message.guild.voice_client.stop()
else:
await utils.reply(
message,
f"**{len(players[message.guild.id].queue)}.** {queued.format()}",
)
elif tokens[0].lower() == "play":
await resume(message)
else:
if players[message.guild.id].queue:
formatted_duration = utils.format_duration(
sum(
[
queued.player.duration if queued.player.duration else 0
for queued in players[message.guild.id].queue
]
),
natural=True,
)
def embed(description):
e = disnake.Embed(
description=description,
color=constants.EMBED_COLOR,
)
if formatted_duration and len(players[message.guild.id].queue) > 1:
e.set_footer(text=f"{formatted_duration} in total")
return e
await disnake_paginator.ButtonPaginator(
invalid_user_function=utils.invalid_user_handler,
color=constants.EMBED_COLOR,
segments=list(
map(
embed,
[
"\n\n".join(
[
f"**{i + 1}.** {queued.format(show_queuer=True, hide_preview=True, multiline=True)}"
for i, queued in batch
]
)
for batch in itertools.batched(
enumerate(players[message.guild.id].queue), 10
)
],
)
),
).start(utils.MessageInteractionWrapper(message))
else:
await utils.reply(
message,
"nothing is queued!",
)
async def playing(message):
if not command_allowed(message, immutable=True):
return
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0], "get information about the currently playing song"
)
parser.add_argument(
"-d",
"--description",
action="store_true",
help="get the description",
)
if not (args := await parser.parse_args(message, tokens)):
return
if source := message.guild.voice_client.source:
if args.description:
if description := source.description:
paginator = disnake_paginator.ButtonPaginator(
invalid_user_function=utils.invalid_user_handler,
color=constants.EMBED_COLOR,
title=source.title,
segments=disnake_paginator.split(description),
)
for embed in paginator.embeds:
embed.url = source.original_url
await paginator.start(utils.MessageInteractionWrapper(message))
else:
await utils.reply(
message,
source.description or "no description found!",
)
return
bar_length = 35
progress = source.original.progress / source.duration
embed = disnake.Embed(
color=constants.EMBED_COLOR,
title=source.title,
url=source.original_url,
description=f"{'⏸️ ' if message.guild.voice_client.is_paused() else ''}"
f"`[{'#'*int(progress * bar_length)}{'-'*int((1 - progress) * bar_length)}]` "
f"**{youtubedl.format_duration(int(source.original.progress))}** / **{youtubedl.format_duration(source.duration)}** (**{round(progress * 100)}%**)",
)
embed.add_field(name="Volume", value=f"{int(source.volume*100)}%")
embed.add_field(name="Views", value=f"{source.view_count:,}")
embed.add_field(
name="Queuer",
value=players[message.guild.id].current.trigger_message.author.mention,
)
embed.set_image(source.thumbnail_url)
await utils.reply(
message,
embed=embed,
)
else:
await utils.reply(
message,
"nothing is playing!",
)
async def fast_forward(message):
if not command_allowed(message):
return
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "fast forward audio playback")
parser.add_argument(
"seconds",
type=lambda v: arguments.range_type(v, min=0, max=300),
help="the amount of seconds to fast forward",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
message.guild.voice_client.pause()
message.guild.voice_client.source.original.fast_forward(args.seconds)
message.guild.voice_client.resume()
await utils.add_check_reaction(message)
async def skip(message):
if not command_allowed(message):
return
if not players[message.guild.id].queue:
message.guild.voice_client.stop()
await utils.reply(
message,
"the queue is empty now!",
)
else:
message.guild.voice_client.stop()
await utils.add_check_reaction(message)
if not message.guild.voice_client.source:
play_next(message)
async def join(message):
if message.guild.voice_client:
return await message.guild.voice_client.move_to(message.channel)
await message.channel.connect()
await utils.add_check_reaction(message)
async def leave(message):
if not command_allowed(message):
return
await message.guild.voice_client.disconnect()
await utils.add_check_reaction(message)
async def resume(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_paused():
message.guild.voice_client.resume()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is paused!",
)
async def pause(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_playing():
message.guild.voice_client.pause()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is playing!",
)
async def volume(message):
if not command_allowed(message, immutable=True):
return
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "get or set the current volume level")
parser.add_argument(
"volume",
nargs="?",
type=lambda v: arguments.range_type(v, min=0, max=150),
help="the volume level (0 - 150)",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
if args.volume is None:
await utils.reply(
message,
f"{int(message.guild.voice_client.source.volume * 100)}",
)
else:
if not command_allowed(message):
return
message.guild.voice_client.source.volume = float(args.volume) / 100.0
await utils.add_check_reaction(message)
def delete_queued(messages):
if messages[0].guild.id not in players:
return
if len(players[messages[0].guild.id].queue) == 0:
return
found = []
for message in messages:
for queued in players[message.guild.id].queue:
if queued.trigger_message.id == message.id:
found.append(queued)
for queued in found:
players[messages[0].guild.id].queue.remove(queued)
def play_after_callback(e, message, once):
if e:
print(f"player error: {e}")
if not once:
play_next(message)
def play_next(message, once=False, first=False):
message.guild.voice_client.stop()
if message.guild.id in players and players[message.guild.id].queue:
queued = players[message.guild.id].queue_pop()
try:
message.guild.voice_client.play(
queued.player, after=lambda e: play_after_callback(e, message, once)
)
except disnake.opus.OpusNotLoaded:
utils.load_opus()
message.guild.voice_client.play(
queued.player, after=lambda e: play_after_callback(e, message, once)
)
embed = disnake.Embed(
color=constants.EMBED_COLOR,
title=queued.player.title,
url=queued.player.original_url,
)
embed.add_field(name="Volume", value=f"{int(queued.player.volume*100)}%")
embed.add_field(name="Views", value=f"{queued.player.view_count:,}")
embed.add_field(
name="Queuer",
value=players[message.guild.id].current.trigger_message.author.mention,
)
embed.set_image(queued.player.thumbnail_url)
if first:
client.loop.create_task(utils.reply(message, embed=embed))
else:
client.loop.create_task(utils.channel_send(message, embed=embed))
async def ensure_joined(message):
if message.guild.voice_client is None:
if message.author.voice:
await message.author.voice.channel.connect()
else:
await utils.reply(message, "you are not connected to a voice channel!")
def command_allowed(message, immutable=False):
if not message.guild.voice_client:
return
if immutable:
return message.channel.id == message.guild.voice_client.channel.id
else:
if not message.author.voice:
return False
return message.author.voice.channel.id == message.guild.voice_client.channel.id

View File

@@ -0,0 +1,20 @@
from .channel import join, leave
from .playback import fast_forward, pause, playing, resume, volume
from .queue import queue_or_play, skip
from .sponsorblock import sponsorblock_command
from .utils import remove_queued
__all__ = [
"fast_forward",
"join",
"leave",
"pause",
"playing",
"queue_or_play",
"remove_queued",
"resume",
"skip",
"skip",
"sponsorblock_command",
"volume",
]

24
commands/voice/channel.py Normal file
View File

@@ -0,0 +1,24 @@
import utils
from .utils import command_allowed
async def join(message):
if message.author.voice:
if message.guild.voice_client:
await message.guild.voice_client.move_to(message.channel)
else:
await message.author.voice.channel.connect()
else:
await utils.reply(message, "you are not connected to a voice channel!")
return
await utils.add_check_reaction(message)
async def leave(message):
if not command_allowed(message):
return
await message.guild.voice_client.disconnect()
await utils.add_check_reaction(message)

168
commands/voice/playback.py Normal file
View File

@@ -0,0 +1,168 @@
import disnake_paginator
import arguments
import commands
import sponsorblock
import utils
from constants import EMBED_COLOR
from state import players
from .utils import command_allowed
async def playing(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"get information about the currently playing song",
)
parser.add_argument(
"-d",
"--description",
action="store_true",
help="get the description",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message, immutable=True):
return
if source := message.guild.voice_client.source:
if args.description:
if description := source.description:
paginator = disnake_paginator.ButtonPaginator(
invalid_user_function=utils.invalid_user_handler,
color=EMBED_COLOR,
title=source.title,
segments=disnake_paginator.split(description),
)
for embed in paginator.embeds:
embed.url = source.original_url
await paginator.start(utils.MessageInteractionWrapper(message))
else:
await utils.reply(
message,
source.description or "no description found!",
)
return
await utils.reply(
message,
embed=players[message.guild.id].current.embed(
is_paused=message.guild.voice_client.is_paused(),
),
)
else:
await utils.reply(
message,
"nothing is playing!",
)
async def resume(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_paused():
message.guild.voice_client.resume()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is paused!",
)
async def pause(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_playing():
message.guild.voice_client.pause()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is playing!",
)
async def fast_forward(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "skip the current sponsorblock segment")
parser.add_argument(
"-s",
"--seconds",
nargs="?",
type=lambda v: arguments.range_type(v, lower=0, upper=300),
help="the number of seconds to fast forward instead",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
seconds = args.seconds
if not seconds:
video = await sponsorblock.get_segments(
players[message.guild.id].current.player.id,
)
if not video:
await utils.reply(
message,
"no sponsorblock segments were found for this video!",
)
return
progress = message.guild.voice_client.source.original.progress
for segment in video["segments"]:
begin, end = map(float, segment["segment"])
if progress >= begin and progress < end:
seconds = end - message.guild.voice_client.source.original.progress
if not seconds:
await utils.reply(message, "no sponsorblock segment is currently playing!")
return
message.guild.voice_client.pause()
message.guild.voice_client.source.original.fast_forward(seconds)
message.guild.voice_client.resume()
await utils.add_check_reaction(message)
async def volume(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "get or set the current volume level")
parser.add_argument(
"volume",
nargs="?",
type=lambda v: arguments.range_type(v, lower=0, upper=150),
help="the volume level (0 - 150)",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message, immutable=True):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
if args.volume is None:
await utils.reply(
message,
f"{int(message.guild.voice_client.source.volume * 100)}",
)
else:
if not command_allowed(message):
return
message.guild.voice_client.source.volume = float(args.volume) / 100.0
await utils.add_check_reaction(message)

270
commands/voice/queue.py Normal file
View File

@@ -0,0 +1,270 @@
import itertools
import disnake
import disnake_paginator
import arguments
import audio
import commands
import utils
from constants import EMBED_COLOR
from state import client, players, trusted_users
from .playback import resume
from .utils import command_allowed, ensure_joined, play_next
async def queue_or_play(message, edited=False):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"queue a song, list the queue, or resume playback",
)
parser.add_argument("query", nargs="*", help="yt-dlp URL or query to get song")
parser.add_argument(
"-v",
"--volume",
default=50,
type=lambda v: arguments.range_type(v, lower=0, upper=150),
help="the volume level (0 - 150) for the specified song",
)
parser.add_argument(
"-i",
"--remove-index",
type=int,
nargs="*",
help="remove queued songs by index",
)
parser.add_argument(
"-m",
"--match-multiple",
action="store_true",
help="continue removing queued after finding a match",
)
parser.add_argument(
"-c",
"--clear",
action="store_true",
help="remove all queued songs",
)
parser.add_argument(
"--now",
action="store_true",
help="play the specified song immediately",
)
parser.add_argument(
"--next",
action="store_true",
help="play the specified song next",
)
parser.add_argument(
"-t",
"--remove-title",
help="remove queued songs by title",
)
parser.add_argument(
"-q",
"--remove-queuer",
type=int,
help="remove queued songs by queuer",
)
if not (args := await parser.parse_args(message, tokens)):
return
await ensure_joined(message)
if len(tokens) == 1 and tokens[0].lower() != "play":
if not command_allowed(message, immutable=True):
return
elif not command_allowed(message):
return
if message.guild.id not in players:
players[message.guild.id] = audio.queue.Player()
if edited:
found = next(
filter(
lambda queued: queued.trigger_message.id == message.id,
players[message.guild.id].queue,
),
None,
)
if found:
players[message.guild.id].queue.remove(found)
if args.clear:
players[message.guild.id].queue.clear()
await utils.add_check_reaction(message)
elif indices := args.remove_index:
targets = []
for i in indices:
if i <= 0 or i > len(players[message.guild.id].queue):
await utils.reply(message, f"invalid index `{i}`!")
return
targets.append(players[message.guild.id].queue[i - 1])
for target in targets:
if target in players[message.guild.id].queue:
players[message.guild.id].queue.remove(target)
if len(targets) == 1:
await utils.reply(message, f"**removed** {targets[0].format()}")
else:
await utils.reply(
message,
f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}",
)
elif args.remove_title or args.remove_queuer:
targets = set()
for queued in players[message.guild.id].queue:
if t := args.remove_title:
if t in queued.player.title:
targets.add(queued)
if q := args.remove_queuer:
if q == queued.trigger_message.author.id:
targets.add(queued)
targets = list(targets)
if not args.match_multiple:
targets = targets[:1]
for target in targets:
players[message.guild.id].queue.remove(target)
await utils.reply(
message,
f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}",
)
elif query := args.query:
if (
not message.channel.permissions_for(message.author).manage_channels
and len(
list(
filter(
lambda queued: queued.trigger_message.author.id
== message.author.id,
players[message.guild.id].queue,
),
),
)
>= 5
and not len(message.guild.voice_client.channel.members) == 2
and message.author.id not in trusted_users
):
await utils.reply(
message,
"you can only queue **5 items** without the manage channels permission!",
)
return
try:
async with message.channel.typing():
player = await audio.youtubedl.YTDLSource.from_url(
" ".join(query),
loop=client.loop,
stream=True,
)
player.volume = float(args.volume) / 100.0
except Exception as e:
await utils.reply(message, f"failed to queue: `{e}`")
return
queued = audio.queue.Song(player, message)
if args.now or args.next:
players[message.guild.id].queue_push_front(queued)
else:
players[message.guild.id].queue_push(queued)
if not message.guild.voice_client:
await utils.reply(message, "unexpected disconnect from voice channel!")
return
elif not message.guild.voice_client.source:
play_next(message, first=True)
elif args.now:
message.guild.voice_client.stop()
else:
await utils.reply(
message,
f"**{1 if args.next else len(players[message.guild.id].queue)}.** {queued.format()}",
)
utils.cooldown(message, 2)
elif tokens[0].lower() == "play":
await resume(message)
else:
if players[message.guild.id].queue:
formatted_duration = utils.format_duration(
sum(
[
queued.player.duration if queued.player.duration else 0
for queued in players[message.guild.id].queue
],
),
natural=True,
)
def embed(description):
e = disnake.Embed(
description=description,
color=EMBED_COLOR,
)
if formatted_duration and len(players[message.guild.id].queue) > 1:
e.set_footer(text=f"{formatted_duration} in total")
return e
await disnake_paginator.ButtonPaginator(
invalid_user_function=utils.invalid_user_handler,
color=EMBED_COLOR,
segments=list(
map(
embed,
[
"\n\n".join(
[
f"**{i + 1}.** {queued.format(show_queuer=True, hide_preview=True, multiline=True)}"
for i, queued in batch
],
)
for batch in itertools.batched(
enumerate(players[message.guild.id].queue),
10,
)
],
),
),
).start(utils.MessageInteractionWrapper(message))
else:
await utils.reply(
message,
"nothing is queued!",
)
async def skip(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "skip the song currently playing")
parser.add_argument(
"-n",
"--next",
action="store_true",
help="skip the next song",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message):
return
if not players[message.guild.id].queue:
message.guild.voice_client.stop()
await utils.reply(
message,
"the queue is empty now!",
)
elif args.next:
next = players[message.guild.id].queue.pop()
await utils.reply(message, f"**skipped** {next.format()}")
else:
message.guild.voice_client.stop()
await utils.add_check_reaction(message)
if not message.guild.voice_client.source:
play_next(message)

View File

@@ -0,0 +1,47 @@
import disnake
import audio
import sponsorblock
import utils
from constants import EMBED_COLOR, SPONSORBLOCK_CATEGORY_NAMES
from state import players
from .utils import command_allowed
async def sponsorblock_command(message):
if not command_allowed(message, immutable=True):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
progress = message.guild.voice_client.source.original.progress
video = await sponsorblock.get_segments(players[message.guild.id].current.player.id)
if not video:
await utils.reply(
message,
"no sponsorblock segments were found for this video!",
)
return
text = []
for segment in video["segments"]:
begin, end = map(int, segment["segment"])
if (category := segment["category"]) in SPONSORBLOCK_CATEGORY_NAMES:
category = SPONSORBLOCK_CATEGORY_NAMES[category]
current = "**" if progress >= begin and progress < end else ""
text.append(
f"{current}`{audio.utils.format_duration(begin)}` - `{audio.utils.format_duration(end)}`: {category}{current}",
)
await utils.reply(
message,
embed=disnake.Embed(
title="Sponsorblock segments",
description="\n".join(text),
color=EMBED_COLOR,
),
)

72
commands/voice/utils.py Normal file
View File

@@ -0,0 +1,72 @@
from logging import error
import disnake
import utils
from state import client, players
def play_after_callback(e, message, once):
if e:
error(f"player error: {e}")
if not once:
play_next(message)
def play_next(message, once=False, first=False):
if not message.guild.voice_client:
return
message.guild.voice_client.stop()
if not disnake.opus.is_loaded():
utils.load_opus()
if message.guild.id in players and players[message.guild.id].queue:
queued = players[message.guild.id].queue_pop()
message.guild.voice_client.play(
queued.player,
after=lambda e: play_after_callback(e, message, once),
)
embed = queued.embed()
if first and len(players[message.guild.id].queue) == 0:
client.loop.create_task(utils.reply(message, embed=embed))
else:
client.loop.create_task(utils.channel_send(message, embed=embed))
def remove_queued(messages):
if messages[0].guild.id not in players:
return
if len(players[messages[0].guild.id].queue) == 0:
return
found = []
for message in messages:
for queued in players[message.guild.id].queue:
if queued.trigger_message.id == message.id:
found.append(queued)
for queued in found:
players[messages[0].guild.id].queue.remove(queued)
async def ensure_joined(message):
if message.guild.voice_client is None:
if message.author.voice:
await message.author.voice.channel.connect()
else:
await utils.reply(message, "you are not connected to a voice channel!")
def command_allowed(message, immutable=False):
if not message.guild.voice_client:
return False
if immutable:
return True
if not message.author.voice:
return False
return message.author.voice.channel.id == message.guild.voice_client.channel.id

View File

@@ -15,25 +15,97 @@ YTDL_OPTIONS = {
"source_address": "0.0.0.0", "source_address": "0.0.0.0",
} }
BAR_LENGTH = 35
EMBED_COLOR = 0xFF6600 EMBED_COLOR = 0xFF6600
OWNERS = [531392146767347712] OWNERS = [531392146767347712]
PREFIX = "%" PREFIX = "%"
SPONSORBLOCK_CATEGORY_NAMES = {
"music_offtopic": "non-music",
"selfpromo": "self promotion",
"sponsor": "sponsored",
}
REACTIONS = {
"cat": ["🐈"],
"dog": ["🐕"],
"gn": ["💤", "😪", "😴", "🛌"],
"pizza": ["🍕"],
}
RELOADABLE_MODULES = [ RELOADABLE_MODULES = [
"arguments", "arguments",
"audio",
"audio.discord",
"audio.queue",
"audio.utils",
"audio.youtubedl",
"commands", "commands",
"commands.bot", "commands.bot",
"commands.tools", "commands.tools",
"commands.utils", "commands.utils",
"commands.voice", "commands.voice",
"commands.voice.channel",
"commands.voice.playback",
"commands.voice.playing",
"commands.voice.queue",
"commands.voice.sponsorblock",
"commands.voice.utils",
"constants", "constants",
"core", "core",
"events", "events",
"extra", "extra",
"fun",
"sponsorblock",
"tasks", "tasks",
"utils", "utils",
"utils.common",
"utils.discord",
"voice", "voice",
"youtubedl", "yt_dlp",
"yt_dlp.version",
] ]
PUBLIC_FLAGS = {
1 << 0: "Discord Employee",
1 << 1: "Discord Partner",
1 << 2: "HypeSquad Events",
1 << 3: "Bug Hunter Level 1",
1 << 6: "HypeSquad Bravery",
1 << 7: "HypeSquad Brilliance",
1 << 8: "HypeSquad Balance",
1 << 9: "Early Supporter",
1 << 10: "Team User",
1 << 14: "Bug Hunter Level 2",
1 << 16: "Verified Bot",
1 << 17: "Verified Bot Developer",
1 << 18: "Discord Certified Moderator",
1 << 19: "HTTP Interactions Only",
1 << 22: "Active Developer",
}
BADGE_EMOJIS = {
"Discord Employee": "<:DiscordStaff:879666899980546068>",
"Discord Partner": "<:DiscordPartner:879668340434534400>",
"HypeSquad Events": "<:HypeSquadEvents:879666970310606848>",
"Bug Hunter Level 1": "<:BugHunter1:879666851448234014>",
"HypeSquad Bravery": "<:HypeSquadBravery:879666945153175612>",
"HypeSquad Brilliance": "<:HypeSquadBrilliance:879666956884643861>",
"HypeSquad Balance": "<:HypeSquadBalance:879666934717771786>",
"Early Supporter": "<:EarlySupporter:879666916493496400>",
"Team User": "<:TeamUser:890866907996127305>",
"Bug Hunter Level 2": "<:BugHunter2:879666866971357224>",
"Verified Bot": "<:VerifiedBot:879670687554498591>",
"Verified Bot Developer": "<:VerifiedBotDeveloper:879669786550890507>",
"Discord Certified Moderator": "<:DiscordModerator:879666882976837654>",
"HTTP Interactions Only": "<:HTTPInteractionsOnly:1047141867806015559>",
"Active Developer": "<:ActiveDeveloper:1047141451244523592>",
}
APPLICATION_FLAGS = {
1 << 12: "Presence Intent",
1 << 13: "Presence Intent (unverified)",
1 << 14: "Guild Members Intent",
1 << 15: "Guild Members Intent (unverified)",
1 << 16: "Unusual Growth (verification suspended)",
1 << 18: "Message Content Intent",
1 << 19: "Message Content Intent (unverified)",
1 << 23: "Suports Application Commands",
}
SECRETS = { SECRETS = {
"TOKEN": os.getenv("BOT_TOKEN"), "TOKEN": os.getenv("BOT_TOKEN"),

121
core.py
View File

@@ -3,21 +3,24 @@ import contextlib
import importlib import importlib
import inspect import inspect
import io import io
import signal
import textwrap import textwrap
import time import time
import traceback import traceback
from logging import debug
import disnake import disnake
import disnake_paginator import disnake_paginator
import commands import commands
import constants
import utils import utils
from state import client, command_locks, idle_tracker from commands import Command as C
from constants import EMBED_COLOR, OWNERS, PREFIX, RELOADABLE_MODULES
from state import client, command_cooldowns, command_locks, idle_tracker
async def on_message(message, edited=False): async def on_message(message, edited=False):
if not message.content.startswith(constants.PREFIX) or message.author.bot: if not message.content.startswith(PREFIX) or message.author.bot:
return return
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
@@ -38,52 +41,56 @@ async def on_message(message, edited=False):
f"ambiguous command, could be {' or '.join([f'`{match.value}`' for match in matched])}", f"ambiguous command, could be {' or '.join([f'`{match.value}`' for match in matched])}",
) )
return return
matched = matched[0]
if message.guild.id not in command_locks: if (message.guild.id, message.author.id) not in command_locks:
command_locks[message.guild.id] = asyncio.Lock() command_locks[(message.guild.id, message.author.id)] = asyncio.Lock()
await command_locks[(message.guild.id, message.author.id)].acquire()
C = commands.Command
try: try:
match matched[0]: if (cooldowns := command_cooldowns.get(message.author.id)) and not edited:
case C.RELOAD if message.author.id in constants.OWNERS: if (end_time := cooldowns.get(matched)) and (
reloaded_modules = set() remaining_time := round(end_time - time.time()) > 0
rreload(reloaded_modules, __import__("core")) ):
rreload(reloaded_modules, __import__("extra")) await utils.reply(
for module in filter( message,
lambda v: inspect.ismodule(v) f"please wait **{utils.format_duration(remaining_time, natural=True)}** before using this command again!",
and v.__name__ in constants.RELOADABLE_MODULES, )
globals().values(), return
):
rreload(reloaded_modules, module) match matched:
case C.RELOAD if message.author.id in OWNERS:
start = time.time()
reloaded_modules = reload()
end = time.time()
debug(
f"reloaded {len(reloaded_modules)} modules in {round(end - start, 2)}s",
)
await utils.add_check_reaction(message) await utils.add_check_reaction(message)
case C.EXECUTE if message.author.id in constants.OWNERS:
case C.EXECUTE if message.author.id in OWNERS:
code = message.content[len(tokens[0]) + 1 :].strip().strip("`") code = message.content[len(tokens[0]) + 1 :].strip().strip("`")
for replacement in ["python", "py"]: for replacement in ["python", "py"]:
if code.startswith(replacement): if code.startswith(replacement):
code = code[len(replacement) :] code = code[len(replacement) :]
stdout = io.StringIO()
try: try:
stdout = io.StringIO()
with contextlib.redirect_stdout(stdout): with contextlib.redirect_stdout(stdout):
if "#globals" in code: wrapped_code = (
exec( f"async def run_code():\n{textwrap.indent(code, ' ')}"
f"async def run_code():\n{textwrap.indent(code, ' ')}", )
globals(), if "# globals" in code:
) exec(wrapped_code, globals())
await globals()["run_code"]() await globals()["run_code"]()
else: else:
dictionary = dict(locals(), **globals()) dictionary = dict(locals(), **globals())
exec( exec(wrapped_code, dictionary, dictionary)
f"async def run_code():\n{textwrap.indent(code, ' ')}",
dictionary,
dictionary,
)
await dictionary["run_code"]() await dictionary["run_code"]()
output = stdout.getvalue() output = stdout.getvalue()
except Exception as e: except Exception as e:
output = "`" + str(e) + "`" output = "`" + str(e) + "`"
output = utils.filter_secrets(output) output = utils.filter_secrets(output)
if len(output) > 2000: if len(output) > 2000:
@@ -92,25 +99,24 @@ async def on_message(message, edited=False):
prefix="```\n", prefix="```\n",
suffix="```", suffix="```",
invalid_user_function=utils.invalid_user_handler, invalid_user_function=utils.invalid_user_handler,
color=constants.EMBED_COLOR, color=EMBED_COLOR,
segments=disnake_paginator.split(output), segments=disnake_paginator.split(output),
).start(utils.MessageInteractionWrapper(message)) ).start(utils.MessageInteractionWrapper(message))
elif len(output.strip()) == 0: elif len(output.strip()) == 0:
await utils.add_check_reaction(message) await utils.add_check_reaction(message)
else: else:
await utils.reply(message, output) await utils.reply(message, output)
case C.CLEAR | C.PURGE if message.author.id in constants.OWNERS:
case C.CLEAR | C.PURGE if message.author.id in OWNERS:
await commands.tools.clear(message) await commands.tools.clear(message)
case C.JOIN: case C.JOIN:
await commands.voice.join(message) await commands.voice.join(message)
case C.LEAVE: case C.LEAVE:
await commands.voice.leave(message) await commands.voice.leave(message)
case C.QUEUE | C.PLAY: case C.QUEUE | C.PLAY:
async with command_locks[message.guild.id]: await commands.voice.queue_or_play(message, edited)
await commands.voice.queue_or_play(message, edited)
case C.SKIP: case C.SKIP:
async with command_locks[message.guild.id]: await commands.voice.skip(message)
await commands.voice.skip(message)
case C.RESUME: case C.RESUME:
await commands.voice.resume(message) await commands.voice.resume(message)
case C.PAUSE: case C.PAUSE:
@@ -125,33 +131,45 @@ async def on_message(message, edited=False):
await commands.voice.playing(message) await commands.voice.playing(message)
case C.FAST_FORWARD: case C.FAST_FORWARD:
await commands.voice.fast_forward(message) await commands.voice.fast_forward(message)
case C.STATUS:
await commands.bot.status(message)
case C.PING:
await commands.bot.ping(message)
case C.LOOKUP:
await commands.tools.lookup(message)
case C.SPONSORBLOCK:
await commands.voice.sponsorblock_command(message)
except Exception as e: except Exception as e:
await utils.reply( await utils.reply(
message, message,
f"exception occurred while processing command: ```\n{''.join(traceback.format_exception(e)).replace('`', '\\`')}```", f"exception occurred while processing command: ```\n{''.join(traceback.format_exception(e)).replace('`', '\\`')}```",
) )
raise e
finally:
command_locks[(message.guild.id, message.author.id)].release()
async def on_voice_state_update(_, before, after): async def on_voice_state_update(_, before, after):
def is_empty(channel): def is_empty(channel):
return [m.id for m in (channel.members if channel else [])] == [client.user.id] return [m.id for m in (channel.members if channel else [])] == [client.user.id]
c = None channel = None
if is_empty(before.channel): if is_empty(before.channel):
c = before.channel channel = before.channel
elif is_empty(after.channel): elif is_empty(after.channel):
c = after.channel channel = after.channel
if c:
await c.guild.voice_client.disconnect() if channel:
await channel.guild.voice_client.disconnect()
def rreload(reloaded_modules, module): def rreload(reloaded_modules, module):
reloaded_modules.add(module.__name__) reloaded_modules.add(module.__name__)
for submodule in filter( for submodule in filter(
lambda v: inspect.ismodule(v) lambda sm: inspect.ismodule(sm)
and v.__name__ in constants.RELOADABLE_MODULES and sm.__name__ in RELOADABLE_MODULES
and v.__name__ not in reloaded_modules, and sm.__name__ not in reloaded_modules,
vars(module).values(), vars(module).values(),
): ):
rreload(reloaded_modules, submodule) rreload(reloaded_modules, submodule)
@@ -160,3 +178,18 @@ def rreload(reloaded_modules, module):
if "__reload_module__" in dir(module): if "__reload_module__" in dir(module):
module.__reload_module__() module.__reload_module__()
def reload(*_):
reloaded_modules = set()
rreload(reloaded_modules, __import__("core"))
rreload(reloaded_modules, __import__("extra"))
for module in filter(
lambda v: inspect.ismodule(v) and v.__name__ in RELOADABLE_MODULES,
globals().values(),
):
rreload(reloaded_modules, module)
return reloaded_modules
signal.signal(signal.SIGUSR1, reload)

View File

@@ -1,11 +1,12 @@
import asyncio import asyncio
import threading import threading
import time from logging import debug, info, warning
import commands import commands
import core import core
import fun
import tasks import tasks
from state import client, start_time from state import client
def prepare(): def prepare():
@@ -20,15 +21,16 @@ def prepare():
async def on_bulk_message_delete(messages): async def on_bulk_message_delete(messages):
commands.voice.delete_queued(messages) commands.voice.remove_queued(messages)
async def on_message(message): async def on_message(message):
await core.on_message(message) await core.on_message(message)
await fun.on_message(message)
async def on_message_delete(message): async def on_message_delete(message):
commands.voice.delete_queued([message]) commands.voice.remove_queued([message])
async def on_message_edit(before, after): async def on_message_edit(before, after):
@@ -38,19 +40,29 @@ async def on_message_edit(before, after):
await core.on_message(after, edited=True) await core.on_message(after, edited=True)
async def on_ready():
print(f"logged in as {client.user} in {round(time.time() - start_time, 1)}s")
async def on_voice_state_update(member, before, after): async def on_voice_state_update(member, before, after):
await core.on_voice_state_update(member, before, after) await core.on_voice_state_update(member, before, after)
async def on_ready():
info(f"logged in as {client.user}")
async def on_connect():
debug("connected to the gateway!")
async def on_disconnect():
warning("disconnected from the gateway!")
for event_type, handlers in client.get_listeners().items(): for event_type, handlers in client.get_listeners().items():
for handler in handlers: for handler in handlers:
client.remove_listener(handler, event_type) client.remove_listener(handler, event_type)
client.add_listener(on_bulk_message_delete, "on_bulk_message_delete") client.add_listener(on_bulk_message_delete, "on_bulk_message_delete")
client.add_listener(on_connect, "on_connect")
client.add_listener(on_disconnect, "on_disconnect")
client.add_listener(on_message, "on_message") client.add_listener(on_message, "on_message")
client.add_listener(on_message_delete, "on_message_delete") client.add_listener(on_message_delete, "on_message_delete")
client.add_listener(on_message_edit, "on_message_edit") client.add_listener(on_message_edit, "on_message_edit")

View File

@@ -4,15 +4,19 @@ import string
import disnake import disnake
import youtube_transcript_api import youtube_transcript_api
from state import client, players from state import client, kill, players
async def transcript( async def transcript(
message, languages=["en"], max_messages=6, min_messages=3, upper=True message,
languages=["en"],
max_messages=6,
min_messages=3,
upper=True,
): ):
initial_id = message.guild.voice_client.source.id initial_id = message.guild.voice_client.source.id
transcript_list = youtube_transcript_api.YouTubeTranscriptApi.list_transcripts( transcript_list = youtube_transcript_api.YouTubeTranscriptApi.list_transcripts(
initial_id initial_id,
) )
try: try:
transcript = transcript_list.find_manually_created_transcript(languages).fetch() transcript = transcript_list.find_manually_created_transcript(languages).fetch()
@@ -39,13 +43,18 @@ async def transcript(
) )
if len(messages) > max_messages: if len(messages) > max_messages:
try: try:
await message.channel.delete_messages( count = min(min_messages, len(messages))
[messages.pop() for _ in range(max_messages - min_messages)] if count == 1:
) await messages.pop().delete()
else:
await message.channel.delete_messages(
[messages.pop() for _ in range(count)],
)
except Exception: except Exception:
pass pass
if message.guild.voice_client.source.id != initial_id: if (message.guild.voice_client.source.id != initial_id) or kill["transcript"]:
kill["transcript"] = False
break break
@@ -72,19 +81,20 @@ def messages_per_second(limit=500):
average = 1 average = 1
print( print(
f"I am receiving **{average} {'message' if average == 1 else 'messages'} per second** " f"I am receiving **{average} {'message' if average == 1 else 'messages'} per second** "
f"from **{len(members)} {'member' if len(members) == 1 else 'members'}** across **{len(guilds)} {'guild' if len(guilds) == 1 else 'guilds'}**" f"from **{len(members)} {'member' if len(members) == 1 else 'members'}** across **{len(guilds)} {'guild' if len(guilds) == 1 else 'guilds'}**",
) )
async def auto_count(channel_id): async def auto_count(channel_id: int):
if (channel := await client.fetch_channel(channel_id)) and isinstance( if (channel := await client.fetch_channel(channel_id)) and isinstance(
channel, disnake.TextChannel channel,
disnake.TextChannel,
): ):
last_message = (await channel.history(limit=1).flatten())[0] last_message = (await channel.history(limit=1).flatten())[0]
try: try:
result = str( result = str(
int("".join(filter(lambda d: d in string.digits, last_message.content))) int("".join(filter(lambda d: d in string.digits, last_message.content)))
+ 1 + 1,
) )
except Exception: except Exception:
result = "where number" result = "where number"

13
fun.py Normal file
View File

@@ -0,0 +1,13 @@
import random
import commands
from constants import REACTIONS
async def on_message(message):
if random.random() < 0.01:
tokens = commands.tokenize(message.content, remove_prefix=False)
for keyword, options in REACTIONS.items():
if keyword in tokens:
await message.add_reaction(random.choice(options))
break

13
main.py
View File

@@ -1,7 +1,20 @@
import logging
import constants import constants
import events import events
from state import client from state import client
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(
format=(
"%(asctime)s %(levelname)s %(name)s:%(module)s %(message)s"
if __debug__
else "%(asctime)s %(levelname)s %(message)s"
),
datefmt="%Y-%m-%d %T",
level=logging.DEBUG if __debug__ else logging.INFO,
)
logging.getLogger("disnake").setLevel(logging.WARNING)
events.prepare() events.prepare()
client.run(constants.SECRETS["TOKEN"]) client.run(constants.SECRETS["TOKEN"])

View File

@@ -1,6 +1,8 @@
aiohttp
audioop-lts audioop-lts
disnake disnake
disnake_paginator disnake_paginator
psutil
PyNaCl PyNaCl
youtube_transcript_api youtube_transcript_api
yt-dlp yt-dlp[default] @ https://github.com/yt-dlp/yt-dlp/archive/master.tar.gz

37
sponsorblock.py Normal file
View File

@@ -0,0 +1,37 @@
import hashlib
import json
import aiohttp
from state import sponsorblock_cache
categories = json.dumps(
[
"interaction",
"intro",
"music_offtopic",
"outro",
"preview",
"selfpromo",
"sponsor",
],
)
async def get_segments(video_id: str):
if video_id in sponsorblock_cache:
return sponsorblock_cache[video_id]
hash_prefix = hashlib.sha256(video_id.encode()).hexdigest()[:4]
session = aiohttp.ClientSession()
response = await session.get(
f"https://sponsor.ajay.app/api/skipSegments/{hash_prefix}",
params={"categories": categories},
)
if response.status == 200 and (
results := list(
filter(lambda v: video_id == v["videoID"], await response.json()),
)
):
sponsorblock_cache[video_id] = results[0]
return results[0]

View File

@@ -1,32 +1,20 @@
import collections
import time import time
import disnake import disnake
from utils import LimitedSizeDict
class LimitedSizeDict(collections.OrderedDict):
def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", 1000)
super().__init__(*args, **kwds)
self._check_size_limit()
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is not None:
while len(self) > self.size_limit:
self.popitem(last=False)
intents = disnake.Intents.default() intents = disnake.Intents.default()
intents.message_content = True intents.message_content = True
intents.members = True intents.members = True
client = disnake.Client(intents=intents) client = disnake.Client(intents=intents)
command_cooldowns = LimitedSizeDict()
command_locks = LimitedSizeDict() command_locks = LimitedSizeDict()
idle_tracker = {"is_idle": False, "last_used": time.time()} idle_tracker = {"is_idle": False, "last_used": time.time()}
kill = {"transcript": False}
message_responses = LimitedSizeDict() message_responses = LimitedSizeDict()
players = {} players = {}
sponsorblock_cache = LimitedSizeDict()
start_time = time.time() start_time = time.time()
trusted_users = []

View File

@@ -1,5 +1,6 @@
import asyncio import asyncio
import time import time
from logging import debug, error
import disnake import disnake
@@ -7,6 +8,8 @@ from state import client, idle_tracker, players
async def cleanup(): async def cleanup():
debug("spawned cleanup thread")
while True: while True:
await asyncio.sleep(3600) await asyncio.sleep(3600)
@@ -16,10 +19,14 @@ async def cleanup():
targets.append(guild_id) targets.append(guild_id)
for target in targets: for target in targets:
del players[target] del players[target]
debug(f"cleanup thread removed {len(targets)} empty players")
if ( if (
not idle_tracker["is_idle"] not idle_tracker["is_idle"]
and time.time() - idle_tracker["last_used"] >= 3600 and time.time() - idle_tracker["last_used"] >= 3600
): ):
await client.change_presence(status=disnake.Status.idle) try:
idle_tracker["is_idle"] = True await client.change_presence(status=disnake.Status.idle)
idle_tracker["is_idle"] = True
except Exception as e:
error(f"failed to change status to idle: {e}")

View File

@@ -1,3 +1,3 @@
from . import test_format_duration from . import test_filter_secrets, test_format_duration
__all__ = ["test_format_duration"] __all__ = ["test_filter_secrets", "test_format_duration"]

View File

@@ -0,0 +1,21 @@
import unittest
import utils
class TestFilterSecrets(unittest.TestCase):
def test_filter_secrets(self):
secret = "PLACEHOLDER_TOKEN"
self.assertFalse(
secret in utils.filter_secrets(f"HELLO{secret}WORLD", {"TOKEN": secret}),
)
self.assertFalse(secret in utils.filter_secrets(secret, {"TOKEN": secret}))
self.assertFalse(
secret in utils.filter_secrets(f"123{secret}", {"TOKEN": secret}),
)
self.assertFalse(
secret in utils.filter_secrets(f"{secret}{secret}", {"TOKEN": secret}),
)
self.assertFalse(
secret in utils.filter_secrets(f"{secret}@#(*&*$)", {"TOKEN": secret}),
)

View File

@@ -1,61 +1,107 @@
import unittest import unittest
import audio
import utils import utils
import youtubedl
class TestFormatDuration(unittest.TestCase): class TestFormatDuration(unittest.TestCase):
def test_youtubedl(self): def test_audio(self):
self.assertEqual(youtubedl.format_duration(0), "00:00") def f(s):
self.assertEqual(youtubedl.format_duration(0.5), "00:00") return audio.utils.format_duration(s)
self.assertEqual(youtubedl.format_duration(60.5), "01:00")
self.assertEqual(youtubedl.format_duration(1), "00:01") self.assertEqual(f(0), "00:00")
self.assertEqual(youtubedl.format_duration(60), "01:00") self.assertEqual(f(0.5), "00:00")
self.assertEqual(youtubedl.format_duration(60 + 30), "01:30") self.assertEqual(f(60.5), "01:00")
self.assertEqual(youtubedl.format_duration(60 * 60), "01:00:00") self.assertEqual(f(1), "00:01")
self.assertEqual(youtubedl.format_duration(60 * 60 + 30), "01:00:30") self.assertEqual(f(60), "01:00")
self.assertEqual(f(60 + 30), "01:30")
self.assertEqual(f(60 * 60), "01:00:00")
self.assertEqual(f(60 * 60 + 30), "01:00:30")
def test_utils(self): def test_utils(self):
self.assertEqual(utils.format_duration(0), "") def f(s):
self.assertEqual(utils.format_duration(60 * 60 * 24 * 7), "1 week") return utils.format_duration(s)
self.assertEqual(utils.format_duration(60 * 60 * 24 * 21), "3 weeks")
self.assertEqual(f(0), "")
self.assertEqual(f(60 * 60 * 24 * 7), "1 week")
self.assertEqual(f(60 * 60 * 24 * 21), "3 weeks")
self.assertEqual( self.assertEqual(
utils.format_duration((60 * 60 * 24 * 21) - 1), f((60 * 60 * 24 * 21) - 1),
"2 weeks, 6 days, 23 hours, 59 minutes, 59 seconds", "2 weeks, 6 days, 23 hours, 59 minutes, 59 seconds",
) )
self.assertEqual(utils.format_duration(60), "1 minute") self.assertEqual(f(60), "1 minute")
self.assertEqual(utils.format_duration(60 * 2), "2 minutes") self.assertEqual(f(60 * 2), "2 minutes")
self.assertEqual(utils.format_duration(60 * 59), "59 minutes") self.assertEqual(f(60 * 59), "59 minutes")
self.assertEqual(utils.format_duration(60 * 60), "1 hour") self.assertEqual(f(60 * 60), "1 hour")
self.assertEqual(utils.format_duration(60 * 60 * 2), "2 hours") self.assertEqual(f(60 * 60 * 2), "2 hours")
self.assertEqual(utils.format_duration(1), "1 second") self.assertEqual(f(1), "1 second")
self.assertEqual(utils.format_duration(60 + 5), "1 minute, 5 seconds") self.assertEqual(f(60 + 5), "1 minute, 5 seconds")
self.assertEqual(utils.format_duration(60 * 60 + 30), "1 hour, 30 seconds") self.assertEqual(f(60 * 60 + 30), "1 hour, 30 seconds")
self.assertEqual( self.assertEqual(f(60 * 60 + 60 + 30), "1 hour, 1 minute, 30 seconds")
utils.format_duration(60 * 60 + 60 + 30), "1 hour, 1 minute, 30 seconds" self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1 week, 30 seconds")
)
self.assertEqual(
utils.format_duration(60 * 60 * 24 * 7 + 30), "1 week, 30 seconds"
)
def test_utils_natural(self): def test_utils_natural(self):
def format(seconds: int): def f(s):
return utils.format_duration(seconds, natural=True) return utils.format_duration(s, natural=True)
self.assertEqual(format(0), "") self.assertEqual(f(0), "")
self.assertEqual(format(60 * 60 * 24 * 7), "1 week") self.assertEqual(f(60 * 60 * 24 * 7), "1 week")
self.assertEqual(format(60 * 60 * 24 * 21), "3 weeks") self.assertEqual(f(60 * 60 * 24 * 21), "3 weeks")
self.assertEqual( self.assertEqual(
format((60 * 60 * 24 * 21) - 1), f((60 * 60 * 24 * 21) - 1),
"2 weeks, 6 days, 23 hours, 59 minutes and 59 seconds", "2 weeks, 6 days, 23 hours, 59 minutes and 59 seconds",
) )
self.assertEqual(format(60), "1 minute") self.assertEqual(f(60), "1 minute")
self.assertEqual(format(60 * 2), "2 minutes") self.assertEqual(f(60 * 2), "2 minutes")
self.assertEqual(format(60 * 59), "59 minutes") self.assertEqual(f(60 * 59), "59 minutes")
self.assertEqual(format(60 * 60), "1 hour") self.assertEqual(f(60 * 60), "1 hour")
self.assertEqual(format(60 * 60 * 2), "2 hours") self.assertEqual(f(60 * 60 * 2), "2 hours")
self.assertEqual(format(1), "1 second") self.assertEqual(f(1), "1 second")
self.assertEqual(format(60 + 5), "1 minute and 5 seconds") self.assertEqual(f(60 + 5), "1 minute and 5 seconds")
self.assertEqual(format(60 * 60 + 30), "1 hour and 30 seconds") self.assertEqual(f(60 * 60 + 30), "1 hour and 30 seconds")
self.assertEqual(format(60 * 60 + 60 + 30), "1 hour, 1 minute and 30 seconds") self.assertEqual(f(60 * 60 + 60 + 30), "1 hour, 1 minute and 30 seconds")
self.assertEqual(format(60 * 60 * 24 * 7 + 30), "1 week and 30 seconds") self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1 week and 30 seconds")
def test_utils_short(self):
def f(s):
return utils.format_duration(s, short=True)
self.assertEqual(f(0), "")
self.assertEqual(f(60 * 60 * 24 * 7), "1w")
self.assertEqual(f(60 * 60 * 24 * 21), "3w")
self.assertEqual(
f((60 * 60 * 24 * 21) - 1),
"2w 6d 23h 59m 59s",
)
self.assertEqual(f(60), "1m")
self.assertEqual(f(60 * 2), "2m")
self.assertEqual(f(60 * 59), "59m")
self.assertEqual(f(60 * 60), "1h")
self.assertEqual(f(60 * 60 * 2), "2h")
self.assertEqual(f(1), "1s")
self.assertEqual(f(60 + 5), "1m 5s")
self.assertEqual(f(60 * 60 + 30), "1h 30s")
self.assertEqual(f(60 * 60 + 60 + 30), "1h 1m 30s")
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1w 30s")
def test_utils_natural_short(self):
def f(s):
return utils.format_duration(s, natural=True, short=True)
self.assertEqual(f(0), "")
self.assertEqual(f(60 * 60 * 24 * 7), "1w")
self.assertEqual(f(60 * 60 * 24 * 21), "3w")
self.assertEqual(
f((60 * 60 * 24 * 21) - 1),
"2w 6d 23h 59m and 59s",
)
self.assertEqual(f(60), "1m")
self.assertEqual(f(60 * 2), "2m")
self.assertEqual(f(60 * 59), "59m")
self.assertEqual(f(60 * 60), "1h")
self.assertEqual(f(60 * 60 * 2), "2h")
self.assertEqual(f(1), "1s")
self.assertEqual(f(60 + 5), "1m and 5s")
self.assertEqual(f(60 * 60 + 30), "1h and 30s")
self.assertEqual(f(60 * 60 + 60 + 30), "1h 1m and 30s")
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1w and 30s")

115
utils.py
View File

@@ -1,115 +0,0 @@
import os
import disnake
import constants
from state import message_responses
class ChannelResponseWrapper:
def __init__(self, message):
self.message = message
self.sent_message = None
async def send_message(self, **kwargs):
if "ephemeral" in kwargs:
del kwargs["ephemeral"]
self.sent_message = await reply(self.message, **kwargs)
async def edit_message(self, content=None, embed=None, view=None):
if self.sent_message:
content = content or self.sent_message.content
if not embed and len(self.sent_message.embeds) > 0:
embed = self.sent_message.embeds[0]
await self.sent_message.edit(content=content, embed=embed, view=view)
class MessageInteractionWrapper:
def __init__(self, message):
self.message = message
self.author = message.author
self.response = ChannelResponseWrapper(message)
async def edit_original_message(self, content=None, embed=None, view=None):
await self.response.edit_message(content=content, embed=embed, view=view)
def format_duration(duration: int, natural: bool = False):
def format_plural(noun, count):
return noun if count == 1 else noun + "s"
segments = []
weeks, duration = divmod(duration, 604800)
if weeks > 0:
segments.append(f"{weeks} {format_plural('week', weeks)}")
days, duration = divmod(duration, 86400)
if days > 0:
segments.append(f"{days} {format_plural('day', days)}")
hours, duration = divmod(duration, 3600)
if hours > 0:
segments.append(f"{hours} {format_plural('hour', hours)}")
minutes, duration = divmod(duration, 60)
if minutes > 0:
segments.append(f"{minutes} {format_plural('minute', minutes)}")
if duration > 0:
segments.append(f"{duration} {format_plural('second', duration)}")
if not natural or len(segments) <= 1:
return ", ".join(segments)
return ", ".join(segments[:-1]) + f" and {segments[-1]}"
async def add_check_reaction(message):
await message.add_reaction("")
async def reply(message, *args, **kwargs):
if message.id in message_responses:
await message_responses[message.id].edit(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
)
else:
response = await message.reply(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
)
message_responses[message.id] = response
return message_responses[message.id]
async def channel_send(message, *args, **kwargs):
await message.channel.send(
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
)
async def invalid_user_handler(interaction):
await interaction.response.send_message(
"you are not the intended receiver of this message!", ephemeral=True
)
def filter_secrets(text: str) -> str:
for secret_name, secret in constants.SECRETS.items():
if not secret:
continue
text = text.replace(secret, f"<{secret_name}>")
return text
def load_opus():
print("opus wasn't automatically loaded! trying to load manually...")
for path in ["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"]:
if os.path.exists(path):
try:
disnake.opus.load_opus(path)
print(f"successfully loaded opus from {path}")
return
except Exception as e:
print(f"failed to load opus from {path}: {e}")
raise Exception("could not locate working opus library")

28
utils/__init__.py Normal file
View File

@@ -0,0 +1,28 @@
from .common import LimitedSizeDict, filter_secrets, format_duration, surround
from .discord import (
ChannelResponseWrapper,
MessageInteractionWrapper,
add_check_reaction,
channel_send,
cooldown,
invalid_user_handler,
load_opus,
reply,
snowflake_timestamp,
)
__all__ = [
"add_check_reaction",
"channel_send",
"ChannelResponseWrapper",
"cooldown",
"filter_secrets",
"format_duration",
"invalid_user_handler",
"LimitedSizeDict",
"load_opus",
"MessageInteractionWrapper",
"reply",
"snowflake_timestamp",
"surround",
]

64
utils/common.py Normal file
View File

@@ -0,0 +1,64 @@
from collections import OrderedDict
from constants import SECRETS
def surround(inner: str, outer="```") -> str:
return outer + str(inner) + outer
def format_duration(duration: int, natural: bool = False, short: bool = False) -> str:
def format_plural(noun, count):
if short:
return noun[0]
return " " + (noun if count == 1 else noun + "s")
segments = []
weeks, duration = divmod(duration, 604800)
if weeks > 0:
segments.append(f"{weeks}{format_plural('week', weeks)}")
days, duration = divmod(duration, 86400)
if days > 0:
segments.append(f"{days}{format_plural('day', days)}")
hours, duration = divmod(duration, 3600)
if hours > 0:
segments.append(f"{hours}{format_plural('hour', hours)}")
minutes, duration = divmod(duration, 60)
if minutes > 0:
segments.append(f"{minutes}{format_plural('minute', minutes)}")
if duration > 0:
segments.append(f"{duration}{format_plural('second', duration)}")
separator = " " if short else ", "
if not natural or len(segments) <= 1:
return separator.join(segments)
return separator.join(segments[:-1]) + f" and {segments[-1]}"
def filter_secrets(text: str, secrets=SECRETS) -> str:
for secret_name, secret in secrets.items():
if not secret:
continue
text = text.replace(secret, f"<{secret_name}>")
return text
class LimitedSizeDict(OrderedDict):
def __init__(self, *args, **kwargs):
self.size_limit = kwargs.pop("size_limit", 100)
super().__init__(*args, **kwargs)
self._check_size_limit()
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is not None:
while len(self) > self.size_limit:
self.popitem(last=False)

118
utils/discord.py Normal file
View File

@@ -0,0 +1,118 @@
import time
from logging import error, info
from pathlib import Path
import disnake
import commands
from constants import OWNERS
from state import command_cooldowns, message_responses
def cooldown(message, cooldown_time: int):
if message.author.id in OWNERS:
return
possible_commands = commands.match(message.content)
if not possible_commands or len(possible_commands) > 1:
return
command = possible_commands[0]
end_time = time.time() + cooldown_time
if message.author.id in command_cooldowns:
command_cooldowns[message.author.id][command] = end_time
else:
command_cooldowns[message.author.id] = {command: end_time}
async def reply(message, *args, **kwargs):
if message.id in message_responses:
if len(args) == 0:
kwargs["content"] = None
elif len(kwargs) == 0:
kwargs["embeds"] = []
try:
await message_responses[message.id].edit(
*args,
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
)
return
except Exception:
pass
try:
response = await message.reply(
*args,
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
)
except Exception:
response = await channel_send(message, *args, **kwargs)
message_responses[message.id] = response
return message_responses[message.id]
async def channel_send(message, *args, **kwargs):
await message.channel.send(
*args,
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
)
def load_opus():
for path in filter(
lambda p: Path(p).exists(),
["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"],
):
try:
disnake.opus.load_opus(path)
info(f"successfully loaded opus from {path}")
return
except Exception as e:
error(f"failed to load opus from {path}: {e}")
raise Exception("could not locate working opus library")
def snowflake_timestamp(snowflake) -> int:
return round(((snowflake >> 22) + 1420070400000) / 1000)
async def add_check_reaction(message):
await message.add_reaction("")
async def invalid_user_handler(interaction):
await interaction.response.send_message(
"you are not the intended receiver of this message!",
ephemeral=True,
)
class ChannelResponseWrapper:
def __init__(self, message):
self.message = message
self.sent_message = None
async def send_message(self, **kwargs):
kwargs.pop("ephemeral", None)
self.sent_message = await reply(self.message, **kwargs)
async def edit_message(self, content=None, embed=None, view=None):
if self.sent_message:
content = content or self.sent_message.content
if not embed and len(self.sent_message.embeds) > 0:
embed = self.sent_message.embeds[0]
await self.sent_message.edit(content=content, embed=embed, view=view)
class MessageInteractionWrapper:
def __init__(self, message):
self.message = message
self.author = message.author
self.response = ChannelResponseWrapper(message)
async def edit_original_message(self, content=None, embed=None, view=None):
await self.response.edit_message(content=content, embed=embed, view=view)

View File

@@ -1,137 +0,0 @@
import asyncio
import collections
from dataclasses import dataclass
from typing import Any, Optional
import disnake
import yt_dlp
import constants
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)
class CustomAudioSource(disnake.AudioSource):
def __init__(self, source):
self._source = source
self.read_count = 0
def read(self) -> bytes:
data = self._source.read()
if data:
self.read_count += 1
return data
def fast_forward(self, seconds: int):
for _ in range(int(seconds / 0.02)):
self.read()
@property
def progress(self) -> float:
return self.read_count * 0.02
class YTDLSource(disnake.PCMVolumeTransformer):
def __init__(
self, source: CustomAudioSource, *, data: dict[str, Any], volume: float = 0.5
):
super().__init__(source, volume)
self.description = data.get("description")
self.duration = data.get("duration")
self.id = data.get("id")
self.original_url = data.get("original_url")
self.thumbnail_url = data.get("thumbnail")
self.title = data.get("title")
self.view_count = data.get("view_count")
@classmethod
async def from_url(
cls,
url,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
stream: bool = False,
):
loop = loop or asyncio.get_event_loop()
data: Any = await loop.run_in_executor(
None, lambda: ytdl.extract_info(url, download=not stream)
)
if "entries" in data:
data = data["entries"][0]
return cls(
CustomAudioSource(
disnake.FFmpegPCMAudio(
data["url"] if stream else ytdl.prepare_filename(data),
before_options="-vn -reconnect 1",
)
),
data=data,
)
def __repr__(self):
return f"<YTDLSource title={self.title} original_url=<{self.original_url}> duration={self.duration}>"
def __str__(self):
return self.__repr__()
@dataclass
class QueuedSong:
player: YTDLSource
trigger_message: disnake.Message
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
if multiline:
return (
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})\n**duration:** {format_duration(self.player.duration) if self.player.duration else '[live]'}"
+ (
f", **queued by:** <@{self.trigger_message.author.id}>"
if show_queuer
else ""
)
)
else:
return (
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''}) [**{format_duration(self.player.duration) if self.player.duration else 'live'}**]"
+ (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "")
)
def __str__(self):
return self.__repr__()
@dataclass
class QueuedPlayer:
queue = collections.deque()
current: Optional[QueuedSong] = None
def queue_pop(self):
popped = self.queue.popleft()
self.current = popped
return popped
def queue_add(self, item):
self.queue.append(item)
def queue_add_front(self, item):
self.queue.appendleft(item)
def __str__(self):
return self.__repr__()
def format_duration(duration: int | float) -> str:
hours, duration = divmod(int(duration), 3600)
minutes, duration = divmod(duration, 60)
segments = [hours, minutes, duration]
if len(segments) == 3 and segments[0] == 0:
del segments[0]
return f"{':'.join(f'{s:0>2}' for s in segments)}"
def __reload_module__():
global ytdl
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)