Compare commits
No commits in common. "27a460fa6e4cbefe4cdc63a445c7d0bf6f8013bf" and "655b552c10fc93de97ee704416711fb7393471a0" have entirely different histories.
27a460fa6e
...
655b552c10
@ -1,64 +1,18 @@
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
import disnake
|
||||
import psutil
|
||||
|
||||
import arguments
|
||||
import commands
|
||||
import utils
|
||||
from constants import EMBED_COLOR
|
||||
from state import client, start_time
|
||||
from state import start_time
|
||||
|
||||
|
||||
async def status(message):
|
||||
member_count = 0
|
||||
channel_count = 0
|
||||
for guild in client.guilds:
|
||||
member_count += len(guild.members)
|
||||
channel_count += len(guild.channels)
|
||||
process = psutil.Process(os.getpid())
|
||||
memory_usage = process.memory_info().rss / 1048576
|
||||
|
||||
embed = disnake.Embed(color=EMBED_COLOR)
|
||||
embed.add_field(
|
||||
name="Latency",
|
||||
value=f"```{round(client.latency * 1000, 1)} ms```",
|
||||
async def help(message):
|
||||
await utils.reply(
|
||||
message,
|
||||
", ".join(
|
||||
[f"`{command.value}`" for command in commands.Command.__members__.values()]
|
||||
),
|
||||
)
|
||||
embed.add_field(
|
||||
name="RSS",
|
||||
value=f"```{round(memory_usage, 1)} MiB```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Threads",
|
||||
value=f"```{threading.active_count()}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Guilds",
|
||||
value=f"```{len(client.guilds)}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Members",
|
||||
value=f"```{member_count}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Channels",
|
||||
value=f"```{channel_count}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Commands",
|
||||
value=f"```{len(commands.Command.__members__)}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Disnake",
|
||||
value=f"```{disnake.__version__}```",
|
||||
)
|
||||
embed.add_field(
|
||||
name="Uptime",
|
||||
value=f"```{utils.format_duration(int(time.time() - start_time), short=True)}```",
|
||||
)
|
||||
await utils.reply(message, embed=embed)
|
||||
|
||||
|
||||
async def uptime(message):
|
||||
@ -82,12 +36,3 @@ async def uptime(message):
|
||||
await utils.reply(
|
||||
message, f"up {utils.format_duration(int(time.time() - start_time))}"
|
||||
)
|
||||
|
||||
|
||||
async def help(message):
|
||||
await utils.reply(
|
||||
message,
|
||||
", ".join(
|
||||
[f"`{command.value}`" for command in commands.Command.__members__.values()]
|
||||
),
|
||||
)
|
||||
|
@ -1,10 +1,9 @@
|
||||
from enum import Enum
|
||||
from functools import lru_cache
|
||||
import enum
|
||||
|
||||
import constants
|
||||
|
||||
|
||||
class Command(Enum):
|
||||
class Command(enum.Enum):
|
||||
CLEAR = "clear"
|
||||
CURRENT = "current"
|
||||
EXECUTE = "execute"
|
||||
@ -20,12 +19,10 @@ class Command(Enum):
|
||||
RELOAD = "reload"
|
||||
RESUME = "resume"
|
||||
SKIP = "skip"
|
||||
STATUS = "status"
|
||||
UPTIME = "uptime"
|
||||
VOLUME = "volume"
|
||||
|
||||
|
||||
@lru_cache
|
||||
def match_token(token: str) -> list[Command]:
|
||||
if token.lower() == "r":
|
||||
return [Command.RELOAD]
|
||||
@ -46,13 +43,11 @@ def match_token(token: str) -> list[Command]:
|
||||
)
|
||||
|
||||
|
||||
@lru_cache
|
||||
def match(command: str) -> list[Command] | None:
|
||||
if tokens := tokenize(command):
|
||||
return match_token(tokens[0])
|
||||
|
||||
|
||||
@lru_cache
|
||||
def tokenize(string: str) -> list[str]:
|
||||
tokens = []
|
||||
token = ""
|
||||
|
@ -5,14 +5,11 @@ import disnake_paginator
|
||||
|
||||
import arguments
|
||||
import commands
|
||||
import constants
|
||||
import utils
|
||||
import youtubedl
|
||||
from constants import EMBED_COLOR
|
||||
from state import client, players
|
||||
|
||||
from .playback import resume
|
||||
from .utils import command_allowed, ensure_joined, play_next
|
||||
|
||||
|
||||
async def queue_or_play(message, edited=False):
|
||||
if message.guild.id not in players:
|
||||
@ -106,7 +103,7 @@ async def queue_or_play(message, edited=False):
|
||||
players[message.guild.id].queue.remove(target)
|
||||
|
||||
if len(targets) == 1:
|
||||
await utils.reply(message, f"**removed** {targets[0].format()}")
|
||||
await utils.reply(message, f"**X** {targets[0].format()}")
|
||||
else:
|
||||
await utils.reply(
|
||||
message,
|
||||
@ -180,8 +177,6 @@ async def queue_or_play(message, edited=False):
|
||||
message,
|
||||
f"**{len(players[message.guild.id].queue)}.** {queued.format()}",
|
||||
)
|
||||
|
||||
utils.cooldown(message, 3)
|
||||
elif tokens[0].lower() == "play":
|
||||
await resume(message)
|
||||
else:
|
||||
@ -199,7 +194,7 @@ async def queue_or_play(message, edited=False):
|
||||
def embed(description):
|
||||
e = disnake.Embed(
|
||||
description=description,
|
||||
color=EMBED_COLOR,
|
||||
color=constants.EMBED_COLOR,
|
||||
)
|
||||
if formatted_duration and len(players[message.guild.id].queue) > 1:
|
||||
e.set_footer(text=f"{formatted_duration} in total")
|
||||
@ -207,7 +202,7 @@ async def queue_or_play(message, edited=False):
|
||||
|
||||
await disnake_paginator.ButtonPaginator(
|
||||
invalid_user_function=utils.invalid_user_handler,
|
||||
color=EMBED_COLOR,
|
||||
color=constants.EMBED_COLOR,
|
||||
segments=list(
|
||||
map(
|
||||
embed,
|
||||
@ -232,18 +227,98 @@ async def queue_or_play(message, edited=False):
|
||||
)
|
||||
|
||||
|
||||
async def skip(message):
|
||||
async def playing(message):
|
||||
if not command_allowed(message, immutable=True):
|
||||
return
|
||||
|
||||
tokens = commands.tokenize(message.content)
|
||||
parser = arguments.ArgumentParser(tokens[0], "skip the currently playing song")
|
||||
parser = arguments.ArgumentParser(
|
||||
tokens[0], "get information about the currently playing song"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-n",
|
||||
"--next",
|
||||
"-d",
|
||||
"--description",
|
||||
action="store_true",
|
||||
help="skip the next song",
|
||||
help="get the description",
|
||||
)
|
||||
if not (args := await parser.parse_args(message, tokens)):
|
||||
return
|
||||
|
||||
if source := message.guild.voice_client.source:
|
||||
if args.description:
|
||||
if description := source.description:
|
||||
paginator = disnake_paginator.ButtonPaginator(
|
||||
invalid_user_function=utils.invalid_user_handler,
|
||||
color=constants.EMBED_COLOR,
|
||||
title=source.title,
|
||||
segments=disnake_paginator.split(description),
|
||||
)
|
||||
for embed in paginator.embeds:
|
||||
embed.url = source.original_url
|
||||
await paginator.start(utils.MessageInteractionWrapper(message))
|
||||
else:
|
||||
await utils.reply(
|
||||
message,
|
||||
source.description or "no description found!",
|
||||
)
|
||||
return
|
||||
|
||||
bar_length = 35
|
||||
progress = source.original.progress / source.duration
|
||||
|
||||
embed = disnake.Embed(
|
||||
color=constants.EMBED_COLOR,
|
||||
title=source.title,
|
||||
url=source.original_url,
|
||||
description=f"{'⏸️ ' if message.guild.voice_client.is_paused() else ''}"
|
||||
f"`[{'#'*int(progress * bar_length)}{'-'*int((1 - progress) * bar_length)}]` "
|
||||
f"**{youtubedl.format_duration(int(source.original.progress))}** / **{youtubedl.format_duration(source.duration)}** (**{round(progress * 100)}%**)",
|
||||
)
|
||||
embed.add_field(name="Volume", value=f"{int(source.volume*100)}%")
|
||||
embed.add_field(name="Views", value=f"{source.view_count:,}")
|
||||
embed.add_field(
|
||||
name="Queuer",
|
||||
value=players[message.guild.id].current.trigger_message.author.mention,
|
||||
)
|
||||
embed.set_image(source.thumbnail_url)
|
||||
|
||||
await utils.reply(
|
||||
message,
|
||||
embed=embed,
|
||||
)
|
||||
else:
|
||||
await utils.reply(
|
||||
message,
|
||||
"nothing is playing!",
|
||||
)
|
||||
|
||||
|
||||
async def fast_forward(message):
|
||||
if not command_allowed(message):
|
||||
return
|
||||
|
||||
tokens = commands.tokenize(message.content)
|
||||
parser = arguments.ArgumentParser(tokens[0], "fast forward audio playback")
|
||||
parser.add_argument(
|
||||
"seconds",
|
||||
type=lambda v: arguments.range_type(v, min=0, max=300),
|
||||
help="the amount of seconds to fast forward",
|
||||
)
|
||||
if not (args := await parser.parse_args(message, tokens)):
|
||||
return
|
||||
|
||||
if not message.guild.voice_client.source:
|
||||
await utils.reply(message, "nothing is playing!")
|
||||
return
|
||||
|
||||
message.guild.voice_client.pause()
|
||||
message.guild.voice_client.source.original.fast_forward(args.seconds)
|
||||
message.guild.voice_client.resume()
|
||||
|
||||
await utils.add_check_reaction(message)
|
||||
|
||||
|
||||
async def skip(message):
|
||||
if not command_allowed(message):
|
||||
return
|
||||
|
||||
@ -253,11 +328,159 @@ async def skip(message):
|
||||
message,
|
||||
"the queue is empty now!",
|
||||
)
|
||||
elif args.next:
|
||||
next = players[message.guild.id].queue.pop()
|
||||
await utils.reply(message, f"**skipped** {next.format()}")
|
||||
else:
|
||||
message.guild.voice_client.stop()
|
||||
await utils.add_check_reaction(message)
|
||||
if not message.guild.voice_client.source:
|
||||
play_next(message)
|
||||
|
||||
|
||||
async def join(message):
|
||||
if message.guild.voice_client:
|
||||
return await message.guild.voice_client.move_to(message.channel)
|
||||
|
||||
await message.channel.connect()
|
||||
await utils.add_check_reaction(message)
|
||||
|
||||
|
||||
async def leave(message):
|
||||
if not command_allowed(message):
|
||||
return
|
||||
|
||||
await message.guild.voice_client.disconnect()
|
||||
await utils.add_check_reaction(message)
|
||||
|
||||
|
||||
async def resume(message):
|
||||
if not command_allowed(message):
|
||||
return
|
||||
|
||||
if message.guild.voice_client.is_paused():
|
||||
message.guild.voice_client.resume()
|
||||
await utils.add_check_reaction(message)
|
||||
else:
|
||||
await utils.reply(
|
||||
message,
|
||||
"nothing is paused!",
|
||||
)
|
||||
|
||||
|
||||
async def pause(message):
|
||||
if not command_allowed(message):
|
||||
return
|
||||
|
||||
if message.guild.voice_client.is_playing():
|
||||
message.guild.voice_client.pause()
|
||||
await utils.add_check_reaction(message)
|
||||
else:
|
||||
await utils.reply(
|
||||
message,
|
||||
"nothing is playing!",
|
||||
)
|
||||
|
||||
|
||||
async def volume(message):
|
||||
if not command_allowed(message, immutable=True):
|
||||
return
|
||||
|
||||
tokens = commands.tokenize(message.content)
|
||||
parser = arguments.ArgumentParser(tokens[0], "get or set the current volume level")
|
||||
parser.add_argument(
|
||||
"volume",
|
||||
nargs="?",
|
||||
type=lambda v: arguments.range_type(v, min=0, max=150),
|
||||
help="the volume level (0 - 150)",
|
||||
)
|
||||
if not (args := await parser.parse_args(message, tokens)):
|
||||
return
|
||||
|
||||
if not message.guild.voice_client.source:
|
||||
await utils.reply(message, "nothing is playing!")
|
||||
return
|
||||
|
||||
if args.volume is None:
|
||||
await utils.reply(
|
||||
message,
|
||||
f"{int(message.guild.voice_client.source.volume * 100)}",
|
||||
)
|
||||
else:
|
||||
if not command_allowed(message):
|
||||
return
|
||||
|
||||
message.guild.voice_client.source.volume = float(args.volume) / 100.0
|
||||
await utils.add_check_reaction(message)
|
||||
|
||||
|
||||
def delete_queued(messages):
|
||||
if messages[0].guild.id not in players:
|
||||
return
|
||||
|
||||
if len(players[messages[0].guild.id].queue) == 0:
|
||||
return
|
||||
|
||||
found = []
|
||||
for message in messages:
|
||||
for queued in players[message.guild.id].queue:
|
||||
if queued.trigger_message.id == message.id:
|
||||
found.append(queued)
|
||||
for queued in found:
|
||||
players[messages[0].guild.id].queue.remove(queued)
|
||||
|
||||
|
||||
def play_after_callback(e, message, once):
|
||||
if e:
|
||||
print(f"player error: {e}")
|
||||
if not once:
|
||||
play_next(message)
|
||||
|
||||
|
||||
def play_next(message, once=False, first=False):
|
||||
message.guild.voice_client.stop()
|
||||
if message.guild.id in players and players[message.guild.id].queue:
|
||||
queued = players[message.guild.id].queue_pop()
|
||||
try:
|
||||
message.guild.voice_client.play(
|
||||
queued.player, after=lambda e: play_after_callback(e, message, once)
|
||||
)
|
||||
except disnake.opus.OpusNotLoaded:
|
||||
utils.load_opus()
|
||||
message.guild.voice_client.play(
|
||||
queued.player, after=lambda e: play_after_callback(e, message, once)
|
||||
)
|
||||
|
||||
embed = disnake.Embed(
|
||||
color=constants.EMBED_COLOR,
|
||||
title=queued.player.title,
|
||||
url=queued.player.original_url,
|
||||
)
|
||||
embed.add_field(name="Volume", value=f"{int(queued.player.volume*100)}%")
|
||||
embed.add_field(name="Views", value=f"{queued.player.view_count:,}")
|
||||
embed.add_field(
|
||||
name="Queuer",
|
||||
value=players[message.guild.id].current.trigger_message.author.mention,
|
||||
)
|
||||
embed.set_image(queued.player.thumbnail_url)
|
||||
|
||||
if first:
|
||||
client.loop.create_task(utils.reply(message, embed=embed))
|
||||
else:
|
||||
client.loop.create_task(utils.channel_send(message, embed=embed))
|
||||
|
||||
|
||||
async def ensure_joined(message):
|
||||
if message.guild.voice_client is None:
|
||||
if message.author.voice:
|
||||
await message.author.voice.channel.connect()
|
||||
else:
|
||||
await utils.reply(message, "you are not connected to a voice channel!")
|
||||
|
||||
|
||||
def command_allowed(message, immutable=False):
|
||||
if not message.guild.voice_client:
|
||||
return
|
||||
if immutable:
|
||||
return message.channel.id == message.guild.voice_client.channel.id
|
||||
else:
|
||||
if not message.author.voice:
|
||||
return False
|
||||
return message.author.voice.channel.id == message.guild.voice_client.channel.id
|
@ -1,18 +0,0 @@
|
||||
from .channel import join, leave
|
||||
from .playback import fast_forward, pause, playing, resume, volume
|
||||
from .queue import queue_or_play, skip
|
||||
from .utils import remove_queued
|
||||
|
||||
__all__ = [
|
||||
"join",
|
||||
"leave",
|
||||
"fast_forward",
|
||||
"playing",
|
||||
"queue_or_play",
|
||||
"skip",
|
||||
"resume",
|
||||
"pause",
|
||||
"skip",
|
||||
"remove_queued",
|
||||
"volume",
|
||||
]
|
@ -1,19 +0,0 @@
|
||||
import utils
|
||||
|
||||
from .utils import command_allowed
|
||||
|
||||
|
||||
async def join(message):
|
||||
if message.guild.voice_client:
|
||||
return await message.guild.voice_client.move_to(message.channel)
|
||||
|
||||
await message.channel.connect()
|
||||
await utils.add_check_reaction(message)
|
||||
|
||||
|
||||
async def leave(message):
|
||||
if not command_allowed(message):
|
||||
return
|
||||
|
||||
await message.guild.voice_client.disconnect()
|
||||
await utils.add_check_reaction(message)
|
@ -1,143 +0,0 @@
|
||||
import disnake_paginator
|
||||
|
||||
import arguments
|
||||
import commands
|
||||
import utils
|
||||
from constants import EMBED_COLOR
|
||||
from state import players
|
||||
|
||||
from .utils import command_allowed
|
||||
|
||||
|
||||
async def playing(message):
|
||||
tokens = commands.tokenize(message.content)
|
||||
parser = arguments.ArgumentParser(
|
||||
tokens[0], "get information about the currently playing song"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
"--description",
|
||||
action="store_true",
|
||||
help="get the description",
|
||||
)
|
||||
if not (args := await parser.parse_args(message, tokens)):
|
||||
return
|
||||
|
||||
if not command_allowed(message, immutable=True):
|
||||
return
|
||||
|
||||
if source := message.guild.voice_client.source:
|
||||
if args.description:
|
||||
if description := source.description:
|
||||
paginator = disnake_paginator.ButtonPaginator(
|
||||
invalid_user_function=utils.invalid_user_handler,
|
||||
color=EMBED_COLOR,
|
||||
title=source.title,
|
||||
segments=disnake_paginator.split(description),
|
||||
)
|
||||
for embed in paginator.embeds:
|
||||
embed.url = source.original_url
|
||||
await paginator.start(utils.MessageInteractionWrapper(message))
|
||||
else:
|
||||
await utils.reply(
|
||||
message,
|
||||
source.description or "no description found!",
|
||||
)
|
||||
return
|
||||
|
||||
await utils.reply(
|
||||
message,
|
||||
embed=players[message.guild.id].current.embed(
|
||||
is_paused=message.guild.voice_client.is_paused()
|
||||
),
|
||||
)
|
||||
else:
|
||||
await utils.reply(
|
||||
message,
|
||||
"nothing is playing!",
|
||||
)
|
||||
|
||||
|
||||
async def resume(message):
|
||||
if not command_allowed(message):
|
||||
return
|
||||
|
||||
if message.guild.voice_client.is_paused():
|
||||
message.guild.voice_client.resume()
|
||||
await utils.add_check_reaction(message)
|
||||
else:
|
||||
await utils.reply(
|
||||
message,
|
||||
"nothing is paused!",
|
||||
)
|
||||
|
||||
|
||||
async def pause(message):
|
||||
if not command_allowed(message):
|
||||
return
|
||||
|
||||
if message.guild.voice_client.is_playing():
|
||||
message.guild.voice_client.pause()
|
||||
await utils.add_check_reaction(message)
|
||||
else:
|
||||
await utils.reply(
|
||||
message,
|
||||
"nothing is playing!",
|
||||
)
|
||||
|
||||
|
||||
async def fast_forward(message):
|
||||
tokens = commands.tokenize(message.content)
|
||||
parser = arguments.ArgumentParser(tokens[0], "fast forward audio playback")
|
||||
parser.add_argument(
|
||||
"seconds",
|
||||
type=lambda v: arguments.range_type(v, min=0, max=300),
|
||||
help="the amount of seconds to fast forward",
|
||||
)
|
||||
if not (args := await parser.parse_args(message, tokens)):
|
||||
return
|
||||
|
||||
if not command_allowed(message):
|
||||
return
|
||||
|
||||
if not message.guild.voice_client.source:
|
||||
await utils.reply(message, "nothing is playing!")
|
||||
return
|
||||
|
||||
message.guild.voice_client.pause()
|
||||
message.guild.voice_client.source.original.fast_forward(args.seconds)
|
||||
message.guild.voice_client.resume()
|
||||
|
||||
await utils.add_check_reaction(message)
|
||||
|
||||
|
||||
async def volume(message):
|
||||
tokens = commands.tokenize(message.content)
|
||||
parser = arguments.ArgumentParser(tokens[0], "get or set the current volume level")
|
||||
parser.add_argument(
|
||||
"volume",
|
||||
nargs="?",
|
||||
type=lambda v: arguments.range_type(v, min=0, max=150),
|
||||
help="the volume level (0 - 150)",
|
||||
)
|
||||
if not (args := await parser.parse_args(message, tokens)):
|
||||
return
|
||||
|
||||
if not command_allowed(message, immutable=True):
|
||||
return
|
||||
|
||||
if not message.guild.voice_client.source:
|
||||
await utils.reply(message, "nothing is playing!")
|
||||
return
|
||||
|
||||
if args.volume is None:
|
||||
await utils.reply(
|
||||
message,
|
||||
f"{int(message.guild.voice_client.source.volume * 100)}",
|
||||
)
|
||||
else:
|
||||
if not command_allowed(message):
|
||||
return
|
||||
|
||||
message.guild.voice_client.source.volume = float(args.volume) / 100.0
|
||||
await utils.add_check_reaction(message)
|
@ -1,72 +0,0 @@
|
||||
from logging import error
|
||||
|
||||
import disnake
|
||||
|
||||
import utils
|
||||
from state import client, players
|
||||
|
||||
|
||||
def play_after_callback(e, message, once):
|
||||
if e:
|
||||
error(f"player error: {e}")
|
||||
if not once:
|
||||
play_next(message)
|
||||
|
||||
|
||||
def play_next(message, once=False, first=False):
|
||||
if not message.guild.voice_client:
|
||||
return
|
||||
|
||||
message.guild.voice_client.stop()
|
||||
if message.guild.id in players and players[message.guild.id].queue:
|
||||
queued = players[message.guild.id].queue_pop()
|
||||
try:
|
||||
message.guild.voice_client.play(
|
||||
queued.player, after=lambda e: play_after_callback(e, message, once)
|
||||
)
|
||||
except disnake.opus.OpusNotLoaded:
|
||||
utils.load_opus()
|
||||
message.guild.voice_client.play(
|
||||
queued.player, after=lambda e: play_after_callback(e, message, once)
|
||||
)
|
||||
|
||||
embed = queued.embed()
|
||||
if first and len(players[message.guild.id].queue) == 0:
|
||||
client.loop.create_task(utils.reply(message, embed=embed))
|
||||
else:
|
||||
client.loop.create_task(utils.channel_send(message, embed=embed))
|
||||
|
||||
|
||||
def remove_queued(messages):
|
||||
if messages[0].guild.id not in players:
|
||||
return
|
||||
|
||||
if len(players[messages[0].guild.id].queue) == 0:
|
||||
return
|
||||
|
||||
found = []
|
||||
for message in messages:
|
||||
for queued in players[message.guild.id].queue:
|
||||
if queued.trigger_message.id == message.id:
|
||||
found.append(queued)
|
||||
for queued in found:
|
||||
players[messages[0].guild.id].queue.remove(queued)
|
||||
|
||||
|
||||
async def ensure_joined(message):
|
||||
if message.guild.voice_client is None:
|
||||
if message.author.voice:
|
||||
await message.author.voice.channel.connect()
|
||||
else:
|
||||
await utils.reply(message, "you are not connected to a voice channel!")
|
||||
|
||||
|
||||
def command_allowed(message, immutable=False):
|
||||
if not message.guild.voice_client:
|
||||
return
|
||||
if immutable:
|
||||
return message.channel.id == message.guild.voice_client.channel.id
|
||||
else:
|
||||
if not message.author.voice:
|
||||
return False
|
||||
return message.author.voice.channel.id == message.guild.voice_client.channel.id
|
@ -15,7 +15,6 @@ YTDL_OPTIONS = {
|
||||
"source_address": "0.0.0.0",
|
||||
}
|
||||
|
||||
BAR_LENGTH = 35
|
||||
EMBED_COLOR = 0xFF6600
|
||||
OWNERS = [531392146767347712]
|
||||
PREFIX = "%"
|
||||
@ -26,11 +25,6 @@ RELOADABLE_MODULES = [
|
||||
"commands.tools",
|
||||
"commands.utils",
|
||||
"commands.voice",
|
||||
"commands.voice.channel",
|
||||
"commands.voice.playback",
|
||||
"commands.voice.playing",
|
||||
"commands.voice.queue",
|
||||
"commands.voice.utils",
|
||||
"constants",
|
||||
"core",
|
||||
"events",
|
||||
|
71
core.py
71
core.py
@ -6,20 +6,18 @@ import io
|
||||
import textwrap
|
||||
import time
|
||||
import traceback
|
||||
from logging import debug
|
||||
|
||||
import disnake
|
||||
import disnake_paginator
|
||||
|
||||
import commands
|
||||
import constants
|
||||
import utils
|
||||
from commands import Command as C
|
||||
from constants import EMBED_COLOR, OWNERS, PREFIX, RELOADABLE_MODULES
|
||||
from state import client, command_cooldowns, command_locks, idle_tracker
|
||||
from state import client, command_locks, idle_tracker
|
||||
|
||||
|
||||
async def on_message(message, edited=False):
|
||||
if not message.content.startswith(PREFIX) or message.author.bot:
|
||||
if not message.content.startswith(constants.PREFIX) or message.author.bot:
|
||||
return
|
||||
|
||||
tokens = commands.tokenize(message.content)
|
||||
@ -40,44 +38,26 @@ async def on_message(message, edited=False):
|
||||
f"ambiguous command, could be {' or '.join([f'`{match.value}`' for match in matched])}",
|
||||
)
|
||||
return
|
||||
matched = matched[0]
|
||||
|
||||
if (message.guild.id, message.author.id) not in command_locks:
|
||||
command_locks[(message.guild.id, message.author.id)] = asyncio.Lock()
|
||||
await command_locks[(message.guild.id, message.author.id)].acquire()
|
||||
if message.guild.id not in command_locks:
|
||||
command_locks[message.guild.id] = asyncio.Lock()
|
||||
|
||||
C = commands.Command
|
||||
try:
|
||||
if cooldowns := command_cooldowns.get(message.author.id):
|
||||
if (end_time := cooldowns.get(matched)) and int(time.time()) < int(
|
||||
end_time
|
||||
):
|
||||
await utils.reply(
|
||||
message,
|
||||
f"please wait **{utils.format_duration(int(end_time - time.time()), natural=True)}** before using this command again!",
|
||||
)
|
||||
return
|
||||
|
||||
match matched:
|
||||
case C.RELOAD if message.author.id in OWNERS:
|
||||
match matched[0]:
|
||||
case C.RELOAD if message.author.id in constants.OWNERS:
|
||||
reloaded_modules = set()
|
||||
start = time.time()
|
||||
|
||||
rreload(reloaded_modules, __import__("core"))
|
||||
rreload(reloaded_modules, __import__("extra"))
|
||||
for module in filter(
|
||||
lambda v: inspect.ismodule(v) and v.__name__ in RELOADABLE_MODULES,
|
||||
lambda v: inspect.ismodule(v)
|
||||
and v.__name__ in constants.RELOADABLE_MODULES,
|
||||
globals().values(),
|
||||
):
|
||||
rreload(reloaded_modules, module)
|
||||
|
||||
end = time.time()
|
||||
if __debug__:
|
||||
debug(
|
||||
f"reloaded {len(reloaded_modules)} modules in {round(end-start, 2)}s"
|
||||
)
|
||||
|
||||
await utils.add_check_reaction(message)
|
||||
case C.EXECUTE if message.author.id in OWNERS:
|
||||
case C.EXECUTE if message.author.id in constants.OWNERS:
|
||||
code = message.content[len(tokens[0]) + 1 :].strip().strip("`")
|
||||
for replacement in ["python", "py"]:
|
||||
if code.startswith(replacement):
|
||||
@ -112,23 +92,25 @@ async def on_message(message, edited=False):
|
||||
prefix="```\n",
|
||||
suffix="```",
|
||||
invalid_user_function=utils.invalid_user_handler,
|
||||
color=EMBED_COLOR,
|
||||
color=constants.EMBED_COLOR,
|
||||
segments=disnake_paginator.split(output),
|
||||
).start(utils.MessageInteractionWrapper(message))
|
||||
elif len(output.strip()) == 0:
|
||||
await utils.add_check_reaction(message)
|
||||
else:
|
||||
await utils.reply(message, output)
|
||||
case C.CLEAR | C.PURGE if message.author.id in OWNERS:
|
||||
case C.CLEAR | C.PURGE if message.author.id in constants.OWNERS:
|
||||
await commands.tools.clear(message)
|
||||
case C.JOIN:
|
||||
await commands.voice.join(message)
|
||||
case C.LEAVE:
|
||||
await commands.voice.leave(message)
|
||||
case C.QUEUE | C.PLAY:
|
||||
await commands.voice.queue_or_play(message, edited)
|
||||
async with command_locks[message.guild.id]:
|
||||
await commands.voice.queue_or_play(message, edited)
|
||||
case C.SKIP:
|
||||
await commands.voice.skip(message)
|
||||
async with command_locks[message.guild.id]:
|
||||
await commands.voice.skip(message)
|
||||
case C.RESUME:
|
||||
await commands.voice.resume(message)
|
||||
case C.PAUSE:
|
||||
@ -143,29 +125,24 @@ async def on_message(message, edited=False):
|
||||
await commands.voice.playing(message)
|
||||
case C.FAST_FORWARD:
|
||||
await commands.voice.fast_forward(message)
|
||||
case C.STATUS:
|
||||
await commands.bot.status(message)
|
||||
except Exception as e:
|
||||
await utils.reply(
|
||||
message,
|
||||
f"exception occurred while processing command: ```\n{"".join(traceback.format_exception(e)).replace("`", "\\`")}```",
|
||||
f"exception occurred while processing command: ```\n{''.join(traceback.format_exception(e)).replace('`', '\\`')}```",
|
||||
)
|
||||
raise e
|
||||
finally:
|
||||
command_locks[(message.guild.id, message.author.id)].release()
|
||||
|
||||
|
||||
async def on_voice_state_update(_, before, after):
|
||||
def is_empty(channel):
|
||||
return [m.id for m in (channel.members if channel else [])] == [client.user.id]
|
||||
|
||||
channel = None
|
||||
c = None
|
||||
if is_empty(before.channel):
|
||||
channel = before.channel
|
||||
c = before.channel
|
||||
elif is_empty(after.channel):
|
||||
channel = after.channel
|
||||
if channel:
|
||||
await channel.guild.voice_client.disconnect()
|
||||
c = after.channel
|
||||
if c:
|
||||
await c.guild.voice_client.disconnect()
|
||||
|
||||
|
||||
def rreload(reloaded_modules, module):
|
||||
@ -173,7 +150,7 @@ def rreload(reloaded_modules, module):
|
||||
|
||||
for submodule in filter(
|
||||
lambda v: inspect.ismodule(v)
|
||||
and v.__name__ in RELOADABLE_MODULES
|
||||
and v.__name__ in constants.RELOADABLE_MODULES
|
||||
and v.__name__ not in reloaded_modules,
|
||||
vars(module).values(),
|
||||
):
|
||||
|
@ -1,7 +1,6 @@
|
||||
import asyncio
|
||||
import threading
|
||||
import time
|
||||
from logging import info
|
||||
|
||||
import commands
|
||||
import core
|
||||
@ -21,7 +20,7 @@ def prepare():
|
||||
|
||||
|
||||
async def on_bulk_message_delete(messages):
|
||||
commands.voice.remove_queued(messages)
|
||||
commands.voice.delete_queued(messages)
|
||||
|
||||
|
||||
async def on_message(message):
|
||||
@ -29,7 +28,7 @@ async def on_message(message):
|
||||
|
||||
|
||||
async def on_message_delete(message):
|
||||
commands.voice.remove_queued([message])
|
||||
commands.voice.delete_queued([message])
|
||||
|
||||
|
||||
async def on_message_edit(before, after):
|
||||
@ -40,7 +39,7 @@ async def on_message_edit(before, after):
|
||||
|
||||
|
||||
async def on_ready():
|
||||
info(f"logged in as {client.user} in {round(time.time() - start_time, 1)}s")
|
||||
print(f"logged in as {client.user} in {round(time.time() - start_time, 1)}s")
|
||||
|
||||
|
||||
async def on_voice_state_update(member, before, after):
|
||||
|
14
extra.py
14
extra.py
@ -4,7 +4,7 @@ import string
|
||||
import disnake
|
||||
import youtube_transcript_api
|
||||
|
||||
from state import client, kill, players
|
||||
from state import client, players
|
||||
|
||||
|
||||
async def transcript(
|
||||
@ -39,17 +39,13 @@ async def transcript(
|
||||
)
|
||||
if len(messages) > max_messages:
|
||||
try:
|
||||
count = max_messages - min_messages
|
||||
if count == 1:
|
||||
await messages.pop().delete()
|
||||
else:
|
||||
await message.channel.delete_messages(
|
||||
[messages.pop() for _ in range(count)]
|
||||
)
|
||||
await message.channel.delete_messages(
|
||||
[messages.pop() for _ in range(max_messages - min_messages)]
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if (message.guild.voice_client.source.id != initial_id) or kill["transcript"]:
|
||||
if message.guild.voice_client.source.id != initial_id:
|
||||
break
|
||||
|
||||
|
||||
|
11
main.py
11
main.py
@ -1,18 +1,7 @@
|
||||
import logging
|
||||
|
||||
import constants
|
||||
import events
|
||||
from state import client
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(
|
||||
format=(
|
||||
"%(asctime)s %(levelname)s %(name):%(module)s %(message)s"
|
||||
if __debug__
|
||||
else "%(asctime)s %(levelname)s %(message)s"
|
||||
),
|
||||
datefmt="%Y-%m-%d %T",
|
||||
level=logging.DEBUG if __debug__ else logging.INFO,
|
||||
)
|
||||
events.prepare()
|
||||
client.run(constants.SECRETS["TOKEN"])
|
||||
|
12
state.py
12
state.py
@ -1,13 +1,13 @@
|
||||
import collections
|
||||
import time
|
||||
from collections import OrderedDict
|
||||
|
||||
import disnake
|
||||
|
||||
|
||||
class LimitedSizeDict(OrderedDict):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.size_limit = kwargs.pop("size_limit", 1000)
|
||||
super().__init__(*args, **kwargs)
|
||||
class LimitedSizeDict(collections.OrderedDict):
|
||||
def __init__(self, *args, **kwds):
|
||||
self.size_limit = kwds.pop("size_limit", 1000)
|
||||
super().__init__(*args, **kwds)
|
||||
self._check_size_limit()
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
@ -25,10 +25,8 @@ intents.message_content = True
|
||||
intents.members = True
|
||||
client = disnake.Client(intents=intents)
|
||||
|
||||
command_cooldowns = LimitedSizeDict()
|
||||
command_locks = LimitedSizeDict()
|
||||
idle_tracker = {"is_idle": False, "last_used": time.time()}
|
||||
kill = {"transcript": False}
|
||||
message_responses = LimitedSizeDict()
|
||||
players = {}
|
||||
start_time = time.time()
|
||||
|
14
tasks.py
14
tasks.py
@ -1,6 +1,5 @@
|
||||
import asyncio
|
||||
import time
|
||||
from logging import debug, error
|
||||
|
||||
import disnake
|
||||
|
||||
@ -8,10 +7,8 @@ from state import client, idle_tracker, players
|
||||
|
||||
|
||||
async def cleanup():
|
||||
debug("spawned cleanup thread")
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(3600 * 12)
|
||||
await asyncio.sleep(3600)
|
||||
|
||||
targets = []
|
||||
for guild_id, player in players.items():
|
||||
@ -19,15 +16,10 @@ async def cleanup():
|
||||
targets.append(guild_id)
|
||||
for target in targets:
|
||||
del players[target]
|
||||
if __debug__:
|
||||
debug(f"cleanup removed {len(targets)} empty players")
|
||||
|
||||
if (
|
||||
not idle_tracker["is_idle"]
|
||||
and time.time() - idle_tracker["last_used"] >= 3600
|
||||
):
|
||||
try:
|
||||
await client.change_presence(status=disnake.Status.idle)
|
||||
idle_tracker["is_idle"] = True
|
||||
except Exception as e:
|
||||
error(f"failed to change status to idle: {e}")
|
||||
await client.change_presence(status=disnake.Status.idle)
|
||||
idle_tracker["is_idle"] = True
|
||||
|
@ -6,102 +6,56 @@ import youtubedl
|
||||
|
||||
class TestFormatDuration(unittest.TestCase):
|
||||
def test_youtubedl(self):
|
||||
def f(s):
|
||||
return youtubedl.format_duration(s)
|
||||
|
||||
self.assertEqual(f(0), "00:00")
|
||||
self.assertEqual(f(0.5), "00:00")
|
||||
self.assertEqual(f(60.5), "01:00")
|
||||
self.assertEqual(f(1), "00:01")
|
||||
self.assertEqual(f(60), "01:00")
|
||||
self.assertEqual(f(60 + 30), "01:30")
|
||||
self.assertEqual(f(60 * 60), "01:00:00")
|
||||
self.assertEqual(f(60 * 60 + 30), "01:00:30")
|
||||
self.assertEqual(youtubedl.format_duration(0), "00:00")
|
||||
self.assertEqual(youtubedl.format_duration(0.5), "00:00")
|
||||
self.assertEqual(youtubedl.format_duration(60.5), "01:00")
|
||||
self.assertEqual(youtubedl.format_duration(1), "00:01")
|
||||
self.assertEqual(youtubedl.format_duration(60), "01:00")
|
||||
self.assertEqual(youtubedl.format_duration(60 + 30), "01:30")
|
||||
self.assertEqual(youtubedl.format_duration(60 * 60), "01:00:00")
|
||||
self.assertEqual(youtubedl.format_duration(60 * 60 + 30), "01:00:30")
|
||||
|
||||
def test_utils(self):
|
||||
def f(s):
|
||||
return utils.format_duration(s)
|
||||
|
||||
self.assertEqual(f(0), "")
|
||||
self.assertEqual(f(60 * 60 * 24 * 7), "1 week")
|
||||
self.assertEqual(f(60 * 60 * 24 * 21), "3 weeks")
|
||||
self.assertEqual(utils.format_duration(0), "")
|
||||
self.assertEqual(utils.format_duration(60 * 60 * 24 * 7), "1 week")
|
||||
self.assertEqual(utils.format_duration(60 * 60 * 24 * 21), "3 weeks")
|
||||
self.assertEqual(
|
||||
f((60 * 60 * 24 * 21) - 1),
|
||||
utils.format_duration((60 * 60 * 24 * 21) - 1),
|
||||
"2 weeks, 6 days, 23 hours, 59 minutes, 59 seconds",
|
||||
)
|
||||
self.assertEqual(f(60), "1 minute")
|
||||
self.assertEqual(f(60 * 2), "2 minutes")
|
||||
self.assertEqual(f(60 * 59), "59 minutes")
|
||||
self.assertEqual(f(60 * 60), "1 hour")
|
||||
self.assertEqual(f(60 * 60 * 2), "2 hours")
|
||||
self.assertEqual(f(1), "1 second")
|
||||
self.assertEqual(f(60 + 5), "1 minute, 5 seconds")
|
||||
self.assertEqual(f(60 * 60 + 30), "1 hour, 30 seconds")
|
||||
self.assertEqual(f(60 * 60 + 60 + 30), "1 hour, 1 minute, 30 seconds")
|
||||
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1 week, 30 seconds")
|
||||
self.assertEqual(utils.format_duration(60), "1 minute")
|
||||
self.assertEqual(utils.format_duration(60 * 2), "2 minutes")
|
||||
self.assertEqual(utils.format_duration(60 * 59), "59 minutes")
|
||||
self.assertEqual(utils.format_duration(60 * 60), "1 hour")
|
||||
self.assertEqual(utils.format_duration(60 * 60 * 2), "2 hours")
|
||||
self.assertEqual(utils.format_duration(1), "1 second")
|
||||
self.assertEqual(utils.format_duration(60 + 5), "1 minute, 5 seconds")
|
||||
self.assertEqual(utils.format_duration(60 * 60 + 30), "1 hour, 30 seconds")
|
||||
self.assertEqual(
|
||||
utils.format_duration(60 * 60 + 60 + 30), "1 hour, 1 minute, 30 seconds"
|
||||
)
|
||||
self.assertEqual(
|
||||
utils.format_duration(60 * 60 * 24 * 7 + 30), "1 week, 30 seconds"
|
||||
)
|
||||
|
||||
def test_utils_natural(self):
|
||||
def f(s):
|
||||
return utils.format_duration(s, natural=True)
|
||||
def format(seconds: int):
|
||||
return utils.format_duration(seconds, natural=True)
|
||||
|
||||
self.assertEqual(f(0), "")
|
||||
self.assertEqual(f(60 * 60 * 24 * 7), "1 week")
|
||||
self.assertEqual(f(60 * 60 * 24 * 21), "3 weeks")
|
||||
self.assertEqual(format(0), "")
|
||||
self.assertEqual(format(60 * 60 * 24 * 7), "1 week")
|
||||
self.assertEqual(format(60 * 60 * 24 * 21), "3 weeks")
|
||||
self.assertEqual(
|
||||
f((60 * 60 * 24 * 21) - 1),
|
||||
format((60 * 60 * 24 * 21) - 1),
|
||||
"2 weeks, 6 days, 23 hours, 59 minutes and 59 seconds",
|
||||
)
|
||||
self.assertEqual(f(60), "1 minute")
|
||||
self.assertEqual(f(60 * 2), "2 minutes")
|
||||
self.assertEqual(f(60 * 59), "59 minutes")
|
||||
self.assertEqual(f(60 * 60), "1 hour")
|
||||
self.assertEqual(f(60 * 60 * 2), "2 hours")
|
||||
self.assertEqual(f(1), "1 second")
|
||||
self.assertEqual(f(60 + 5), "1 minute and 5 seconds")
|
||||
self.assertEqual(f(60 * 60 + 30), "1 hour and 30 seconds")
|
||||
self.assertEqual(f(60 * 60 + 60 + 30), "1 hour, 1 minute and 30 seconds")
|
||||
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1 week and 30 seconds")
|
||||
|
||||
def test_utils_short(self):
|
||||
def f(s):
|
||||
return utils.format_duration(s, short=True)
|
||||
|
||||
self.assertEqual(f(0), "")
|
||||
self.assertEqual(f(60 * 60 * 24 * 7), "1w")
|
||||
self.assertEqual(f(60 * 60 * 24 * 21), "3w")
|
||||
self.assertEqual(
|
||||
f((60 * 60 * 24 * 21) - 1),
|
||||
"2w 6d 23h 59m 59s",
|
||||
)
|
||||
self.assertEqual(f(60), "1m")
|
||||
self.assertEqual(f(60 * 2), "2m")
|
||||
self.assertEqual(f(60 * 59), "59m")
|
||||
self.assertEqual(f(60 * 60), "1h")
|
||||
self.assertEqual(f(60 * 60 * 2), "2h")
|
||||
self.assertEqual(f(1), "1s")
|
||||
self.assertEqual(f(60 + 5), "1m 5s")
|
||||
self.assertEqual(f(60 * 60 + 30), "1h 30s")
|
||||
self.assertEqual(f(60 * 60 + 60 + 30), "1h 1m 30s")
|
||||
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1w 30s")
|
||||
|
||||
def test_utils_natural_short(self):
|
||||
def f(s):
|
||||
return utils.format_duration(s, natural=True, short=True)
|
||||
|
||||
self.assertEqual(f(0), "")
|
||||
self.assertEqual(f(60 * 60 * 24 * 7), "1w")
|
||||
self.assertEqual(f(60 * 60 * 24 * 21), "3w")
|
||||
self.assertEqual(
|
||||
f((60 * 60 * 24 * 21) - 1),
|
||||
"2w 6d 23h 59m and 59s",
|
||||
)
|
||||
self.assertEqual(f(60), "1m")
|
||||
self.assertEqual(f(60 * 2), "2m")
|
||||
self.assertEqual(f(60 * 59), "59m")
|
||||
self.assertEqual(f(60 * 60), "1h")
|
||||
self.assertEqual(f(60 * 60 * 2), "2h")
|
||||
self.assertEqual(f(1), "1s")
|
||||
self.assertEqual(f(60 + 5), "1m and 5s")
|
||||
self.assertEqual(f(60 * 60 + 30), "1h and 30s")
|
||||
self.assertEqual(f(60 * 60 + 60 + 30), "1h 1m and 30s")
|
||||
self.assertEqual(f(60 * 60 * 24 * 7 + 30), "1w and 30s")
|
||||
self.assertEqual(format(60), "1 minute")
|
||||
self.assertEqual(format(60 * 2), "2 minutes")
|
||||
self.assertEqual(format(60 * 59), "59 minutes")
|
||||
self.assertEqual(format(60 * 60), "1 hour")
|
||||
self.assertEqual(format(60 * 60 * 2), "2 hours")
|
||||
self.assertEqual(format(1), "1 second")
|
||||
self.assertEqual(format(60 + 5), "1 minute and 5 seconds")
|
||||
self.assertEqual(format(60 * 60 + 30), "1 hour and 30 seconds")
|
||||
self.assertEqual(format(60 * 60 + 60 + 30), "1 hour, 1 minute and 30 seconds")
|
||||
self.assertEqual(format(60 * 60 * 24 * 7 + 30), "1 week and 30 seconds")
|
||||
|
46
utils.py
46
utils.py
@ -1,12 +1,9 @@
|
||||
import os
|
||||
import time
|
||||
from logging import error, info, warning
|
||||
|
||||
import disnake
|
||||
|
||||
import commands
|
||||
import constants
|
||||
from state import command_cooldowns, message_responses
|
||||
from state import message_responses
|
||||
|
||||
|
||||
class ChannelResponseWrapper:
|
||||
@ -37,50 +34,35 @@ class MessageInteractionWrapper:
|
||||
await self.response.edit_message(content=content, embed=embed, view=view)
|
||||
|
||||
|
||||
def cooldown(message, cooldown_time: int):
|
||||
possible_commands = commands.match(message.content)
|
||||
if not possible_commands or len(possible_commands) > 1:
|
||||
return
|
||||
command = possible_commands[0]
|
||||
|
||||
end_time = time.time() + cooldown_time
|
||||
if message.author.id in command_cooldowns:
|
||||
command_cooldowns[message.author.id][command] = end_time
|
||||
else:
|
||||
command_cooldowns[message.author.id] = {command: end_time}
|
||||
|
||||
|
||||
def format_duration(duration: int, natural: bool = False, short: bool = False):
|
||||
def format_duration(duration: int, natural: bool = False):
|
||||
def format_plural(noun, count):
|
||||
if short:
|
||||
return noun[0]
|
||||
return " " + (noun if count == 1 else noun + "s")
|
||||
return noun if count == 1 else noun + "s"
|
||||
|
||||
segments = []
|
||||
|
||||
weeks, duration = divmod(duration, 604800)
|
||||
if weeks > 0:
|
||||
segments.append(f"{weeks}{format_plural('week', weeks)}")
|
||||
segments.append(f"{weeks} {format_plural('week', weeks)}")
|
||||
|
||||
days, duration = divmod(duration, 86400)
|
||||
if days > 0:
|
||||
segments.append(f"{days}{format_plural('day', days)}")
|
||||
segments.append(f"{days} {format_plural('day', days)}")
|
||||
|
||||
hours, duration = divmod(duration, 3600)
|
||||
if hours > 0:
|
||||
segments.append(f"{hours}{format_plural('hour', hours)}")
|
||||
segments.append(f"{hours} {format_plural('hour', hours)}")
|
||||
|
||||
minutes, duration = divmod(duration, 60)
|
||||
if minutes > 0:
|
||||
segments.append(f"{minutes}{format_plural('minute', minutes)}")
|
||||
segments.append(f"{minutes} {format_plural('minute', minutes)}")
|
||||
|
||||
if duration > 0:
|
||||
segments.append(f"{duration}{format_plural('second', duration)}")
|
||||
segments.append(f"{duration} {format_plural('second', duration)}")
|
||||
|
||||
separator = " " if short else ", "
|
||||
if not natural or len(segments) <= 1:
|
||||
return separator.join(segments)
|
||||
return separator.join(segments[:-1]) + f" and {segments[-1]}"
|
||||
return ", ".join(segments)
|
||||
|
||||
return ", ".join(segments[:-1]) + f" and {segments[-1]}"
|
||||
|
||||
|
||||
async def add_check_reaction(message):
|
||||
@ -121,13 +103,13 @@ def filter_secrets(text: str) -> str:
|
||||
|
||||
|
||||
def load_opus():
|
||||
warning("opus wasn't automatically loaded! trying to load manually...")
|
||||
print("opus wasn't automatically loaded! trying to load manually...")
|
||||
for path in ["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"]:
|
||||
if os.path.exists(path):
|
||||
try:
|
||||
disnake.opus.load_opus(path)
|
||||
info(f"successfully loaded opus from {path}")
|
||||
print(f"successfully loaded opus from {path}")
|
||||
return
|
||||
except Exception as e:
|
||||
error(f"failed to load opus from {path}: {e}")
|
||||
print(f"failed to load opus from {path}: {e}")
|
||||
raise Exception("could not locate working opus library")
|
||||
|
51
youtubedl.py
51
youtubedl.py
@ -6,9 +6,9 @@ from typing import Any, Optional
|
||||
import disnake
|
||||
import yt_dlp
|
||||
|
||||
from constants import BAR_LENGTH, EMBED_COLOR, YTDL_OPTIONS
|
||||
import constants
|
||||
|
||||
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
|
||||
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)
|
||||
|
||||
|
||||
class CustomAudioSource(disnake.AudioSource):
|
||||
@ -40,13 +40,9 @@ class YTDLSource(disnake.PCMVolumeTransformer):
|
||||
self.description = data.get("description")
|
||||
self.duration = data.get("duration")
|
||||
self.id = data.get("id")
|
||||
self.like_count = data.get("like_count")
|
||||
self.original_url = data.get("original_url")
|
||||
self.thumbnail_url = data.get("thumbnail")
|
||||
self.timestamp = data.get("timestamp")
|
||||
self.title = data.get("title")
|
||||
self.uploader = data.get("uploader")
|
||||
self.uploader_url = data.get("uploader_url")
|
||||
self.view_count = data.get("view_count")
|
||||
|
||||
@classmethod
|
||||
@ -103,47 +99,6 @@ class QueuedSong:
|
||||
+ (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "")
|
||||
)
|
||||
|
||||
def embed(self, is_paused=False):
|
||||
progress = 0
|
||||
if self.player.duration:
|
||||
progress = self.player.original.progress / self.player.duration
|
||||
|
||||
embed = disnake.Embed(
|
||||
color=EMBED_COLOR,
|
||||
title=self.player.title,
|
||||
url=self.player.original_url,
|
||||
description=(
|
||||
f"{'⏸️ ' if is_paused else ''}"
|
||||
f"`[{'#'*int(progress * BAR_LENGTH)}{'-'*int((1 - progress) * BAR_LENGTH)}]` "
|
||||
+ (
|
||||
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
|
||||
if self.player.duration
|
||||
else "[**live**]"
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
embed.add_field(
|
||||
name="Uploader",
|
||||
value=f"[{self.player.uploader}]({self.player.uploader_url})",
|
||||
)
|
||||
embed.add_field(name="Likes", value=f"{self.player.like_count:,}")
|
||||
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
|
||||
embed.add_field(name="Published", value=f"<t:{self.player.timestamp}>")
|
||||
embed.add_field(name="Volume", value=f"{int(self.player.volume*100)}%")
|
||||
|
||||
embed.set_image(self.player.thumbnail_url)
|
||||
embed.set_footer(
|
||||
text=f"queued by {self.trigger_message.author.name}",
|
||||
icon_url=(
|
||||
self.trigger_message.author.avatar.url
|
||||
if self.trigger_message.author.avatar
|
||||
else None
|
||||
),
|
||||
)
|
||||
|
||||
return embed
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
@ -179,4 +134,4 @@ def format_duration(duration: int | float) -> str:
|
||||
|
||||
def __reload_module__():
|
||||
global ytdl
|
||||
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
|
||||
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)
|
||||
|
Loading…
x
Reference in New Issue
Block a user