Compare commits
No commits in common. "27a460fa6e4cbefe4cdc63a445c7d0bf6f8013bf" and "655b552c10fc93de97ee704416711fb7393471a0" have entirely different histories.
27a460fa6e
...
655b552c10
@ -1,64 +1,18 @@
|
|||||||
import os
|
|
||||||
import threading
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import disnake
|
|
||||||
import psutil
|
|
||||||
|
|
||||||
import arguments
|
import arguments
|
||||||
import commands
|
import commands
|
||||||
import utils
|
import utils
|
||||||
from constants import EMBED_COLOR
|
from state import start_time
|
||||||
from state import client, start_time
|
|
||||||
|
|
||||||
|
|
||||||
async def status(message):
|
async def help(message):
|
||||||
member_count = 0
|
await utils.reply(
|
||||||
channel_count = 0
|
message,
|
||||||
for guild in client.guilds:
|
", ".join(
|
||||||
member_count += len(guild.members)
|
[f"`{command.value}`" for command in commands.Command.__members__.values()]
|
||||||
channel_count += len(guild.channels)
|
),
|
||||||
process = psutil.Process(os.getpid())
|
|
||||||
memory_usage = process.memory_info().rss / 1048576
|
|
||||||
|
|
||||||
embed = disnake.Embed(color=EMBED_COLOR)
|
|
||||||
embed.add_field(
|
|
||||||
name="Latency",
|
|
||||||
value=f"```{round(client.latency * 1000, 1)} ms```",
|
|
||||||
)
|
)
|
||||||
embed.add_field(
|
|
||||||
name="RSS",
|
|
||||||
value=f"```{round(memory_usage, 1)} MiB```",
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Threads",
|
|
||||||
value=f"```{threading.active_count()}```",
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Guilds",
|
|
||||||
value=f"```{len(client.guilds)}```",
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Members",
|
|
||||||
value=f"```{member_count}```",
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Channels",
|
|
||||||
value=f"```{channel_count}```",
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Commands",
|
|
||||||
value=f"```{len(commands.Command.__members__)}```",
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Disnake",
|
|
||||||
value=f"```{disnake.__version__}```",
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Uptime",
|
|
||||||
value=f"```{utils.format_duration(int(time.time() - start_time), short=True)}```",
|
|
||||||
)
|
|
||||||
await utils.reply(message, embed=embed)
|
|
||||||
|
|
||||||
|
|
||||||
async def uptime(message):
|
async def uptime(message):
|
||||||
@ -82,12 +36,3 @@ async def uptime(message):
|
|||||||
await utils.reply(
|
await utils.reply(
|
||||||
message, f"up {utils.format_duration(int(time.time() - start_time))}"
|
message, f"up {utils.format_duration(int(time.time() - start_time))}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def help(message):
|
|
||||||
await utils.reply(
|
|
||||||
message,
|
|
||||||
", ".join(
|
|
||||||
[f"`{command.value}`" for command in commands.Command.__members__.values()]
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
@ -1,10 +1,9 @@
|
|||||||
from enum import Enum
|
import enum
|
||||||
from functools import lru_cache
|
|
||||||
|
|
||||||
import constants
|
import constants
|
||||||
|
|
||||||
|
|
||||||
class Command(Enum):
|
class Command(enum.Enum):
|
||||||
CLEAR = "clear"
|
CLEAR = "clear"
|
||||||
CURRENT = "current"
|
CURRENT = "current"
|
||||||
EXECUTE = "execute"
|
EXECUTE = "execute"
|
||||||
@ -20,12 +19,10 @@ class Command(Enum):
|
|||||||
RELOAD = "reload"
|
RELOAD = "reload"
|
||||||
RESUME = "resume"
|
RESUME = "resume"
|
||||||
SKIP = "skip"
|
SKIP = "skip"
|
||||||
STATUS = "status"
|
|
||||||
UPTIME = "uptime"
|
UPTIME = "uptime"
|
||||||
VOLUME = "volume"
|
VOLUME = "volume"
|
||||||
|
|
||||||
|
|
||||||
@lru_cache
|
|
||||||
def match_token(token: str) -> list[Command]:
|
def match_token(token: str) -> list[Command]:
|
||||||
if token.lower() == "r":
|
if token.lower() == "r":
|
||||||
return [Command.RELOAD]
|
return [Command.RELOAD]
|
||||||
@ -46,13 +43,11 @@ def match_token(token: str) -> list[Command]:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@lru_cache
|
|
||||||
def match(command: str) -> list[Command] | None:
|
def match(command: str) -> list[Command] | None:
|
||||||
if tokens := tokenize(command):
|
if tokens := tokenize(command):
|
||||||
return match_token(tokens[0])
|
return match_token(tokens[0])
|
||||||
|
|
||||||
|
|
||||||
@lru_cache
|
|
||||||
def tokenize(string: str) -> list[str]:
|
def tokenize(string: str) -> list[str]:
|
||||||
tokens = []
|
tokens = []
|
||||||
token = ""
|
token = ""
|
||||||
|
@ -5,14 +5,11 @@ import disnake_paginator
|
|||||||
|
|
||||||
import arguments
|
import arguments
|
||||||
import commands
|
import commands
|
||||||
|
import constants
|
||||||
import utils
|
import utils
|
||||||
import youtubedl
|
import youtubedl
|
||||||
from constants import EMBED_COLOR
|
|
||||||
from state import client, players
|
from state import client, players
|
||||||
|
|
||||||
from .playback import resume
|
|
||||||
from .utils import command_allowed, ensure_joined, play_next
|
|
||||||
|
|
||||||
|
|
||||||
async def queue_or_play(message, edited=False):
|
async def queue_or_play(message, edited=False):
|
||||||
if message.guild.id not in players:
|
if message.guild.id not in players:
|
||||||
@ -106,7 +103,7 @@ async def queue_or_play(message, edited=False):
|
|||||||
players[message.guild.id].queue.remove(target)
|
players[message.guild.id].queue.remove(target)
|
||||||
|
|
||||||
if len(targets) == 1:
|
if len(targets) == 1:
|
||||||
await utils.reply(message, f"**removed** {targets[0].format()}")
|
await utils.reply(message, f"**X** {targets[0].format()}")
|
||||||
else:
|
else:
|
||||||
await utils.reply(
|
await utils.reply(
|
||||||
message,
|
message,
|
||||||
@ -180,8 +177,6 @@ async def queue_or_play(message, edited=False):
|
|||||||
message,
|
message,
|
||||||
f"**{len(players[message.guild.id].queue)}.** {queued.format()}",
|
f"**{len(players[message.guild.id].queue)}.** {queued.format()}",
|
||||||
)
|
)
|
||||||
|
|
||||||
utils.cooldown(message, 3)
|
|
||||||
elif tokens[0].lower() == "play":
|
elif tokens[0].lower() == "play":
|
||||||
await resume(message)
|
await resume(message)
|
||||||
else:
|
else:
|
||||||
@ -199,7 +194,7 @@ async def queue_or_play(message, edited=False):
|
|||||||
def embed(description):
|
def embed(description):
|
||||||
e = disnake.Embed(
|
e = disnake.Embed(
|
||||||
description=description,
|
description=description,
|
||||||
color=EMBED_COLOR,
|
color=constants.EMBED_COLOR,
|
||||||
)
|
)
|
||||||
if formatted_duration and len(players[message.guild.id].queue) > 1:
|
if formatted_duration and len(players[message.guild.id].queue) > 1:
|
||||||
e.set_footer(text=f"{formatted_duration} in total")
|
e.set_footer(text=f"{formatted_duration} in total")
|
||||||
@ -207,7 +202,7 @@ async def queue_or_play(message, edited=False):
|
|||||||
|
|
||||||
await disnake_paginator.ButtonPaginator(
|
await disnake_paginator.ButtonPaginator(
|
||||||
invalid_user_function=utils.invalid_user_handler,
|
invalid_user_function=utils.invalid_user_handler,
|
||||||
color=EMBED_COLOR,
|
color=constants.EMBED_COLOR,
|
||||||
segments=list(
|
segments=list(
|
||||||
map(
|
map(
|
||||||
embed,
|
embed,
|
||||||
@ -232,18 +227,98 @@ async def queue_or_play(message, edited=False):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def skip(message):
|
async def playing(message):
|
||||||
|
if not command_allowed(message, immutable=True):
|
||||||
|
return
|
||||||
|
|
||||||
tokens = commands.tokenize(message.content)
|
tokens = commands.tokenize(message.content)
|
||||||
parser = arguments.ArgumentParser(tokens[0], "skip the currently playing song")
|
parser = arguments.ArgumentParser(
|
||||||
|
tokens[0], "get information about the currently playing song"
|
||||||
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"-n",
|
"-d",
|
||||||
"--next",
|
"--description",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help="skip the next song",
|
help="get the description",
|
||||||
)
|
)
|
||||||
if not (args := await parser.parse_args(message, tokens)):
|
if not (args := await parser.parse_args(message, tokens)):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if source := message.guild.voice_client.source:
|
||||||
|
if args.description:
|
||||||
|
if description := source.description:
|
||||||
|
paginator = disnake_paginator.ButtonPaginator(
|
||||||
|
invalid_user_function=utils.invalid_user_handler,
|
||||||
|
color=constants.EMBED_COLOR,
|
||||||
|
title=source.title,
|
||||||
|
segments=disnake_paginator.split(description),
|
||||||
|
)
|
||||||
|
for embed in paginator.embeds:
|
||||||
|
embed.url = source.original_url
|
||||||
|
await paginator.start(utils.MessageInteractionWrapper(message))
|
||||||
|
else:
|
||||||
|
await utils.reply(
|
||||||
|
message,
|
||||||
|
source.description or "no description found!",
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
bar_length = 35
|
||||||
|
progress = source.original.progress / source.duration
|
||||||
|
|
||||||
|
embed = disnake.Embed(
|
||||||
|
color=constants.EMBED_COLOR,
|
||||||
|
title=source.title,
|
||||||
|
url=source.original_url,
|
||||||
|
description=f"{'⏸️ ' if message.guild.voice_client.is_paused() else ''}"
|
||||||
|
f"`[{'#'*int(progress * bar_length)}{'-'*int((1 - progress) * bar_length)}]` "
|
||||||
|
f"**{youtubedl.format_duration(int(source.original.progress))}** / **{youtubedl.format_duration(source.duration)}** (**{round(progress * 100)}%**)",
|
||||||
|
)
|
||||||
|
embed.add_field(name="Volume", value=f"{int(source.volume*100)}%")
|
||||||
|
embed.add_field(name="Views", value=f"{source.view_count:,}")
|
||||||
|
embed.add_field(
|
||||||
|
name="Queuer",
|
||||||
|
value=players[message.guild.id].current.trigger_message.author.mention,
|
||||||
|
)
|
||||||
|
embed.set_image(source.thumbnail_url)
|
||||||
|
|
||||||
|
await utils.reply(
|
||||||
|
message,
|
||||||
|
embed=embed,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await utils.reply(
|
||||||
|
message,
|
||||||
|
"nothing is playing!",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def fast_forward(message):
|
||||||
|
if not command_allowed(message):
|
||||||
|
return
|
||||||
|
|
||||||
|
tokens = commands.tokenize(message.content)
|
||||||
|
parser = arguments.ArgumentParser(tokens[0], "fast forward audio playback")
|
||||||
|
parser.add_argument(
|
||||||
|
"seconds",
|
||||||
|
type=lambda v: arguments.range_type(v, min=0, max=300),
|
||||||
|
help="the amount of seconds to fast forward",
|
||||||
|
)
|
||||||
|
if not (args := await parser.parse_args(message, tokens)):
|
||||||
|
return
|
||||||
|
|
||||||
|
if not message.guild.voice_client.source:
|
||||||
|
await utils.reply(message, "nothing is playing!")
|
||||||
|
return
|
||||||
|
|
||||||
|
message.guild.voice_client.pause()
|
||||||
|
message.guild.voice_client.source.original.fast_forward(args.seconds)
|
||||||
|
message.guild.voice_client.resume()
|
||||||
|
|
||||||
|
await utils.add_check_reaction(message)
|
||||||
|
|
||||||
|
|
||||||
|
async def skip(message):
|
||||||
if not command_allowed(message):
|
if not command_allowed(message):
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -253,11 +328,159 @@ async def skip(message):
|
|||||||
message,
|
message,
|
||||||
"the queue is empty now!",
|
"the queue is empty now!",
|
||||||
)
|
)
|
||||||
elif args.next:
|
|
||||||
next = players[message.guild.id].queue.pop()
|
|
||||||
await utils.reply(message, f"**skipped** {next.format()}")
|
|
||||||
else:
|
else:
|
||||||
message.guild.voice_client.stop()
|
message.guild.voice_client.stop()
|
||||||
await utils.add_check_reaction(message)
|
await utils.add_check_reaction(message)
|
||||||
if not message.guild.voice_client.source:
|
if not message.guild.voice_client.source:
|
||||||
play_next(message)
|
play_next(message)
|
||||||
|
|
||||||
|
|
||||||
|
async def join(message):
|
||||||
|
if message.guild.voice_client:
|
||||||
|
return await message.guild.voice_client.move_to(message.channel)
|
||||||
|
|
||||||
|
await message.channel.connect()
|
||||||
|
await utils.add_check_reaction(message)
|
||||||
|
|
||||||
|
|
||||||
|
async def leave(message):
|
||||||
|
if not command_allowed(message):
|
||||||
|
return
|
||||||
|
|
||||||
|
await message.guild.voice_client.disconnect()
|
||||||
|
await utils.add_check_reaction(message)
|
||||||
|
|
||||||
|
|
||||||
|
async def resume(message):
|
||||||
|
if not command_allowed(message):
|
||||||
|
return
|
||||||
|
|
||||||
|
if message.guild.voice_client.is_paused():
|
||||||
|
message.guild.voice_client.resume()
|
||||||
|
await utils.add_check_reaction(message)
|
||||||
|
else:
|
||||||
|
await utils.reply(
|
||||||
|
message,
|
||||||
|
"nothing is paused!",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def pause(message):
|
||||||
|
if not command_allowed(message):
|
||||||
|
return
|
||||||
|
|
||||||
|
if message.guild.voice_client.is_playing():
|
||||||
|
message.guild.voice_client.pause()
|
||||||
|
await utils.add_check_reaction(message)
|
||||||
|
else:
|
||||||
|
await utils.reply(
|
||||||
|
message,
|
||||||
|
"nothing is playing!",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def volume(message):
|
||||||
|
if not command_allowed(message, immutable=True):
|
||||||
|
return
|
||||||
|
|
||||||
|
tokens = commands.tokenize(message.content)
|
||||||
|
parser = arguments.ArgumentParser(tokens[0], "get or set the current volume level")
|
||||||
|
parser.add_argument(
|
||||||
|
"volume",
|
||||||
|
nargs="?",
|
||||||
|
type=lambda v: arguments.range_type(v, min=0, max=150),
|
||||||
|
help="the volume level (0 - 150)",
|
||||||
|
)
|
||||||
|
if not (args := await parser.parse_args(message, tokens)):
|
||||||
|
return
|
||||||
|
|
||||||
|
if not message.guild.voice_client.source:
|
||||||
|
await utils.reply(message, "nothing is playing!")
|
||||||
|
return
|
||||||
|
|
||||||
|
if args.volume is None:
|
||||||
|
await utils.reply(
|
||||||
|
message,
|
||||||
|
f"{int(message.guild.voice_client.source.volume * 100)}",
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
if not command_allowed(message):
|
||||||
|
return
|
||||||
|
|
||||||
|
message.guild.voice_client.source.volume = float(args.volume) / 100.0
|
||||||
|
await utils.add_check_reaction(message)
|
||||||
|
|
||||||
|
|
||||||
|
def delete_queued(messages):
|
||||||
|
if messages[0].guild.id not in players:
|
||||||
|
return
|
||||||
|
|
||||||
|
if len(players[messages[0].guild.id].queue) == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
found = []
|
||||||
|
for message in messages:
|
||||||
|
for queued in players[message.guild.id].queue:
|
||||||
|
if queued.trigger_message.id == message.id:
|
||||||
|
found.append(queued)
|
||||||
|
for queued in found:
|
||||||
|
players[messages[0].guild.id].queue.remove(queued)
|
||||||
|
|
||||||
|
|
||||||
|
def play_after_callback(e, message, once):
|
||||||
|
if e:
|
||||||
|
print(f"player error: {e}")
|
||||||
|
if not once:
|
||||||
|
play_next(message)
|
||||||
|
|
||||||
|
|
||||||
|
def play_next(message, once=False, first=False):
|
||||||
|
message.guild.voice_client.stop()
|
||||||
|
if message.guild.id in players and players[message.guild.id].queue:
|
||||||
|
queued = players[message.guild.id].queue_pop()
|
||||||
|
try:
|
||||||
|
message.guild.voice_client.play(
|
||||||
|
queued.player, after=lambda e: play_after_callback(e, message, once)
|
||||||
|
)
|
||||||
|
except disnake.opus.OpusNotLoaded:
|
||||||
|
utils.load_opus()
|
||||||
|
message.guild.voice_client.play(
|
||||||
|
queued.player, after=lambda e: play_after_callback(e, message, once)
|
||||||
|
)
|
||||||
|
|
||||||
|
embed = disnake.Embed(
|
||||||
|
color=constants.EMBED_COLOR,
|
||||||
|
title=queued.player.title,
|
||||||
|
url=queued.player.original_url,
|
||||||
|
)
|
||||||
|
embed.add_field(name="Volume", value=f"{int(queued.player.volume*100)}%")
|
||||||
|
embed.add_field(name="Views", value=f"{queued.player.view_count:,}")
|
||||||
|
embed.add_field(
|
||||||
|
name="Queuer",
|
||||||
|
value=players[message.guild.id].current.trigger_message.author.mention,
|
||||||
|
)
|
||||||
|
embed.set_image(queued.player.thumbnail_url)
|
||||||
|
|
||||||
|
if first:
|
||||||
|
client.loop.create_task(utils.reply(message, embed=embed))
|
||||||
|
else:
|
||||||
|
client.loop.create_task(utils.channel_send(message, embed=embed))
|
||||||
|
|
||||||
|
|
||||||
|
async def ensure_joined(message):
|
||||||
|
if message.guild.voice_client is None:
|
||||||
|
if message.author.voice:
|
||||||
|
await message.author.voice.channel.connect()
|
||||||
|
else:
|
||||||
|
await utils.reply(message, "you are not connected to a voice channel!")
|
||||||
|
|
||||||
|
|
||||||
|
def command_allowed(message, immutable=False):
|
||||||
|
if not message.guild.voice_client:
|
||||||
|
return
|
||||||
|
if immutable:
|
||||||
|
return message.channel.id == message.guild.voice_client.channel.id
|
||||||
|
else:
|
||||||
|
if not message.author.voice:
|
||||||
|
return False
|
||||||
|
return message.author.voice.channel.id == message.guild.voice_client.channel.id
|
@ -1,18 +0,0 @@
|
|||||||
from .channel import join, leave
|
|
||||||
from .playback import fast_forward, pause, playing, resume, volume
|
|
||||||
from .queue import queue_or_play, skip
|
|
||||||
from .utils import remove_queued
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
"join",
|
|
||||||
"leave",
|
|
||||||
"fast_forward",
|
|
||||||
"playing",
|
|
||||||
"queue_or_play",
|
|
||||||
"skip",
|
|
||||||
"resume",
|
|
||||||
"pause",
|
|
||||||
"skip",
|
|
||||||
"remove_queued",
|
|
||||||
"volume",
|
|
||||||
]
|
|
@ -1,19 +0,0 @@
|
|||||||
import utils
|
|
||||||
|
|
||||||
from .utils import command_allowed
|
|
||||||
|
|
||||||
|
|
||||||
async def join(message):
|
|
||||||
if message.guild.voice_client:
|
|
||||||
return await message.guild.voice_client.move_to(message.channel)
|
|
||||||
|
|
||||||
await message.channel.connect()
|
|
||||||
await utils.add_check_reaction(message)
|
|
||||||
|
|
||||||
|
|
||||||
async def leave(message):
|
|
||||||
if not command_allowed(message):
|
|
||||||
return
|
|
||||||
|
|
||||||
await message.guild.voice_client.disconnect()
|
|
||||||
await utils.add_check_reaction(message)
|
|
@ -1,143 +0,0 @@
|
|||||||
import disnake_paginator
|
|
||||||
|
|
||||||
import arguments
|
|
||||||
import commands
|
|
||||||
import utils
|
|
||||||
from constants import EMBED_COLOR
|
|
||||||
from state import players
|
|
||||||
|
|
||||||
from .utils import command_allowed
|
|
||||||
|
|
||||||
|
|
||||||
async def playing(message):
|
|
||||||
tokens = commands.tokenize(message.content)
|
|
||||||
parser = arguments.ArgumentParser(
|
|
||||||
tokens[0], "get information about the currently playing song"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"-d",
|
|
||||||
"--description",
|
|
||||||
action="store_true",
|
|
||||||
help="get the description",
|
|
||||||
)
|
|
||||||
if not (args := await parser.parse_args(message, tokens)):
|
|
||||||
return
|
|
||||||
|
|
||||||
if not command_allowed(message, immutable=True):
|
|
||||||
return
|
|
||||||
|
|
||||||
if source := message.guild.voice_client.source:
|
|
||||||
if args.description:
|
|
||||||
if description := source.description:
|
|
||||||
paginator = disnake_paginator.ButtonPaginator(
|
|
||||||
invalid_user_function=utils.invalid_user_handler,
|
|
||||||
color=EMBED_COLOR,
|
|
||||||
title=source.title,
|
|
||||||
segments=disnake_paginator.split(description),
|
|
||||||
)
|
|
||||||
for embed in paginator.embeds:
|
|
||||||
embed.url = source.original_url
|
|
||||||
await paginator.start(utils.MessageInteractionWrapper(message))
|
|
||||||
else:
|
|
||||||
await utils.reply(
|
|
||||||
message,
|
|
||||||
source.description or "no description found!",
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
await utils.reply(
|
|
||||||
message,
|
|
||||||
embed=players[message.guild.id].current.embed(
|
|
||||||
is_paused=message.guild.voice_client.is_paused()
|
|
||||||
),
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
await utils.reply(
|
|
||||||
message,
|
|
||||||
"nothing is playing!",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def resume(message):
|
|
||||||
if not command_allowed(message):
|
|
||||||
return
|
|
||||||
|
|
||||||
if message.guild.voice_client.is_paused():
|
|
||||||
message.guild.voice_client.resume()
|
|
||||||
await utils.add_check_reaction(message)
|
|
||||||
else:
|
|
||||||
await utils.reply(
|
|
||||||
message,
|
|
||||||
"nothing is paused!",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def pause(message):
|
|
||||||
if not command_allowed(message):
|
|
||||||
return
|
|
||||||
|
|
||||||
if message.guild.voice_client.is_playing():
|
|
||||||
message.guild.voice_client.pause()
|
|
||||||
await utils.add_check_reaction(message)
|
|
||||||
else:
|
|
||||||
await utils.reply(
|
|
||||||
message,
|
|
||||||
"nothing is playing!",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def fast_forward(message):
|
|
||||||
tokens = commands.tokenize(message.content)
|
|
||||||
parser = arguments.ArgumentParser(tokens[0], "fast forward audio playback")
|
|
||||||
parser.add_argument(
|
|
||||||
"seconds",
|
|
||||||
type=lambda v: arguments.range_type(v, min=0, max=300),
|
|
||||||
help="the amount of seconds to fast forward",
|
|
||||||
)
|
|
||||||
if not (args := await parser.parse_args(message, tokens)):
|
|
||||||
return
|
|
||||||
|
|
||||||
if not command_allowed(message):
|
|
||||||
return
|
|
||||||
|
|
||||||
if not message.guild.voice_client.source:
|
|
||||||
await utils.reply(message, "nothing is playing!")
|
|
||||||
return
|
|
||||||
|
|
||||||
message.guild.voice_client.pause()
|
|
||||||
message.guild.voice_client.source.original.fast_forward(args.seconds)
|
|
||||||
message.guild.voice_client.resume()
|
|
||||||
|
|
||||||
await utils.add_check_reaction(message)
|
|
||||||
|
|
||||||
|
|
||||||
async def volume(message):
|
|
||||||
tokens = commands.tokenize(message.content)
|
|
||||||
parser = arguments.ArgumentParser(tokens[0], "get or set the current volume level")
|
|
||||||
parser.add_argument(
|
|
||||||
"volume",
|
|
||||||
nargs="?",
|
|
||||||
type=lambda v: arguments.range_type(v, min=0, max=150),
|
|
||||||
help="the volume level (0 - 150)",
|
|
||||||
)
|
|
||||||
if not (args := await parser.parse_args(message, tokens)):
|
|
||||||
return
|
|
||||||
|
|
||||||
if not command_allowed(message, immutable=True):
|
|
||||||
return
|
|
||||||
|
|
||||||
if not message.guild.voice_client.source:
|
|
||||||
await utils.reply(message, "nothing is playing!")
|
|
||||||
return
|
|
||||||
|
|
||||||
if args.volume is None:
|
|
||||||
await utils.reply(
|
|
||||||
message,
|
|
||||||
f"{int(message.guild.voice_client.source.volume * 100)}",
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
if not command_allowed(message):
|
|
||||||
return
|
|
||||||
|
|
||||||
message.guild.voice_client.source.volume = float(args.volume) / 100.0
|
|
||||||
await utils.add_check_reaction(message)
|
|
@ -1,72 +0,0 @@
|
|||||||
from logging import error
|
|
||||||
|
|
||||||
import disnake
|
|
||||||
|
|
||||||
import utils
|
|
||||||
from state import client, players
|
|
||||||
|
|
||||||
|
|
||||||
def play_after_callback(e, message, once):
|
|
||||||
if e:
|
|
||||||
error(f"player error: {e}")
|
|
||||||
if not once:
|
|
||||||
play_next(message)
|
|
||||||
|
|
||||||
|
|
||||||
def play_next(message, once=False, first=False):
|
|
||||||
if not message.guild.voice_client:
|
|
||||||
return
|
|
||||||
|
|
||||||
message.guild.voice_client.stop()
|
|
||||||
if message.guild.id in players and players[message.guild.id].queue:
|
|
||||||
queued = players[message.guild.id].queue_pop()
|
|
||||||
try:
|
|
||||||
message.guild.voice_client.play(
|
|
||||||
queued.player, after=lambda e: play_after_callback(e, message, once)
|
|
||||||
)
|
|
||||||
except disnake.opus.OpusNotLoaded:
|
|
||||||
utils.load_opus()
|
|
||||||
message.guild.voice_client.play(
|
|
||||||
queued.player, after=lambda e: play_after_callback(e, message, once)
|
|
||||||
)
|
|
||||||
|
|
||||||
embed = queued.embed()
|
|
||||||
if first and len(players[message.guild.id].queue) == 0:
|
|
||||||
client.loop.create_task(utils.reply(message, embed=embed))
|
|
||||||
else:
|
|
||||||
client.loop.create_task(utils.channel_send(message, embed=embed))
|
|
||||||
|
|
||||||
|
|
||||||
def remove_queued(messages):
|
|
||||||
if messages[0].guild.id not in players:
|
|
||||||
return
|
|
||||||
|
|
||||||
if len(players[messages[0].guild.id].queue) == 0:
|
|
||||||
return
|
|
||||||
|
|
||||||
found = []
|
|
||||||
for message in messages:
|
|
||||||
for queued in players[message.guild.id].queue:
|
|
||||||
if queued.trigger_message.id == message.id:
|
|
||||||
found.append(queued)
|
|
||||||
for queued in found:
|
|
||||||
players[messages[0].guild.id].queue.remove(queued)
|
|
||||||
|
|
||||||
|
|
||||||
async def ensure_joined(message):
|
|
||||||
if message.guild.voice_client is None:
|
|
||||||
if message.author.voice:
|
|
||||||
await message.author.voice.channel.connect()
|
|
||||||
else:
|
|
||||||
await utils.reply(message, "you are not connected to a voice channel!")
|
|
||||||
|
|
||||||
|
|
||||||
def command_allowed(message, immutable=False):
|
|
||||||
if not message.guild.voice_client:
|
|
||||||
return
|
|
||||||
if immutable:
|
|
||||||
return message.channel.id == message.guild.voice_client.channel.id
|
|
||||||
else:
|
|
||||||
if not message.author.voice:
|
|
||||||
return False
|
|
||||||
return message.author.voice.channel.id == message.guild.voice_client.channel.id
|
|
@ -15,7 +15,6 @@ YTDL_OPTIONS = {
|
|||||||
"source_address": "0.0.0.0",
|
"source_address": "0.0.0.0",
|
||||||
}
|
}
|
||||||
|
|
||||||
BAR_LENGTH = 35
|
|
||||||
EMBED_COLOR = 0xFF6600
|
EMBED_COLOR = 0xFF6600
|
||||||
OWNERS = [531392146767347712]
|
OWNERS = [531392146767347712]
|
||||||
PREFIX = "%"
|
PREFIX = "%"
|
||||||
@ -26,11 +25,6 @@ RELOADABLE_MODULES = [
|
|||||||
"commands.tools",
|
"commands.tools",
|
||||||
"commands.utils",
|
"commands.utils",
|
||||||
"commands.voice",
|
"commands.voice",
|
||||||
"commands.voice.channel",
|
|
||||||
"commands.voice.playback",
|
|
||||||
"commands.voice.playing",
|
|
||||||
"commands.voice.queue",
|
|
||||||
"commands.voice.utils",
|
|
||||||
"constants",
|
"constants",
|
||||||
"core",
|
"core",
|
||||||
"events",
|
"events",
|
||||||
|
71
core.py
71
core.py
@ -6,20 +6,18 @@ import io
|
|||||||
import textwrap
|
import textwrap
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
from logging import debug
|
|
||||||
|
|
||||||
import disnake
|
import disnake
|
||||||
import disnake_paginator
|
import disnake_paginator
|
||||||
|
|
||||||
import commands
|
import commands
|
||||||
|
import constants
|
||||||
import utils
|
import utils
|
||||||
from commands import Command as C
|
from state import client, command_locks, idle_tracker
|
||||||
from constants import EMBED_COLOR, OWNERS, PREFIX, RELOADABLE_MODULES
|
|
||||||
from state import client, command_cooldowns, command_locks, idle_tracker
|
|
||||||
|
|
||||||
|
|
||||||
async def on_message(message, edited=False):
|
async def on_message(message, edited=False):
|
||||||
if not message.content.startswith(PREFIX) or message.author.bot:
|
if not message.content.startswith(constants.PREFIX) or message.author.bot:
|
||||||
return
|
return
|
||||||
|
|
||||||
tokens = commands.tokenize(message.content)
|
tokens = commands.tokenize(message.content)
|
||||||
@ -40,44 +38,26 @@ async def on_message(message, edited=False):
|
|||||||
f"ambiguous command, could be {' or '.join([f'`{match.value}`' for match in matched])}",
|
f"ambiguous command, could be {' or '.join([f'`{match.value}`' for match in matched])}",
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
matched = matched[0]
|
|
||||||
|
|
||||||
if (message.guild.id, message.author.id) not in command_locks:
|
if message.guild.id not in command_locks:
|
||||||
command_locks[(message.guild.id, message.author.id)] = asyncio.Lock()
|
command_locks[message.guild.id] = asyncio.Lock()
|
||||||
await command_locks[(message.guild.id, message.author.id)].acquire()
|
|
||||||
|
|
||||||
|
C = commands.Command
|
||||||
try:
|
try:
|
||||||
if cooldowns := command_cooldowns.get(message.author.id):
|
match matched[0]:
|
||||||
if (end_time := cooldowns.get(matched)) and int(time.time()) < int(
|
case C.RELOAD if message.author.id in constants.OWNERS:
|
||||||
end_time
|
|
||||||
):
|
|
||||||
await utils.reply(
|
|
||||||
message,
|
|
||||||
f"please wait **{utils.format_duration(int(end_time - time.time()), natural=True)}** before using this command again!",
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
match matched:
|
|
||||||
case C.RELOAD if message.author.id in OWNERS:
|
|
||||||
reloaded_modules = set()
|
reloaded_modules = set()
|
||||||
start = time.time()
|
|
||||||
|
|
||||||
rreload(reloaded_modules, __import__("core"))
|
rreload(reloaded_modules, __import__("core"))
|
||||||
rreload(reloaded_modules, __import__("extra"))
|
rreload(reloaded_modules, __import__("extra"))
|
||||||
for module in filter(
|
for module in filter(
|
||||||
lambda v: inspect.ismodule(v) and v.__name__ in RELOADABLE_MODULES,
|
lambda v: inspect.ismodule(v)
|
||||||
|
and v.__name__ in constants.RELOADABLE_MODULES,
|
||||||
globals().values(),
|
globals().values(),
|
||||||
):
|
):
|
||||||
rreload(reloaded_modules, module)
|
rreload(reloaded_modules, module)
|
||||||
|
|
||||||
end = time.time()
|
|
||||||
if __debug__:
|
|
||||||
debug(
|
|
||||||
f"reloaded {len(reloaded_modules)} modules in {round(end-start, 2)}s"
|
|
||||||
)
|
|
||||||
|
|
||||||
await utils.add_check_reaction(message)
|
await utils.add_check_reaction(message)
|
||||||
case C.EXECUTE if message.author.id in OWNERS:
|
case C.EXECUTE if message.author.id in constants.OWNERS:
|
||||||
code = message.content[len(tokens[0]) + 1 :].strip().strip("`")
|
code = message.content[len(tokens[0]) + 1 :].strip().strip("`")
|
||||||
for replacement in ["python", "py"]:
|
for replacement in ["python", "py"]:
|
||||||
if code.startswith(replacement):
|
if code.startswith(replacement):
|
||||||
@ -112,23 +92,25 @@ async def on_message(message, edited=False):
|
|||||||
prefix="```\n",
|
prefix="```\n",
|
||||||
suffix="```",
|
suffix="```",
|
||||||
invalid_user_function=utils.invalid_user_handler,
|
invalid_user_function=utils.invalid_user_handler,
|
||||||
color=EMBED_COLOR,
|
color=constants.EMBED_COLOR,
|
||||||
segments=disnake_paginator.split(output),
|
segments=disnake_paginator.split(output),
|
||||||
).start(utils.MessageInteractionWrapper(message))
|
).start(utils.MessageInteractionWrapper(message))
|
||||||
elif len(output.strip()) == 0:
|
elif len(output.strip()) == 0:
|
||||||
await utils.add_check_reaction(message)
|
await utils.add_check_reaction(message)
|
||||||
else:
|
else:
|
||||||
await utils.reply(message, output)
|
await utils.reply(message, output)
|
||||||
case C.CLEAR | C.PURGE if message.author.id in OWNERS:
|
case C.CLEAR | C.PURGE if message.author.id in constants.OWNERS:
|
||||||
await commands.tools.clear(message)
|
await commands.tools.clear(message)
|
||||||
case C.JOIN:
|
case C.JOIN:
|
||||||
await commands.voice.join(message)
|
await commands.voice.join(message)
|
||||||
case C.LEAVE:
|
case C.LEAVE:
|
||||||
await commands.voice.leave(message)
|
await commands.voice.leave(message)
|
||||||
case C.QUEUE | C.PLAY:
|
case C.QUEUE | C.PLAY:
|
||||||
await commands.voice.queue_or_play(message, edited)
|
async with command_locks[message.guild.id]:
|
||||||
|
await commands.voice.queue_or_play(message, edited)
|
||||||
case C.SKIP:
|
case C.SKIP:
|
||||||
await commands.voice.skip(message)
|
async with command_locks[message.guild.id]:
|
||||||
|
await commands.voice.skip(message)
|
||||||
case C.RESUME:
|
case C.RESUME:
|
||||||
await commands.voice.resume(message)
|
await commands.voice.resume(message)
|
||||||
case C.PAUSE:
|
case C.PAUSE:
|
||||||
@ -143,29 +125,24 @@ async def on_message(message, edited=False):
|
|||||||
await commands.voice.playing(message)
|
await commands.voice.playing(message)
|
||||||
case C.FAST_FORWARD:
|
case C.FAST_FORWARD:
|
||||||
await commands.voice.fast_forward(message)
|
await commands.voice.fast_forward(message)
|
||||||
case C.STATUS:
|
|
||||||
await commands.bot.status(message)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
await utils.reply(
|
await utils.reply(
|
||||||
message,
|
message,
|
||||||
f"exception occurred while processing command: ```\n{"".join(traceback.format_exception(e)).replace("`", "\\`")}```",
|
f"exception occurred while processing command: ```\n{''.join(traceback.format_exception(e)).replace('`', '\\`')}```",
|
||||||
)
|
)
|
||||||
raise e
|
|
||||||
finally:
|
|
||||||
command_locks[(message.guild.id, message.author.id)].release()
|
|
||||||
|
|
||||||
|
|
||||||
async def on_voice_state_update(_, before, after):
|
async def on_voice_state_update(_, before, after):
|
||||||
def is_empty(channel):
|
def is_empty(channel):
|
||||||
return [m.id for m in (channel.members if channel else [])] == [client.user.id]
|
return [m.id for m in (channel.members if channel else [])] == [client.user.id]
|
||||||
|
|
||||||
channel = None
|
c = None
|
||||||
if is_empty(before.channel):
|
if is_empty(before.channel):
|
||||||
channel = before.channel
|
c = before.channel
|
||||||
elif is_empty(after.channel):
|
elif is_empty(after.channel):
|
||||||
channel = after.channel
|
c = after.channel
|
||||||
if channel:
|
if c:
|
||||||
await channel.guild.voice_client.disconnect()
|
await c.guild.voice_client.disconnect()
|
||||||
|
|
||||||
|
|
||||||
def rreload(reloaded_modules, module):
|
def rreload(reloaded_modules, module):
|
||||||
@ -173,7 +150,7 @@ def rreload(reloaded_modules, module):
|
|||||||
|
|
||||||
for submodule in filter(
|
for submodule in filter(
|
||||||
lambda v: inspect.ismodule(v)
|
lambda v: inspect.ismodule(v)
|
||||||
and v.__name__ in RELOADABLE_MODULES
|
and v.__name__ in constants.RELOADABLE_MODULES
|
||||||
and v.__name__ not in reloaded_modules,
|
and v.__name__ not in reloaded_modules,
|
||||||
vars(module).values(),
|
vars(module).values(),
|
||||||
):
|
):
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from logging import info
|
|
||||||
|
|
||||||
import commands
|
import commands
|
||||||
import core
|
import core
|
||||||
@ -21,7 +20,7 @@ def prepare():
|
|||||||
|
|
||||||
|
|
||||||
async def on_bulk_message_delete(messages):
|
async def on_bulk_message_delete(messages):
|
||||||
commands.voice.remove_queued(messages)
|
commands.voice.delete_queued(messages)
|
||||||
|
|
||||||
|
|
||||||
async def on_message(message):
|
async def on_message(message):
|
||||||
@ -29,7 +28,7 @@ async def on_message(message):
|
|||||||
|
|
||||||
|
|
||||||
async def on_message_delete(message):
|
async def on_message_delete(message):
|
||||||
commands.voice.remove_queued([message])
|
commands.voice.delete_queued([message])
|
||||||
|
|
||||||
|
|
||||||
async def on_message_edit(before, after):
|
async def on_message_edit(before, after):
|
||||||
@ -40,7 +39,7 @@ async def on_message_edit(before, after):
|
|||||||
|
|
||||||
|
|
||||||
async def on_ready():
|
async def on_ready():
|
||||||
info(f"logged in as {client.user} in {round(time.time() - start_time, 1)}s")
|
print(f"logged in as {client.user} in {round(time.time() - start_time, 1)}s")
|
||||||
|
|
||||||
|
|
||||||
async def on_voice_state_update(member, before, after):
|
async def on_voice_state_update(member, before, after):
|
||||||
|
14
extra.py
14
extra.py
@ -4,7 +4,7 @@ import string
|
|||||||
import disnake
|
import disnake
|
||||||
import youtube_transcript_api
|
import youtube_transcript_api
|
||||||
|
|
||||||
from state import client, kill, players
|
from state import client, players
|
||||||
|
|
||||||
|
|
||||||
async def transcript(
|
async def transcript(
|
||||||
@ -39,17 +39,13 @@ async def transcript(
|
|||||||
)
|
)
|
||||||
if len(messages) > max_messages:
|
if len(messages) > max_messages:
|
||||||
try:
|
try:
|
||||||
count = max_messages - min_messages
|
await message.channel.delete_messages(
|
||||||
if count == 1:
|
[messages.pop() for _ in range(max_messages - min_messages)]
|
||||||
await messages.pop().delete()
|
)
|
||||||
else:
|
|
||||||
await message.channel.delete_messages(
|
|
||||||
[messages.pop() for _ in range(count)]
|
|
||||||
)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if (message.guild.voice_client.source.id != initial_id) or kill["transcript"]:
|
if message.guild.voice_client.source.id != initial_id:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
|
11
main.py
11
main.py
@ -1,18 +1,7 @@
|
|||||||
import logging
|
|
||||||
|
|
||||||
import constants
|
import constants
|
||||||
import events
|
import events
|
||||||
from state import client
|
from state import client
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
logging.basicConfig(
|
|
||||||
format=(
|
|
||||||
"%(asctime)s %(levelname)s %(name):%(module)s %(message)s"
|
|
||||||
if __debug__
|
|
||||||
else "%(asctime)s %(levelname)s %(message)s"
|
|
||||||
),
|
|
||||||
datefmt="%Y-%m-%d %T",
|
|
||||||
level=logging.DEBUG if __debug__ else logging.INFO,
|
|
||||||
)
|
|
||||||
events.prepare()
|
events.prepare()
|
||||||
client.run(constants.SECRETS["TOKEN"])
|
client.run(constants.SECRETS["TOKEN"])
|
||||||
|
12
state.py
12
state.py
@ -1,13 +1,13 @@
|
|||||||
|
import collections
|
||||||
import time
|
import time
|
||||||
from collections import OrderedDict
|
|
||||||
|
|
||||||
import disnake
|
import disnake
|
||||||
|
|
||||||
|
|
||||||
class LimitedSizeDict(OrderedDict):
|
class LimitedSizeDict(collections.OrderedDict):
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwds):
|
||||||
self.size_limit = kwargs.pop("size_limit", 1000)
|
self.size_limit = kwds.pop("size_limit", 1000)
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwds)
|
||||||
self._check_size_limit()
|
self._check_size_limit()
|
||||||
|
|
||||||
def __setitem__(self, key, value):
|
def __setitem__(self, key, value):
|
||||||
@ -25,10 +25,8 @@ intents.message_content = True
|
|||||||
intents.members = True
|
intents.members = True
|
||||||
client = disnake.Client(intents=intents)
|
client = disnake.Client(intents=intents)
|
||||||
|
|
||||||
command_cooldowns = LimitedSizeDict()
|
|
||||||
command_locks = LimitedSizeDict()
|
command_locks = LimitedSizeDict()
|
||||||
idle_tracker = {"is_idle": False, "last_used": time.time()}
|
idle_tracker = {"is_idle": False, "last_used": time.time()}
|
||||||
kill = {"transcript": False}
|
|
||||||
message_responses = LimitedSizeDict()
|
message_responses = LimitedSizeDict()
|
||||||
players = {}
|
players = {}
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
14
tasks.py
14
tasks.py
@ -1,6 +1,5 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import time
|
import time
|
||||||
from logging import debug, error
|
|
||||||
|
|
||||||
import disnake
|
import disnake
|
||||||
|
|
||||||
@ -8,10 +7,8 @@ from state import client, idle_tracker, players
|
|||||||
|
|
||||||
|
|
||||||
async def cleanup():
|
async def cleanup():
|
||||||
debug("spawned cleanup thread")
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(3600 * 12)
|
await asyncio.sleep(3600)
|
||||||
|
|
||||||
targets = []
|
targets = []
|
||||||
for guild_id, player in players.items():
|
for guild_id, player in players.items():
|
||||||
@ -19,15 +16,10 @@ async def cleanup():
|
|||||||
targets.append(guild_id)
|
targets.append(guild_id)
|
||||||
for target in targets:
|
for target in targets:
|
||||||
del players[target]
|
del players[target]
|
||||||
if __debug__:
|
|
||||||
debug(f"cleanup removed {len(targets)} empty players")
|
|
||||||
|
|
||||||
if (
|
if (
|
||||||
not idle_tracker["is_idle"]
|
not idle_tracker["is_idle"]
|
||||||
and time.time() - idle_tracker["last_used"] >= 3600
|
and time.time() - idle_tracker["last_used"] >= 3600
|
||||||
):
|
):
|
||||||
try:
|
await client.change_presence(status=disnake.Status.idle)
|
||||||
await client.change_presence(status=disnake.Status.idle)
|
idle_tracker["is_idle"] = True
|
||||||
idle_tracker["is_idle"] = True
|
|
||||||
except Exception as e:
|
|
||||||
error(f"failed to change status to idle: {e}")
|
|
||||||
|
@ -6,102 +6,56 @@ import youtubedl
|
|||||||
|
|
||||||
class TestFormatDuration(unittest.TestCase):
|
class TestFormatDuration(unittest.TestCase):
|
||||||
def test_youtubedl(self):
|
def test_youtubedl(self):
|
||||||
def f(s):
|
self.assertEqual(youtubedl.format_duration(0), "00:00")
|
||||||
return youtubedl.format_duration(s)
|
self.assertEqual(youtubedl.format_duration(0.5), "00:00")
|
||||||
|
self.assertEqual(youtubedl.format_duration(60.5), "01:00")
|
||||||
self.assertEqual(f(0), "00:00")
|
self.assertEqual(youtubedl.format_duration(1), "00:01")
|
||||||
self.assertEqual(f(0.5), "00:00")
|
self.assertEqual(youtubedl.format_duration(60), "01:00")
|
||||||
self.assertEqual(f(60.5), "01:00")
|
self.assertEqual(youtubedl.format_duration(60 + 30), "01:30")
|
||||||
self.assertEqual(f(1), "00:01")
|
self.assertEqual(youtubedl.format_duration(60 * 60), "01:00:00")
|
||||||
self.assertEqual(f(60), "01:00")
|
self.assertEqual(youtubedl.format_duration(60 * 60 + 30), "01:00:30")
|
||||||
self.assertEqual(f(60 + 30), "01:30")
|
|
||||||
self.assertEqual(f(60 * 60), "01:00:00")
|
|
||||||
self.assertEqual(f(60 * 60 + 30), "01:00:30")
|
|
||||||
|
|
||||||
def test_utils(self):
|
def test_utils(self):
|
||||||
def f(s):
|
self.assertEqual(utils.format_duration(0), "")
|
||||||
return utils.format_duration(s)
|
self.assertEqual(utils.format_duration(60 * 60 * 24 * 7), "1 week")
|
||||||
|
self.assertEqual(utils.format_duration(60 * 60 * 24 * 21), "3 weeks")
|
||||||
self.assertEqual(f(0), "")
|
|
||||||
self.assertEqual(f(60 * 60 * 24 * 7), "1 week")
|
|
||||||
self.assertEqual(f(60 * 60 * 24 * 21), "3 weeks")
|
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
f((60 * 60 * 24 * 21) - 1),
|
utils.format_duration((60 * 60 * 24 * 21) - 1),
|
||||||
"2 weeks, 6 days, 23 hours, 59 minutes, 59 seconds",
|
"2 weeks, 6 days, 23 hours, 59 minutes, 59 seconds",
|
||||||
)
|
)
|
||||||
self.assertEqual(f(60), "1 minute")
|
self.assertEqual(utils.format_duration(60), "1 minute")
|
||||||
self.assertEqual(f(60 * 2), "2 minutes")
|
self.assertEqual(utils.format_duration(60 * 2), "2 minutes")
|
||||||
self.assertEqual(f(60 * 59), "59 minutes")
|
self.assertEqual(utils.format_duration(60 * 59), "59 minutes")
|
||||||
self.assertEqual(f(60 * 60), "1 hour")
|
self.assertEqual(utils.format_duration(60 * 60), "1 hour")
|
||||||
self.assertEqual(f(60 * 60 * 2), "2 hours")
|
self.assertEqual(utils.format_duration(60 * 60 * 2), "2 hours")
|
||||||
self.assertEqual(f(1), "1 second")
|
self.assertEqual(utils.format_duration(1), "1 second")
|
||||||
self.assertEqual(f(60 + 5), "1 minute, 5 seconds")
|
self.assertEqual(utils.format_duration(60 + 5), "1 minute, 5 seconds")
|
||||||
self.assertEqual(f(60 * 60 + 30), "1 hour, 30 seconds")
|
self.assertEqual(utils.format_duration(60 * 60 + 30), "1 hour, 30 seconds")
|
||||||
self.assertEqual(f(60 * 60 + 60 + 30), "1 hour, 1 minute, 30 seconds")
|
self.assertEqual(
|
||||||
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1 week, 30 seconds")
|
utils.format_duration(60 * 60 + 60 + 30), "1 hour, 1 minute, 30 seconds"
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
utils.format_duration(60 * 60 * 24 * 7 + 30), "1 week, 30 seconds"
|
||||||
|
)
|
||||||
|
|
||||||
def test_utils_natural(self):
|
def test_utils_natural(self):
|
||||||
def f(s):
|
def format(seconds: int):
|
||||||
return utils.format_duration(s, natural=True)
|
return utils.format_duration(seconds, natural=True)
|
||||||
|
|
||||||
self.assertEqual(f(0), "")
|
self.assertEqual(format(0), "")
|
||||||
self.assertEqual(f(60 * 60 * 24 * 7), "1 week")
|
self.assertEqual(format(60 * 60 * 24 * 7), "1 week")
|
||||||
self.assertEqual(f(60 * 60 * 24 * 21), "3 weeks")
|
self.assertEqual(format(60 * 60 * 24 * 21), "3 weeks")
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
f((60 * 60 * 24 * 21) - 1),
|
format((60 * 60 * 24 * 21) - 1),
|
||||||
"2 weeks, 6 days, 23 hours, 59 minutes and 59 seconds",
|
"2 weeks, 6 days, 23 hours, 59 minutes and 59 seconds",
|
||||||
)
|
)
|
||||||
self.assertEqual(f(60), "1 minute")
|
self.assertEqual(format(60), "1 minute")
|
||||||
self.assertEqual(f(60 * 2), "2 minutes")
|
self.assertEqual(format(60 * 2), "2 minutes")
|
||||||
self.assertEqual(f(60 * 59), "59 minutes")
|
self.assertEqual(format(60 * 59), "59 minutes")
|
||||||
self.assertEqual(f(60 * 60), "1 hour")
|
self.assertEqual(format(60 * 60), "1 hour")
|
||||||
self.assertEqual(f(60 * 60 * 2), "2 hours")
|
self.assertEqual(format(60 * 60 * 2), "2 hours")
|
||||||
self.assertEqual(f(1), "1 second")
|
self.assertEqual(format(1), "1 second")
|
||||||
self.assertEqual(f(60 + 5), "1 minute and 5 seconds")
|
self.assertEqual(format(60 + 5), "1 minute and 5 seconds")
|
||||||
self.assertEqual(f(60 * 60 + 30), "1 hour and 30 seconds")
|
self.assertEqual(format(60 * 60 + 30), "1 hour and 30 seconds")
|
||||||
self.assertEqual(f(60 * 60 + 60 + 30), "1 hour, 1 minute and 30 seconds")
|
self.assertEqual(format(60 * 60 + 60 + 30), "1 hour, 1 minute and 30 seconds")
|
||||||
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1 week and 30 seconds")
|
self.assertEqual(format(60 * 60 * 24 * 7 + 30), "1 week and 30 seconds")
|
||||||
|
|
||||||
def test_utils_short(self):
|
|
||||||
def f(s):
|
|
||||||
return utils.format_duration(s, short=True)
|
|
||||||
|
|
||||||
self.assertEqual(f(0), "")
|
|
||||||
self.assertEqual(f(60 * 60 * 24 * 7), "1w")
|
|
||||||
self.assertEqual(f(60 * 60 * 24 * 21), "3w")
|
|
||||||
self.assertEqual(
|
|
||||||
f((60 * 60 * 24 * 21) - 1),
|
|
||||||
"2w 6d 23h 59m 59s",
|
|
||||||
)
|
|
||||||
self.assertEqual(f(60), "1m")
|
|
||||||
self.assertEqual(f(60 * 2), "2m")
|
|
||||||
self.assertEqual(f(60 * 59), "59m")
|
|
||||||
self.assertEqual(f(60 * 60), "1h")
|
|
||||||
self.assertEqual(f(60 * 60 * 2), "2h")
|
|
||||||
self.assertEqual(f(1), "1s")
|
|
||||||
self.assertEqual(f(60 + 5), "1m 5s")
|
|
||||||
self.assertEqual(f(60 * 60 + 30), "1h 30s")
|
|
||||||
self.assertEqual(f(60 * 60 + 60 + 30), "1h 1m 30s")
|
|
||||||
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1w 30s")
|
|
||||||
|
|
||||||
def test_utils_natural_short(self):
|
|
||||||
def f(s):
|
|
||||||
return utils.format_duration(s, natural=True, short=True)
|
|
||||||
|
|
||||||
self.assertEqual(f(0), "")
|
|
||||||
self.assertEqual(f(60 * 60 * 24 * 7), "1w")
|
|
||||||
self.assertEqual(f(60 * 60 * 24 * 21), "3w")
|
|
||||||
self.assertEqual(
|
|
||||||
f((60 * 60 * 24 * 21) - 1),
|
|
||||||
"2w 6d 23h 59m and 59s",
|
|
||||||
)
|
|
||||||
self.assertEqual(f(60), "1m")
|
|
||||||
self.assertEqual(f(60 * 2), "2m")
|
|
||||||
self.assertEqual(f(60 * 59), "59m")
|
|
||||||
self.assertEqual(f(60 * 60), "1h")
|
|
||||||
self.assertEqual(f(60 * 60 * 2), "2h")
|
|
||||||
self.assertEqual(f(1), "1s")
|
|
||||||
self.assertEqual(f(60 + 5), "1m and 5s")
|
|
||||||
self.assertEqual(f(60 * 60 + 30), "1h and 30s")
|
|
||||||
self.assertEqual(f(60 * 60 + 60 + 30), "1h 1m and 30s")
|
|
||||||
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1w and 30s")
|
|
||||||
|
46
utils.py
46
utils.py
@ -1,12 +1,9 @@
|
|||||||
import os
|
import os
|
||||||
import time
|
|
||||||
from logging import error, info, warning
|
|
||||||
|
|
||||||
import disnake
|
import disnake
|
||||||
|
|
||||||
import commands
|
|
||||||
import constants
|
import constants
|
||||||
from state import command_cooldowns, message_responses
|
from state import message_responses
|
||||||
|
|
||||||
|
|
||||||
class ChannelResponseWrapper:
|
class ChannelResponseWrapper:
|
||||||
@ -37,50 +34,35 @@ class MessageInteractionWrapper:
|
|||||||
await self.response.edit_message(content=content, embed=embed, view=view)
|
await self.response.edit_message(content=content, embed=embed, view=view)
|
||||||
|
|
||||||
|
|
||||||
def cooldown(message, cooldown_time: int):
|
def format_duration(duration: int, natural: bool = False):
|
||||||
possible_commands = commands.match(message.content)
|
|
||||||
if not possible_commands or len(possible_commands) > 1:
|
|
||||||
return
|
|
||||||
command = possible_commands[0]
|
|
||||||
|
|
||||||
end_time = time.time() + cooldown_time
|
|
||||||
if message.author.id in command_cooldowns:
|
|
||||||
command_cooldowns[message.author.id][command] = end_time
|
|
||||||
else:
|
|
||||||
command_cooldowns[message.author.id] = {command: end_time}
|
|
||||||
|
|
||||||
|
|
||||||
def format_duration(duration: int, natural: bool = False, short: bool = False):
|
|
||||||
def format_plural(noun, count):
|
def format_plural(noun, count):
|
||||||
if short:
|
return noun if count == 1 else noun + "s"
|
||||||
return noun[0]
|
|
||||||
return " " + (noun if count == 1 else noun + "s")
|
|
||||||
|
|
||||||
segments = []
|
segments = []
|
||||||
|
|
||||||
weeks, duration = divmod(duration, 604800)
|
weeks, duration = divmod(duration, 604800)
|
||||||
if weeks > 0:
|
if weeks > 0:
|
||||||
segments.append(f"{weeks}{format_plural('week', weeks)}")
|
segments.append(f"{weeks} {format_plural('week', weeks)}")
|
||||||
|
|
||||||
days, duration = divmod(duration, 86400)
|
days, duration = divmod(duration, 86400)
|
||||||
if days > 0:
|
if days > 0:
|
||||||
segments.append(f"{days}{format_plural('day', days)}")
|
segments.append(f"{days} {format_plural('day', days)}")
|
||||||
|
|
||||||
hours, duration = divmod(duration, 3600)
|
hours, duration = divmod(duration, 3600)
|
||||||
if hours > 0:
|
if hours > 0:
|
||||||
segments.append(f"{hours}{format_plural('hour', hours)}")
|
segments.append(f"{hours} {format_plural('hour', hours)}")
|
||||||
|
|
||||||
minutes, duration = divmod(duration, 60)
|
minutes, duration = divmod(duration, 60)
|
||||||
if minutes > 0:
|
if minutes > 0:
|
||||||
segments.append(f"{minutes}{format_plural('minute', minutes)}")
|
segments.append(f"{minutes} {format_plural('minute', minutes)}")
|
||||||
|
|
||||||
if duration > 0:
|
if duration > 0:
|
||||||
segments.append(f"{duration}{format_plural('second', duration)}")
|
segments.append(f"{duration} {format_plural('second', duration)}")
|
||||||
|
|
||||||
separator = " " if short else ", "
|
|
||||||
if not natural or len(segments) <= 1:
|
if not natural or len(segments) <= 1:
|
||||||
return separator.join(segments)
|
return ", ".join(segments)
|
||||||
return separator.join(segments[:-1]) + f" and {segments[-1]}"
|
|
||||||
|
return ", ".join(segments[:-1]) + f" and {segments[-1]}"
|
||||||
|
|
||||||
|
|
||||||
async def add_check_reaction(message):
|
async def add_check_reaction(message):
|
||||||
@ -121,13 +103,13 @@ def filter_secrets(text: str) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def load_opus():
|
def load_opus():
|
||||||
warning("opus wasn't automatically loaded! trying to load manually...")
|
print("opus wasn't automatically loaded! trying to load manually...")
|
||||||
for path in ["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"]:
|
for path in ["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"]:
|
||||||
if os.path.exists(path):
|
if os.path.exists(path):
|
||||||
try:
|
try:
|
||||||
disnake.opus.load_opus(path)
|
disnake.opus.load_opus(path)
|
||||||
info(f"successfully loaded opus from {path}")
|
print(f"successfully loaded opus from {path}")
|
||||||
return
|
return
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error(f"failed to load opus from {path}: {e}")
|
print(f"failed to load opus from {path}: {e}")
|
||||||
raise Exception("could not locate working opus library")
|
raise Exception("could not locate working opus library")
|
||||||
|
51
youtubedl.py
51
youtubedl.py
@ -6,9 +6,9 @@ from typing import Any, Optional
|
|||||||
import disnake
|
import disnake
|
||||||
import yt_dlp
|
import yt_dlp
|
||||||
|
|
||||||
from constants import BAR_LENGTH, EMBED_COLOR, YTDL_OPTIONS
|
import constants
|
||||||
|
|
||||||
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
|
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)
|
||||||
|
|
||||||
|
|
||||||
class CustomAudioSource(disnake.AudioSource):
|
class CustomAudioSource(disnake.AudioSource):
|
||||||
@ -40,13 +40,9 @@ class YTDLSource(disnake.PCMVolumeTransformer):
|
|||||||
self.description = data.get("description")
|
self.description = data.get("description")
|
||||||
self.duration = data.get("duration")
|
self.duration = data.get("duration")
|
||||||
self.id = data.get("id")
|
self.id = data.get("id")
|
||||||
self.like_count = data.get("like_count")
|
|
||||||
self.original_url = data.get("original_url")
|
self.original_url = data.get("original_url")
|
||||||
self.thumbnail_url = data.get("thumbnail")
|
self.thumbnail_url = data.get("thumbnail")
|
||||||
self.timestamp = data.get("timestamp")
|
|
||||||
self.title = data.get("title")
|
self.title = data.get("title")
|
||||||
self.uploader = data.get("uploader")
|
|
||||||
self.uploader_url = data.get("uploader_url")
|
|
||||||
self.view_count = data.get("view_count")
|
self.view_count = data.get("view_count")
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -103,47 +99,6 @@ class QueuedSong:
|
|||||||
+ (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "")
|
+ (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "")
|
||||||
)
|
)
|
||||||
|
|
||||||
def embed(self, is_paused=False):
|
|
||||||
progress = 0
|
|
||||||
if self.player.duration:
|
|
||||||
progress = self.player.original.progress / self.player.duration
|
|
||||||
|
|
||||||
embed = disnake.Embed(
|
|
||||||
color=EMBED_COLOR,
|
|
||||||
title=self.player.title,
|
|
||||||
url=self.player.original_url,
|
|
||||||
description=(
|
|
||||||
f"{'⏸️ ' if is_paused else ''}"
|
|
||||||
f"`[{'#'*int(progress * BAR_LENGTH)}{'-'*int((1 - progress) * BAR_LENGTH)}]` "
|
|
||||||
+ (
|
|
||||||
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
|
|
||||||
if self.player.duration
|
|
||||||
else "[**live**]"
|
|
||||||
)
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
embed.add_field(
|
|
||||||
name="Uploader",
|
|
||||||
value=f"[{self.player.uploader}]({self.player.uploader_url})",
|
|
||||||
)
|
|
||||||
embed.add_field(name="Likes", value=f"{self.player.like_count:,}")
|
|
||||||
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
|
|
||||||
embed.add_field(name="Published", value=f"<t:{self.player.timestamp}>")
|
|
||||||
embed.add_field(name="Volume", value=f"{int(self.player.volume*100)}%")
|
|
||||||
|
|
||||||
embed.set_image(self.player.thumbnail_url)
|
|
||||||
embed.set_footer(
|
|
||||||
text=f"queued by {self.trigger_message.author.name}",
|
|
||||||
icon_url=(
|
|
||||||
self.trigger_message.author.avatar.url
|
|
||||||
if self.trigger_message.author.avatar
|
|
||||||
else None
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
return embed
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.__repr__()
|
return self.__repr__()
|
||||||
|
|
||||||
@ -179,4 +134,4 @@ def format_duration(duration: int | float) -> str:
|
|||||||
|
|
||||||
def __reload_module__():
|
def __reload_module__():
|
||||||
global ytdl
|
global ytdl
|
||||||
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
|
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user