diff --git a/audio.py b/audio.py deleted file mode 100644 index cdfb99f..0000000 --- a/audio.py +++ /dev/null @@ -1,212 +0,0 @@ -import asyncio -import audioop -import collections -from dataclasses import dataclass -from typing import Any, Optional - -import disnake -import yt_dlp - -from constants import BAR_LENGTH, EMBED_COLOR, YTDL_OPTIONS - -ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS) - - -class TrackedAudioSource(disnake.AudioSource): - def __init__(self, source): - self._source = source - self.read_count = 0 - - def read(self) -> bytes: - data = self._source.read() - if data: - self.read_count += 1 - return data - - def fast_forward(self, seconds: int): - for _ in range(int(seconds / 0.02)): - self.read() - - @property - def progress(self) -> float: - return self.read_count * 0.02 - - -class PCMVolumeTransformer(disnake.AudioSource): - def __init__(self, original: TrackedAudioSource, volume: float = 1.0) -> None: - if original.is_opus(): - raise disnake.ClientException("AudioSource must not be Opus encoded.") - - self.original = original - self.volume = volume - - def cleanup(self) -> None: - self.original.cleanup() - - def read(self) -> bytes: - ret = self.original.read() - return audioop.mul(ret, 2, self.volume) - - -class YTDLSource(PCMVolumeTransformer): - def __init__( - self, source: TrackedAudioSource, *, data: dict[str, Any], volume: float = 0.5 - ): - super().__init__(source, volume) - - self.description = data.get("description") - self.duration = data.get("duration") - self.id = data.get("id") - self.like_count = data.get("like_count") - self.original_url = data.get("original_url") - self.thumbnail_url = data.get("thumbnail") - self.timestamp = data.get("timestamp") - self.title = data.get("title") - self.uploader = data.get("uploader") - self.uploader_url = data.get("uploader_url") - self.view_count = data.get("view_count") - - @classmethod - async def from_url( - cls, - url, - *, - loop: Optional[asyncio.AbstractEventLoop] = None, - stream: bool = False, - ): - loop = loop or asyncio.get_event_loop() - data: Any = await loop.run_in_executor( - None, lambda: ytdl.extract_info(url, download=not stream) - ) - - if "entries" in data: - if not data["entries"]: - raise Exception("no entries provided by yt-dlp!") - data = data["entries"][0] - - return cls( - TrackedAudioSource( - disnake.FFmpegPCMAudio( - data["url"] if stream else ytdl.prepare_filename(data), - before_options="-vn -reconnect 1", - ) - ), - data=data, - ) - - def __repr__(self): - return f" duration={self.duration}>" - - def __str__(self): - return self.__repr__() - - -@dataclass -class QueuedSong: - player: YTDLSource - trigger_message: disnake.Message - - def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str: - if multiline: - return ( - f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})\n**duration:** {format_duration(self.player.duration) if self.player.duration else '[live]'}" - + ( - f", **queued by:** <@{self.trigger_message.author.id}>" - if show_queuer - else "" - ) - ) - else: - return ( - f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''}) [**{format_duration(self.player.duration) if self.player.duration else 'live'}**]" - + (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "") - ) - - def embed(self, is_paused=False): - progress = 0 - if self.player.duration: - progress = self.player.original.progress / self.player.duration - - embed = disnake.Embed( - color=EMBED_COLOR, - title=self.player.title, - url=self.player.original_url, - description=( - f"{'⏸️ ' if is_paused else ''}" - f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` " - + ( - f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)" - if self.player.duration - else "[**live**]" - ) - ), - ) - - if self.player.uploader_url: - embed.add_field( - name="Uploader", - value=f"[{self.player.uploader}]({self.player.uploader_url})", - ) - else: - embed.add_field( - name="Uploader", - value=self.player.uploader, - ) - embed.add_field( - name="Likes", - value=f"{self.player.like_count:,}" - if self.player.like_count - else "Unknown", - ) - embed.add_field(name="Views", value=f"{self.player.view_count:,}") - embed.add_field(name="Published", value=f"") - embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%") - - embed.set_image(self.player.thumbnail_url) - embed.set_footer( - text=f"queued by {self.trigger_message.author.name}", - icon_url=( - self.trigger_message.author.avatar.url - if self.trigger_message.author.avatar - else None - ), - ) - - return embed - - def __str__(self): - return self.__repr__() - - -@dataclass -class QueuedPlayer: - queue = collections.deque() - current: Optional[QueuedSong] = None - - def queue_pop(self): - popped = self.queue.popleft() - self.current = popped - return popped - - def queue_add(self, item): - self.queue.append(item) - - def queue_add_front(self, item): - self.queue.appendleft(item) - - def __str__(self): - return self.__repr__() - - -def format_duration(duration: int | float) -> str: - hours, duration = divmod(int(duration), 3600) - minutes, duration = divmod(duration, 60) - segments = [hours, minutes, duration] - if len(segments) == 3 and segments[0] == 0: - del segments[0] - return f"{':'.join(f'{s:0>2}' for s in segments)}" - - -def __reload_module__(): - global ytdl - ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS) diff --git a/audio/__init__.py b/audio/__init__.py new file mode 100644 index 0000000..c7d670c --- /dev/null +++ b/audio/__init__.py @@ -0,0 +1,3 @@ +from . import discord, queue, utils, youtubedl + +__all__ = ["utils", "queue", "youtubedl", "discord"] diff --git a/audio/discord.py b/audio/discord.py new file mode 100644 index 0000000..fb5c0a4 --- /dev/null +++ b/audio/discord.py @@ -0,0 +1,39 @@ +import audioop + +import disnake + + +class TrackedAudioSource(disnake.AudioSource): + def __init__(self, source): + self._source = source + self.read_count = 0 + + def read(self) -> bytes: + data = self._source.read() + if data: + self.read_count += 1 + return data + + def fast_forward(self, seconds: int): + for _ in range(int(seconds / 0.02)): + self.read() + + @property + def progress(self) -> float: + return self.read_count * 0.02 + + +class PCMVolumeTransformer(disnake.AudioSource): + def __init__(self, original: TrackedAudioSource, volume: float = 1.0) -> None: + if original.is_opus(): + raise disnake.ClientException("AudioSource must not be Opus encoded.") + + self.original = original + self.volume = volume + + def cleanup(self) -> None: + self.original.cleanup() + + def read(self) -> bytes: + ret = self.original.read() + return audioop.mul(ret, 2, self.volume) diff --git a/audio/queue.py b/audio/queue.py new file mode 100644 index 0000000..9eb47ed --- /dev/null +++ b/audio/queue.py @@ -0,0 +1,107 @@ +import collections +from dataclasses import dataclass +from typing import Optional + +import disnake + +from constants import BAR_LENGTH, EMBED_COLOR +from utils import format_duration + +from .youtubedl import YTDLSource + + +@dataclass +class Song: + player: YTDLSource + trigger_message: disnake.Message + + def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str: + if multiline: + return ( + f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})\n**duration:** {format_duration(self.player.duration) if self.player.duration else '[live]'}" + + ( + f", **queued by:** <@{self.trigger_message.author.id}>" + if show_queuer + else "" + ) + ) + else: + return ( + f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''}) [**{format_duration(self.player.duration) if self.player.duration else 'live'}**]" + + (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "") + ) + + def embed(self, is_paused=False): + progress = 0 + if self.player.duration: + progress = self.player.original.progress / self.player.duration + + embed = disnake.Embed( + color=EMBED_COLOR, + title=self.player.title, + url=self.player.original_url, + description=( + f"{'⏸️ ' if is_paused else ''}" + f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` " + + ( + f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)" + if self.player.duration + else "[**live**]" + ) + ), + ) + + if self.player.uploader_url: + embed.add_field( + name="Uploader", + value=f"[{self.player.uploader}]({self.player.uploader_url})", + ) + else: + embed.add_field( + name="Uploader", + value=self.player.uploader, + ) + embed.add_field( + name="Likes", + value=f"{self.player.like_count:,}" + if self.player.like_count + else "Unknown", + ) + embed.add_field(name="Views", value=f"{self.player.view_count:,}") + embed.add_field(name="Published", value=f"") + embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%") + + embed.set_image(self.player.thumbnail_url) + embed.set_footer( + text=f"queued by {self.trigger_message.author.name}", + icon_url=( + self.trigger_message.author.avatar.url + if self.trigger_message.author.avatar + else None + ), + ) + + return embed + + def __str__(self): + return self.__repr__() + + +@dataclass +class Player: + queue = collections.deque() + current: Optional[Song] = None + + def queue_pop(self): + popped = self.queue.popleft() + self.current = popped + return popped + + def queue_add(self, item): + self.queue.append(item) + + def queue_add_front(self, item): + self.queue.appendleft(item) + + def __str__(self): + return self.__repr__() diff --git a/audio/utils.py b/audio/utils.py new file mode 100644 index 0000000..e091b71 --- /dev/null +++ b/audio/utils.py @@ -0,0 +1,7 @@ +def format_duration(duration: int | float) -> str: + hours, duration = divmod(int(duration), 3600) + minutes, duration = divmod(duration, 60) + segments = [hours, minutes, duration] + if len(segments) == 3 and segments[0] == 0: + del segments[0] + return f"{':'.join(f'{s:0>2}' for s in segments)}" diff --git a/audio/youtubedl.py b/audio/youtubedl.py new file mode 100644 index 0000000..f004fe9 --- /dev/null +++ b/audio/youtubedl.py @@ -0,0 +1,69 @@ +import asyncio +from typing import Any, Optional + +import disnake +import yt_dlp + +from constants import YTDL_OPTIONS + +from .discord import PCMVolumeTransformer, TrackedAudioSource + +ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS) + + +class YTDLSource(PCMVolumeTransformer): + def __init__( + self, source: TrackedAudioSource, *, data: dict[str, Any], volume: float = 0.5 + ): + super().__init__(source, volume) + + self.description = data.get("description") + self.duration = data.get("duration") + self.id = data.get("id") + self.like_count = data.get("like_count") + self.original_url = data.get("original_url") + self.thumbnail_url = data.get("thumbnail") + self.timestamp = data.get("timestamp") + self.title = data.get("title") + self.uploader = data.get("uploader") + self.uploader_url = data.get("uploader_url") + self.view_count = data.get("view_count") + + @classmethod + async def from_url( + cls, + url, + *, + loop: Optional[asyncio.AbstractEventLoop] = None, + stream: bool = False, + ): + loop = loop or asyncio.get_event_loop() + data: Any = await loop.run_in_executor( + None, lambda: ytdl.extract_info(url, download=not stream) + ) + + if "entries" in data: + if not data["entries"]: + raise Exception("no entries provided by yt-dlp!") + data = data["entries"][0] + + return cls( + TrackedAudioSource( + disnake.FFmpegPCMAudio( + data["url"] if stream else ytdl.prepare_filename(data), + before_options="-vn -reconnect 1", + ) + ), + data=data, + ) + + def __repr__(self): + return f" duration={self.duration}>" + + def __str__(self): + return self.__repr__() + + +def __reload_module__(): + global ytdl + ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS) diff --git a/commands/voice/queue.py b/commands/voice/queue.py index 9519ce9..bb32ce1 100644 --- a/commands/voice/queue.py +++ b/commands/voice/queue.py @@ -16,7 +16,7 @@ from .utils import command_allowed, ensure_joined, play_next async def queue_or_play(message, edited=False): if message.guild.id not in players: - players[message.guild.id] = audio.QueuedPlayer() + players[message.guild.id] = audio.queue.Player() tokens = commands.tokenize(message.content) parser = arguments.ArgumentParser( @@ -154,7 +154,7 @@ async def queue_or_play(message, edited=False): try: async with message.channel.typing(): - player = await audio.YTDLSource.from_url( + player = await audio.youtubedl.YTDLSource.from_url( " ".join(query), loop=client.loop, stream=True ) player.volume = float(args.volume) / 100.0 @@ -162,7 +162,7 @@ async def queue_or_play(message, edited=False): await utils.reply(message, f"**failed to queue:** `{e}`") return - queued = audio.QueuedSong(player, message) + queued = audio.queue.Song(player, message) if args.now or args.next: players[message.guild.id].queue_add_front(queued) diff --git a/commands/voice/sponsorblock.py b/commands/voice/sponsorblock.py index 2477806..ecb83d3 100644 --- a/commands/voice/sponsorblock.py +++ b/commands/voice/sponsorblock.py @@ -32,7 +32,7 @@ async def sponsorblock_command(message): current = "**" if progress >= begin and progress < end else "" text.append( - f"{current}`{audio.format_duration(begin)}` - `{audio.format_duration(end)}`: {category_name if category_name else 'Unknown'}{current}" + f"{current}`{audio.utils.format_duration(begin)}` - `{audio.utils.format_duration(end)}`: {category_name if category_name else 'Unknown'}{current}" ) await utils.reply( diff --git a/constants.py b/constants.py index 1376a49..a8d7dc0 100644 --- a/constants.py +++ b/constants.py @@ -22,6 +22,10 @@ PREFIX = "%" RELOADABLE_MODULES = [ "arguments", "audio", + "audio.discord", + "audio.queue", + "audio.utils", + "audio.youtubedl", "commands", "commands.bot", "commands.tools", diff --git a/tests/test_format_duration.py b/tests/test_format_duration.py index a4d7b6f..9fb2c7d 100644 --- a/tests/test_format_duration.py +++ b/tests/test_format_duration.py @@ -7,7 +7,7 @@ import utils class TestFormatDuration(unittest.TestCase): def test_audio(self): def f(s): - return audio.format_duration(s) + return audio.utils.format_duration(s) self.assertEqual(f(0), "00:00") self.assertEqual(f(0.5), "00:00")