Compare commits

...

9 Commits

13 changed files with 250 additions and 254 deletions

View File

@ -29,3 +29,11 @@ class ArgumentParser:
await utils.reply(message, f"```\n{self.print_help()}```") await utils.reply(message, f"```\n{self.print_help()}```")
except Exception as e: except Exception as e:
await utils.reply(message, f"`{e}`") await utils.reply(message, f"`{e}`")
def range_type(string, min=0, max=100):
value = int(string)
if min <= value <= max:
return value
else:
raise argparse.ArgumentTypeError("value not in range %s-%s" % (min, max))

View File

@ -1,21 +1,6 @@
import importlib
import inspect
from state import reloaded_modules
from . import bot, tools, utils, voice from . import bot, tools, utils, voice
from .utils import * from .utils import *
def __reload_module__(): def __reload_module__():
for name, module in globals().items():
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)
module.__reload_module__()
globals().update({k: v for k, v in vars(utils).items() if not k.startswith("_")}) globals().update({k: v for k, v in vars(utils).items() if not k.startswith("_")})

View File

@ -1,12 +1,18 @@
import importlib
import inspect
import time import time
import arguments import arguments
import commands import commands
import constants
import utils import utils
from state import reloaded_modules, start_time from state import start_time
async def help(message):
await utils.reply(
message,
", ".join(
[f"`{command.value}`" for command in commands.Command.__members__.values()]
),
)
async def uptime(message): async def uptime(message):
@ -27,16 +33,28 @@ async def uptime(message):
if args.since: if args.since:
await utils.reply(message, f"{round(start_time)}") await utils.reply(message, f"{round(start_time)}")
else: else:
await utils.reply(message, f"up {round(time.time() - start_time)} seconds") format_plural = lambda noun, count: noun if count == 1 else noun + "s"
segments = []
duration = time.time() - start_time
def __reload_module__(): days, duration = divmod(duration, 86400)
for name, module in globals().items(): if days >= 1:
if ( days = int(days)
inspect.ismodule(module) segments.append(f"{days} {format_plural('day', days)}")
and name not in constants.RELOAD_BLACKLISTED_MODULES
): hours, duration = divmod(duration, 3600)
importlib.reload(module) if hours >= 1:
if "__reload_module__" in dir(module) and name not in reloaded_modules: hours = int(hours)
reloaded_modules.add(name) segments.append(f"{hours} {format_plural('hour', hours)}")
module.__reload_module__()
minutes, duration = divmod(duration, 60)
if minutes >= 1:
minutes = int(minutes)
segments.append(f"{minutes} {format_plural('minute', minutes)}")
seconds = int(duration)
if seconds > 0:
segments.append(f"{seconds} {format_plural('second', seconds)}")
await utils.reply(message, f"up {', '.join(segments)}")

View File

@ -1,11 +1,8 @@
import importlib
import inspect
import re import re
import arguments import arguments
import commands import commands
import constants import utils
from state import reloaded_modules
async def clear(message): async def clear(message):
@ -63,15 +60,3 @@ async def clear(message):
) )
except: except:
pass pass
def __reload_module__():
for name, module in globals().items():
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)
module.__reload_module__()

View File

@ -6,6 +6,7 @@ import constants
class Command(enum.Enum): class Command(enum.Enum):
CLEAR = "clear" CLEAR = "clear"
EXECUTE = "execute" EXECUTE = "execute"
HELP = "help"
JOIN = "join" JOIN = "join"
LEAVE = "leave" LEAVE = "leave"
PAUSE = "pause" PAUSE = "pause"

View File

@ -1,12 +1,10 @@
import importlib import functools
import inspect
import arguments import arguments
import commands import commands
import constants
import utils import utils
import ytdlp import youtubedl
from state import client, playback_queue, reloaded_modules from state import client, player_current, player_queue
async def queue_or_play(message): async def queue_or_play(message):
@ -14,8 +12,8 @@ async def queue_or_play(message):
if not command_allowed(message): if not command_allowed(message):
return return
if message.guild.id not in playback_queue: if message.guild.id not in player_queue:
playback_queue[message.guild.id] = [] player_queue[message.guild.id] = []
tokens = commands.tokenize(message.content) tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser( parser = arguments.ArgumentParser(
@ -28,19 +26,28 @@ async def queue_or_play(message):
action="store_true", action="store_true",
help="clear all queued songs", help="clear all queued songs",
) )
parser.add_argument(
"-v",
"--volume",
default=50,
type=functools.partial(arguments.range_type, min=0, max=150),
metavar="[0-150]",
help="the volume level (0 - 150)",
)
if not (args := await parser.parse_args(message, tokens)): if not (args := await parser.parse_args(message, tokens)):
return return
if args.clear: if args.clear:
playback_queue[message.guild.id] = [] player_queue[message.guild.id] = []
await message.add_reaction("") await utils.add_check_reaction(message)
return return
elif query := args.query: elif query := args.query:
try: try:
async with message.channel.typing(): async with message.channel.typing():
player = await ytdlp.YTDLSource.from_url( player = await youtubedl.YTDLSource.from_url(
query, loop=client.loop, stream=True query, loop=client.loop, stream=True
) )
player.volume = float(args.volume) / 100.0
except Exception as e: except Exception as e:
await utils.reply( await utils.reply(
message, message,
@ -48,15 +55,16 @@ async def queue_or_play(message):
) )
return return
playback_queue[message.guild.id].append( player_queue[message.guild.id].insert(
{"player": player, "queuer": message.author.id} 0, {"player": player, "queuer": message.author.id}
) )
if ( if (
not message.guild.voice_client.is_playing() not message.guild.voice_client.is_playing()
and not message.guild.voice_client.is_paused() and not message.guild.voice_client.is_paused()
): ):
await play_next(message) await utils.reply(message, f"**now playing:** `{player.title}`")
play_next(message)
else: else:
await utils.reply( await utils.reply(
message, message,
@ -72,29 +80,29 @@ async def queue_or_play(message):
) )
else: else:
generate_currently_playing = ( generate_currently_playing = (
lambda: f"**0.** {'**paused:** ' if message.guild.voice_client.is_paused() else ''}`{message.guild.voice_client.source.title}`" lambda: f"**0.** {'**paused:** ' if message.guild.voice_client.is_paused() else ''}`{message.guild.voice_client.source.title}` (<@{player_current[message.guild.id]['queuer']}>)"
) )
if ( if (
not playback_queue[message.guild.id] not player_queue[message.guild.id]
and not message.guild.voice_client.source and not message.guild.voice_client.source
): ):
await utils.reply( await utils.reply(
message, message,
"nothing is playing or queued!", "nothing is playing or queued!",
) )
elif not playback_queue[message.guild.id]: elif not player_queue[message.guild.id]:
await utils.reply(message, generate_currently_playing()) await utils.reply(message, generate_currently_playing())
elif not message.guild.voice_client.source: elif not message.guild.voice_client.source:
await utils.reply( await utils.reply(
message, message,
generate_queue_list(playback_queue[message.guild.id]), generate_queue_list(player_queue[message.guild.id]),
) )
else: else:
await utils.reply( await utils.reply(
message, message,
generate_currently_playing() generate_currently_playing()
+ "\n" + "\n"
+ generate_queue_list(playback_queue[message.guild.id]), + generate_queue_list(player_queue[message.guild.id]),
) )
else: else:
await utils.reply( await utils.reply(
@ -107,14 +115,15 @@ async def skip(message):
if not command_allowed(message): if not command_allowed(message):
return return
if not playback_queue[message.guild.id]: if not player_queue[message.guild.id]:
message.guild.voice_client.stop() message.guild.voice_client.stop()
await utils.reply( await utils.reply(
message, message,
"the queue is empty now!", "the queue is empty now!",
) )
else: else:
await play_next(message) message.guild.voice_client.stop()
await utils.add_check_reaction(message)
async def join(message): async def join(message):
@ -165,8 +174,7 @@ async def volume(message):
parser.add_argument( parser.add_argument(
"volume", "volume",
nargs="?", nargs="?",
type=int, type=functools.partial(arguments.range_type, min=0, max=150),
choices=range(0, 151),
metavar="[0-150]", metavar="[0-150]",
help="the volume level (0 - 150)", help="the volume level (0 - 150)",
) )
@ -180,28 +188,24 @@ async def volume(message):
) )
return return
if args.volume: if args.volume is None:
message.guild.voice_client.source.volume = float(args.volume) / 100.0
await utils.reply(
message,
f"{args.volume}",
)
else:
await utils.reply( await utils.reply(
message, message,
f"{int(message.guild.voice_client.source.volume * 100)}", f"{int(message.guild.voice_client.source.volume * 100)}",
) )
else:
message.guild.voice_client.source.volume = float(args.volume) / 100.0
await utils.add_check_reaction(message)
async def play_next(message): def play_next(message, once=False):
while playback_queue[message.guild.id]:
queued = playback_queue[message.guild.id].pop()
await ensure_joined(message)
message.guild.voice_client.stop() message.guild.voice_client.stop()
if player_queue[message.guild.id]:
queued = player_queue[message.guild.id].pop()
player_current[message.guild.id] = queued
message.guild.voice_client.play( message.guild.voice_client.play(
queued["player"], after=lambda e: print(f"player error: {e}") if e else None queued["player"], after=lambda _: play_next(message) if not once else None
) )
await message.channel.send(f"**now playing:** {queued['player'].title}")
async def ensure_joined(message): async def ensure_joined(message):
@ -225,15 +229,3 @@ def generate_queue_list(queue: list):
for i, queued in enumerate(queue) for i, queued in enumerate(queue)
] ]
) )
def __reload_module__():
for name, module in globals().items():
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)
module.__reload_module__()

View File

@ -3,7 +3,20 @@ import os
EMBED_COLOR = 0xFF6600 EMBED_COLOR = 0xFF6600
OWNERS = [531392146767347712] OWNERS = [531392146767347712]
PREFIX = "%" PREFIX = "%"
RELOAD_BLACKLISTED_MODULES = ["re", "argparse"] RELOADABLE_MODULES = [
"arguments",
"commands",
"commands.bot",
"commands.tools",
"commands.utils",
"commands.voice",
"constants",
"core",
"events",
"utils",
"voice",
"youtubedl",
]
YTDL_OPTIONS = { YTDL_OPTIONS = {
"default_search": "auto", "default_search": "auto",

139
core.py
View File

@ -1,7 +1,136 @@
message_handlers = {} import asyncio
import contextlib
import importlib
import inspect
import io
import textwrap
import traceback
import disnake_paginator
import commands
import constants
import core
import utils
from state import command_locks
async def trigger_message_handlers(event_type: str, *data): async def on_message(message):
if event_type in message_handlers: tokens = commands.tokenize(message.content)
for message_handler in message_handlers[event_type]: if not tokens:
await message_handler(*data) return
matched = commands.match_token(tokens[0])
if not matched:
return
if len(matched) > 1:
await utils.reply(
message,
f"ambiguous command, could be {' or '.join([f'`{match.value}`' for match in matched])}",
)
return
C = commands.Command
try:
match matched[0]:
case C.RELOAD if message.author.id in constants.OWNERS:
reloaded_modules = set()
for module in filter(
lambda v: inspect.ismodule(v)
and v.__name__ in constants.RELOADABLE_MODULES,
globals().values(),
):
core.rreload(reloaded_modules, module)
await utils.add_check_reaction(message)
case C.EXECUTE if message.author.id in constants.OWNERS:
code = message.content[len(tokens[0]) + 1 :].strip().strip("`")
for replacement in ["python", "py"]:
if code.startswith(replacement):
code = code[len(replacement) :]
stdout = io.StringIO()
try:
with contextlib.redirect_stdout(stdout):
if "#globals" in code:
exec(
f"async def run_code():\n{textwrap.indent(code, ' ')}",
globals(),
)
await globals()["run_code"]()
else:
dictionary = dict(locals(), **globals())
exec(
f"async def run_code():\n{textwrap.indent(code, ' ')}",
dictionary,
dictionary,
)
await dictionary["run_code"]()
output = stdout.getvalue()
except Exception as e:
output = "`" + str(e) + "`"
output = utils.filter_secrets(output)
if len(output) > 2000:
output = output.replace("`", "\\`")
pager = disnake_paginator.ButtonPaginator(
prefix=f"```\n",
suffix="```",
color=constants.EMBED_COLOR,
segments=disnake_paginator.split(output),
invalid_user_function=utils.invalid_user_handler,
)
await pager.start(
disnake_paginator.wrappers.MessageInteractionWrapper(message)
)
elif len(output.strip()) == 0:
await utils.add_check_reaction(message)
else:
await message.channel.send(output)
case C.CLEAR | C.PURGE if message.author.id in constants.OWNERS:
await commands.tools.clear(message)
case C.JOIN:
await commands.voice.join(message)
case C.LEAVE:
await commands.voice.leave(message)
case C.QUEUE | C.PLAY:
if message.guild.id not in command_locks:
command_locks[message.guild.id] = asyncio.Lock()
async with command_locks[message.guild.id]:
await commands.voice.queue_or_play(message)
case C.SKIP:
await commands.voice.skip(message)
case C.RESUME:
await commands.voice.resume(message)
case C.PAUSE:
await commands.voice.pause(message)
case C.VOLUME:
await commands.voice.volume(message)
case C.HELP:
await commands.bot.help(message)
case C.UPTIME:
await commands.bot.uptime(message)
except Exception as e:
await utils.reply(
message,
f"exception occurred while processing command: ```\n{''.join(traceback.format_exception(e)).replace('`', '\\`')}```",
)
def rreload(reloaded_modules, module):
reloaded_modules.add(module.__name__)
importlib.reload(module)
if "__reload_module__" in dir(module):
module.__reload_module__()
with contextlib.suppress(AttributeError):
for submodule in filter(
lambda v: inspect.ismodule(v)
and v.__name__ in constants.RELOADABLE_MODULES
and v.__name__ not in reloaded_modules,
map(lambda attr: getattr(module, attr), dir(module)),
):
rreload(reloaded_modules, submodule)
importlib.reload(module)

116
events.py
View File

@ -1,113 +1,7 @@
import contextlib dynamic_handlers = {}
import importlib
import inspect
import io
import textwrap
import traceback
import disnake_paginator
import commands
import constants
import utils
from state import reloaded_modules
async def on_message(message): async def trigger_dynamic_handlers(event_type: str, *data):
tokens = commands.tokenize(message.content) if event_type in dynamic_handlers:
if not tokens: for message_handler in dynamic_handlers[event_type]:
return await message_handler(*data)
matched = commands.match_token(tokens[0])
if not matched:
return
if len(matched) > 1:
await utils.reply(
message,
f"ambiguous command, could be {' or '.join([f'`{match.value}`' for match in matched])}",
)
return
C = commands.Command
try:
match matched[0]:
case C.EXECUTE if message.author.id in constants.OWNERS:
code = message.content[len(tokens[0]) + 1 :].strip().strip("`")
for replacement in ["python", "py"]:
if code.startswith(replacement):
code = code[len(replacement) :]
stdout = io.StringIO()
try:
with contextlib.redirect_stdout(stdout):
if "#globals" in code:
exec(
f"async def run_code():\n{textwrap.indent(code, ' ')}",
globals(),
)
await globals()["run_code"]()
else:
dictionary = dict(locals(), **globals())
exec(
f"async def run_code():\n{textwrap.indent(code, ' ')}",
dictionary,
dictionary,
)
await dictionary["run_code"]()
output = stdout.getvalue()
except Exception as e:
output = "`" + str(e) + "`"
output = utils.filter_secrets(output)
if len(output) > 2000:
output = output.replace("`", "\\`")
pager = disnake_paginator.ButtonPaginator(
prefix=f"```\n",
suffix="```",
color=constants.EMBED_COLOR,
segments=disnake_paginator.split(output),
invalid_user_function=utils.invalid_user_handler,
)
await pager.start(
disnake_paginator.wrappers.MessageInteractionWrapper(message)
)
elif len(output.strip()) == 0:
await message.add_reaction("")
else:
await message.channel.send(output)
case C.CLEAR | C.PURGE:
await commands.tools.clear(message)
case C.JOIN:
await commands.voice.join(message)
case C.LEAVE:
await commands.voice.leave(message)
case C.QUEUE | C.PLAY:
await commands.voice.queue_or_play(message)
case C.SKIP:
await commands.voice.skip(message)
case C.RESUME:
await commands.voice.resume(message)
case C.PAUSE:
await commands.voice.pause(message)
case C.VOLUME:
await commands.voice.volume(message)
case C.UPTIME:
await commands.bot.uptime(message)
except Exception as e:
await utils.reply(
message,
f"exception occurred while processing command: ```\n{''.join(traceback.format_exception(e)).replace('`', '\\`')}```",
)
def __reload_module__():
for name, module in globals().items():
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)
module.__reload_module__()

29
main.py
View File

@ -1,12 +1,9 @@
import importlib
import inspect
import time import time
import commands
import constants import constants
import core import core
import events import events
from state import client, reloaded_modules, start_time from state import client, start_time
@client.event @client.event
@ -16,37 +13,19 @@ async def on_ready():
@client.event @client.event
async def on_message_edit(before, after): async def on_message_edit(before, after):
await core.trigger_message_handlers("on_message_edit", before, after) await events.trigger_dynamic_handlers("on_message_edit", before, after)
await on_message(after) await on_message(after)
@client.event @client.event
async def on_message(message): async def on_message(message):
await core.trigger_message_handlers("on_message", message) await events.trigger_dynamic_handlers("on_message", message)
global reloaded_modules
if not message.content.startswith(constants.PREFIX): if not message.content.startswith(constants.PREFIX):
return return
if message.author.id in constants.OWNERS and commands.match(message.content) == [ await core.on_message(message)
commands.Command.RELOAD
]:
for name, module in globals().items():
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)
module.__reload_module__()
reloaded_modules.clear()
await message.add_reaction("")
return
await events.on_message(message)
client.run(constants.SECRETS["TOKEN"]) client.run(constants.SECRETS["TOKEN"])

View File

@ -2,11 +2,12 @@ import time
import disnake import disnake
start_time = time.time() player_queue = {}
player_current = {}
playback_queue = {} command_locks = {}
reloaded_modules = set()
intents = disnake.Intents.default() intents = disnake.Intents.default()
intents.message_content = True intents.message_content = True
client = disnake.Client(intents=intents) client = disnake.Client(intents=intents)
start_time = time.time()

View File

@ -3,6 +3,10 @@ import disnake
import constants import constants
async def add_check_reaction(message):
await message.add_reaction("")
async def reply(message, *args): async def reply(message, *args):
await message.reply(*args, allowed_mentions=disnake.AllowedMentions.none()) await message.reply(*args, allowed_mentions=disnake.AllowedMentions.none())

View File

@ -1,13 +1,10 @@
import asyncio import asyncio
import importlib
import inspect
from typing import Any, Optional from typing import Any, Optional
import disnake import disnake
import yt_dlp import yt_dlp
import constants import constants
from state import reloaded_modules
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS) ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)
@ -38,22 +35,12 @@ class YTDLSource(disnake.PCMVolumeTransformer):
return cls( return cls(
disnake.FFmpegPCMAudio( disnake.FFmpegPCMAudio(
data["url"] if stream else ytdl.prepare_filename(data), data["url"] if stream else ytdl.prepare_filename(data),
options="-vn -reconnect 1", before_options="-vn -reconnect 1",
), ),
data=data, data=data,
) )
def __reload_module__(): def __reload_module__():
for name, module in globals().items():
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)
module.__reload_module__()
global ytdl global ytdl
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS) ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)