Compare commits

..

No commits in common. "27a460fa6e4cbefe4cdc63a445c7d0bf6f8013bf" and "655b552c10fc93de97ee704416711fb7393471a0" have entirely different histories.

17 changed files with 348 additions and 601 deletions

View File

@ -1,64 +1,18 @@
import os
import threading
import time
import disnake
import psutil
import arguments
import commands
import utils
from constants import EMBED_COLOR
from state import client, start_time
from state import start_time
async def status(message):
member_count = 0
channel_count = 0
for guild in client.guilds:
member_count += len(guild.members)
channel_count += len(guild.channels)
process = psutil.Process(os.getpid())
memory_usage = process.memory_info().rss / 1048576
embed = disnake.Embed(color=EMBED_COLOR)
embed.add_field(
name="Latency",
value=f"```{round(client.latency * 1000, 1)} ms```",
async def help(message):
await utils.reply(
message,
", ".join(
[f"`{command.value}`" for command in commands.Command.__members__.values()]
),
)
embed.add_field(
name="RSS",
value=f"```{round(memory_usage, 1)} MiB```",
)
embed.add_field(
name="Threads",
value=f"```{threading.active_count()}```",
)
embed.add_field(
name="Guilds",
value=f"```{len(client.guilds)}```",
)
embed.add_field(
name="Members",
value=f"```{member_count}```",
)
embed.add_field(
name="Channels",
value=f"```{channel_count}```",
)
embed.add_field(
name="Commands",
value=f"```{len(commands.Command.__members__)}```",
)
embed.add_field(
name="Disnake",
value=f"```{disnake.__version__}```",
)
embed.add_field(
name="Uptime",
value=f"```{utils.format_duration(int(time.time() - start_time), short=True)}```",
)
await utils.reply(message, embed=embed)
async def uptime(message):
@ -82,12 +36,3 @@ async def uptime(message):
await utils.reply(
message, f"up {utils.format_duration(int(time.time() - start_time))}"
)
async def help(message):
await utils.reply(
message,
", ".join(
[f"`{command.value}`" for command in commands.Command.__members__.values()]
),
)

View File

@ -1,10 +1,9 @@
from enum import Enum
from functools import lru_cache
import enum
import constants
class Command(Enum):
class Command(enum.Enum):
CLEAR = "clear"
CURRENT = "current"
EXECUTE = "execute"
@ -20,12 +19,10 @@ class Command(Enum):
RELOAD = "reload"
RESUME = "resume"
SKIP = "skip"
STATUS = "status"
UPTIME = "uptime"
VOLUME = "volume"
@lru_cache
def match_token(token: str) -> list[Command]:
if token.lower() == "r":
return [Command.RELOAD]
@ -46,13 +43,11 @@ def match_token(token: str) -> list[Command]:
)
@lru_cache
def match(command: str) -> list[Command] | None:
if tokens := tokenize(command):
return match_token(tokens[0])
@lru_cache
def tokenize(string: str) -> list[str]:
tokens = []
token = ""

View File

@ -5,14 +5,11 @@ import disnake_paginator
import arguments
import commands
import constants
import utils
import youtubedl
from constants import EMBED_COLOR
from state import client, players
from .playback import resume
from .utils import command_allowed, ensure_joined, play_next
async def queue_or_play(message, edited=False):
if message.guild.id not in players:
@ -106,7 +103,7 @@ async def queue_or_play(message, edited=False):
players[message.guild.id].queue.remove(target)
if len(targets) == 1:
await utils.reply(message, f"**removed** {targets[0].format()}")
await utils.reply(message, f"**X** {targets[0].format()}")
else:
await utils.reply(
message,
@ -180,8 +177,6 @@ async def queue_or_play(message, edited=False):
message,
f"**{len(players[message.guild.id].queue)}.** {queued.format()}",
)
utils.cooldown(message, 3)
elif tokens[0].lower() == "play":
await resume(message)
else:
@ -199,7 +194,7 @@ async def queue_or_play(message, edited=False):
def embed(description):
e = disnake.Embed(
description=description,
color=EMBED_COLOR,
color=constants.EMBED_COLOR,
)
if formatted_duration and len(players[message.guild.id].queue) > 1:
e.set_footer(text=f"{formatted_duration} in total")
@ -207,7 +202,7 @@ async def queue_or_play(message, edited=False):
await disnake_paginator.ButtonPaginator(
invalid_user_function=utils.invalid_user_handler,
color=EMBED_COLOR,
color=constants.EMBED_COLOR,
segments=list(
map(
embed,
@ -232,18 +227,98 @@ async def queue_or_play(message, edited=False):
)
async def skip(message):
async def playing(message):
if not command_allowed(message, immutable=True):
return
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "skip the currently playing song")
parser = arguments.ArgumentParser(
tokens[0], "get information about the currently playing song"
)
parser.add_argument(
"-n",
"--next",
"-d",
"--description",
action="store_true",
help="skip the next song",
help="get the description",
)
if not (args := await parser.parse_args(message, tokens)):
return
if source := message.guild.voice_client.source:
if args.description:
if description := source.description:
paginator = disnake_paginator.ButtonPaginator(
invalid_user_function=utils.invalid_user_handler,
color=constants.EMBED_COLOR,
title=source.title,
segments=disnake_paginator.split(description),
)
for embed in paginator.embeds:
embed.url = source.original_url
await paginator.start(utils.MessageInteractionWrapper(message))
else:
await utils.reply(
message,
source.description or "no description found!",
)
return
bar_length = 35
progress = source.original.progress / source.duration
embed = disnake.Embed(
color=constants.EMBED_COLOR,
title=source.title,
url=source.original_url,
description=f"{'⏸️ ' if message.guild.voice_client.is_paused() else ''}"
f"`[{'#'*int(progress * bar_length)}{'-'*int((1 - progress) * bar_length)}]` "
f"**{youtubedl.format_duration(int(source.original.progress))}** / **{youtubedl.format_duration(source.duration)}** (**{round(progress * 100)}%**)",
)
embed.add_field(name="Volume", value=f"{int(source.volume*100)}%")
embed.add_field(name="Views", value=f"{source.view_count:,}")
embed.add_field(
name="Queuer",
value=players[message.guild.id].current.trigger_message.author.mention,
)
embed.set_image(source.thumbnail_url)
await utils.reply(
message,
embed=embed,
)
else:
await utils.reply(
message,
"nothing is playing!",
)
async def fast_forward(message):
if not command_allowed(message):
return
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "fast forward audio playback")
parser.add_argument(
"seconds",
type=lambda v: arguments.range_type(v, min=0, max=300),
help="the amount of seconds to fast forward",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
message.guild.voice_client.pause()
message.guild.voice_client.source.original.fast_forward(args.seconds)
message.guild.voice_client.resume()
await utils.add_check_reaction(message)
async def skip(message):
if not command_allowed(message):
return
@ -253,11 +328,159 @@ async def skip(message):
message,
"the queue is empty now!",
)
elif args.next:
next = players[message.guild.id].queue.pop()
await utils.reply(message, f"**skipped** {next.format()}")
else:
message.guild.voice_client.stop()
await utils.add_check_reaction(message)
if not message.guild.voice_client.source:
play_next(message)
async def join(message):
if message.guild.voice_client:
return await message.guild.voice_client.move_to(message.channel)
await message.channel.connect()
await utils.add_check_reaction(message)
async def leave(message):
if not command_allowed(message):
return
await message.guild.voice_client.disconnect()
await utils.add_check_reaction(message)
async def resume(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_paused():
message.guild.voice_client.resume()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is paused!",
)
async def pause(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_playing():
message.guild.voice_client.pause()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is playing!",
)
async def volume(message):
if not command_allowed(message, immutable=True):
return
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "get or set the current volume level")
parser.add_argument(
"volume",
nargs="?",
type=lambda v: arguments.range_type(v, min=0, max=150),
help="the volume level (0 - 150)",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
if args.volume is None:
await utils.reply(
message,
f"{int(message.guild.voice_client.source.volume * 100)}",
)
else:
if not command_allowed(message):
return
message.guild.voice_client.source.volume = float(args.volume) / 100.0
await utils.add_check_reaction(message)
def delete_queued(messages):
if messages[0].guild.id not in players:
return
if len(players[messages[0].guild.id].queue) == 0:
return
found = []
for message in messages:
for queued in players[message.guild.id].queue:
if queued.trigger_message.id == message.id:
found.append(queued)
for queued in found:
players[messages[0].guild.id].queue.remove(queued)
def play_after_callback(e, message, once):
if e:
print(f"player error: {e}")
if not once:
play_next(message)
def play_next(message, once=False, first=False):
message.guild.voice_client.stop()
if message.guild.id in players and players[message.guild.id].queue:
queued = players[message.guild.id].queue_pop()
try:
message.guild.voice_client.play(
queued.player, after=lambda e: play_after_callback(e, message, once)
)
except disnake.opus.OpusNotLoaded:
utils.load_opus()
message.guild.voice_client.play(
queued.player, after=lambda e: play_after_callback(e, message, once)
)
embed = disnake.Embed(
color=constants.EMBED_COLOR,
title=queued.player.title,
url=queued.player.original_url,
)
embed.add_field(name="Volume", value=f"{int(queued.player.volume*100)}%")
embed.add_field(name="Views", value=f"{queued.player.view_count:,}")
embed.add_field(
name="Queuer",
value=players[message.guild.id].current.trigger_message.author.mention,
)
embed.set_image(queued.player.thumbnail_url)
if first:
client.loop.create_task(utils.reply(message, embed=embed))
else:
client.loop.create_task(utils.channel_send(message, embed=embed))
async def ensure_joined(message):
if message.guild.voice_client is None:
if message.author.voice:
await message.author.voice.channel.connect()
else:
await utils.reply(message, "you are not connected to a voice channel!")
def command_allowed(message, immutable=False):
if not message.guild.voice_client:
return
if immutable:
return message.channel.id == message.guild.voice_client.channel.id
else:
if not message.author.voice:
return False
return message.author.voice.channel.id == message.guild.voice_client.channel.id

View File

@ -1,18 +0,0 @@
from .channel import join, leave
from .playback import fast_forward, pause, playing, resume, volume
from .queue import queue_or_play, skip
from .utils import remove_queued
__all__ = [
"join",
"leave",
"fast_forward",
"playing",
"queue_or_play",
"skip",
"resume",
"pause",
"skip",
"remove_queued",
"volume",
]

View File

@ -1,19 +0,0 @@
import utils
from .utils import command_allowed
async def join(message):
if message.guild.voice_client:
return await message.guild.voice_client.move_to(message.channel)
await message.channel.connect()
await utils.add_check_reaction(message)
async def leave(message):
if not command_allowed(message):
return
await message.guild.voice_client.disconnect()
await utils.add_check_reaction(message)

View File

@ -1,143 +0,0 @@
import disnake_paginator
import arguments
import commands
import utils
from constants import EMBED_COLOR
from state import players
from .utils import command_allowed
async def playing(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0], "get information about the currently playing song"
)
parser.add_argument(
"-d",
"--description",
action="store_true",
help="get the description",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message, immutable=True):
return
if source := message.guild.voice_client.source:
if args.description:
if description := source.description:
paginator = disnake_paginator.ButtonPaginator(
invalid_user_function=utils.invalid_user_handler,
color=EMBED_COLOR,
title=source.title,
segments=disnake_paginator.split(description),
)
for embed in paginator.embeds:
embed.url = source.original_url
await paginator.start(utils.MessageInteractionWrapper(message))
else:
await utils.reply(
message,
source.description or "no description found!",
)
return
await utils.reply(
message,
embed=players[message.guild.id].current.embed(
is_paused=message.guild.voice_client.is_paused()
),
)
else:
await utils.reply(
message,
"nothing is playing!",
)
async def resume(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_paused():
message.guild.voice_client.resume()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is paused!",
)
async def pause(message):
if not command_allowed(message):
return
if message.guild.voice_client.is_playing():
message.guild.voice_client.pause()
await utils.add_check_reaction(message)
else:
await utils.reply(
message,
"nothing is playing!",
)
async def fast_forward(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "fast forward audio playback")
parser.add_argument(
"seconds",
type=lambda v: arguments.range_type(v, min=0, max=300),
help="the amount of seconds to fast forward",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
message.guild.voice_client.pause()
message.guild.voice_client.source.original.fast_forward(args.seconds)
message.guild.voice_client.resume()
await utils.add_check_reaction(message)
async def volume(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(tokens[0], "get or set the current volume level")
parser.add_argument(
"volume",
nargs="?",
type=lambda v: arguments.range_type(v, min=0, max=150),
help="the volume level (0 - 150)",
)
if not (args := await parser.parse_args(message, tokens)):
return
if not command_allowed(message, immutable=True):
return
if not message.guild.voice_client.source:
await utils.reply(message, "nothing is playing!")
return
if args.volume is None:
await utils.reply(
message,
f"{int(message.guild.voice_client.source.volume * 100)}",
)
else:
if not command_allowed(message):
return
message.guild.voice_client.source.volume = float(args.volume) / 100.0
await utils.add_check_reaction(message)

View File

@ -1,72 +0,0 @@
from logging import error
import disnake
import utils
from state import client, players
def play_after_callback(e, message, once):
if e:
error(f"player error: {e}")
if not once:
play_next(message)
def play_next(message, once=False, first=False):
if not message.guild.voice_client:
return
message.guild.voice_client.stop()
if message.guild.id in players and players[message.guild.id].queue:
queued = players[message.guild.id].queue_pop()
try:
message.guild.voice_client.play(
queued.player, after=lambda e: play_after_callback(e, message, once)
)
except disnake.opus.OpusNotLoaded:
utils.load_opus()
message.guild.voice_client.play(
queued.player, after=lambda e: play_after_callback(e, message, once)
)
embed = queued.embed()
if first and len(players[message.guild.id].queue) == 0:
client.loop.create_task(utils.reply(message, embed=embed))
else:
client.loop.create_task(utils.channel_send(message, embed=embed))
def remove_queued(messages):
if messages[0].guild.id not in players:
return
if len(players[messages[0].guild.id].queue) == 0:
return
found = []
for message in messages:
for queued in players[message.guild.id].queue:
if queued.trigger_message.id == message.id:
found.append(queued)
for queued in found:
players[messages[0].guild.id].queue.remove(queued)
async def ensure_joined(message):
if message.guild.voice_client is None:
if message.author.voice:
await message.author.voice.channel.connect()
else:
await utils.reply(message, "you are not connected to a voice channel!")
def command_allowed(message, immutable=False):
if not message.guild.voice_client:
return
if immutable:
return message.channel.id == message.guild.voice_client.channel.id
else:
if not message.author.voice:
return False
return message.author.voice.channel.id == message.guild.voice_client.channel.id

View File

@ -15,7 +15,6 @@ YTDL_OPTIONS = {
"source_address": "0.0.0.0",
}
BAR_LENGTH = 35
EMBED_COLOR = 0xFF6600
OWNERS = [531392146767347712]
PREFIX = "%"
@ -26,11 +25,6 @@ RELOADABLE_MODULES = [
"commands.tools",
"commands.utils",
"commands.voice",
"commands.voice.channel",
"commands.voice.playback",
"commands.voice.playing",
"commands.voice.queue",
"commands.voice.utils",
"constants",
"core",
"events",

67
core.py
View File

@ -6,20 +6,18 @@ import io
import textwrap
import time
import traceback
from logging import debug
import disnake
import disnake_paginator
import commands
import constants
import utils
from commands import Command as C
from constants import EMBED_COLOR, OWNERS, PREFIX, RELOADABLE_MODULES
from state import client, command_cooldowns, command_locks, idle_tracker
from state import client, command_locks, idle_tracker
async def on_message(message, edited=False):
if not message.content.startswith(PREFIX) or message.author.bot:
if not message.content.startswith(constants.PREFIX) or message.author.bot:
return
tokens = commands.tokenize(message.content)
@ -40,44 +38,26 @@ async def on_message(message, edited=False):
f"ambiguous command, could be {' or '.join([f'`{match.value}`' for match in matched])}",
)
return
matched = matched[0]
if (message.guild.id, message.author.id) not in command_locks:
command_locks[(message.guild.id, message.author.id)] = asyncio.Lock()
await command_locks[(message.guild.id, message.author.id)].acquire()
if message.guild.id not in command_locks:
command_locks[message.guild.id] = asyncio.Lock()
C = commands.Command
try:
if cooldowns := command_cooldowns.get(message.author.id):
if (end_time := cooldowns.get(matched)) and int(time.time()) < int(
end_time
):
await utils.reply(
message,
f"please wait **{utils.format_duration(int(end_time - time.time()), natural=True)}** before using this command again!",
)
return
match matched:
case C.RELOAD if message.author.id in OWNERS:
match matched[0]:
case C.RELOAD if message.author.id in constants.OWNERS:
reloaded_modules = set()
start = time.time()
rreload(reloaded_modules, __import__("core"))
rreload(reloaded_modules, __import__("extra"))
for module in filter(
lambda v: inspect.ismodule(v) and v.__name__ in RELOADABLE_MODULES,
lambda v: inspect.ismodule(v)
and v.__name__ in constants.RELOADABLE_MODULES,
globals().values(),
):
rreload(reloaded_modules, module)
end = time.time()
if __debug__:
debug(
f"reloaded {len(reloaded_modules)} modules in {round(end-start, 2)}s"
)
await utils.add_check_reaction(message)
case C.EXECUTE if message.author.id in OWNERS:
case C.EXECUTE if message.author.id in constants.OWNERS:
code = message.content[len(tokens[0]) + 1 :].strip().strip("`")
for replacement in ["python", "py"]:
if code.startswith(replacement):
@ -112,22 +92,24 @@ async def on_message(message, edited=False):
prefix="```\n",
suffix="```",
invalid_user_function=utils.invalid_user_handler,
color=EMBED_COLOR,
color=constants.EMBED_COLOR,
segments=disnake_paginator.split(output),
).start(utils.MessageInteractionWrapper(message))
elif len(output.strip()) == 0:
await utils.add_check_reaction(message)
else:
await utils.reply(message, output)
case C.CLEAR | C.PURGE if message.author.id in OWNERS:
case C.CLEAR | C.PURGE if message.author.id in constants.OWNERS:
await commands.tools.clear(message)
case C.JOIN:
await commands.voice.join(message)
case C.LEAVE:
await commands.voice.leave(message)
case C.QUEUE | C.PLAY:
async with command_locks[message.guild.id]:
await commands.voice.queue_or_play(message, edited)
case C.SKIP:
async with command_locks[message.guild.id]:
await commands.voice.skip(message)
case C.RESUME:
await commands.voice.resume(message)
@ -143,29 +125,24 @@ async def on_message(message, edited=False):
await commands.voice.playing(message)
case C.FAST_FORWARD:
await commands.voice.fast_forward(message)
case C.STATUS:
await commands.bot.status(message)
except Exception as e:
await utils.reply(
message,
f"exception occurred while processing command: ```\n{"".join(traceback.format_exception(e)).replace("`", "\\`")}```",
f"exception occurred while processing command: ```\n{''.join(traceback.format_exception(e)).replace('`', '\\`')}```",
)
raise e
finally:
command_locks[(message.guild.id, message.author.id)].release()
async def on_voice_state_update(_, before, after):
def is_empty(channel):
return [m.id for m in (channel.members if channel else [])] == [client.user.id]
channel = None
c = None
if is_empty(before.channel):
channel = before.channel
c = before.channel
elif is_empty(after.channel):
channel = after.channel
if channel:
await channel.guild.voice_client.disconnect()
c = after.channel
if c:
await c.guild.voice_client.disconnect()
def rreload(reloaded_modules, module):
@ -173,7 +150,7 @@ def rreload(reloaded_modules, module):
for submodule in filter(
lambda v: inspect.ismodule(v)
and v.__name__ in RELOADABLE_MODULES
and v.__name__ in constants.RELOADABLE_MODULES
and v.__name__ not in reloaded_modules,
vars(module).values(),
):

View File

@ -1,7 +1,6 @@
import asyncio
import threading
import time
from logging import info
import commands
import core
@ -21,7 +20,7 @@ def prepare():
async def on_bulk_message_delete(messages):
commands.voice.remove_queued(messages)
commands.voice.delete_queued(messages)
async def on_message(message):
@ -29,7 +28,7 @@ async def on_message(message):
async def on_message_delete(message):
commands.voice.remove_queued([message])
commands.voice.delete_queued([message])
async def on_message_edit(before, after):
@ -40,7 +39,7 @@ async def on_message_edit(before, after):
async def on_ready():
info(f"logged in as {client.user} in {round(time.time() - start_time, 1)}s")
print(f"logged in as {client.user} in {round(time.time() - start_time, 1)}s")
async def on_voice_state_update(member, before, after):

View File

@ -4,7 +4,7 @@ import string
import disnake
import youtube_transcript_api
from state import client, kill, players
from state import client, players
async def transcript(
@ -39,17 +39,13 @@ async def transcript(
)
if len(messages) > max_messages:
try:
count = max_messages - min_messages
if count == 1:
await messages.pop().delete()
else:
await message.channel.delete_messages(
[messages.pop() for _ in range(count)]
[messages.pop() for _ in range(max_messages - min_messages)]
)
except Exception:
pass
if (message.guild.voice_client.source.id != initial_id) or kill["transcript"]:
if message.guild.voice_client.source.id != initial_id:
break

11
main.py
View File

@ -1,18 +1,7 @@
import logging
import constants
import events
from state import client
if __name__ == "__main__":
logging.basicConfig(
format=(
"%(asctime)s %(levelname)s %(name):%(module)s %(message)s"
if __debug__
else "%(asctime)s %(levelname)s %(message)s"
),
datefmt="%Y-%m-%d %T",
level=logging.DEBUG if __debug__ else logging.INFO,
)
events.prepare()
client.run(constants.SECRETS["TOKEN"])

View File

@ -1,13 +1,13 @@
import collections
import time
from collections import OrderedDict
import disnake
class LimitedSizeDict(OrderedDict):
def __init__(self, *args, **kwargs):
self.size_limit = kwargs.pop("size_limit", 1000)
super().__init__(*args, **kwargs)
class LimitedSizeDict(collections.OrderedDict):
def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", 1000)
super().__init__(*args, **kwds)
self._check_size_limit()
def __setitem__(self, key, value):
@ -25,10 +25,8 @@ intents.message_content = True
intents.members = True
client = disnake.Client(intents=intents)
command_cooldowns = LimitedSizeDict()
command_locks = LimitedSizeDict()
idle_tracker = {"is_idle": False, "last_used": time.time()}
kill = {"transcript": False}
message_responses = LimitedSizeDict()
players = {}
start_time = time.time()

View File

@ -1,6 +1,5 @@
import asyncio
import time
from logging import debug, error
import disnake
@ -8,10 +7,8 @@ from state import client, idle_tracker, players
async def cleanup():
debug("spawned cleanup thread")
while True:
await asyncio.sleep(3600 * 12)
await asyncio.sleep(3600)
targets = []
for guild_id, player in players.items():
@ -19,15 +16,10 @@ async def cleanup():
targets.append(guild_id)
for target in targets:
del players[target]
if __debug__:
debug(f"cleanup removed {len(targets)} empty players")
if (
not idle_tracker["is_idle"]
and time.time() - idle_tracker["last_used"] >= 3600
):
try:
await client.change_presence(status=disnake.Status.idle)
idle_tracker["is_idle"] = True
except Exception as e:
error(f"failed to change status to idle: {e}")

View File

@ -6,102 +6,56 @@ import youtubedl
class TestFormatDuration(unittest.TestCase):
def test_youtubedl(self):
def f(s):
return youtubedl.format_duration(s)
self.assertEqual(f(0), "00:00")
self.assertEqual(f(0.5), "00:00")
self.assertEqual(f(60.5), "01:00")
self.assertEqual(f(1), "00:01")
self.assertEqual(f(60), "01:00")
self.assertEqual(f(60 + 30), "01:30")
self.assertEqual(f(60 * 60), "01:00:00")
self.assertEqual(f(60 * 60 + 30), "01:00:30")
self.assertEqual(youtubedl.format_duration(0), "00:00")
self.assertEqual(youtubedl.format_duration(0.5), "00:00")
self.assertEqual(youtubedl.format_duration(60.5), "01:00")
self.assertEqual(youtubedl.format_duration(1), "00:01")
self.assertEqual(youtubedl.format_duration(60), "01:00")
self.assertEqual(youtubedl.format_duration(60 + 30), "01:30")
self.assertEqual(youtubedl.format_duration(60 * 60), "01:00:00")
self.assertEqual(youtubedl.format_duration(60 * 60 + 30), "01:00:30")
def test_utils(self):
def f(s):
return utils.format_duration(s)
self.assertEqual(f(0), "")
self.assertEqual(f(60 * 60 * 24 * 7), "1 week")
self.assertEqual(f(60 * 60 * 24 * 21), "3 weeks")
self.assertEqual(utils.format_duration(0), "")
self.assertEqual(utils.format_duration(60 * 60 * 24 * 7), "1 week")
self.assertEqual(utils.format_duration(60 * 60 * 24 * 21), "3 weeks")
self.assertEqual(
f((60 * 60 * 24 * 21) - 1),
utils.format_duration((60 * 60 * 24 * 21) - 1),
"2 weeks, 6 days, 23 hours, 59 minutes, 59 seconds",
)
self.assertEqual(f(60), "1 minute")
self.assertEqual(f(60 * 2), "2 minutes")
self.assertEqual(f(60 * 59), "59 minutes")
self.assertEqual(f(60 * 60), "1 hour")
self.assertEqual(f(60 * 60 * 2), "2 hours")
self.assertEqual(f(1), "1 second")
self.assertEqual(f(60 + 5), "1 minute, 5 seconds")
self.assertEqual(f(60 * 60 + 30), "1 hour, 30 seconds")
self.assertEqual(f(60 * 60 + 60 + 30), "1 hour, 1 minute, 30 seconds")
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1 week, 30 seconds")
self.assertEqual(utils.format_duration(60), "1 minute")
self.assertEqual(utils.format_duration(60 * 2), "2 minutes")
self.assertEqual(utils.format_duration(60 * 59), "59 minutes")
self.assertEqual(utils.format_duration(60 * 60), "1 hour")
self.assertEqual(utils.format_duration(60 * 60 * 2), "2 hours")
self.assertEqual(utils.format_duration(1), "1 second")
self.assertEqual(utils.format_duration(60 + 5), "1 minute, 5 seconds")
self.assertEqual(utils.format_duration(60 * 60 + 30), "1 hour, 30 seconds")
self.assertEqual(
utils.format_duration(60 * 60 + 60 + 30), "1 hour, 1 minute, 30 seconds"
)
self.assertEqual(
utils.format_duration(60 * 60 * 24 * 7 + 30), "1 week, 30 seconds"
)
def test_utils_natural(self):
def f(s):
return utils.format_duration(s, natural=True)
def format(seconds: int):
return utils.format_duration(seconds, natural=True)
self.assertEqual(f(0), "")
self.assertEqual(f(60 * 60 * 24 * 7), "1 week")
self.assertEqual(f(60 * 60 * 24 * 21), "3 weeks")
self.assertEqual(format(0), "")
self.assertEqual(format(60 * 60 * 24 * 7), "1 week")
self.assertEqual(format(60 * 60 * 24 * 21), "3 weeks")
self.assertEqual(
f((60 * 60 * 24 * 21) - 1),
format((60 * 60 * 24 * 21) - 1),
"2 weeks, 6 days, 23 hours, 59 minutes and 59 seconds",
)
self.assertEqual(f(60), "1 minute")
self.assertEqual(f(60 * 2), "2 minutes")
self.assertEqual(f(60 * 59), "59 minutes")
self.assertEqual(f(60 * 60), "1 hour")
self.assertEqual(f(60 * 60 * 2), "2 hours")
self.assertEqual(f(1), "1 second")
self.assertEqual(f(60 + 5), "1 minute and 5 seconds")
self.assertEqual(f(60 * 60 + 30), "1 hour and 30 seconds")
self.assertEqual(f(60 * 60 + 60 + 30), "1 hour, 1 minute and 30 seconds")
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1 week and 30 seconds")
def test_utils_short(self):
def f(s):
return utils.format_duration(s, short=True)
self.assertEqual(f(0), "")
self.assertEqual(f(60 * 60 * 24 * 7), "1w")
self.assertEqual(f(60 * 60 * 24 * 21), "3w")
self.assertEqual(
f((60 * 60 * 24 * 21) - 1),
"2w 6d 23h 59m 59s",
)
self.assertEqual(f(60), "1m")
self.assertEqual(f(60 * 2), "2m")
self.assertEqual(f(60 * 59), "59m")
self.assertEqual(f(60 * 60), "1h")
self.assertEqual(f(60 * 60 * 2), "2h")
self.assertEqual(f(1), "1s")
self.assertEqual(f(60 + 5), "1m 5s")
self.assertEqual(f(60 * 60 + 30), "1h 30s")
self.assertEqual(f(60 * 60 + 60 + 30), "1h 1m 30s")
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1w 30s")
def test_utils_natural_short(self):
def f(s):
return utils.format_duration(s, natural=True, short=True)
self.assertEqual(f(0), "")
self.assertEqual(f(60 * 60 * 24 * 7), "1w")
self.assertEqual(f(60 * 60 * 24 * 21), "3w")
self.assertEqual(
f((60 * 60 * 24 * 21) - 1),
"2w 6d 23h 59m and 59s",
)
self.assertEqual(f(60), "1m")
self.assertEqual(f(60 * 2), "2m")
self.assertEqual(f(60 * 59), "59m")
self.assertEqual(f(60 * 60), "1h")
self.assertEqual(f(60 * 60 * 2), "2h")
self.assertEqual(f(1), "1s")
self.assertEqual(f(60 + 5), "1m and 5s")
self.assertEqual(f(60 * 60 + 30), "1h and 30s")
self.assertEqual(f(60 * 60 + 60 + 30), "1h 1m and 30s")
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1w and 30s")
self.assertEqual(format(60), "1 minute")
self.assertEqual(format(60 * 2), "2 minutes")
self.assertEqual(format(60 * 59), "59 minutes")
self.assertEqual(format(60 * 60), "1 hour")
self.assertEqual(format(60 * 60 * 2), "2 hours")
self.assertEqual(format(1), "1 second")
self.assertEqual(format(60 + 5), "1 minute and 5 seconds")
self.assertEqual(format(60 * 60 + 30), "1 hour and 30 seconds")
self.assertEqual(format(60 * 60 + 60 + 30), "1 hour, 1 minute and 30 seconds")
self.assertEqual(format(60 * 60 * 24 * 7 + 30), "1 week and 30 seconds")

View File

@ -1,12 +1,9 @@
import os
import time
from logging import error, info, warning
import disnake
import commands
import constants
from state import command_cooldowns, message_responses
from state import message_responses
class ChannelResponseWrapper:
@ -37,50 +34,35 @@ class MessageInteractionWrapper:
await self.response.edit_message(content=content, embed=embed, view=view)
def cooldown(message, cooldown_time: int):
possible_commands = commands.match(message.content)
if not possible_commands or len(possible_commands) > 1:
return
command = possible_commands[0]
end_time = time.time() + cooldown_time
if message.author.id in command_cooldowns:
command_cooldowns[message.author.id][command] = end_time
else:
command_cooldowns[message.author.id] = {command: end_time}
def format_duration(duration: int, natural: bool = False, short: bool = False):
def format_duration(duration: int, natural: bool = False):
def format_plural(noun, count):
if short:
return noun[0]
return " " + (noun if count == 1 else noun + "s")
return noun if count == 1 else noun + "s"
segments = []
weeks, duration = divmod(duration, 604800)
if weeks > 0:
segments.append(f"{weeks}{format_plural('week', weeks)}")
segments.append(f"{weeks} {format_plural('week', weeks)}")
days, duration = divmod(duration, 86400)
if days > 0:
segments.append(f"{days}{format_plural('day', days)}")
segments.append(f"{days} {format_plural('day', days)}")
hours, duration = divmod(duration, 3600)
if hours > 0:
segments.append(f"{hours}{format_plural('hour', hours)}")
segments.append(f"{hours} {format_plural('hour', hours)}")
minutes, duration = divmod(duration, 60)
if minutes > 0:
segments.append(f"{minutes}{format_plural('minute', minutes)}")
segments.append(f"{minutes} {format_plural('minute', minutes)}")
if duration > 0:
segments.append(f"{duration}{format_plural('second', duration)}")
segments.append(f"{duration} {format_plural('second', duration)}")
separator = " " if short else ", "
if not natural or len(segments) <= 1:
return separator.join(segments)
return separator.join(segments[:-1]) + f" and {segments[-1]}"
return ", ".join(segments)
return ", ".join(segments[:-1]) + f" and {segments[-1]}"
async def add_check_reaction(message):
@ -121,13 +103,13 @@ def filter_secrets(text: str) -> str:
def load_opus():
warning("opus wasn't automatically loaded! trying to load manually...")
print("opus wasn't automatically loaded! trying to load manually...")
for path in ["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"]:
if os.path.exists(path):
try:
disnake.opus.load_opus(path)
info(f"successfully loaded opus from {path}")
print(f"successfully loaded opus from {path}")
return
except Exception as e:
error(f"failed to load opus from {path}: {e}")
print(f"failed to load opus from {path}: {e}")
raise Exception("could not locate working opus library")

View File

@ -6,9 +6,9 @@ from typing import Any, Optional
import disnake
import yt_dlp
from constants import BAR_LENGTH, EMBED_COLOR, YTDL_OPTIONS
import constants
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)
class CustomAudioSource(disnake.AudioSource):
@ -40,13 +40,9 @@ class YTDLSource(disnake.PCMVolumeTransformer):
self.description = data.get("description")
self.duration = data.get("duration")
self.id = data.get("id")
self.like_count = data.get("like_count")
self.original_url = data.get("original_url")
self.thumbnail_url = data.get("thumbnail")
self.timestamp = data.get("timestamp")
self.title = data.get("title")
self.uploader = data.get("uploader")
self.uploader_url = data.get("uploader_url")
self.view_count = data.get("view_count")
@classmethod
@ -103,47 +99,6 @@ class QueuedSong:
+ (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "")
)
def embed(self, is_paused=False):
progress = 0
if self.player.duration:
progress = self.player.original.progress / self.player.duration
embed = disnake.Embed(
color=EMBED_COLOR,
title=self.player.title,
url=self.player.original_url,
description=(
f"{'⏸️ ' if is_paused else ''}"
f"`[{'#'*int(progress * BAR_LENGTH)}{'-'*int((1 - progress) * BAR_LENGTH)}]` "
+ (
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
if self.player.duration
else "[**live**]"
)
),
)
embed.add_field(
name="Uploader",
value=f"[{self.player.uploader}]({self.player.uploader_url})",
)
embed.add_field(name="Likes", value=f"{self.player.like_count:,}")
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
embed.add_field(name="Published", value=f"<t:{self.player.timestamp}>")
embed.add_field(name="Volume", value=f"{int(self.player.volume*100)}%")
embed.set_image(self.player.thumbnail_url)
embed.set_footer(
text=f"queued by {self.trigger_message.author.name}",
icon_url=(
self.trigger_message.author.avatar.url
if self.trigger_message.author.avatar
else None
),
)
return embed
def __str__(self):
return self.__repr__()
@ -179,4 +134,4 @@ def format_duration(duration: int | float) -> str:
def __reload_module__():
global ytdl
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)