Compare commits

..

4 Commits

10 changed files with 241 additions and 224 deletions

3
audio/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from . import discord, queue, utils, youtubedl
__all__ = ["utils", "queue", "youtubedl", "discord"]

39
audio/discord.py Normal file
View File

@ -0,0 +1,39 @@
import audioop
import disnake
class TrackedAudioSource(disnake.AudioSource):
def __init__(self, source):
self._source = source
self.read_count = 0
def read(self) -> bytes:
data = self._source.read()
if data:
self.read_count += 1
return data
def fast_forward(self, seconds: int):
for _ in range(int(seconds / 0.02)):
self.read()
@property
def progress(self) -> float:
return self.read_count * 0.02
class PCMVolumeTransformer(disnake.AudioSource):
def __init__(self, original: TrackedAudioSource, volume: float = 1.0) -> None:
if original.is_opus():
raise disnake.ClientException("AudioSource must not be Opus encoded.")
self.original = original
self.volume = volume
def cleanup(self) -> None:
self.original.cleanup()
def read(self) -> bytes:
ret = self.original.read()
return audioop.mul(ret, 2, self.volume)

107
audio/queue.py Normal file
View File

@ -0,0 +1,107 @@
import collections
from dataclasses import dataclass
from typing import Optional
import disnake
from constants import BAR_LENGTH, EMBED_COLOR
from .utils import format_duration
from .youtubedl import YTDLSource
@dataclass
class Song:
player: YTDLSource
trigger_message: disnake.Message
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
if multiline:
return (
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})\n**duration:** {format_duration(self.player.duration) if self.player.duration else '[live]'}"
+ (
f", **queued by:** <@{self.trigger_message.author.id}>"
if show_queuer
else ""
)
)
else:
return (
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''}) [**{format_duration(self.player.duration) if self.player.duration else 'live'}**]"
+ (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "")
)
def embed(self, is_paused=False):
progress = 0
if self.player.duration:
progress = self.player.original.progress / self.player.duration
embed = disnake.Embed(
color=EMBED_COLOR,
title=self.player.title,
url=self.player.original_url,
description=(
f"{'⏸️ ' if is_paused else ''}"
f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` "
+ (
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
if self.player.duration
else "[**live**]"
)
),
)
if self.player.uploader_url:
embed.add_field(
name="Uploader",
value=f"[{self.player.uploader}]({self.player.uploader_url})",
)
else:
embed.add_field(
name="Uploader",
value=self.player.uploader,
)
embed.add_field(
name="Likes",
value=f"{self.player.like_count:,}"
if self.player.like_count
else "Unknown",
)
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
embed.add_field(name="Published", value=f"<t:{self.player.timestamp}>")
embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%")
embed.set_image(self.player.thumbnail_url)
embed.set_footer(
text=f"queued by {self.trigger_message.author.name}",
icon_url=(
self.trigger_message.author.avatar.url
if self.trigger_message.author.avatar
else None
),
)
return embed
def __str__(self):
return self.__repr__()
@dataclass
class Player:
queue = collections.deque()
current: Optional[Song] = None
def queue_pop(self):
popped = self.queue.popleft()
self.current = popped
return popped
def queue_push(self, item):
self.queue.append(item)
def queue_push_front(self, item):
self.queue.appendleft(item)
def __str__(self):
return self.__repr__()

7
audio/utils.py Normal file
View File

@ -0,0 +1,7 @@
def format_duration(duration: int | float) -> str:
hours, duration = divmod(int(duration), 3600)
minutes, duration = divmod(duration, 60)
segments = [hours, minutes, duration]
if len(segments) == 3 and segments[0] == 0:
del segments[0]
return f"{':'.join(f'{s:0>2}' for s in segments)}"

69
audio/youtubedl.py Normal file
View File

@ -0,0 +1,69 @@
import asyncio
from typing import Any, Optional
import disnake
import yt_dlp
from constants import YTDL_OPTIONS
from .discord import PCMVolumeTransformer, TrackedAudioSource
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
class YTDLSource(PCMVolumeTransformer):
def __init__(
self, source: TrackedAudioSource, *, data: dict[str, Any], volume: float = 0.5
):
super().__init__(source, volume)
self.description = data.get("description")
self.duration = data.get("duration")
self.id = data.get("id")
self.like_count = data.get("like_count")
self.original_url = data.get("original_url")
self.thumbnail_url = data.get("thumbnail")
self.timestamp = data.get("timestamp")
self.title = data.get("title")
self.uploader = data.get("uploader")
self.uploader_url = data.get("uploader_url")
self.view_count = data.get("view_count")
@classmethod
async def from_url(
cls,
url,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
stream: bool = False,
):
loop = loop or asyncio.get_event_loop()
data: Any = await loop.run_in_executor(
None, lambda: ytdl.extract_info(url, download=not stream)
)
if "entries" in data:
if not data["entries"]:
raise Exception("no entries provided by yt-dlp!")
data = data["entries"][0]
return cls(
TrackedAudioSource(
disnake.FFmpegPCMAudio(
data["url"] if stream else ytdl.prepare_filename(data),
before_options="-vn -reconnect 1",
)
),
data=data,
)
def __repr__(self):
return f"<YTDLSource title={self.title} original_url=<{self.original_url}> duration={self.duration}>"
def __str__(self):
return self.__repr__()
def __reload_module__():
global ytdl
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)

View File

@ -4,9 +4,9 @@ import disnake
import disnake_paginator import disnake_paginator
import arguments import arguments
import audio
import commands import commands
import utils import utils
import youtubedl
from constants import EMBED_COLOR from constants import EMBED_COLOR
from state import client, players from state import client, players
@ -16,7 +16,7 @@ from .utils import command_allowed, ensure_joined, play_next
async def queue_or_play(message, edited=False): async def queue_or_play(message, edited=False):
if message.guild.id not in players: if message.guild.id not in players:
players[message.guild.id] = youtubedl.QueuedPlayer() players[message.guild.id] = audio.queue.Player()
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser( parser = arguments.ArgumentParser(
@ -154,7 +154,7 @@ async def queue_or_play(message, edited=False):
try: try:
async with message.channel.typing(): async with message.channel.typing():
player = await youtubedl.YTDLSource.from_url( player = await audio.youtubedl.YTDLSource.from_url(
" ".join(query), loop=client.loop, stream=True " ".join(query), loop=client.loop, stream=True
) )
player.volume = float(args.volume) / 100.0 player.volume = float(args.volume) / 100.0
@ -162,12 +162,12 @@ async def queue_or_play(message, edited=False):
await utils.reply(message, f"**failed to queue:** `{e}`") await utils.reply(message, f"**failed to queue:** `{e}`")
return return
queued = youtubedl.QueuedSong(player, message) queued = audio.queue.Song(player, message)
if args.now or args.next: if args.now or args.next:
players[message.guild.id].queue_add_front(queued) players[message.guild.id].queue_push_front(queued)
else: else:
players[message.guild.id].queue_add(queued) players[message.guild.id].queue_push(queued)
if not message.guild.voice_client: if not message.guild.voice_client:
await utils.reply(message, "unexpected disconnect from voice channel!") await utils.reply(message, "unexpected disconnect from voice channel!")

View File

@ -1,8 +1,8 @@
import disnake import disnake
import audio
import sponsorblock import sponsorblock
import utils import utils
import youtubedl
from constants import EMBED_COLOR from constants import EMBED_COLOR
from state import players from state import players
@ -32,7 +32,7 @@ async def sponsorblock_command(message):
current = "**" if progress >= begin and progress < end else "" current = "**" if progress >= begin and progress < end else ""
text.append( text.append(
f"{current}`{youtubedl.format_duration(begin)}` - `{youtubedl.format_duration(end)}`: {category_name if category_name else 'Unknown'}{current}" f"{current}`{audio.utils.format_duration(begin)}` - `{audio.utils.format_duration(end)}`: {category_name if category_name else 'Unknown'}{current}"
) )
await utils.reply( await utils.reply(

View File

@ -21,6 +21,11 @@ OWNERS = [531392146767347712]
PREFIX = "%" PREFIX = "%"
RELOADABLE_MODULES = [ RELOADABLE_MODULES = [
"arguments", "arguments",
"audio",
"audio.discord",
"audio.queue",
"audio.utils",
"audio.youtubedl",
"commands", "commands",
"commands.bot", "commands.bot",
"commands.tools", "commands.tools",
@ -41,7 +46,6 @@ RELOADABLE_MODULES = [
"tasks", "tasks",
"utils", "utils",
"voice", "voice",
"youtubedl",
"yt_dlp", "yt_dlp",
"yt_dlp.version", "yt_dlp.version",
] ]

View File

@ -1,13 +1,13 @@
import unittest import unittest
import audio
import utils import utils
import youtubedl
class TestFormatDuration(unittest.TestCase): class TestFormatDuration(unittest.TestCase):
def test_youtubedl(self): def test_audio(self):
def f(s): def f(s):
return youtubedl.format_duration(s) return audio.utils.format_duration(s)
self.assertEqual(f(0), "00:00") self.assertEqual(f(0), "00:00")
self.assertEqual(f(0.5), "00:00") self.assertEqual(f(0.5), "00:00")

View File

@ -1,212 +0,0 @@
import asyncio
import audioop
import collections
from dataclasses import dataclass
from typing import Any, Optional
import disnake
import yt_dlp
from constants import BAR_LENGTH, EMBED_COLOR, YTDL_OPTIONS
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
class TrackedAudioSource(disnake.AudioSource):
def __init__(self, source):
self._source = source
self.read_count = 0
def read(self) -> bytes:
data = self._source.read()
if data:
self.read_count += 1
return data
def fast_forward(self, seconds: int):
for _ in range(int(seconds / 0.02)):
self.read()
@property
def progress(self) -> float:
return self.read_count * 0.02
class PCMVolumeTransformer(disnake.AudioSource):
def __init__(self, original: TrackedAudioSource, volume: float = 1.0) -> None:
if original.is_opus():
raise disnake.ClientException("AudioSource must not be Opus encoded.")
self.original = original
self.volume = volume
def cleanup(self) -> None:
self.original.cleanup()
def read(self) -> bytes:
ret = self.original.read()
return audioop.mul(ret, 2, self.volume)
class YTDLSource(PCMVolumeTransformer):
def __init__(
self, source: TrackedAudioSource, *, data: dict[str, Any], volume: float = 0.5
):
super().__init__(source, volume)
self.description = data.get("description")
self.duration = data.get("duration")
self.id = data.get("id")
self.like_count = data.get("like_count")
self.original_url = data.get("original_url")
self.thumbnail_url = data.get("thumbnail")
self.timestamp = data.get("timestamp")
self.title = data.get("title")
self.uploader = data.get("uploader")
self.uploader_url = data.get("uploader_url")
self.view_count = data.get("view_count")
@classmethod
async def from_url(
cls,
url,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
stream: bool = False,
):
loop = loop or asyncio.get_event_loop()
data: Any = await loop.run_in_executor(
None, lambda: ytdl.extract_info(url, download=not stream)
)
if "entries" in data:
if not data["entries"]:
raise Exception("no entries provided by yt-dlp!")
data = data["entries"][0]
return cls(
TrackedAudioSource(
disnake.FFmpegPCMAudio(
data["url"] if stream else ytdl.prepare_filename(data),
before_options="-vn -reconnect 1",
)
),
data=data,
)
def __repr__(self):
return f"<YTDLSource title={self.title} original_url=<{self.original_url}> duration={self.duration}>"
def __str__(self):
return self.__repr__()
@dataclass
class QueuedSong:
player: YTDLSource
trigger_message: disnake.Message
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
if multiline:
return (
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})\n**duration:** {format_duration(self.player.duration) if self.player.duration else '[live]'}"
+ (
f", **queued by:** <@{self.trigger_message.author.id}>"
if show_queuer
else ""
)
)
else:
return (
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''}) [**{format_duration(self.player.duration) if self.player.duration else 'live'}**]"
+ (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "")
)
def embed(self, is_paused=False):
progress = 0
if self.player.duration:
progress = self.player.original.progress / self.player.duration
embed = disnake.Embed(
color=EMBED_COLOR,
title=self.player.title,
url=self.player.original_url,
description=(
f"{'⏸️ ' if is_paused else ''}"
f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` "
+ (
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
if self.player.duration
else "[**live**]"
)
),
)
if self.player.uploader_url:
embed.add_field(
name="Uploader",
value=f"[{self.player.uploader}]({self.player.uploader_url})",
)
else:
embed.add_field(
name="Uploader",
value=self.player.uploader,
)
embed.add_field(
name="Likes",
value=f"{self.player.like_count:,}"
if self.player.like_count
else "Unknown",
)
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
embed.add_field(name="Published", value=f"<t:{self.player.timestamp}>")
embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%")
embed.set_image(self.player.thumbnail_url)
embed.set_footer(
text=f"queued by {self.trigger_message.author.name}",
icon_url=(
self.trigger_message.author.avatar.url
if self.trigger_message.author.avatar
else None
),
)
return embed
def __str__(self):
return self.__repr__()
@dataclass
class QueuedPlayer:
queue = collections.deque()
current: Optional[QueuedSong] = None
def queue_pop(self):
popped = self.queue.popleft()
self.current = popped
return popped
def queue_add(self, item):
self.queue.append(item)
def queue_add_front(self, item):
self.queue.appendleft(item)
def __str__(self):
return self.__repr__()
def format_duration(duration: int | float) -> str:
hours, duration = divmod(int(duration), 3600)
minutes, duration = divmod(duration, 60)
segments = [hours, minutes, duration]
if len(segments) == 3 and segments[0] == 0:
del segments[0]
return f"{':'.join(f'{s:0>2}' for s in segments)}"
def __reload_module__():
global ytdl
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)