Compare commits

...

185 Commits

Author SHA1 Message Date
019e60450f refactor: tweak descriptions 2025-06-10 20:59:11 -04:00
7672107c68 chore(requirements): use latest yt-dlp from github 2025-06-10 20:30:41 -04:00
ee6ea4eed4 refactor(audio/queue): save uploader field value 2025-06-08 17:29:36 -04:00
fed280e6c5 fix(audio/queue): check for uploader name 2025-06-08 13:46:57 -04:00
1a8f84b333 feat: reload when SIGUSR1 is received 2025-06-08 13:11:50 -04:00
94bdb91eb0 feat: add trusted user list 2025-06-08 13:11:50 -04:00
5c030a0557 refactor(commands): tweak descriptions 2025-06-08 12:57:34 -04:00
5344e89c26 feat(audio/queue): add timestamps 2025-06-08 12:26:30 -04:00
80e6d422e5 fix: add missing character in format string 2025-05-29 11:45:18 -04:00
71fad98d3d refactor(cleanup): reduce interval 2025-05-02 18:19:16 -04:00
83d784c917 refactor: reduce LimitedSizeDict size 2025-05-02 18:18:34 -04:00
f4b7e0f5ce refactor(commands/voice): clean up some code 2025-05-01 18:29:26 -04:00
1316fb593c refactor(commands/voice/queue): delay player creation 2025-05-01 18:29:26 -04:00
b6d105a519 refactor(sponsorblock): hashPrefix -> hash_prefix 2025-04-25 21:27:04 -04:00
ec31250153 refactor: follow more guidelines 2025-04-03 17:53:46 -04:00
f360566824 refactor: minor changes 2025-03-28 21:22:19 -04:00
b0c96a11cd feat(fun): add more reactions 2025-03-28 21:22:19 -04:00
062676df26 fix: handle missing sponsorblock category names 2025-02-26 16:21:42 -05:00
5430f7c632 refactor(utils): add surround function 2025-02-25 17:55:21 -05:00
0a8482c030 refactor(constants): use bit-shifts for public flags 2025-02-25 17:49:45 -05:00
87c88f796d refactor(audio/youtubedl): remove redundant <> from repr 2025-02-25 17:47:38 -05:00
d08744ebb2 refactor(sponsorblock): clean up categories 2025-02-25 17:46:42 -05:00
0b3425a658 fix(commands/voice/join): check message author voice channel before joining 2025-02-25 17:42:44 -05:00
e7105f1828 chore(docker): run with debug output 2025-02-13 20:32:56 -05:00
4f7bd903b8 fix(youtubedl): handle error when no url 2025-02-13 18:59:27 -05:00
22249ecf7a refactor: remove useless debug checks
Debug messages shouldn't be printed in the first place if debug isn't on.
2025-02-13 16:31:52 -05:00
5610fc7acd refactor: minor cleanups 2025-02-13 16:31:30 -05:00
c73260badb feat(commands): add c as alias for current 2025-02-13 16:28:51 -05:00
2645f33940 fix: reload all util modules 2025-02-12 19:04:16 -05:00
8d76a107c5 refactor(utils/cooldown): ignore owners 2025-02-12 19:02:50 -05:00
8ee7693b91 refactor(audio/youtubedl): improve empty entries error message 2025-02-12 15:29:55 -05:00
0f5532a14a refactor(commands/voice/queue): remove extra bold from queue error 2025-02-12 15:29:11 -05:00
c8c4756cc3 fix(commands/voice/join): only join when author is in voice channel 2025-02-12 11:56:30 -05:00
ea09f291e5 refactor(core): tweak cooldown calculation
Should prevent issues where cooldown is 0 when formatted.
2025-02-10 18:08:06 -05:00
c7658f84dc chore: ignore .venv directory 2025-02-09 22:20:42 -05:00
b562ea4ac5 audio/queue: display player metadata correctly 2025-02-09 17:44:50 -05:00
623de96463 fix(audio/queue): correctly import format_duration 2025-02-09 03:39:17 -05:00
97f4787b39 audio/queue/player: rename queue_add to queue_push 2025-02-09 03:35:41 -05:00
69f4d6967f audio: split into more modules 2025-02-09 03:25:05 -05:00
af0896a6a0 audio: rename from youtubedl 2025-02-09 03:04:44 -05:00
9a58bc964d refactor(commands/voice/queue): remove redundant suppress_embeds 2025-02-06 19:59:30 -05:00
65168d38f9 refactor(youtubedl): completely remove limits from PCMVolumeTransformer 2025-02-05 21:55:13 -05:00
70ed37737c refactor(youtubedl): CustomAudioSource -> TrackedAudioSource 2025-02-05 21:54:42 -05:00
3719fc69b5 refactor(youtubedl): use CustomAudioSource as hint for PCMVolumeTransformer 2025-02-05 21:54:42 -05:00
94837f0e77 refactor: fix casing and add type hints 2025-02-05 21:42:21 -05:00
d63155d0fb chore(requirements): add aiohttp 2025-02-04 21:27:47 -05:00
40cd8238dd refactor: log messages on gateway connection state change 2025-02-04 17:04:36 -05:00
81e30c7e70 refactor(commands/utils): add override for skip 2025-02-03 16:40:01 -05:00
a1d63f1bb1 refactor(youtubedl): raise exception if no entries 2025-02-03 16:26:23 -05:00
1a24754549 refactor(commands/voice/queue): check for voice_client 2025-02-03 16:24:47 -05:00
2c6d05b33d fix(youtubedl): show placeholder if no likes 2025-02-01 18:44:28 -05:00
117438be76 refactor(fun): simplify if statement 2025-02-01 18:43:56 -05:00
fbdd442a8e style(commands/voice/playback): format again 2025-02-01 16:09:55 -05:00
ca9f811e8f fix(commands/voice/sponsorblock): use youtubedl duration and make sponsorblock default 2025-01-25 19:03:26 -05:00
26f81bd58f feat(commands/voice): add sponsorblock command 2025-01-24 17:51:25 -05:00
256156b9d2 fix(extra): reset state.kill 2025-01-24 16:26:31 -05:00
10f7ce991c fix(commands/voice/queue): show correct index for queue --next 2025-01-23 19:25:01 -05:00
fc06b312cd fix(commands/tools/lookup): get accent color properly 2025-01-23 19:24:50 -05:00
98f61c623c feat(youtubedl): add unlimited PCMVolumeTransformer 2025-01-23 19:19:17 -05:00
0e69a039a1 feat(commands/utils/tokenize): add remove_prefix parameter 2025-01-23 14:50:39 -05:00
3930175c79 fix: send another message if reply fails 2025-01-22 23:18:46 -05:00
03a8014d2f refactor(sponsorblock): use params and include sponsor category as well 2025-01-22 14:53:21 -05:00
8ef4c85bd8 refactor: use aiohttp for requests 2025-01-22 14:49:29 -05:00
640e750e3d feat: add sponsorblock integration
Add the -S option to the fast forward command.
2025-01-22 14:47:00 -05:00
3ca58a4c19 refactor(commands/voice/queue): allow query with spaces 2025-01-19 17:53:58 -05:00
a8d69e079d refactor: allow reloading yt_dlp 2025-01-19 04:44:00 -05:00
7f07e1bd71 refactor(commands/bot/status): show yt-dlp version 2025-01-14 23:40:12 -05:00
5bdfddbbbd fix(commands/voice): clean up command_allowed check 2025-01-13 01:20:17 -05:00
d0ddf9ee47 fix(commands/voice/join): check if channels correct before joining 2025-01-13 01:08:55 -05:00
de5d4d2793 refactor: parse_snowflake -> snowflake_timestamp 2025-01-10 23:48:45 -05:00
7fdaf7b379 chore(docker): add psutil build dependencyes 2025-01-09 20:26:59 -05:00
b8b566cb57 chore(requirements): add psutil 2025-01-09 20:23:22 -05:00
234d6b438b refactor(utils): split into separate files 2025-01-09 20:04:40 -05:00
c420f3de6b refactor(core): clean up execute command 2025-01-09 16:57:14 -05:00
c80b926b35 feat: add fun module 2025-01-09 16:57:14 -05:00
23c29e1fa0 refactor(utils): remove opus load warning 2025-01-09 16:53:00 -05:00
8cbd7d6aef style: format with ruff 0.9 2025-01-09 16:31:32 -05:00
57809fe26d refactor(commands/tools): clean up 2025-01-09 16:25:52 -05:00
8a4f12fcce refactor(commands/voice/utils): check if loaded opus manually 2025-01-09 16:20:36 -05:00
1e4271217d refactor(commands): get rid of custom module reloader 2025-01-09 16:20:16 -05:00
c892358fef fix(utils/reply): remove content or embeds when editing message 2025-01-08 16:36:00 -05:00
849af9d394 feat(commands/tools): add lookup 2025-01-08 16:29:10 -05:00
aa4632b4dd feat(commands/bot): add ping 2025-01-08 16:03:24 -05:00
79fd40a8e3 feat(commands/voice/clear): add --attachments 2025-01-08 14:27:40 -05:00
d6150a664f fix(commands/tools/clear): check for ignore_ids 2025-01-08 14:25:58 -05:00
f00ac9c977 refactor(commands/voice/queue): reduce cooldown to 2 seconds 2025-01-08 14:20:51 -05:00
50651db89e test: add secret filtering 2025-01-08 13:52:27 -05:00
bb3c379755 fix(youtubedl): check for uploader_url when creating embed 2025-01-08 13:31:18 -05:00
7a400da1ee feat(commands/tools/clear): add --ignore-ids 2025-01-08 13:20:58 -05:00
78f2d0a568 fix(extra/transcript): calculate count properly 2025-01-08 13:18:56 -05:00
d5d8c56ba1 fix(commands): don't check cooldown for edited messages 2025-01-08 13:08:30 -05:00
93c67f707c refactor: set disnake log level to WARNING 2025-01-08 13:06:21 -05:00
27a460fa6e refactor(commands): use format_duration for cooldown message 2025-01-08 10:28:04 -05:00
ffdd25d849 style: format files 2025-01-08 10:18:57 -05:00
32c7be659b perf(commands/utils): use lru cache for matching 2025-01-08 10:18:46 -05:00
672ae02e16 feat(commands): add cooldown system 2025-01-08 10:17:06 -05:00
7c2e17e0d3 refactor: use from imports for constants 2025-01-08 09:28:01 -05:00
07b3bde22d test: check for short formatting cases 2025-01-08 09:28:00 -05:00
6afbce5d8f feat(utils): add support for short formatting 2025-01-08 09:25:37 -05:00
be77e62e53 feat(commands/bot): add status 2025-01-08 09:25:11 -05:00
e3982c064d feat: add proper logging 2025-01-08 08:58:16 -05:00
d56bac1b2f refactor: improve debgu messages 2025-01-08 08:37:40 -05:00
d6bc67f17a refactor: clean up some import and names 2025-01-08 08:32:59 -05:00
8d0bec4cf2 fix(tasks): add error handling to presence changing 2025-01-08 08:26:36 -05:00
c34bdc2bfd refactor(youtubedl): channel -> uploader and add url 2025-01-07 17:19:17 -05:00
6c5e92aec2 refactor(extra): use delete() if only one message 2025-01-07 16:40:25 -05:00
5e5a91d879 feat(youtubedl): add more fields to embed 2025-01-07 16:31:27 -05:00
41f9beb6e8 refactor(commands/voice): move embed generation into separate file 2025-01-07 16:11:32 -05:00
8ee5d01bf6 refactor(commands/voice): split into separate files 2025-01-07 15:29:28 -05:00
2ea3d74e8a fix(commands/voice): return early if voice_client doesn't exist 2025-01-07 14:49:13 -05:00
c04cf1b05f feat(state): add kill dict 2025-01-07 14:34:23 -05:00
803eae2adc feat(commands/voice/skip): add --next 2025-01-07 12:38:08 -05:00
1b781ac6a0 refactor(commands/voice): check if command allowed after argparse
So the help command always works.
2025-01-07 12:37:56 -05:00
5824fcdf16 refactor(commands/voice): minor description updates 2025-01-07 12:37:33 -05:00
930169346b fix(commands/voice/queue): only allow first when queue is actually empty 2025-01-07 12:37:02 -05:00
655b552c10 feat(commands/voice): show embed for newly playing song 2025-01-06 20:57:13 -05:00
d52266300c refactor(commands/voice/queue): only show footer when more than one queued 2025-01-06 19:26:40 -05:00
bb70e5d057 feat(extra): add upper parameter to transcript 2025-01-06 18:56:46 -05:00
8cd3115ed2 feat(extra): add limit to messages_per_second 2025-01-06 18:49:42 -05:00
c69f1c7d26 feat: add extra module for runtime code execution 2025-01-06 18:35:05 -05:00
f9489a869d fix(commands/voice): reply to message if first queued 2025-01-06 18:03:05 -05:00
8b871fb102 refactor(youtubedl): expose video id 2025-01-06 16:36:37 -05:00
5295d75257 refactor(commands): add current as an alias for playing 2025-01-06 16:32:13 -05:00
MightyCoderX
dfe05cc548 chore: dockerize bot (#1)
Co-authored-by: ErrorNoInternet <errornointernet@envs.net>
2025-01-06 16:12:56 -05:00
eeca6ec5d9 fix: manually load opus if not loaded already 2025-01-06 16:11:27 -05:00
b9e5f1899e refactor(commands/voice): throw error if playback fails 2025-01-06 15:55:16 -05:00
cf98497c99 refactor(commands/voice): tweak error message 2025-01-06 15:08:57 -05:00
71be016461 test(format_duration): youtubedl duration formatting 2025-01-06 14:45:37 -05:00
a5503751a5 fix(youtubedl): always cast to int when formatting duration 2025-01-06 14:41:43 -05:00
da5db1e73a feat(utils): use improved disnake_paginator interaction wrapper
Adds edit support (via utils.reply)
2025-01-06 14:31:08 -05:00
729fc28f1b test: add format duration tests 2025-01-06 14:06:06 -05:00
439095116f feat(commands/voice): add ff 2025-01-06 13:43:55 -05:00
f06d8075ea feat(youtubedl): add fast_forward 2025-01-06 13:00:33 -05:00
7c4041c662 feat(commands/voice/queue): allow removing multiple indices 2025-01-06 12:46:46 -05:00
5333559b25 feat(utils/format_duration): add natural 2025-01-06 12:08:34 -05:00
74629ad984 feat(commands/voice/queue): add immutability checks 2025-01-06 11:59:43 -05:00
b0e378105e feat(commands/voice): allow immutable commands from non-vc members 2025-01-06 11:53:19 -05:00
290e85a1c1 fix(commands/voice): check if player exists before playing next 2025-01-06 11:53:02 -05:00
42735f9a60 refactor: tweak duration bracket bolding 2025-01-06 11:36:27 -05:00
c0173b87e9 fix: don't edit on channel send 2025-01-06 11:14:49 -05:00
d3fd79e87f feat(commands/voice/playing): add --description 2025-01-06 10:45:38 -05:00
d9d35a2672 refactor(constants): sort variables 2025-01-06 10:20:14 -05:00
6887ebe087 refactor: tweak some descriptions 2025-01-06 10:12:58 -05:00
5216d611c3 feat(commands/voice/playing): add percentage 2025-01-06 10:01:44 -05:00
2204c24e29 refactor(commands/voice/queue): don't show queuer on first queue 2025-01-06 09:55:28 -05:00
0aef94db2d feat: add player progress tracking 2025-01-06 09:51:22 -05:00
e2834532b2 fix(tasks/cleanup): use .items() 2025-01-06 07:14:32 -05:00
63a2db8278 refactor: clean up initialization code 2025-01-06 00:48:02 -05:00
84768653a9 fix(commands/voice): return early if no player 2025-01-06 00:39:05 -05:00
565dbb6f47 refactor: minor fixes everywhere 2025-01-06 00:39:04 -05:00
ebde4f1310 fix: runs threads for background tasks 2025-01-05 19:42:22 -05:00
82cd56ace8 refactor(state): switch to LimitedSizeDict 2025-01-05 19:24:44 -05:00
3c4480c834 feat: keep track of message responses for all commands 2025-01-05 19:24:19 -05:00
3848deb887 feat: set status to idle appropriately 2025-01-05 19:23:50 -05:00
d7ab46a20e feat(commands/voice/queue): allow queueing more than 5 if only member in channel 2025-01-05 18:33:36 -05:00
ac2fc1c52d refactor: add helper for sending messages in channel 2025-01-05 18:26:45 -05:00
c27998abc7 refactor(commands/voice): add check reaction for join and leave 2025-01-05 18:17:24 -05:00
614359abd1 fix(commands/voice/queue): actually pass edited 2025-01-05 18:06:13 -05:00
4a1ef90059 refactor: tweak some messages 2025-01-05 17:44:15 -05:00
cf2348c918 refactor: remove dynamic handler system
Can just use client.add_listener instead
2025-01-05 17:33:52 -05:00
17bcdc687d feat: delete queued songs on message delete 2025-01-05 17:26:36 -05:00
d5623502b0 feat(commands/voice/queue): replace queued song on message edit 2025-01-05 17:04:34 -05:00
186eda4934 refactor(youtubedl): clean up formatting and use dataclasses 2025-01-05 16:59:49 -05:00
5281236e0f fix(events): use add_listener to allow reloading
Also ignore messages whose embeds have changed.
2025-01-05 16:59:21 -05:00
100f829e7a fix(commands/voice/queue): clean up queue failure message 2025-01-05 16:58:14 -05:00
9790ef2914 feat(commands/voice): limit to 5 queued songs without manage channels permission 2025-01-05 16:51:39 -05:00
1a1272956c feat(commands/voice/queue): use paginators for list 2025-01-05 16:51:23 -05:00
19c7bc477f feat(commands/voice/queue): add --next 2025-01-05 16:50:32 -05:00
e540b266c7 feat(commands/execute): track sent messages 2025-01-05 16:49:33 -05:00
116ed896ab fix(commands/utils): fix partial exact matching
play & playing
2025-01-05 16:48:46 -05:00
6930f964c5 feat(commands/voice): add playing 2025-01-05 16:48:22 -05:00
f08b9f0766 refactor(youtubedl): use collections.deque for player queue 2025-01-03 20:01:19 -05:00
1c7b8797bc chore: add readme 2025-01-03 19:09:24 -05:00
92a0eee92c feat(core): automatically leave voice channel when empty 2025-01-03 18:53:22 -05:00
409373ee27 fix(commands/voice): handle edge case where player dies 2025-01-01 16:52:15 -05:00
642d4aef81 refactor(commands/voice/queue): tweak descriptions 2025-01-01 16:39:47 -05:00
d0ddb36812 fix: put lock on skip command 2025-01-01 15:38:07 -05:00
db2be32a43 feat(utils/format_duration): add weeks 2025-01-01 15:33:57 -05:00
0fa1123a87 refactor(commands/voice/queue): remove shorthand flag from --now 2025-01-01 15:18:21 -05:00
2f0e849c84 fix(commands/voice/queue): don't allow negative indices 2025-01-01 15:11:02 -05:00
7e27c9158b feat(commands/voice/queue): add --page 2024-12-31 22:18:23 -05:00
64919008a5 feat(commands/voice): send message when song plays 2024-12-31 21:45:28 -05:00
99f5d3d338 fix(commands/voice/queue): fix --remove-multiple 2024-12-31 20:37:49 -05:00
e85d90fb36 feat(commands/voice/queue): add --duration 2024-12-31 20:03:40 -05:00
a05c14263b refactor(commands/voice/queue): clean up argument parsing groups 2024-12-31 19:54:35 -05:00
38 changed files with 1973 additions and 568 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
.env
.venv
__pycache__

10
Dockerfile Normal file
View File

@@ -0,0 +1,10 @@
FROM python:3.13-alpine
RUN apk --no-cache add ffmpeg gcc linux-headers musl-dev opus python3-dev
WORKDIR /bot
COPY . .
RUN pip install -r requirements.txt
CMD ["python", "main.py"]

3
README.md Normal file
View File

@@ -0,0 +1,3 @@
# ErrorNoCord
Discord music bot for testing purposes, with live reloading support

View File

@@ -8,7 +8,9 @@ import utils
class ArgumentParser:
def __init__(self, command, description):
self.parser = argparse.ArgumentParser(
command, description=description, exit_on_error=False
command,
description=description,
exit_on_error=False,
)
def print_help(self):
@@ -26,21 +28,20 @@ class ArgumentParser:
async def parse_args(self, message, tokens) -> argparse.Namespace | None:
try:
with contextlib.redirect_stdout(io.StringIO()):
args = self.parser.parse_args(tokens[1:])
return args
return self.parser.parse_args(tokens[1:])
except SystemExit:
await utils.reply(message, f"```\n{self.print_help()}```")
except Exception as e:
await utils.reply(message, f"`{e}`")
def range_type(string, min=0, max=100):
def range_type(string: str, lower=0, upper=100) -> int:
try:
value = int(string)
except ValueError:
raise argparse.ArgumentTypeError(f"value not a valid integer")
except ValueError as e:
raise argparse.ArgumentTypeError("value is not a valid integer") from e
if min <= value <= max:
if lower <= value <= upper:
return value
else:
raise argparse.ArgumentTypeError(f"value not in range {min}-{max}")
raise argparse.ArgumentTypeError(f"value is not in range {lower}-{upper}")

8
audio/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
from . import discord, queue, utils, youtubedl
__all__ = [
"discord",
"queue",
"utils",
"youtubedl",
]

39
audio/discord.py Normal file
View File

@@ -0,0 +1,39 @@
import audioop
import disnake
class TrackedAudioSource(disnake.AudioSource):
def __init__(self, source):
self._source = source
self.read_count = 0
def read(self) -> bytes:
data = self._source.read()
if data:
self.read_count += 1
return data
def fast_forward(self, seconds: int):
for _ in range(int(seconds / 0.02)):
self.read()
@property
def progress(self) -> float:
return self.read_count * 0.02
class PCMVolumeTransformer(disnake.AudioSource):
def __init__(self, original: TrackedAudioSource, volume: float = 1.0) -> None:
if original.is_opus():
raise disnake.ClientException("AudioSource must not be Opus encoded")
self.original = original
self.volume = volume
def cleanup(self) -> None:
self.original.cleanup()
def read(self) -> bytes:
ret = self.original.read()
return audioop.mul(ret, 2, self.volume)

112
audio/queue.py Normal file
View File

@@ -0,0 +1,112 @@
import collections
from dataclasses import dataclass
from typing import ClassVar, Optional
import disnake
from constants import BAR_LENGTH, EMBED_COLOR
from .utils import format_duration
from .youtubedl import YTDLSource
@dataclass
class Song:
player: YTDLSource
trigger_message: disnake.Message
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
title = f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})"
duration = (
format_duration(self.player.duration) if self.player.duration else "stream"
)
if multiline:
queue_time = (
self.trigger_message.edited_at or self.trigger_message.created_at
)
return f"{title}\n**duration:** {duration}" + (
f", **queued by:** <@{self.trigger_message.author.id}> <t:{round(queue_time.timestamp())}:R>"
if show_queuer
else ""
)
return f"{title} [**{duration}**]" + (
f" (<@{self.trigger_message.author.id}>)" if show_queuer else ""
)
def embed(self, is_paused=False):
progress = 0
if self.player.duration:
progress = self.player.original.progress / self.player.duration
embed = disnake.Embed(
color=EMBED_COLOR,
title=self.player.title,
url=self.player.original_url,
description=(
f"{'⏸️ ' if is_paused else ''}"
f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` "
+ (
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
if self.player.duration
else "[**stream**]"
)
),
timestamp=self.trigger_message.edited_at or self.trigger_message.created_at,
)
uploader_value = None
if self.player.uploader_url:
if self.player.uploader:
uploader_value = f"[{self.player.uploader}]({self.player.uploader_url})"
else:
uploader_value = self.player.uploader_url
elif self.player.uploader:
uploader_value = self.player.uploader
if uploader_value:
embed.add_field(name="Uploader", value=uploader_value)
if self.player.like_count:
embed.add_field(name="Likes", value=f"{self.player.like_count:,}")
if self.player.view_count:
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
if self.player.timestamp:
embed.add_field(name="Published", value=f"<t:{int(self.player.timestamp)}>")
if self.player.volume:
embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%")
if self.player.thumbnail_url:
embed.set_image(self.player.thumbnail_url)
embed.set_footer(
text=f"Queued by {self.trigger_message.author.name}",
icon_url=(
self.trigger_message.author.avatar.url
if self.trigger_message.author.avatar
else None
),
)
return embed
def __str__(self):
return self.__repr__()
@dataclass
class Player:
queue: ClassVar = collections.deque()
current: Optional[Song] = None
def queue_pop(self):
popped = self.queue.popleft()
self.current = popped
return popped
def queue_push(self, item):
self.queue.append(item)
def queue_push_front(self, item):
self.queue.appendleft(item)
def __str__(self):
return self.__repr__()

7
audio/utils.py Normal file
View File

@@ -0,0 +1,7 @@
def format_duration(duration: int | float) -> str:
hours, duration = divmod(int(duration), 3600)
minutes, duration = divmod(duration, 60)
segments = [hours, minutes, duration]
if len(segments) == 3 and segments[0] == 0:
del segments[0]
return f"{':'.join(f'{s:0>2}' for s in segments)}"

76
audio/youtubedl.py Normal file
View File

@@ -0,0 +1,76 @@
import asyncio
from typing import Any, Optional
import disnake
import yt_dlp
from constants import YTDL_OPTIONS
from .discord import PCMVolumeTransformer, TrackedAudioSource
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
class YTDLSource(PCMVolumeTransformer):
def __init__(
self,
source: TrackedAudioSource,
*,
data: dict[str, Any],
volume: float = 0.5,
):
super().__init__(source, volume)
self.description = data.get("description")
self.duration = data.get("duration")
self.id = data.get("id")
self.like_count = data.get("like_count")
self.original_url = data.get("original_url")
self.thumbnail_url = data.get("thumbnail")
self.timestamp = data.get("timestamp")
self.title = data.get("title")
self.uploader = data.get("uploader")
self.uploader_url = data.get("uploader_url")
self.view_count = data.get("view_count")
@classmethod
async def from_url(
cls,
url,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
stream: bool = False,
):
loop = loop or asyncio.get_event_loop()
data: Any = await loop.run_in_executor(
None,
lambda: ytdl.extract_info(url, download=not stream),
)
if "entries" in data:
if not data["entries"]:
raise Exception("no results found!")
data = data["entries"][0]
if "url" not in data:
raise Exception("no url returned!")
return cls(
TrackedAudioSource(
disnake.FFmpegPCMAudio(
data["url"] if stream else ytdl.prepare_filename(data),
before_options="-vn -reconnect 1",
),
),
data=data,
)
def __repr__(self):
return f"<YTDLSource title={self.title} original_url={self.original_url} duration={self.duration}>"
def __str__(self):
return self.__repr__()
def __reload_module__():
global ytdl
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)

View File

@@ -1,6 +1,13 @@
from . import bot, tools, utils, voice
from .utils import *
from .utils import Command, match, match_token, tokenize
def __reload_module__():
globals().update({k: v for k, v in vars(utils).items() if not k.startswith("_")})
__all__ = [
"bot",
"Command",
"match",
"match_token",
"tokenize",
"tools",
"utils",
"voice",
]

View File

@@ -1,18 +1,65 @@
import os
import threading
import time
import disnake
import psutil
from yt_dlp import version
import arguments
import commands
import utils
from state import start_time
from constants import EMBED_COLOR
from state import client, start_time
from utils import format_duration, reply, surround
async def help(message):
await utils.reply(
message,
", ".join(
[f"`{command.value}`" for command in commands.Command.__members__.values()]
),
async def status(message):
member_count = 0
channel_count = 0
for guild in client.guilds:
member_count += len(guild.members)
channel_count += len(guild.channels)
process = psutil.Process(os.getpid())
memory_usage = process.memory_info().rss / 1048576
embed = disnake.Embed(color=EMBED_COLOR)
embed.add_field(
name="Latency",
value=surround(f"{round(client.latency * 1000, 1)} ms"),
)
embed.add_field(
name="Memory",
value=surround(f"{round(memory_usage, 1)} MiB"),
)
embed.add_field(
name="Threads",
value=surround(threading.active_count()),
)
embed.add_field(
name="Guilds",
value=surround(len(client.guilds)),
)
embed.add_field(
name="Members",
value=surround(member_count),
)
embed.add_field(
name="Channels",
value=surround(channel_count),
)
embed.add_field(
name="Disnake",
value=surround(disnake.__version__),
)
embed.add_field(
name="yt-dlp",
value=surround(version.__version__),
)
embed.add_field(
name="Uptime",
value=surround(format_duration(int(time.time() - start_time), short=True)),
)
await reply(message, embed=embed)
async def uptime(message):
@@ -31,26 +78,26 @@ async def uptime(message):
return
if args.since:
await utils.reply(message, f"{round(start_time)}")
await reply(message, f"{round(start_time)}")
else:
format_plural = lambda noun, count: noun if count == 1 else noun + "s"
await reply(message, f"up {format_duration(int(time.time() - start_time))}")
segments = []
duration = int(time.time() - start_time)
days, duration = divmod(duration, 86400)
if days >= 1:
segments.append(f"{days} {format_plural('day', days)}")
async def ping(message):
await reply(
message,
embed=disnake.Embed(
title="Pong :ping_pong:",
description=f"Latency: **{round(client.latency * 1000, 1)} ms**",
color=EMBED_COLOR,
),
)
hours, duration = divmod(duration, 3600)
if hours >= 1:
segments.append(f"{hours} {format_plural('hour', hours)}")
minutes, duration = divmod(duration, 60)
if minutes >= 1:
segments.append(f"{minutes} {format_plural('minute', minutes)}")
if duration > 0:
segments.append(f"{duration} {format_plural('second', duration)}")
await utils.reply(message, f"up {', '.join(segments)}")
async def help(message):
await reply(
message,
", ".join(
[f"`{command.value}`" for command in commands.Command.__members__.values()],
),
)

View File

@@ -1,20 +1,175 @@
import re
import arguments
import aiohttp
import disnake
import arguments
import commands
import utils
from constants import APPLICATION_FLAGS, BADGE_EMOJIS, EMBED_COLOR, PUBLIC_FLAGS
from state import client
async def lookup(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"look up a discord user or application by ID",
)
parser.add_argument(
"-a",
"--application",
action="store_true",
help="look up applications instead of users",
)
parser.add_argument(
"id",
type=int,
help="the ID to perform a search for",
)
if not (args := await parser.parse_args(message, tokens)):
return
if args.application:
session = aiohttp.ClientSession()
response = await (
await session.get(f"https://discord.com/api/v9/applications/{args.id}/rpc")
).json()
if "code" in response.keys():
await utils.reply(message, "application not found!")
return
embed = disnake.Embed(description=response["description"], color=EMBED_COLOR)
embed.set_thumbnail(
url=f"https://cdn.discordapp.com/app-icons/{response['id']}/{response['icon']}.webp",
)
embed.add_field(name="Application Name", value=response["name"])
embed.add_field(name="Application ID", value="`" + response["id"] + "`")
embed.add_field(
name="Public Bot",
value=f"{'`' + str(response['bot_public']) + '`' if 'bot_public' in response else 'No bot'}",
)
embed.add_field(name="Public Flags", value="`" + str(response["flags"]) + "`")
embed.add_field(
name="Terms of Service",
value=(
"None"
if "terms_of_service_url" not in response.keys()
else f"[Link]({response['terms_of_service_url']})"
),
)
embed.add_field(
name="Privacy Policy",
value=(
"None"
if "privacy_policy_url" not in response.keys()
else f"[Link]({response['privacy_policy_url']})"
),
)
embed.add_field(
name="Creation Time",
value=f"<t:{utils.snowflake_timestamp(int(response['id']))}:R>",
)
embed.add_field(
name="Default Invite URL",
value=(
"None"
if "install_params" not in response.keys()
else f"[Link](https://discord.com/oauth2/authorize?client_id={response['id']}&permissions={response['install_params']['permissions']}&scope={'%20'.join(response['install_params']['scopes'])})"
),
)
embed.add_field(
name="Custom Invite URL",
value=(
"None"
if "custom_install_url" not in response.keys()
else f"[Link]({response['custom_install_url']})"
),
)
bot_intents = []
for application_flag, intent_name in APPLICATION_FLAGS.items():
if response["flags"] & application_flag == application_flag:
if intent_name.replace(" (unverified)", "") not in bot_intents:
bot_intents.append(intent_name)
embed.add_field(
name="Application Flags",
value=", ".join(bot_intents) if bot_intents else "None",
)
bot_tags = ""
if "tags" in response.keys():
for tag in response["tags"]:
bot_tags += tag + ", "
embed.add_field(
name="Tags",
value="None" if bot_tags == "" else bot_tags[:-2],
inline=False,
)
else:
try:
user = await client.fetch_user(args.id)
except Exception:
await utils.reply(message, "user not found!")
return
badges = ""
for flag, flag_name in PUBLIC_FLAGS.items():
if user.public_flags.value & flag == flag:
if flag_name != "None":
try:
badges += BADGE_EMOJIS[PUBLIC_FLAGS[flag]]
except Exception as e:
raise Exception(
f"unable to find badge: {PUBLIC_FLAGS[flag]}"
) from e
user_object = await client.fetch_user(user.id)
accent_color = 0x000000
if user_object.accent_color is not None:
accent_color = user_object.accent_color
embed = disnake.Embed(color=accent_color)
embed.add_field(
name="User ID",
value=f"`{user.id}`",
)
embed.add_field(
name="Discriminator",
value=f"`{user.name}#{user.discriminator}`",
)
embed.add_field(
name="Creation Time",
value=f"<t:{utils.snowflake_timestamp(int(user.id))}:R>",
)
embed.add_field(
name="Public Flags",
value=f"`{user.public_flags.value}` {badges}",
)
embed.add_field(
name="Bot User",
value=f"`{user.bot}`",
)
embed.add_field(
name="System User",
value=f"`{user.system}`",
)
embed.set_thumbnail(url=user.avatar if user.avatar else user.default_avatar)
if user_object.banner:
embed.set_image(url=user_object.banner)
await utils.reply(message, embed=embed)
async def clear(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"bulk delete messages in the current channel matching certain criteria",
"bulk delete messages in the current channel matching specified criteria",
)
parser.add_argument(
"count",
type=lambda c: arguments.range_type(c, min=1, max=1000),
type=lambda c: arguments.range_type(c, lower=1, upper=1000),
help="amount of messages to delete",
)
group = parser.add_mutually_exclusive_group()
@@ -55,19 +210,32 @@ async def clear(message):
action="store_true",
help="delete messages with reactions",
)
parser.add_argument(
"-A",
"--attachments",
action="store_true",
help="delete messages with attachments",
)
parser.add_argument(
"-d",
"--delete-command",
action="store_true",
help="delete the command message as well",
)
parser.add_argument(
"-I",
"--ignore-ids",
type=int,
action="append",
help="ignore messages with this id",
)
if not (args := await parser.parse_args(message, tokens)):
return
if args.delete_command:
try:
await message.delete()
except:
except Exception:
pass
regex = None
@@ -75,6 +243,8 @@ async def clear(message):
regex = re.compile(r, re.IGNORECASE if args.case_insensitive else 0)
def check(m):
if (ids := args.ignore_ids) and m.id in ids:
return False
c = []
if regex:
c.append(regex.search(m.content))
@@ -87,12 +257,16 @@ async def clear(message):
c.append(m.author.id in i)
if args.reactions:
c.append(len(m.reactions) > 0)
if args.attachments:
c.append(len(m.attachments) > 0)
return all(c)
messages = len(
await message.channel.purge(
limit=args.count, check=check, oldest_first=args.oldest_first
)
limit=args.count,
check=check,
oldest_first=args.oldest_first,
),
)
if not args.delete_command:
@@ -101,5 +275,5 @@ async def clear(message):
message,
f"purged **{messages}/{args.count} {'message' if args.count == 1 else 'messages'}**",
)
except:
except Exception:
pass

View File

@@ -1,50 +1,77 @@
import enum
from enum import Enum
from functools import lru_cache
import constants
class Command(enum.Enum):
class Command(Enum):
CLEAR = "clear"
CURRENT = "current"
EXECUTE = "execute"
FAST_FORWARD = "ff"
HELP = "help"
JOIN = "join"
LEAVE = "leave"
LOOKUP = "lookup"
PAUSE = "pause"
PING = "ping"
PLAY = "play"
PLAYING = "playing"
PURGE = "purge"
QUEUE = "queue"
RELOAD = "reload"
RESUME = "resume"
SKIP = "skip"
SPONSORBLOCK = "sponsorblock"
STATUS = "status"
UPTIME = "uptime"
VOLUME = "volume"
@lru_cache
def match_token(token: str) -> list[Command]:
if token.lower() == "r":
return [Command.RELOAD]
match token.lower():
case "r":
return [Command.RELOAD]
case "s":
return [Command.SKIP]
case "c":
return [Command.CURRENT]
if exact_match := list(
filter(
lambda command: command.value == token.lower(),
Command.__members__.values(),
),
):
return exact_match
return list(
filter(
lambda command: command.value.startswith(token.lower()),
Command.__members__.values(),
)
),
)
@lru_cache
def match(command: str) -> list[Command] | None:
if tokens := tokenize(command):
return match_token(tokens[0])
def tokenize(string: str) -> list[str]:
@lru_cache
def tokenize(string: str, remove_prefix: bool = True) -> list[str]:
tokens = []
token = ""
in_quotes = False
quote_char = None
escape = False
for char in string[len(constants.PREFIX) :]:
if remove_prefix:
string = string[len(constants.PREFIX) :]
for char in string:
if escape:
token += char
escape = False

View File

@@ -1,290 +0,0 @@
import arguments
import commands
import utils
import youtubedl
from state import client, players
async def queue_or_play(message):
await ensure_joined(message)
if not command_allowed(message):
return
if message.guild.id not in players:
players[message.guild.id] = youtubedl.QueuedPlayer()
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0], "queue a song, list the queue, or resume playback"
)
parser.add_argument("query", nargs="?", help="yt-dlp URL or query to get song")
group = parser.add_mutually_exclusive_group()
group.add_argument(
"-v",
"--volume",
default=50,
type=lambda v: arguments.range_type(v, min=0, max=150),
help="the volume level (0 - 150)",
)
group.add_argument(
"-i",
"--remove-index",
type=int,
help="remove a queued song by index",
)
group.add_argument(
"-m",
"--remove-multiple",
action="store_true",
help="continue removing queued songs after finding a match",
)
group.add_argument(
"-c",
"--clear",
action="store_true",
help="remove all queued songs",
)
parser.add_argument(
"-n",
"--now",
action="store_true",
help="play the specified song immediately",
)
parser.add_argument(
"-t",
"--remove-title",
help="remove queued songs by title",
)
parser.add_argument(
"-q",
"--remove-queuer",
type=int,
help="remove queued songs by queuer",
)
if not (args := await parser.parse_args(message, tokens)):
return
if args.clear:
players[message.guild.id].queue.clear()
await utils.add_check_reaction(message)
return
elif i := args.remove_index:
try:
queued = players[message.guild.id].queue[i - 1]
del players[message.guild.id].queue[i - 1]
await utils.reply(message, f"**x** {queued.format()}")
except:
await utils.reply(message, "invalid index!")
elif args.remove_title or args.remove_queuer:
targets = []
for queued in players[message.guild.id].queue:
if t := args.remove_title:
if t in queued.player.title:
targets.append(queued)
continue
if q := args.remove_queuer:
if q == queued.queuer:
targets.append(queued)
if args.remove_multiple:
targets = targets[:1]
for target in targets:
players[message.guild.id].queue.remove(target)
await utils.reply(
message,
f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}",
)
elif query := args.query:
try:
async with message.channel.typing():
player = await youtubedl.YTDLSource.from_url(
query, loop=client.loop, stream=True
)
player.volume = float(args.volume) / 100.0
except Exception as e:
await utils.reply(
message,
f"**unable to queue {query}:** {e}",
)
return
queued = youtubedl.QueuedSong(player, message.author.id)
if args.now:
players[message.guild.id].queue_add_front(queued)
else:
players[message.guild.id].queue_add(queued)
if (
not message.guild.voice_client.is_playing()
and not message.guild.voice_client.is_paused()
):
await utils.reply(message, f"**0.** {queued.format()}")
play_next(message)
elif args.now:
message.guild.voice_client.stop()
await utils.reply(message, f"**0.** {queued.format()}")
else:
await utils.reply(
message,
f"**{len(players[message.guild.id].queue)}.** {queued.format()}",
)
else:
if tokens[0].lower() == "play":
message.guild.voice_client.resume()
await utils.reply(
message,
"resumed!",
)
else:
currently_playing = (
lambda: f"**0.** {'(paused) ' if message.guild.voice_client.is_paused() else ''} {players[message.guild.id].current.format(with_queuer=True)}"
)
queue_list = lambda: "\n".join(
[
f"**{i + 1}.** {queued.format(with_queuer=True, hide_preview=True)}"
for i, queued in enumerate(players[message.guild.id].queue)
]
)
if (
not players[message.guild.id].queue
and not message.guild.voice_client.source
):
await utils.reply(
message,
"nothing is playing or queued!",
)
elif not players[message.guild.id].queue:
await utils.reply(message, currently_playing())
elif not message.guild.voice_client.source:
await utils.reply(
message,
queue_list(),
)
else:
await utils.reply(
message,
currently_playing() + "\n" + queue_list(),
)
async def skip(message):
if not command_allowed(message):
return
if not players[message.guild.id].queue:
message.guild.voice_client.stop()
await utils.reply(
message,
"the queue is empty now!",
)
else:
message.guild.voice_client.stop()
await utils.add_check_reaction(message)
async def join(message):
if message.guild.voice_client:
return await message.guild.voice_client.move_to(message.channel)
await message.channel.connect()
async def leave(message):
if not command_allowed(message):
return
await message.guild.voice_client.disconnect()
async def resume(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_paused():
message.guild.voice_client.resume()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is paused!",
)
async def pause(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_playing():
message.guild.voice_client.pause()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is playing!",
)
async def volume(message):
if not command_allowed(message):
return
if not message.guild.voice_client:
return
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "set the current volume level")
parser.add_argument(
"volume",
nargs="?",
type=lambda v: arguments.range_type(v, min=0, max=150),
help="the volume level (0 - 150)",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not message.guild.voice_client.source:
await utils.reply(
message,
f"nothing is playing!",
)
return
if args.volume is None:
await utils.reply(
message,
f"{int(message.guild.voice_client.source.volume * 100)}",
)
else:
message.guild.voice_client.source.volume = float(args.volume) / 100.0
await utils.add_check_reaction(message)
def play_after_callback(e, message, once):
if e:
print(f"player error: {e}")
if not once:
play_next(message)
def play_next(message, once=False):
message.guild.voice_client.stop()
if players[message.guild.id].queue:
queued = players[message.guild.id].queue_pop()
message.guild.voice_client.play(
queued.player, after=lambda e: play_after_callback(e, message, once)
)
async def ensure_joined(message):
if message.guild.voice_client is None:
if message.author.voice:
await message.author.voice.channel.connect()
else:
await utils.reply(message, "You are not connected to a voice channel.")
def command_allowed(message):
if not message.author.voice or not message.guild.voice_client:
return False
return message.author.voice.channel.id == message.guild.voice_client.channel.id

View File

@@ -0,0 +1,20 @@
from .channel import join, leave
from .playback import fast_forward, pause, playing, resume, volume
from .queue import queue_or_play, skip
from .sponsorblock import sponsorblock_command
from .utils import remove_queued
__all__ = [
"fast_forward",
"join",
"leave",
"pause",
"playing",
"queue_or_play",
"remove_queued",
"resume",
"skip",
"skip",
"sponsorblock_command",
"volume",
]

24
commands/voice/channel.py Normal file
View File

@@ -0,0 +1,24 @@
import utils
from .utils import command_allowed
async def join(message):
if message.author.voice:
if message.guild.voice_client:
await message.guild.voice_client.move_to(message.channel)
else:
await message.author.voice.channel.connect()
else:
await utils.reply(message, "you are not connected to a voice channel!")
return
await utils.add_check_reaction(message)
async def leave(message):
if not command_allowed(message):
return
await message.guild.voice_client.disconnect()
await utils.add_check_reaction(message)

168
commands/voice/playback.py Normal file
View File

@@ -0,0 +1,168 @@
import disnake_paginator
import arguments
import commands
import sponsorblock
import utils
from constants import EMBED_COLOR
from state import players
from .utils import command_allowed
async def playing(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"get information about the currently playing song",
)
parser.add_argument(
"-d",
"--description",
action="store_true",
help="get the description",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message, immutable=True):
return
if source := message.guild.voice_client.source:
if args.description:
if description := source.description:
paginator = disnake_paginator.ButtonPaginator(
invalid_user_function=utils.invalid_user_handler,
color=EMBED_COLOR,
title=source.title,
segments=disnake_paginator.split(description),
)
for embed in paginator.embeds:
embed.url = source.original_url
await paginator.start(utils.MessageInteractionWrapper(message))
else:
await utils.reply(
message,
source.description or "no description found!",
)
return
await utils.reply(
message,
embed=players[message.guild.id].current.embed(
is_paused=message.guild.voice_client.is_paused(),
),
)
else:
await utils.reply(
message,
"nothing is playing!",
)
async def resume(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_paused():
message.guild.voice_client.resume()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is paused!",
)
async def pause(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_playing():
message.guild.voice_client.pause()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is playing!",
)
async def fast_forward(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "skip the current sponsorblock segment")
parser.add_argument(
"-s",
"--seconds",
nargs="?",
type=lambda v: arguments.range_type(v, lower=0, upper=300),
help="the number of seconds to fast forward instead",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
seconds = args.seconds
if not seconds:
video = await sponsorblock.get_segments(
players[message.guild.id].current.player.id,
)
if not video:
await utils.reply(
message,
"no sponsorblock segments were found for this video!",
)
return
progress = message.guild.voice_client.source.original.progress
for segment in video["segments"]:
begin, end = map(float, segment["segment"])
if progress >= begin and progress < end:
seconds = end - message.guild.voice_client.source.original.progress
if not seconds:
await utils.reply(message, "no sponsorblock segment is currently playing!")
return
message.guild.voice_client.pause()
message.guild.voice_client.source.original.fast_forward(seconds)
message.guild.voice_client.resume()
await utils.add_check_reaction(message)
async def volume(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "get or set the current volume level")
parser.add_argument(
"volume",
nargs="?",
type=lambda v: arguments.range_type(v, lower=0, upper=150),
help="the volume level (0 - 150)",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message, immutable=True):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
if args.volume is None:
await utils.reply(
message,
f"{int(message.guild.voice_client.source.volume * 100)}",
)
else:
if not command_allowed(message):
return
message.guild.voice_client.source.volume = float(args.volume) / 100.0
await utils.add_check_reaction(message)

270
commands/voice/queue.py Normal file
View File

@@ -0,0 +1,270 @@
import itertools
import disnake
import disnake_paginator
import arguments
import audio
import commands
import utils
from constants import EMBED_COLOR
from state import client, players, trusted_users
from .playback import resume
from .utils import command_allowed, ensure_joined, play_next
async def queue_or_play(message, edited=False):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"queue a song, list the queue, or resume playback",
)
parser.add_argument("query", nargs="*", help="yt-dlp URL or query to get song")
parser.add_argument(
"-v",
"--volume",
default=50,
type=lambda v: arguments.range_type(v, lower=0, upper=150),
help="the volume level (0 - 150) for the specified song",
)
parser.add_argument(
"-i",
"--remove-index",
type=int,
nargs="*",
help="remove queued songs by index",
)
parser.add_argument(
"-m",
"--match-multiple",
action="store_true",
help="continue removing queued after finding a match",
)
parser.add_argument(
"-c",
"--clear",
action="store_true",
help="remove all queued songs",
)
parser.add_argument(
"--now",
action="store_true",
help="play the specified song immediately",
)
parser.add_argument(
"--next",
action="store_true",
help="play the specified song next",
)
parser.add_argument(
"-t",
"--remove-title",
help="remove queued songs by title",
)
parser.add_argument(
"-q",
"--remove-queuer",
type=int,
help="remove queued songs by queuer",
)
if not (args := await parser.parse_args(message, tokens)):
return
await ensure_joined(message)
if len(tokens) == 1 and tokens[0].lower() != "play":
if not command_allowed(message, immutable=True):
return
elif not command_allowed(message):
return
if message.guild.id not in players:
players[message.guild.id] = audio.queue.Player()
if edited:
found = next(
filter(
lambda queued: queued.trigger_message.id == message.id,
players[message.guild.id].queue,
),
None,
)
if found:
players[message.guild.id].queue.remove(found)
if args.clear:
players[message.guild.id].queue.clear()
await utils.add_check_reaction(message)
elif indices := args.remove_index:
targets = []
for i in indices:
if i <= 0 or i > len(players[message.guild.id].queue):
await utils.reply(message, f"invalid index `{i}`!")
return
targets.append(players[message.guild.id].queue[i - 1])
for target in targets:
if target in players[message.guild.id].queue:
players[message.guild.id].queue.remove(target)
if len(targets) == 1:
await utils.reply(message, f"**removed** {targets[0].format()}")
else:
await utils.reply(
message,
f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}",
)
elif args.remove_title or args.remove_queuer:
targets = set()
for queued in players[message.guild.id].queue:
if t := args.remove_title:
if t in queued.player.title:
targets.add(queued)
if q := args.remove_queuer:
if q == queued.trigger_message.author.id:
targets.add(queued)
targets = list(targets)
if not args.match_multiple:
targets = targets[:1]
for target in targets:
players[message.guild.id].queue.remove(target)
await utils.reply(
message,
f"removed **{len(targets)}** queued {'song' if len(targets) == 1 else 'songs'}",
)
elif query := args.query:
if (
not message.channel.permissions_for(message.author).manage_channels
and len(
list(
filter(
lambda queued: queued.trigger_message.author.id
== message.author.id,
players[message.guild.id].queue,
),
),
)
>= 5
and not len(message.guild.voice_client.channel.members) == 2
and message.author.id not in trusted_users
):
await utils.reply(
message,
"you can only queue **5 items** without the manage channels permission!",
)
return
try:
async with message.channel.typing():
player = await audio.youtubedl.YTDLSource.from_url(
" ".join(query),
loop=client.loop,
stream=True,
)
player.volume = float(args.volume) / 100.0
except Exception as e:
await utils.reply(message, f"failed to queue: `{e}`")
return
queued = audio.queue.Song(player, message)
if args.now or args.next:
players[message.guild.id].queue_push_front(queued)
else:
players[message.guild.id].queue_push(queued)
if not message.guild.voice_client:
await utils.reply(message, "unexpected disconnect from voice channel!")
return
elif not message.guild.voice_client.source:
play_next(message, first=True)
elif args.now:
message.guild.voice_client.stop()
else:
await utils.reply(
message,
f"**{1 if args.next else len(players[message.guild.id].queue)}.** {queued.format()}",
)
utils.cooldown(message, 2)
elif tokens[0].lower() == "play":
await resume(message)
else:
if players[message.guild.id].queue:
formatted_duration = utils.format_duration(
sum(
[
queued.player.duration if queued.player.duration else 0
for queued in players[message.guild.id].queue
],
),
natural=True,
)
def embed(description):
e = disnake.Embed(
description=description,
color=EMBED_COLOR,
)
if formatted_duration and len(players[message.guild.id].queue) > 1:
e.set_footer(text=f"{formatted_duration} in total")
return e
await disnake_paginator.ButtonPaginator(
invalid_user_function=utils.invalid_user_handler,
color=EMBED_COLOR,
segments=list(
map(
embed,
[
"\n\n".join(
[
f"**{i + 1}.** {queued.format(show_queuer=True, hide_preview=True, multiline=True)}"
for i, queued in batch
],
)
for batch in itertools.batched(
enumerate(players[message.guild.id].queue),
10,
)
],
),
),
).start(utils.MessageInteractionWrapper(message))
else:
await utils.reply(
message,
"nothing is queued!",
)
async def skip(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "skip the song currently playing")
parser.add_argument(
"-n",
"--next",
action="store_true",
help="skip the next song",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message):
return
if not players[message.guild.id].queue:
message.guild.voice_client.stop()
await utils.reply(
message,
"the queue is empty now!",
)
elif args.next:
next = players[message.guild.id].queue.pop()
await utils.reply(message, f"**skipped** {next.format()}")
else:
message.guild.voice_client.stop()
await utils.add_check_reaction(message)
if not message.guild.voice_client.source:
play_next(message)

View File

@@ -0,0 +1,47 @@
import disnake
import audio
import sponsorblock
import utils
from constants import EMBED_COLOR, SPONSORBLOCK_CATEGORY_NAMES
from state import players
from .utils import command_allowed
async def sponsorblock_command(message):
if not command_allowed(message, immutable=True):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
progress = message.guild.voice_client.source.original.progress
video = await sponsorblock.get_segments(players[message.guild.id].current.player.id)
if not video:
await utils.reply(
message,
"no sponsorblock segments were found for this video!",
)
return
text = []
for segment in video["segments"]:
begin, end = map(int, segment["segment"])
if (category := segment["category"]) in SPONSORBLOCK_CATEGORY_NAMES:
category = SPONSORBLOCK_CATEGORY_NAMES[category]
current = "**" if progress >= begin and progress < end else ""
text.append(
f"{current}`{audio.utils.format_duration(begin)}` - `{audio.utils.format_duration(end)}`: {category}{current}",
)
await utils.reply(
message,
embed=disnake.Embed(
title="Sponsorblock segments",
description="\n".join(text),
color=EMBED_COLOR,
),
)

72
commands/voice/utils.py Normal file
View File

@@ -0,0 +1,72 @@
from logging import error
import disnake
import utils
from state import client, players
def play_after_callback(e, message, once):
if e:
error(f"player error: {e}")
if not once:
play_next(message)
def play_next(message, once=False, first=False):
if not message.guild.voice_client:
return
message.guild.voice_client.stop()
if not disnake.opus.is_loaded():
utils.load_opus()
if message.guild.id in players and players[message.guild.id].queue:
queued = players[message.guild.id].queue_pop()
message.guild.voice_client.play(
queued.player,
after=lambda e: play_after_callback(e, message, once),
)
embed = queued.embed()
if first and len(players[message.guild.id].queue) == 0:
client.loop.create_task(utils.reply(message, embed=embed))
else:
client.loop.create_task(utils.channel_send(message, embed=embed))
def remove_queued(messages):
if messages[0].guild.id not in players:
return
if len(players[messages[0].guild.id].queue) == 0:
return
found = []
for message in messages:
for queued in players[message.guild.id].queue:
if queued.trigger_message.id == message.id:
found.append(queued)
for queued in found:
players[messages[0].guild.id].queue.remove(queued)
async def ensure_joined(message):
if message.guild.voice_client is None:
if message.author.voice:
await message.author.voice.channel.connect()
else:
await utils.reply(message, "you are not connected to a voice channel!")
def command_allowed(message, immutable=False):
if not message.guild.voice_client:
return False
if immutable:
return True
if not message.author.voice:
return False
return message.author.voice.channel.id == message.guild.voice_client.channel.id

View File

@@ -1,24 +1,7 @@
import os
EMBED_COLOR = 0xFF6600
OWNERS = [531392146767347712]
PREFIX = "%"
RELOADABLE_MODULES = [
"arguments",
"commands",
"commands.bot",
"commands.tools",
"commands.utils",
"commands.voice",
"constants",
"core",
"events",
"utils",
"voice",
"youtubedl",
]
YTDL_OPTIONS = {
"color": "never",
"default_search": "auto",
"format": "bestaudio/best",
"ignoreerrors": False,
@@ -32,6 +15,97 @@ YTDL_OPTIONS = {
"source_address": "0.0.0.0",
}
BAR_LENGTH = 35
EMBED_COLOR = 0xFF6600
OWNERS = [531392146767347712]
PREFIX = "%"
SPONSORBLOCK_CATEGORY_NAMES = {
"music_offtopic": "non-music",
"selfpromo": "self promotion",
"sponsor": "sponsored",
}
REACTIONS = {
"cat": ["🐈"],
"dog": ["🐕"],
"gn": ["💤", "😪", "😴", "🛌"],
"pizza": ["🍕"],
}
RELOADABLE_MODULES = [
"arguments",
"audio",
"audio.discord",
"audio.queue",
"audio.utils",
"audio.youtubedl",
"commands",
"commands.bot",
"commands.tools",
"commands.utils",
"commands.voice",
"commands.voice.channel",
"commands.voice.playback",
"commands.voice.playing",
"commands.voice.queue",
"commands.voice.sponsorblock",
"commands.voice.utils",
"constants",
"core",
"events",
"extra",
"fun",
"sponsorblock",
"tasks",
"utils",
"utils.common",
"utils.discord",
"voice",
"yt_dlp",
"yt_dlp.version",
]
PUBLIC_FLAGS = {
1 << 0: "Discord Employee",
1 << 1: "Discord Partner",
1 << 2: "HypeSquad Events",
1 << 3: "Bug Hunter Level 1",
1 << 6: "HypeSquad Bravery",
1 << 7: "HypeSquad Brilliance",
1 << 8: "HypeSquad Balance",
1 << 9: "Early Supporter",
1 << 10: "Team User",
1 << 14: "Bug Hunter Level 2",
1 << 16: "Verified Bot",
1 << 17: "Verified Bot Developer",
1 << 18: "Discord Certified Moderator",
1 << 19: "HTTP Interactions Only",
1 << 22: "Active Developer",
}
BADGE_EMOJIS = {
"Discord Employee": "<:DiscordStaff:879666899980546068>",
"Discord Partner": "<:DiscordPartner:879668340434534400>",
"HypeSquad Events": "<:HypeSquadEvents:879666970310606848>",
"Bug Hunter Level 1": "<:BugHunter1:879666851448234014>",
"HypeSquad Bravery": "<:HypeSquadBravery:879666945153175612>",
"HypeSquad Brilliance": "<:HypeSquadBrilliance:879666956884643861>",
"HypeSquad Balance": "<:HypeSquadBalance:879666934717771786>",
"Early Supporter": "<:EarlySupporter:879666916493496400>",
"Team User": "<:TeamUser:890866907996127305>",
"Bug Hunter Level 2": "<:BugHunter2:879666866971357224>",
"Verified Bot": "<:VerifiedBot:879670687554498591>",
"Verified Bot Developer": "<:VerifiedBotDeveloper:879669786550890507>",
"Discord Certified Moderator": "<:DiscordModerator:879666882976837654>",
"HTTP Interactions Only": "<:HTTPInteractionsOnly:1047141867806015559>",
"Active Developer": "<:ActiveDeveloper:1047141451244523592>",
}
APPLICATION_FLAGS = {
1 << 12: "Presence Intent",
1 << 13: "Presence Intent (unverified)",
1 << 14: "Guild Members Intent",
1 << 15: "Guild Members Intent (unverified)",
1 << 16: "Unusual Growth (verification suspended)",
1 << 18: "Message Content Intent",
1 << 19: "Message Content Intent (unverified)",
1 << 23: "Suports Application Commands",
}
SECRETS = {
"TOKEN": os.getenv("BOT_TOKEN"),

146
core.py
View File

@@ -3,20 +3,24 @@ import contextlib
import importlib
import inspect
import io
import signal
import textwrap
import time
import traceback
from logging import debug
import disnake
import disnake_paginator
import commands
import constants
import core
import utils
from state import command_locks
from commands import Command as C
from constants import EMBED_COLOR, OWNERS, PREFIX, RELOADABLE_MODULES
from state import client, command_cooldowns, command_locks, idle_tracker
async def on_message(message):
if not message.content.startswith(constants.PREFIX) or message.author.bot:
async def on_message(message, edited=False):
if not message.content.startswith(PREFIX) or message.author.bot:
return
tokens = commands.tokenize(message.content)
@@ -26,83 +30,91 @@ async def on_message(message):
if not matched:
return
idle_tracker["last_used"] = time.time()
if idle_tracker["is_idle"]:
idle_tracker["is_idle"] = False
await client.change_presence(status=disnake.Status.online)
if len(matched) > 1:
await utils.reply(
message,
f"ambiguous command, could be {' or '.join([f'`{match.value}`' for match in matched])}",
)
return
matched = matched[0]
if message.guild.id not in command_locks:
command_locks[message.guild.id] = asyncio.Lock()
if (message.guild.id, message.author.id) not in command_locks:
command_locks[(message.guild.id, message.author.id)] = asyncio.Lock()
await command_locks[(message.guild.id, message.author.id)].acquire()
C = commands.Command
try:
match matched[0]:
case C.RELOAD if message.author.id in constants.OWNERS:
reloaded_modules = set()
for module in filter(
lambda v: inspect.ismodule(v)
and v.__name__ in constants.RELOADABLE_MODULES,
globals().values(),
):
core.rreload(reloaded_modules, module)
if (cooldowns := command_cooldowns.get(message.author.id)) and not edited:
if (end_time := cooldowns.get(matched)) and (
remaining_time := round(end_time - time.time()) > 0
):
await utils.reply(
message,
f"please wait **{utils.format_duration(remaining_time, natural=True)}** before using this command again!",
)
return
match matched:
case C.RELOAD if message.author.id in OWNERS:
start = time.time()
reloaded_modules = reload()
end = time.time()
debug(
f"reloaded {len(reloaded_modules)} modules in {round(end - start, 2)}s",
)
await utils.add_check_reaction(message)
case C.EXECUTE if message.author.id in constants.OWNERS:
case C.EXECUTE if message.author.id in OWNERS:
code = message.content[len(tokens[0]) + 1 :].strip().strip("`")
for replacement in ["python", "py"]:
if code.startswith(replacement):
code = code[len(replacement) :]
stdout = io.StringIO()
try:
stdout = io.StringIO()
with contextlib.redirect_stdout(stdout):
if "#globals" in code:
exec(
f"async def run_code():\n{textwrap.indent(code, ' ')}",
globals(),
)
wrapped_code = (
f"async def run_code():\n{textwrap.indent(code, ' ')}"
)
if "# globals" in code:
exec(wrapped_code, globals())
await globals()["run_code"]()
else:
dictionary = dict(locals(), **globals())
exec(
f"async def run_code():\n{textwrap.indent(code, ' ')}",
dictionary,
dictionary,
)
exec(wrapped_code, dictionary, dictionary)
await dictionary["run_code"]()
output = stdout.getvalue()
except Exception as e:
output = "`" + str(e) + "`"
output = utils.filter_secrets(output)
if len(output) > 2000:
output = output.replace("`", "\\`")
pager = disnake_paginator.ButtonPaginator(
prefix=f"```\n",
await disnake_paginator.ButtonPaginator(
prefix="```\n",
suffix="```",
color=constants.EMBED_COLOR,
segments=disnake_paginator.split(output),
invalid_user_function=utils.invalid_user_handler,
)
await pager.start(
disnake_paginator.wrappers.MessageInteractionWrapper(message)
)
color=EMBED_COLOR,
segments=disnake_paginator.split(output),
).start(utils.MessageInteractionWrapper(message))
elif len(output.strip()) == 0:
await utils.add_check_reaction(message)
else:
await message.channel.send(output)
case C.CLEAR | C.PURGE if message.author.id in constants.OWNERS:
await utils.reply(message, output)
case C.CLEAR | C.PURGE if message.author.id in OWNERS:
await commands.tools.clear(message)
case C.JOIN:
await commands.voice.join(message)
case C.LEAVE:
await commands.voice.leave(message)
case C.QUEUE | C.PLAY:
async with command_locks[message.guild.id]:
await commands.voice.queue_or_play(message)
await commands.voice.queue_or_play(message, edited)
case C.SKIP:
await commands.voice.skip(message)
case C.RESUME:
@@ -115,20 +127,49 @@ async def on_message(message):
await commands.bot.help(message)
case C.UPTIME:
await commands.bot.uptime(message)
case C.PLAYING | C.CURRENT:
await commands.voice.playing(message)
case C.FAST_FORWARD:
await commands.voice.fast_forward(message)
case C.STATUS:
await commands.bot.status(message)
case C.PING:
await commands.bot.ping(message)
case C.LOOKUP:
await commands.tools.lookup(message)
case C.SPONSORBLOCK:
await commands.voice.sponsorblock_command(message)
except Exception as e:
await utils.reply(
message,
f"exception occurred while processing command: ```\n{''.join(traceback.format_exception(e)).replace('`', '\\`')}```",
)
raise e
finally:
command_locks[(message.guild.id, message.author.id)].release()
async def on_voice_state_update(_, before, after):
def is_empty(channel):
return [m.id for m in (channel.members if channel else [])] == [client.user.id]
channel = None
if is_empty(before.channel):
channel = before.channel
elif is_empty(after.channel):
channel = after.channel
if channel:
await channel.guild.voice_client.disconnect()
def rreload(reloaded_modules, module):
reloaded_modules.add(module.__name__)
for submodule in filter(
lambda v: inspect.ismodule(v)
and v.__name__ in constants.RELOADABLE_MODULES
and v.__name__ not in reloaded_modules,
lambda sm: inspect.ismodule(sm)
and sm.__name__ in RELOADABLE_MODULES
and sm.__name__ not in reloaded_modules,
vars(module).values(),
):
rreload(reloaded_modules, submodule)
@@ -137,3 +178,18 @@ def rreload(reloaded_modules, module):
if "__reload_module__" in dir(module):
module.__reload_module__()
def reload(*_):
reloaded_modules = set()
rreload(reloaded_modules, __import__("core"))
rreload(reloaded_modules, __import__("extra"))
for module in filter(
lambda v: inspect.ismodule(v) and v.__name__ in RELOADABLE_MODULES,
globals().values(),
):
rreload(reloaded_modules, module)
return reloaded_modules
signal.signal(signal.SIGUSR1, reload)

View File

@@ -1,30 +1,70 @@
import asyncio
import threading
from logging import debug, info, warning
import commands
import core
import events
import fun
import tasks
from state import client
dynamic_handlers = {}
def prepare():
threading.Thread(
name="cleanup",
target=asyncio.run_coroutine_threadsafe,
args=(
tasks.cleanup(),
client.loop,
),
).start()
async def trigger_dynamic_handlers(event_type: str, *data):
if event_type in dynamic_handlers:
for dynamic_handler in dynamic_handlers[event_type]:
try:
await dynamic_handler(*data)
except Exception as e:
print(
f"error in dynamic event handler {dynamic_handler} for {event_type}: {e}"
)
async def on_bulk_message_delete(messages):
commands.voice.remove_queued(messages)
@client.event
async def on_message_edit(before, after):
await events.trigger_dynamic_handlers("on_message_edit", before, after)
await core.on_message(after)
@client.event
async def on_message(message):
await events.trigger_dynamic_handlers("on_message", message)
await core.on_message(message)
await fun.on_message(message)
async def on_message_delete(message):
commands.voice.remove_queued([message])
async def on_message_edit(before, after):
if before.content == after.content:
return
await core.on_message(after, edited=True)
async def on_voice_state_update(member, before, after):
await core.on_voice_state_update(member, before, after)
async def on_ready():
info(f"logged in as {client.user}")
async def on_connect():
debug("connected to the gateway!")
async def on_disconnect():
warning("disconnected from the gateway!")
for event_type, handlers in client.get_listeners().items():
for handler in handlers:
client.remove_listener(handler, event_type)
client.add_listener(on_bulk_message_delete, "on_bulk_message_delete")
client.add_listener(on_connect, "on_connect")
client.add_listener(on_disconnect, "on_disconnect")
client.add_listener(on_message, "on_message")
client.add_listener(on_message_delete, "on_message_delete")
client.add_listener(on_message_edit, "on_message_edit")
client.add_listener(on_ready, "on_ready")
client.add_listener(on_voice_state_update, "on_voice_state_update")

101
extra.py Normal file
View File

@@ -0,0 +1,101 @@
import asyncio
import string
import disnake
import youtube_transcript_api
from state import client, kill, players
async def transcript(
message,
languages=["en"],
max_messages=6,
min_messages=3,
upper=True,
):
initial_id = message.guild.voice_client.source.id
transcript_list = youtube_transcript_api.YouTubeTranscriptApi.list_transcripts(
initial_id,
)
try:
transcript = transcript_list.find_manually_created_transcript(languages).fetch()
except Exception:
transcript = transcript_list.find_generated_transcript(languages).fetch()
await message.channel.send("(autogenerated)")
messages = []
for line in transcript:
if (
players[message.guild.id].current.player.original.progress
>= line["start"] + line["duration"]
):
continue
while (
players[message.guild.id].current.player.original.progress < line["start"]
):
await asyncio.sleep(0.2)
messages.insert(
0,
await message.channel.send(line["text"].upper() if upper else line["text"]),
)
if len(messages) > max_messages:
try:
count = min(min_messages, len(messages))
if count == 1:
await messages.pop().delete()
else:
await message.channel.delete_messages(
[messages.pop() for _ in range(count)],
)
except Exception:
pass
if (message.guild.voice_client.source.id != initial_id) or kill["transcript"]:
kill["transcript"] = False
break
def messages_per_second(limit=500):
oldest = 2**64
newest = 0
guilds = set()
members = set()
cached_messages = list(client.cached_messages)[-limit:]
for message in cached_messages:
if message.guild:
guilds.add(message.guild.id)
members.add(message.author.id)
t = message.created_at.timestamp()
if t < oldest:
oldest = t
elif t > newest:
newest = t
average = round(len(cached_messages) / (newest - oldest), 1)
if average == 1.0:
average = 1
print(
f"I am receiving **{average} {'message' if average == 1 else 'messages'} per second** "
f"from **{len(members)} {'member' if len(members) == 1 else 'members'}** across **{len(guilds)} {'guild' if len(guilds) == 1 else 'guilds'}**",
)
async def auto_count(channel_id: int):
if (channel := await client.fetch_channel(channel_id)) and isinstance(
channel,
disnake.TextChannel,
):
last_message = (await channel.history(limit=1).flatten())[0]
try:
result = str(
int("".join(filter(lambda d: d in string.digits, last_message.content)))
+ 1,
)
except Exception:
result = "where number"
await channel.send(result)

13
fun.py Normal file
View File

@@ -0,0 +1,13 @@
import random
import commands
from constants import REACTIONS
async def on_message(message):
if random.random() < 0.01:
tokens = commands.tokenize(message.content, remove_prefix=False)
for keyword, options in REACTIONS.items():
if keyword in tokens:
await message.add_reaction(random.choice(options))
break

24
main.py
View File

@@ -1,14 +1,20 @@
import time
import logging
import constants
import events
from state import client, start_time
from state import client
if __name__ == "__main__":
logging.basicConfig(
format=(
"%(asctime)s %(levelname)s %(name)s:%(module)s %(message)s"
if __debug__
else "%(asctime)s %(levelname)s %(message)s"
),
datefmt="%Y-%m-%d %T",
level=logging.DEBUG if __debug__ else logging.INFO,
)
logging.getLogger("disnake").setLevel(logging.WARNING)
@client.event
async def on_ready():
await events.trigger_dynamic_handlers("on_ready")
print(f"logged in as {client.user} in {round(time.time() - start_time, 1)}s")
client.run(constants.SECRETS["TOKEN"])
events.prepare()
client.run(constants.SECRETS["TOKEN"])

View File

@@ -1,5 +1,8 @@
aiohttp
audioop-lts
disnake
disnake_paginator
psutil
PyNaCl
yt-dlp
youtube_transcript_api
yt-dlp[default] @ https://github.com/yt-dlp/yt-dlp/archive/master.tar.gz

37
sponsorblock.py Normal file
View File

@@ -0,0 +1,37 @@
import hashlib
import json
import aiohttp
from state import sponsorblock_cache
categories = json.dumps(
[
"interaction",
"intro",
"music_offtopic",
"outro",
"preview",
"selfpromo",
"sponsor",
],
)
async def get_segments(video_id: str):
if video_id in sponsorblock_cache:
return sponsorblock_cache[video_id]
hash_prefix = hashlib.sha256(video_id.encode()).hexdigest()[:4]
session = aiohttp.ClientSession()
response = await session.get(
f"https://sponsor.ajay.app/api/skipSegments/{hash_prefix}",
params={"categories": categories},
)
if response.status == 200 and (
results := list(
filter(lambda v: video_id == v["videoID"], await response.json()),
)
):
sponsorblock_cache[video_id] = results[0]
return results[0]

View File

@@ -2,11 +2,19 @@ import time
import disnake
players = {}
command_locks = {}
from utils import LimitedSizeDict
intents = disnake.Intents.default()
intents.message_content = True
intents.members = True
client = disnake.Client(intents=intents)
command_cooldowns = LimitedSizeDict()
command_locks = LimitedSizeDict()
idle_tracker = {"is_idle": False, "last_used": time.time()}
kill = {"transcript": False}
message_responses = LimitedSizeDict()
players = {}
sponsorblock_cache = LimitedSizeDict()
start_time = time.time()
trusted_users = []

32
tasks.py Normal file
View File

@@ -0,0 +1,32 @@
import asyncio
import time
from logging import debug, error
import disnake
from state import client, idle_tracker, players
async def cleanup():
debug("spawned cleanup thread")
while True:
await asyncio.sleep(3600)
targets = []
for guild_id, player in players.items():
if len(player.queue) == 0:
targets.append(guild_id)
for target in targets:
del players[target]
debug(f"cleanup thread removed {len(targets)} empty players")
if (
not idle_tracker["is_idle"]
and time.time() - idle_tracker["last_used"] >= 3600
):
try:
await client.change_presence(status=disnake.Status.idle)
idle_tracker["is_idle"] = True
except Exception as e:
error(f"failed to change status to idle: {e}")

3
tests/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from . import test_filter_secrets, test_format_duration
__all__ = ["test_filter_secrets", "test_format_duration"]

View File

@@ -0,0 +1,21 @@
import unittest
import utils
class TestFilterSecrets(unittest.TestCase):
def test_filter_secrets(self):
secret = "PLACEHOLDER_TOKEN"
self.assertFalse(
secret in utils.filter_secrets(f"HELLO{secret}WORLD", {"TOKEN": secret}),
)
self.assertFalse(secret in utils.filter_secrets(secret, {"TOKEN": secret}))
self.assertFalse(
secret in utils.filter_secrets(f"123{secret}", {"TOKEN": secret}),
)
self.assertFalse(
secret in utils.filter_secrets(f"{secret}{secret}", {"TOKEN": secret}),
)
self.assertFalse(
secret in utils.filter_secrets(f"{secret}@#(*&*$)", {"TOKEN": secret}),
)

View File

@@ -0,0 +1,107 @@
import unittest
import audio
import utils
class TestFormatDuration(unittest.TestCase):
def test_audio(self):
def f(s):
return audio.utils.format_duration(s)
self.assertEqual(f(0), "00:00")
self.assertEqual(f(0.5), "00:00")
self.assertEqual(f(60.5), "01:00")
self.assertEqual(f(1), "00:01")
self.assertEqual(f(60), "01:00")
self.assertEqual(f(60 + 30), "01:30")
self.assertEqual(f(60 * 60), "01:00:00")
self.assertEqual(f(60 * 60 + 30), "01:00:30")
def test_utils(self):
def f(s):
return utils.format_duration(s)
self.assertEqual(f(0), "")
self.assertEqual(f(60 * 60 * 24 * 7), "1 week")
self.assertEqual(f(60 * 60 * 24 * 21), "3 weeks")
self.assertEqual(
f((60 * 60 * 24 * 21) - 1),
"2 weeks, 6 days, 23 hours, 59 minutes, 59 seconds",
)
self.assertEqual(f(60), "1 minute")
self.assertEqual(f(60 * 2), "2 minutes")
self.assertEqual(f(60 * 59), "59 minutes")
self.assertEqual(f(60 * 60), "1 hour")
self.assertEqual(f(60 * 60 * 2), "2 hours")
self.assertEqual(f(1), "1 second")
self.assertEqual(f(60 + 5), "1 minute, 5 seconds")
self.assertEqual(f(60 * 60 + 30), "1 hour, 30 seconds")
self.assertEqual(f(60 * 60 + 60 + 30), "1 hour, 1 minute, 30 seconds")
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1 week, 30 seconds")
def test_utils_natural(self):
def f(s):
return utils.format_duration(s, natural=True)
self.assertEqual(f(0), "")
self.assertEqual(f(60 * 60 * 24 * 7), "1 week")
self.assertEqual(f(60 * 60 * 24 * 21), "3 weeks")
self.assertEqual(
f((60 * 60 * 24 * 21) - 1),
"2 weeks, 6 days, 23 hours, 59 minutes and 59 seconds",
)
self.assertEqual(f(60), "1 minute")
self.assertEqual(f(60 * 2), "2 minutes")
self.assertEqual(f(60 * 59), "59 minutes")
self.assertEqual(f(60 * 60), "1 hour")
self.assertEqual(f(60 * 60 * 2), "2 hours")
self.assertEqual(f(1), "1 second")
self.assertEqual(f(60 + 5), "1 minute and 5 seconds")
self.assertEqual(f(60 * 60 + 30), "1 hour and 30 seconds")
self.assertEqual(f(60 * 60 + 60 + 30), "1 hour, 1 minute and 30 seconds")
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1 week and 30 seconds")
def test_utils_short(self):
def f(s):
return utils.format_duration(s, short=True)
self.assertEqual(f(0), "")
self.assertEqual(f(60 * 60 * 24 * 7), "1w")
self.assertEqual(f(60 * 60 * 24 * 21), "3w")
self.assertEqual(
f((60 * 60 * 24 * 21) - 1),
"2w 6d 23h 59m 59s",
)
self.assertEqual(f(60), "1m")
self.assertEqual(f(60 * 2), "2m")
self.assertEqual(f(60 * 59), "59m")
self.assertEqual(f(60 * 60), "1h")
self.assertEqual(f(60 * 60 * 2), "2h")
self.assertEqual(f(1), "1s")
self.assertEqual(f(60 + 5), "1m 5s")
self.assertEqual(f(60 * 60 + 30), "1h 30s")
self.assertEqual(f(60 * 60 + 60 + 30), "1h 1m 30s")
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1w 30s")
def test_utils_natural_short(self):
def f(s):
return utils.format_duration(s, natural=True, short=True)
self.assertEqual(f(0), "")
self.assertEqual(f(60 * 60 * 24 * 7), "1w")
self.assertEqual(f(60 * 60 * 24 * 21), "3w")
self.assertEqual(
f((60 * 60 * 24 * 21) - 1),
"2w 6d 23h 59m and 59s",
)
self.assertEqual(f(60), "1m")
self.assertEqual(f(60 * 2), "2m")
self.assertEqual(f(60 * 59), "59m")
self.assertEqual(f(60 * 60), "1h")
self.assertEqual(f(60 * 60 * 2), "2h")
self.assertEqual(f(1), "1s")
self.assertEqual(f(60 + 5), "1m and 5s")
self.assertEqual(f(60 * 60 + 30), "1h and 30s")
self.assertEqual(f(60 * 60 + 60 + 30), "1h 1m and 30s")
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1w and 30s")

View File

@@ -1,25 +0,0 @@
import disnake
import constants
async def add_check_reaction(message):
await message.add_reaction("")
async def reply(message, *args):
await message.reply(*args, allowed_mentions=disnake.AllowedMentions.none())
async def invalid_user_handler(interaction):
await interaction.response.send_message(
"You are not the intended receiver of this message!", ephemeral=True
)
def filter_secrets(text: str) -> str:
for secret_name, secret in constants.SECRETS.items():
if not secret:
continue
text = text.replace(secret, f"<{secret_name}>")
return text

28
utils/__init__.py Normal file
View File

@@ -0,0 +1,28 @@
from .common import LimitedSizeDict, filter_secrets, format_duration, surround
from .discord import (
ChannelResponseWrapper,
MessageInteractionWrapper,
add_check_reaction,
channel_send,
cooldown,
invalid_user_handler,
load_opus,
reply,
snowflake_timestamp,
)
__all__ = [
"add_check_reaction",
"channel_send",
"ChannelResponseWrapper",
"cooldown",
"filter_secrets",
"format_duration",
"invalid_user_handler",
"LimitedSizeDict",
"load_opus",
"MessageInteractionWrapper",
"reply",
"snowflake_timestamp",
"surround",
]

64
utils/common.py Normal file
View File

@@ -0,0 +1,64 @@
from collections import OrderedDict
from constants import SECRETS
def surround(inner: str, outer="```") -> str:
return outer + str(inner) + outer
def format_duration(duration: int, natural: bool = False, short: bool = False) -> str:
def format_plural(noun, count):
if short:
return noun[0]
return " " + (noun if count == 1 else noun + "s")
segments = []
weeks, duration = divmod(duration, 604800)
if weeks > 0:
segments.append(f"{weeks}{format_plural('week', weeks)}")
days, duration = divmod(duration, 86400)
if days > 0:
segments.append(f"{days}{format_plural('day', days)}")
hours, duration = divmod(duration, 3600)
if hours > 0:
segments.append(f"{hours}{format_plural('hour', hours)}")
minutes, duration = divmod(duration, 60)
if minutes > 0:
segments.append(f"{minutes}{format_plural('minute', minutes)}")
if duration > 0:
segments.append(f"{duration}{format_plural('second', duration)}")
separator = " " if short else ", "
if not natural or len(segments) <= 1:
return separator.join(segments)
return separator.join(segments[:-1]) + f" and {segments[-1]}"
def filter_secrets(text: str, secrets=SECRETS) -> str:
for secret_name, secret in secrets.items():
if not secret:
continue
text = text.replace(secret, f"<{secret_name}>")
return text
class LimitedSizeDict(OrderedDict):
def __init__(self, *args, **kwargs):
self.size_limit = kwargs.pop("size_limit", 100)
super().__init__(*args, **kwargs)
self._check_size_limit()
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is not None:
while len(self) > self.size_limit:
self.popitem(last=False)

118
utils/discord.py Normal file
View File

@@ -0,0 +1,118 @@
import time
from logging import error, info
from pathlib import Path
import disnake
import commands
from constants import OWNERS
from state import command_cooldowns, message_responses
def cooldown(message, cooldown_time: int):
if message.author.id in OWNERS:
return
possible_commands = commands.match(message.content)
if not possible_commands or len(possible_commands) > 1:
return
command = possible_commands[0]
end_time = time.time() + cooldown_time
if message.author.id in command_cooldowns:
command_cooldowns[message.author.id][command] = end_time
else:
command_cooldowns[message.author.id] = {command: end_time}
async def reply(message, *args, **kwargs):
if message.id in message_responses:
if len(args) == 0:
kwargs["content"] = None
elif len(kwargs) == 0:
kwargs["embeds"] = []
try:
await message_responses[message.id].edit(
*args,
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
)
return
except Exception:
pass
try:
response = await message.reply(
*args,
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
)
except Exception:
response = await channel_send(message, *args, **kwargs)
message_responses[message.id] = response
return message_responses[message.id]
async def channel_send(message, *args, **kwargs):
await message.channel.send(
*args,
**kwargs,
allowed_mentions=disnake.AllowedMentions.none(),
)
def load_opus():
for path in filter(
lambda p: Path(p).exists(),
["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"],
):
try:
disnake.opus.load_opus(path)
info(f"successfully loaded opus from {path}")
return
except Exception as e:
error(f"failed to load opus from {path}: {e}")
raise Exception("could not locate working opus library")
def snowflake_timestamp(snowflake) -> int:
return round(((snowflake >> 22) + 1420070400000) / 1000)
async def add_check_reaction(message):
await message.add_reaction("")
async def invalid_user_handler(interaction):
await interaction.response.send_message(
"you are not the intended receiver of this message!",
ephemeral=True,
)
class ChannelResponseWrapper:
def __init__(self, message):
self.message = message
self.sent_message = None
async def send_message(self, **kwargs):
kwargs.pop("ephemeral", None)
self.sent_message = await reply(self.message, **kwargs)
async def edit_message(self, content=None, embed=None, view=None):
if self.sent_message:
content = content or self.sent_message.content
if not embed and len(self.sent_message.embeds) > 0:
embed = self.sent_message.embeds[0]
await self.sent_message.edit(content=content, embed=embed, view=view)
class MessageInteractionWrapper:
def __init__(self, message):
self.message = message
self.author = message.author
self.response = ChannelResponseWrapper(message)
async def edit_original_message(self, content=None, embed=None, view=None):
await self.response.edit_message(content=content, embed=embed, view=view)

View File

@@ -1,104 +0,0 @@
import asyncio
from typing import Any, Optional
import disnake
import yt_dlp
import constants
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)
class YTDLSource(disnake.PCMVolumeTransformer):
def __init__(
self, source: disnake.AudioSource, *, data: dict[str, Any], volume: float = 0.5
):
super().__init__(source, volume)
self.title = data.get("title")
self.original_url = data.get("original_url")
self.duration = data.get("duration")
@classmethod
async def from_url(
cls,
url,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
stream: bool = False,
):
loop = loop or asyncio.get_event_loop()
data: Any = await loop.run_in_executor(
None, lambda: ytdl.extract_info(url, download=not stream)
)
if "entries" in data:
data = data["entries"][0]
return cls(
disnake.FFmpegPCMAudio(
data["url"] if stream else ytdl.prepare_filename(data),
before_options="-vn -reconnect 1",
),
data=data,
)
def __repr__(self):
return f"<YTDLSource title={self.title} original_url=<{self.original_url}> duration={self.duration}>"
def __str__(self):
return self.__repr__()
class QueuedPlayer:
def __init__(self):
self.queue = []
self.current = None
def queue_pop(self):
popped = self.queue[0]
del self.queue[0]
self.current = popped
return popped
def queue_add(self, item):
self.queue.append(item)
def queue_add_front(self, item):
self.queue.insert(0, item)
def __repr__(self):
return f"<QueuedPlayer current={self.current} queue={self.queue}>"
def __str__(self):
return self.__repr__()
class QueuedSong:
def __init__(self, player, queuer):
self.player = player
self.queuer = queuer
def format(self, with_queuer=False, hide_preview=False) -> str:
return (
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''}) [{self.format_duration(self.player.duration) if self.player.duration else 'live'}]"
+ (f" (<@{self.queuer}>)" if with_queuer else "")
)
def format_duration(self, duration: int) -> str:
hours, duration = divmod(duration, 3600)
minutes, duration = divmod(duration, 60)
segments = [hours, minutes, duration]
if len(segments) == 3 and segments[0] == 0:
del segments[0]
return f"{':'.join(f'{s:0>2}' for s in segments)}"
def __repr__(self):
return f"<QueuedSong player={self.player} queuer={self.queuer}>"
def __str__(self):
return self.__repr__()
def __reload_module__():
global ytdl
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)