audio: split into more modules
This commit is contained in:
parent
af0896a6a0
commit
69f4d6967f
212
audio.py
212
audio.py
@ -1,212 +0,0 @@
|
||||
import asyncio
|
||||
import audioop
|
||||
import collections
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Optional
|
||||
|
||||
import disnake
|
||||
import yt_dlp
|
||||
|
||||
from constants import BAR_LENGTH, EMBED_COLOR, YTDL_OPTIONS
|
||||
|
||||
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
|
||||
|
||||
|
||||
class TrackedAudioSource(disnake.AudioSource):
|
||||
def __init__(self, source):
|
||||
self._source = source
|
||||
self.read_count = 0
|
||||
|
||||
def read(self) -> bytes:
|
||||
data = self._source.read()
|
||||
if data:
|
||||
self.read_count += 1
|
||||
return data
|
||||
|
||||
def fast_forward(self, seconds: int):
|
||||
for _ in range(int(seconds / 0.02)):
|
||||
self.read()
|
||||
|
||||
@property
|
||||
def progress(self) -> float:
|
||||
return self.read_count * 0.02
|
||||
|
||||
|
||||
class PCMVolumeTransformer(disnake.AudioSource):
|
||||
def __init__(self, original: TrackedAudioSource, volume: float = 1.0) -> None:
|
||||
if original.is_opus():
|
||||
raise disnake.ClientException("AudioSource must not be Opus encoded.")
|
||||
|
||||
self.original = original
|
||||
self.volume = volume
|
||||
|
||||
def cleanup(self) -> None:
|
||||
self.original.cleanup()
|
||||
|
||||
def read(self) -> bytes:
|
||||
ret = self.original.read()
|
||||
return audioop.mul(ret, 2, self.volume)
|
||||
|
||||
|
||||
class YTDLSource(PCMVolumeTransformer):
|
||||
def __init__(
|
||||
self, source: TrackedAudioSource, *, data: dict[str, Any], volume: float = 0.5
|
||||
):
|
||||
super().__init__(source, volume)
|
||||
|
||||
self.description = data.get("description")
|
||||
self.duration = data.get("duration")
|
||||
self.id = data.get("id")
|
||||
self.like_count = data.get("like_count")
|
||||
self.original_url = data.get("original_url")
|
||||
self.thumbnail_url = data.get("thumbnail")
|
||||
self.timestamp = data.get("timestamp")
|
||||
self.title = data.get("title")
|
||||
self.uploader = data.get("uploader")
|
||||
self.uploader_url = data.get("uploader_url")
|
||||
self.view_count = data.get("view_count")
|
||||
|
||||
@classmethod
|
||||
async def from_url(
|
||||
cls,
|
||||
url,
|
||||
*,
|
||||
loop: Optional[asyncio.AbstractEventLoop] = None,
|
||||
stream: bool = False,
|
||||
):
|
||||
loop = loop or asyncio.get_event_loop()
|
||||
data: Any = await loop.run_in_executor(
|
||||
None, lambda: ytdl.extract_info(url, download=not stream)
|
||||
)
|
||||
|
||||
if "entries" in data:
|
||||
if not data["entries"]:
|
||||
raise Exception("no entries provided by yt-dlp!")
|
||||
data = data["entries"][0]
|
||||
|
||||
return cls(
|
||||
TrackedAudioSource(
|
||||
disnake.FFmpegPCMAudio(
|
||||
data["url"] if stream else ytdl.prepare_filename(data),
|
||||
before_options="-vn -reconnect 1",
|
||||
)
|
||||
),
|
||||
data=data,
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<YTDLSource title={self.title} original_url=<{self.original_url}> duration={self.duration}>"
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueuedSong:
|
||||
player: YTDLSource
|
||||
trigger_message: disnake.Message
|
||||
|
||||
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
|
||||
if multiline:
|
||||
return (
|
||||
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})\n**duration:** {format_duration(self.player.duration) if self.player.duration else '[live]'}"
|
||||
+ (
|
||||
f", **queued by:** <@{self.trigger_message.author.id}>"
|
||||
if show_queuer
|
||||
else ""
|
||||
)
|
||||
)
|
||||
else:
|
||||
return (
|
||||
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''}) [**{format_duration(self.player.duration) if self.player.duration else 'live'}**]"
|
||||
+ (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "")
|
||||
)
|
||||
|
||||
def embed(self, is_paused=False):
|
||||
progress = 0
|
||||
if self.player.duration:
|
||||
progress = self.player.original.progress / self.player.duration
|
||||
|
||||
embed = disnake.Embed(
|
||||
color=EMBED_COLOR,
|
||||
title=self.player.title,
|
||||
url=self.player.original_url,
|
||||
description=(
|
||||
f"{'⏸️ ' if is_paused else ''}"
|
||||
f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` "
|
||||
+ (
|
||||
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
|
||||
if self.player.duration
|
||||
else "[**live**]"
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
if self.player.uploader_url:
|
||||
embed.add_field(
|
||||
name="Uploader",
|
||||
value=f"[{self.player.uploader}]({self.player.uploader_url})",
|
||||
)
|
||||
else:
|
||||
embed.add_field(
|
||||
name="Uploader",
|
||||
value=self.player.uploader,
|
||||
)
|
||||
embed.add_field(
|
||||
name="Likes",
|
||||
value=f"{self.player.like_count:,}"
|
||||
if self.player.like_count
|
||||
else "Unknown",
|
||||
)
|
||||
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
|
||||
embed.add_field(name="Published", value=f"<t:{self.player.timestamp}>")
|
||||
embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%")
|
||||
|
||||
embed.set_image(self.player.thumbnail_url)
|
||||
embed.set_footer(
|
||||
text=f"queued by {self.trigger_message.author.name}",
|
||||
icon_url=(
|
||||
self.trigger_message.author.avatar.url
|
||||
if self.trigger_message.author.avatar
|
||||
else None
|
||||
),
|
||||
)
|
||||
|
||||
return embed
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueuedPlayer:
|
||||
queue = collections.deque()
|
||||
current: Optional[QueuedSong] = None
|
||||
|
||||
def queue_pop(self):
|
||||
popped = self.queue.popleft()
|
||||
self.current = popped
|
||||
return popped
|
||||
|
||||
def queue_add(self, item):
|
||||
self.queue.append(item)
|
||||
|
||||
def queue_add_front(self, item):
|
||||
self.queue.appendleft(item)
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
|
||||
def format_duration(duration: int | float) -> str:
|
||||
hours, duration = divmod(int(duration), 3600)
|
||||
minutes, duration = divmod(duration, 60)
|
||||
segments = [hours, minutes, duration]
|
||||
if len(segments) == 3 and segments[0] == 0:
|
||||
del segments[0]
|
||||
return f"{':'.join(f'{s:0>2}' for s in segments)}"
|
||||
|
||||
|
||||
def __reload_module__():
|
||||
global ytdl
|
||||
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
|
3
audio/__init__.py
Normal file
3
audio/__init__.py
Normal file
@ -0,0 +1,3 @@
|
||||
from . import discord, queue, utils, youtubedl
|
||||
|
||||
__all__ = ["utils", "queue", "youtubedl", "discord"]
|
39
audio/discord.py
Normal file
39
audio/discord.py
Normal file
@ -0,0 +1,39 @@
|
||||
import audioop
|
||||
|
||||
import disnake
|
||||
|
||||
|
||||
class TrackedAudioSource(disnake.AudioSource):
|
||||
def __init__(self, source):
|
||||
self._source = source
|
||||
self.read_count = 0
|
||||
|
||||
def read(self) -> bytes:
|
||||
data = self._source.read()
|
||||
if data:
|
||||
self.read_count += 1
|
||||
return data
|
||||
|
||||
def fast_forward(self, seconds: int):
|
||||
for _ in range(int(seconds / 0.02)):
|
||||
self.read()
|
||||
|
||||
@property
|
||||
def progress(self) -> float:
|
||||
return self.read_count * 0.02
|
||||
|
||||
|
||||
class PCMVolumeTransformer(disnake.AudioSource):
|
||||
def __init__(self, original: TrackedAudioSource, volume: float = 1.0) -> None:
|
||||
if original.is_opus():
|
||||
raise disnake.ClientException("AudioSource must not be Opus encoded.")
|
||||
|
||||
self.original = original
|
||||
self.volume = volume
|
||||
|
||||
def cleanup(self) -> None:
|
||||
self.original.cleanup()
|
||||
|
||||
def read(self) -> bytes:
|
||||
ret = self.original.read()
|
||||
return audioop.mul(ret, 2, self.volume)
|
107
audio/queue.py
Normal file
107
audio/queue.py
Normal file
@ -0,0 +1,107 @@
|
||||
import collections
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
import disnake
|
||||
|
||||
from constants import BAR_LENGTH, EMBED_COLOR
|
||||
from utils import format_duration
|
||||
|
||||
from .youtubedl import YTDLSource
|
||||
|
||||
|
||||
@dataclass
|
||||
class Song:
|
||||
player: YTDLSource
|
||||
trigger_message: disnake.Message
|
||||
|
||||
def format(self, show_queuer=False, hide_preview=False, multiline=False) -> str:
|
||||
if multiline:
|
||||
return (
|
||||
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''})\n**duration:** {format_duration(self.player.duration) if self.player.duration else '[live]'}"
|
||||
+ (
|
||||
f", **queued by:** <@{self.trigger_message.author.id}>"
|
||||
if show_queuer
|
||||
else ""
|
||||
)
|
||||
)
|
||||
else:
|
||||
return (
|
||||
f"[`{self.player.title}`]({'<' if hide_preview else ''}{self.player.original_url}{'>' if hide_preview else ''}) [**{format_duration(self.player.duration) if self.player.duration else 'live'}**]"
|
||||
+ (f" (<@{self.trigger_message.author.id}>)" if show_queuer else "")
|
||||
)
|
||||
|
||||
def embed(self, is_paused=False):
|
||||
progress = 0
|
||||
if self.player.duration:
|
||||
progress = self.player.original.progress / self.player.duration
|
||||
|
||||
embed = disnake.Embed(
|
||||
color=EMBED_COLOR,
|
||||
title=self.player.title,
|
||||
url=self.player.original_url,
|
||||
description=(
|
||||
f"{'⏸️ ' if is_paused else ''}"
|
||||
f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` "
|
||||
+ (
|
||||
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
|
||||
if self.player.duration
|
||||
else "[**live**]"
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
if self.player.uploader_url:
|
||||
embed.add_field(
|
||||
name="Uploader",
|
||||
value=f"[{self.player.uploader}]({self.player.uploader_url})",
|
||||
)
|
||||
else:
|
||||
embed.add_field(
|
||||
name="Uploader",
|
||||
value=self.player.uploader,
|
||||
)
|
||||
embed.add_field(
|
||||
name="Likes",
|
||||
value=f"{self.player.like_count:,}"
|
||||
if self.player.like_count
|
||||
else "Unknown",
|
||||
)
|
||||
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
|
||||
embed.add_field(name="Published", value=f"<t:{self.player.timestamp}>")
|
||||
embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%")
|
||||
|
||||
embed.set_image(self.player.thumbnail_url)
|
||||
embed.set_footer(
|
||||
text=f"queued by {self.trigger_message.author.name}",
|
||||
icon_url=(
|
||||
self.trigger_message.author.avatar.url
|
||||
if self.trigger_message.author.avatar
|
||||
else None
|
||||
),
|
||||
)
|
||||
|
||||
return embed
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
|
||||
@dataclass
|
||||
class Player:
|
||||
queue = collections.deque()
|
||||
current: Optional[Song] = None
|
||||
|
||||
def queue_pop(self):
|
||||
popped = self.queue.popleft()
|
||||
self.current = popped
|
||||
return popped
|
||||
|
||||
def queue_add(self, item):
|
||||
self.queue.append(item)
|
||||
|
||||
def queue_add_front(self, item):
|
||||
self.queue.appendleft(item)
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
7
audio/utils.py
Normal file
7
audio/utils.py
Normal file
@ -0,0 +1,7 @@
|
||||
def format_duration(duration: int | float) -> str:
|
||||
hours, duration = divmod(int(duration), 3600)
|
||||
minutes, duration = divmod(duration, 60)
|
||||
segments = [hours, minutes, duration]
|
||||
if len(segments) == 3 and segments[0] == 0:
|
||||
del segments[0]
|
||||
return f"{':'.join(f'{s:0>2}' for s in segments)}"
|
69
audio/youtubedl.py
Normal file
69
audio/youtubedl.py
Normal file
@ -0,0 +1,69 @@
|
||||
import asyncio
|
||||
from typing import Any, Optional
|
||||
|
||||
import disnake
|
||||
import yt_dlp
|
||||
|
||||
from constants import YTDL_OPTIONS
|
||||
|
||||
from .discord import PCMVolumeTransformer, TrackedAudioSource
|
||||
|
||||
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
|
||||
|
||||
|
||||
class YTDLSource(PCMVolumeTransformer):
|
||||
def __init__(
|
||||
self, source: TrackedAudioSource, *, data: dict[str, Any], volume: float = 0.5
|
||||
):
|
||||
super().__init__(source, volume)
|
||||
|
||||
self.description = data.get("description")
|
||||
self.duration = data.get("duration")
|
||||
self.id = data.get("id")
|
||||
self.like_count = data.get("like_count")
|
||||
self.original_url = data.get("original_url")
|
||||
self.thumbnail_url = data.get("thumbnail")
|
||||
self.timestamp = data.get("timestamp")
|
||||
self.title = data.get("title")
|
||||
self.uploader = data.get("uploader")
|
||||
self.uploader_url = data.get("uploader_url")
|
||||
self.view_count = data.get("view_count")
|
||||
|
||||
@classmethod
|
||||
async def from_url(
|
||||
cls,
|
||||
url,
|
||||
*,
|
||||
loop: Optional[asyncio.AbstractEventLoop] = None,
|
||||
stream: bool = False,
|
||||
):
|
||||
loop = loop or asyncio.get_event_loop()
|
||||
data: Any = await loop.run_in_executor(
|
||||
None, lambda: ytdl.extract_info(url, download=not stream)
|
||||
)
|
||||
|
||||
if "entries" in data:
|
||||
if not data["entries"]:
|
||||
raise Exception("no entries provided by yt-dlp!")
|
||||
data = data["entries"][0]
|
||||
|
||||
return cls(
|
||||
TrackedAudioSource(
|
||||
disnake.FFmpegPCMAudio(
|
||||
data["url"] if stream else ytdl.prepare_filename(data),
|
||||
before_options="-vn -reconnect 1",
|
||||
)
|
||||
),
|
||||
data=data,
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<YTDLSource title={self.title} original_url=<{self.original_url}> duration={self.duration}>"
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
|
||||
def __reload_module__():
|
||||
global ytdl
|
||||
ytdl = yt_dlp.YoutubeDL(YTDL_OPTIONS)
|
@ -16,7 +16,7 @@ from .utils import command_allowed, ensure_joined, play_next
|
||||
|
||||
async def queue_or_play(message, edited=False):
|
||||
if message.guild.id not in players:
|
||||
players[message.guild.id] = audio.QueuedPlayer()
|
||||
players[message.guild.id] = audio.queue.Player()
|
||||
|
||||
tokens = commands.tokenize(message.content)
|
||||
parser = arguments.ArgumentParser(
|
||||
@ -154,7 +154,7 @@ async def queue_or_play(message, edited=False):
|
||||
|
||||
try:
|
||||
async with message.channel.typing():
|
||||
player = await audio.YTDLSource.from_url(
|
||||
player = await audio.youtubedl.YTDLSource.from_url(
|
||||
" ".join(query), loop=client.loop, stream=True
|
||||
)
|
||||
player.volume = float(args.volume) / 100.0
|
||||
@ -162,7 +162,7 @@ async def queue_or_play(message, edited=False):
|
||||
await utils.reply(message, f"**failed to queue:** `{e}`")
|
||||
return
|
||||
|
||||
queued = audio.QueuedSong(player, message)
|
||||
queued = audio.queue.Song(player, message)
|
||||
|
||||
if args.now or args.next:
|
||||
players[message.guild.id].queue_add_front(queued)
|
||||
|
@ -32,7 +32,7 @@ async def sponsorblock_command(message):
|
||||
|
||||
current = "**" if progress >= begin and progress < end else ""
|
||||
text.append(
|
||||
f"{current}`{audio.format_duration(begin)}` - `{audio.format_duration(end)}`: {category_name if category_name else 'Unknown'}{current}"
|
||||
f"{current}`{audio.utils.format_duration(begin)}` - `{audio.utils.format_duration(end)}`: {category_name if category_name else 'Unknown'}{current}"
|
||||
)
|
||||
|
||||
await utils.reply(
|
||||
|
@ -22,6 +22,10 @@ PREFIX = "%"
|
||||
RELOADABLE_MODULES = [
|
||||
"arguments",
|
||||
"audio",
|
||||
"audio.discord",
|
||||
"audio.queue",
|
||||
"audio.utils",
|
||||
"audio.youtubedl",
|
||||
"commands",
|
||||
"commands.bot",
|
||||
"commands.tools",
|
||||
|
@ -7,7 +7,7 @@ import utils
|
||||
class TestFormatDuration(unittest.TestCase):
|
||||
def test_audio(self):
|
||||
def f(s):
|
||||
return audio.format_duration(s)
|
||||
return audio.utils.format_duration(s)
|
||||
|
||||
self.assertEqual(f(0), "00:00")
|
||||
self.assertEqual(f(0.5), "00:00")
|
||||
|
Loading…
x
Reference in New Issue
Block a user