Compare commits

..

3 Commits

11 changed files with 155 additions and 36 deletions

View File

@ -3,13 +3,16 @@ import inspect
from state import reloaded_modules
from . import tools, utils, voice
from . import bot, tools, utils, voice
from .utils import *
def __reload_module__():
for name, module in globals().items():
if inspect.ismodule(module):
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)

43
commands/bot.py Normal file
View File

@ -0,0 +1,43 @@
import importlib
import inspect
import time
import arguments
import constants
from state import reloaded_modules, start_time
import commands
import utils
async def uptime(message):
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"print bot uptime",
)
parser.add_argument(
"-s",
"--since",
action="store_true",
help="bot up since",
)
if not (args := await parser.parse_args(message, tokens)):
return
if args.since:
await utils.reply(message, f"{round(start_time)}")
else:
await utils.reply(message, f"up {round(time.time() - start_time)} seconds")
def __reload_module__():
for name, module in globals().items():
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)
module.__reload_module__()

View File

@ -1,23 +1,77 @@
import importlib
import inspect
import re
import arguments
import commands
import constants
from state import reloaded_modules
async def clear(message):
tokens = commands.tokenize(message.content)[1:]
if len(tokens) < 2:
await message.reply("no count and/or regex supplied!", mention_author=False)
tokens = commands.tokenize(message.content)
parser = arguments.ArgumentParser(
tokens[0],
"bulk delete messages in the current channel matching certain criteria",
)
parser.add_argument(
"count",
type=int,
choices=range(1, 1001),
metavar="[1-1000]",
help="amount of messages to delete",
)
parser.add_argument(
"-r",
"--regex",
required=False,
help="delete messages with content matching this regex",
)
parser.add_argument(
"-i",
"--author-id",
type=int,
action="append",
help="delete messages whose author matches this id",
)
parser.add_argument(
"-o",
"--oldest-first",
action="store_true",
help="delete oldest messages first",
)
if not (args := await parser.parse_args(message, tokens)):
return
def check(m):
c = []
if r := args.regex:
c.append(re.match(r, m.content))
if i := args.author_id:
c.append(m.author.id in i)
return all(c)
message_count = len(
await message.channel.purge(
limit=int(tokens[0]), check=lambda m: re.match(tokens[1], m.content)
limit=args.count, check=check, oldest_first=args.oldest_first
)
)
try:
await message.reply(
f"successfully purged **{message_count} {'message' if message_count == 1 else 'messages'}**",
f"purged **{message_count} {'message' if message_count == 1 else 'messages'}**",
mention_author=False,
)
except:
pass
def __reload_module__():
for name, module in globals().items():
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)
module.__reload_module__()

View File

@ -4,16 +4,18 @@ import constants
class Command(enum.Enum):
RELOAD = "reload"
EXECUTE = "execute"
CLEAR = "clear"
EXECUTE = "execute"
JOIN = "join"
LEAVE = "leave"
QUEUE = "queue"
PLAY = "play"
SKIP = "skip"
RESUME = "resume"
PAUSE = "pause"
PLAY = "play"
PURGE = "purge"
QUEUE = "queue"
RELOAD = "reload"
RESUME = "resume"
SKIP = "skip"
UPTIME = "uptime"
VOLUME = "volume"

View File

@ -3,6 +3,7 @@ import inspect
import arguments
import commands
import constants
import utils
import ytdlp
from state import client, playback_queue, reloaded_modules
@ -227,7 +228,10 @@ def generate_queue_list(queue: list):
def __reload_module__():
for name, module in globals().items():
if inspect.ismodule(module):
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)

View File

@ -3,8 +3,9 @@ import os
EMBED_COLOR = 0xFF6600
OWNERS = [531392146767347712]
PREFIX = "%"
RELOAD_BLACKLISTED_MODULES = ["re", "argparse"]
ytdl_format_options = {
YTDL_OPTIONS = {
"default_search": "auto",
"format": "bestaudio/best",
"ignoreerrors": False,
@ -19,6 +20,6 @@ ytdl_format_options = {
}
secrets = {
SECRETS = {
"TOKEN": os.getenv("BOT_TOKEN"),
}

View File

@ -23,14 +23,15 @@ async def on_message(message):
if len(matched) > 1:
await message.reply(
f"ambiguous command, could be {' or '.join(['`' + match.value + '`' for match in matched])}",
f"ambiguous command, could be {' or '.join([f'`{match.value}`' for match in matched])}",
mention_author=False,
)
return
C = commands.Command
try:
match matched[0]:
case commands.Command.EXECUTE if message.author.id in constants.OWNERS:
case C.EXECUTE if message.author.id in constants.OWNERS:
code = message.content[len(tokens[0]) + 1 :].strip().strip("`")
for replacement in ["python", "py"]:
if code.startswith(replacement):
@ -75,22 +76,24 @@ async def on_message(message):
await message.add_reaction("")
else:
await message.channel.send(output)
case commands.Command.CLEAR:
case C.CLEAR | C.PURGE:
await commands.tools.clear(message)
case commands.Command.JOIN:
case C.JOIN:
await commands.voice.join(message)
case commands.Command.LEAVE:
case C.LEAVE:
await commands.voice.leave(message)
case commands.Command.QUEUE | commands.Command.PLAY:
case C.QUEUE | C.PLAY:
await commands.voice.queue_or_play(message)
case commands.Command.SKIP:
case C.SKIP:
await commands.voice.skip(message)
case commands.Command.RESUME:
case C.RESUME:
await commands.voice.resume(message)
case commands.Command.PAUSE:
case C.PAUSE:
await commands.voice.pause(message)
case commands.Command.VOLUME:
case C.VOLUME:
await commands.voice.volume(message)
case C.UPTIME:
await commands.bot.uptime(message)
except Exception as e:
await message.reply(
f"exception occurred while processing command: ```\n{''.join(traceback.format_exception(e)).replace('`', '\\`')}```",
@ -100,7 +103,10 @@ async def on_message(message):
def __reload_module__():
for name, module in globals().items():
if inspect.ismodule(module):
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)

12
main.py
View File

@ -2,13 +2,12 @@ import importlib
import inspect
import time
import commands
import constants
import core
import events
from state import client, reloaded_modules
from state import client, reloaded_modules, start_time
start_time = time.time()
import commands
@client.event
@ -36,7 +35,10 @@ async def on_message(message):
commands.Command.RELOAD
]:
for name, module in globals().items():
if inspect.ismodule(module):
if (
inspect.ismodule(module)
and name not in constants.RELOAD_BLACKLISTED_MODULES
):
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)
@ -48,4 +50,4 @@ async def on_message(message):
await events.on_message(message)
client.run(constants.secrets["TOKEN"])
client.run(constants.SECRETS["TOKEN"])

View File

@ -1,5 +1,9 @@
import time
import disnake
start_time = time.time()
playback_queue = {}
reloaded_modules = set()

View File

@ -14,7 +14,7 @@ async def invalid_user_handler(interaction):
def filter_secrets(text: str) -> str:
for secret_name, secret in constants.secrets.items():
for secret_name, secret in constants.SECRETS.items():
if not secret:
continue
text = text.replace(secret, f"<{secret_name}>")

View File

@ -9,7 +9,7 @@ import yt_dlp
import constants
from state import reloaded_modules
ytdl = yt_dlp.YoutubeDL(constants.ytdl_format_options)
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)
class YTDLSource(disnake.PCMVolumeTransformer):
@ -45,11 +45,11 @@ class YTDLSource(disnake.PCMVolumeTransformer):
def __reload_module__():
for name, module in globals().items():
if inspect.ismodule(module):
if inspect.ismodule(module) and name not in constants.RELOAD_BLACKLISTED_MODULES:
importlib.reload(module)
if "__reload_module__" in dir(module) and name not in reloaded_modules:
reloaded_modules.add(name)
module.__reload_module__()
global ytdl
ytdl = yt_dlp.YoutubeDL(constants.ytdl_format_options)
ytdl = yt_dlp.YoutubeDL(constants.YTDL_OPTIONS)