Compare commits

..

3 Commits

11 changed files with 155 additions and 36 deletions

View File

@ -3,13 +3,16 @@ import inspect
from state import reloaded_modules from state import reloaded_modules
from . import tools, utils, voice from . import bot, tools, utils, voice
from .utils import * from .utils import *
def __reload_module__(): def __reload_module__():
for name, module in globals().items(): for name, module in globals().items():
if inspect.ismodule(module): if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module) importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules: if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name) reloaded_modules.add(name)

43
commands/bot.py Normal file
View File

@ -0,0 +1,43 @@
import importlib
import inspect
import time
import arguments
import constants
from state import reloaded_modules, start_time
import commands
import utils
async def uptime(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"print bot uptime",
)
parser.add_argument(
"-s",
"--since",
action="store_true",
help="bot up since",
)
if not (args := await parser.parse_args(message, tokens)):
return
if args.since:
await utils.reply(message, f"{round(start_time)}")
else:
await utils.reply(message, f"up {round(time.time() - start_time)} seconds")
def __reload_module__():
for name, module in globals().items():
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)
module.__reload_module__()

View File

@ -1,23 +1,77 @@
import importlib
import inspect
import re import re
import arguments
import commands import commands
import constants
from state import reloaded_modules
async def clear(message): async def clear(message):
tokens = commands.tokenize(message.content)[1:] tokens = commands.tokenize(message.content)
if len(tokens) < 2: parser = arguments.ArgumentParser(
await message.reply("no count and/or regex supplied!", mention_author=False) tokens[0],
"bulk delete messages in the current channel matching certain criteria",
)
parser.add_argument(
"count",
type=int,
choices=range(1, 1001),
metavar="[1-1000]",
help="amount of messages to delete",
)
parser.add_argument(
"-r",
"--regex",
required=False,
help="delete messages with content matching this regex",
)
parser.add_argument(
"-i",
"--author-id",
type=int,
action="append",
help="delete messages whose author matches this id",
)
parser.add_argument(
"-o",
"--oldest-first",
action="store_true",
help="delete oldest messages first",
)
if not (args := await parser.parse_args(message, tokens)):
return return
def check(m):
c = []
if r := args.regex:
c.append(re.match(r, m.content))
if i := args.author_id:
c.append(m.author.id in i)
return all(c)
message_count = len( message_count = len(
await message.channel.purge( await message.channel.purge(
limit=int(tokens[0]), check=lambda m: re.match(tokens[1], m.content) limit=args.count, check=check, oldest_first=args.oldest_first
) )
) )
try: try:
await message.reply( await message.reply(
f"successfully purged **{message_count} {'message' if message_count == 1 else 'messages'}**", f"purged **{message_count} {'message' if message_count == 1 else 'messages'}**",
mention_author=False, mention_author=False,
) )
except: except:
pass pass
def __reload_module__():
for name, module in globals().items():
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)
module.__reload_module__()

View File

@ -4,16 +4,18 @@ import constants
class Command(enum.Enum): class Command(enum.Enum):
RELOAD = "reload"
EXECUTE = "execute"
CLEAR = "clear" CLEAR = "clear"
EXECUTE = "execute"
JOIN = "join" JOIN = "join"
LEAVE = "leave" LEAVE = "leave"
QUEUE = "queue"
PLAY = "play"
SKIP = "skip"
RESUME = "resume"
PAUSE = "pause" PAUSE = "pause"
PLAY = "play"
PURGE = "purge"
QUEUE = "queue"
RELOAD = "reload"
RESUME = "resume"
SKIP = "skip"
UPTIME = "uptime"
VOLUME = "volume" VOLUME = "volume"

View File

@ -3,6 +3,7 @@ import inspect
import arguments import arguments
import commands import commands
import constants
import utils import utils
import ytdlp import ytdlp
from state import client, playback_queue, reloaded_modules from state import client, playback_queue, reloaded_modules
@ -227,7 +228,10 @@ def generate_queue_list(queue: list):
def __reload_module__(): def __reload_module__():
for name, module in globals().items(): for name, module in globals().items():
if inspect.ismodule(module): if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module) importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules: if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name) reloaded_modules.add(name)

View File

@ -3,8 +3,9 @@ import os
EMBED_COLOR = 0xFF6600 EMBED_COLOR = 0xFF6600
OWNERS = [531392146767347712] OWNERS = [531392146767347712]
PREFIX = "%" PREFIX = "%"
RELOAD_BLACKLISTED_MODULES = ["re", "argparse"]
ytdl_format_options = { YTDL_OPTIONS = {
"default_search": "auto", "default_search": "auto",
"format": "bestaudio/best", "format": "bestaudio/best",
"ignoreerrors": False, "ignoreerrors": False,
@ -19,6 +20,6 @@ ytdl_format_options = {
} }
secrets = { SECRETS = {
"TOKEN": os.getenv("BOT_TOKEN"), "TOKEN": os.getenv("BOT_TOKEN"),
} }

View File

@ -23,14 +23,15 @@ async def on_message(message):
if len(matched) > 1: if len(matched) > 1:
await message.reply( await message.reply(
f"ambiguous command, could be {' or '.join(['`' + match.value + '`' for match in matched])}", f"ambiguous command, could be {' or '.join([f'`{match.value}`' for match in matched])}",
mention_author=False, mention_author=False,
) )
return return
C = commands.Command
try: try:
match matched[0]: match matched[0]:
case commands.Command.EXECUTE if message.author.id in constants.OWNERS: case C.EXECUTE if message.author.id in constants.OWNERS:
code = message.content[len(tokens[0]) + 1 :].strip().strip("`") code = message.content[len(tokens[0]) + 1 :].strip().strip("`")
for replacement in ["python", "py"]: for replacement in ["python", "py"]:
if code.startswith(replacement): if code.startswith(replacement):
@ -75,22 +76,24 @@ async def on_message(message):
await message.add_reaction("") await message.add_reaction("")
else: else:
await message.channel.send(output) await message.channel.send(output)
case commands.Command.CLEAR: case C.CLEAR | C.PURGE:
await commands.tools.clear(message) await commands.tools.clear(message)
case commands.Command.JOIN: case C.JOIN:
await commands.voice.join(message) await commands.voice.join(message)
case commands.Command.LEAVE: case C.LEAVE:
await commands.voice.leave(message) await commands.voice.leave(message)
case commands.Command.QUEUE | commands.Command.PLAY: case C.QUEUE | C.PLAY:
await commands.voice.queue_or_play(message) await commands.voice.queue_or_play(message)
case commands.Command.SKIP: case C.SKIP:
await commands.voice.skip(message) await commands.voice.skip(message)
case commands.Command.RESUME: case C.RESUME:
await commands.voice.resume(message) await commands.voice.resume(message)
case commands.Command.PAUSE: case C.PAUSE:
await commands.voice.pause(message) await commands.voice.pause(message)
case commands.Command.VOLUME: case C.VOLUME:
await commands.voice.volume(message) await commands.voice.volume(message)
case C.UPTIME:
await commands.bot.uptime(message)
except Exception as e: except Exception as e:
await message.reply( await message.reply(
f"exception occurred while processing command: ```\n{''.join(traceback.format_exception(e)).replace('`', '\\`')}```", f"exception occurred while processing command: ```\n{''.join(traceback.format_exception(e)).replace('`', '\\`')}```",
@ -100,7 +103,10 @@ async def on_message(message):
def __reload_module__(): def __reload_module__():
for name, module in globals().items(): for name, module in globals().items():
if inspect.ismodule(module): if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module) importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules: if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name) reloaded_modules.add(name)

12
main.py
View File

@ -2,13 +2,12 @@ import importlib
import inspect import inspect
import time import time
import commands
import constants import constants
import core import core
import events import events
from state import client, reloaded_modules from state import client, reloaded_modules, start_time
start_time = time.time() import commands
@client.event @client.event
@ -36,7 +35,10 @@ async def on_message(message):
commands.Command.RELOAD commands.Command.RELOAD
]: ]:
for name, module in globals().items(): for name, module in globals().items():
if inspect.ismodule(module): if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module) importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules: if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name) reloaded_modules.add(name)
@ -48,4 +50,4 @@ async def on_message(message):
await events.on_message(message) await events.on_message(message)
client.run(constants.secrets["TOKEN"]) client.run(constants.SECRETS["TOKEN"])

View File

@ -1,5 +1,9 @@
import time
import disnake import disnake
start_time = time.time()
playback_queue = {} playback_queue = {}
reloaded_modules = set() reloaded_modules = set()

View File

@ -14,7 +14,7 @@ async def invalid_user_handler(interaction):
def filter_secrets(text: str) -> str: def filter_secrets(text: str) -> str:
for secret_name, secret in constants.secrets.items(): for secret_name, secret in constants.SECRETS.items():
if not secret: if not secret:
continue continue
text = text.replace(secret, f"<{secret_name}>") text = text.replace(secret, f"<{secret_name}>")

View File

@ -9,7 +9,7 @@ import yt_dlp
import constants import constants
from state import reloaded_modules from state import reloaded_modules
ytdl = yt_dlp.YoutubeDL(constants.ytdl_format_options) ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)
class YTDLSource(disnake.PCMVolumeTransformer): class YTDLSource(disnake.PCMVolumeTransformer):
@ -45,11 +45,11 @@ class YTDLSource(disnake.PCMVolumeTransformer):
def __reload_module__(): def __reload_module__():
for name, module in globals().items(): for name, module in globals().items():
if inspect.ismodule(module): if inspect.ismodule(module) and name not in constants.RELOAD_BLACKLISTED_MODULES:
importlib.reload(module) importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules: if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name) reloaded_modules.add(name)
module.__reload_module__() module.__reload_module__()
global ytdl global ytdl
ytdl = yt_dlp.YoutubeDL(constants.ytdl_format_options) ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)