Compare commits
No commits in common. "c420f3de6bd85efc8a8a76f335e2e97e542ca501" and "27a460fa6e4cbefe4cdc63a445c7d0bf6f8013bf" have entirely different histories.
c420f3de6b
...
27a460fa6e
@ -11,3 +11,7 @@ __all__ = [
|
|||||||
"match_token",
|
"match_token",
|
||||||
"tokenize",
|
"tokenize",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def __reload_module__():
|
||||||
|
globals().update({k: v for k, v in vars(utils).items() if not k.startswith("_")})
|
||||||
|
@ -84,17 +84,6 @@ async def uptime(message):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def ping(message):
|
|
||||||
await utils.reply(
|
|
||||||
message,
|
|
||||||
embed=disnake.Embed(
|
|
||||||
title="Pong :ping_pong:",
|
|
||||||
description=f"Latency: **{round(client.latency * 1000, 1)} ms**",
|
|
||||||
color=EMBED_COLOR,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def help(message):
|
async def help(message):
|
||||||
await utils.reply(
|
await utils.reply(
|
||||||
message,
|
message,
|
||||||
|
@ -1,159 +1,8 @@
|
|||||||
import re
|
import re
|
||||||
|
|
||||||
import disnake
|
|
||||||
import requests
|
|
||||||
|
|
||||||
import arguments
|
import arguments
|
||||||
import commands
|
import commands
|
||||||
import utils
|
import utils
|
||||||
from constants import APPLICATION_FLAGS, BADGE_EMOJIS, EMBED_COLOR, PUBLIC_FLAGS
|
|
||||||
from state import client
|
|
||||||
|
|
||||||
|
|
||||||
async def lookup(message):
|
|
||||||
tokens = commands.tokenize(message.content)
|
|
||||||
parser = arguments.ArgumentParser(
|
|
||||||
tokens[0],
|
|
||||||
"look up a user or application on discord by their ID",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"-a",
|
|
||||||
"--application",
|
|
||||||
action="store_true",
|
|
||||||
help="search for applications instead of users",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"id",
|
|
||||||
type=int,
|
|
||||||
help="the ID to perform a search for",
|
|
||||||
)
|
|
||||||
if not (args := await parser.parse_args(message, tokens)):
|
|
||||||
return
|
|
||||||
|
|
||||||
if args.application:
|
|
||||||
response = requests.get(
|
|
||||||
f"https://discord.com/api/v9/applications/{args.id}/rpc"
|
|
||||||
).json()
|
|
||||||
if "code" in response.keys():
|
|
||||||
await utils.reply(message, "application not found!")
|
|
||||||
return
|
|
||||||
|
|
||||||
embed = disnake.Embed(description=response["description"], color=EMBED_COLOR)
|
|
||||||
embed.set_thumbnail(
|
|
||||||
url=f"https://cdn.discordapp.com/app-icons/{response['id']}/{response['icon']}.webp"
|
|
||||||
)
|
|
||||||
embed.add_field(name="Application Name", value=response["name"])
|
|
||||||
embed.add_field(name="Application ID", value="`" + response["id"] + "`")
|
|
||||||
embed.add_field(
|
|
||||||
name="Public Bot",
|
|
||||||
value=f"{'`' + str(response['bot_public']) + '`' if 'bot_public' in response else 'No bot'}",
|
|
||||||
)
|
|
||||||
embed.add_field(name="Public Flags", value="`" + str(response["flags"]) + "`")
|
|
||||||
embed.add_field(
|
|
||||||
name="Terms of Service",
|
|
||||||
value=(
|
|
||||||
"None"
|
|
||||||
if "terms_of_service_url" not in response.keys()
|
|
||||||
else f"[Link]({response['terms_of_service_url']})"
|
|
||||||
),
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Privacy Policy",
|
|
||||||
value=(
|
|
||||||
"None"
|
|
||||||
if "privacy_policy_url" not in response.keys()
|
|
||||||
else f"[Link]({response['privacy_policy_url']})"
|
|
||||||
),
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Creation Time",
|
|
||||||
value=f"<t:{utils.parse_snowflake(int(response['id']))}:R>",
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Default Invite URL",
|
|
||||||
value=(
|
|
||||||
"None"
|
|
||||||
if "install_params" not in response.keys()
|
|
||||||
else f"[Link](https://discord.com/oauth2/authorize?client_id={response['id']}&permissions={response['install_params']['permissions']}&scope={'%20'.join(response['install_params']['scopes'])})"
|
|
||||||
),
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Custom Invite URL",
|
|
||||||
value=(
|
|
||||||
"None"
|
|
||||||
if "custom_install_url" not in response.keys()
|
|
||||||
else f"[Link]({response['custom_install_url']})"
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
bot_intents = []
|
|
||||||
for application_flag, intent_name in APPLICATION_FLAGS.items():
|
|
||||||
if response["flags"] & application_flag == application_flag:
|
|
||||||
if intent_name.replace(" (unverified)", "") not in bot_intents:
|
|
||||||
bot_intents.append(intent_name)
|
|
||||||
embed.add_field(
|
|
||||||
name="Application Flags",
|
|
||||||
value=", ".join(bot_intents) if bot_intents else "None",
|
|
||||||
)
|
|
||||||
|
|
||||||
bot_tags = ""
|
|
||||||
if "tags" in response.keys():
|
|
||||||
for tag in response["tags"]:
|
|
||||||
bot_tags += tag + ", "
|
|
||||||
embed.add_field(
|
|
||||||
name="Tags", value="None" if bot_tags == "" else bot_tags[:-2], inline=False
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
user = await client.fetch_user(args.id)
|
|
||||||
except Exception:
|
|
||||||
await utils.reply(message, "user not found!")
|
|
||||||
return
|
|
||||||
|
|
||||||
badges = ""
|
|
||||||
for flag, flag_name in PUBLIC_FLAGS.items():
|
|
||||||
if user.public_flags.value & flag == flag:
|
|
||||||
if flag_name != "None":
|
|
||||||
try:
|
|
||||||
badges += BADGE_EMOJIS[PUBLIC_FLAGS[flag]]
|
|
||||||
except Exception:
|
|
||||||
raise Exception(f"unable to find badge: {PUBLIC_FLAGS[flag]}")
|
|
||||||
|
|
||||||
accent_color = 0x000000
|
|
||||||
user_object = await client.fetch_user(user.id)
|
|
||||||
if user_object.accent_color is None:
|
|
||||||
accent_color = user_object.accent_color
|
|
||||||
|
|
||||||
embed = disnake.Embed(color=accent_color)
|
|
||||||
embed.add_field(
|
|
||||||
name="User ID",
|
|
||||||
value=f"`{user.id}`",
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Discriminator",
|
|
||||||
value=f"`{user.name}#{user.discriminator}`",
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Creation Time",
|
|
||||||
value=f"<t:{utils.parse_snowflake(int(user.id))}:R>",
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Public Flags",
|
|
||||||
value=f"`{user.public_flags.value}` {badges}",
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="Bot User",
|
|
||||||
value=f"`{user.bot}`",
|
|
||||||
)
|
|
||||||
embed.add_field(
|
|
||||||
name="System User",
|
|
||||||
value=f"`{user.system}`",
|
|
||||||
)
|
|
||||||
embed.set_thumbnail(url=user.avatar if user.avatar else user.default_avatar)
|
|
||||||
if user_object.banner:
|
|
||||||
embed.set_image(url=user_object.banner)
|
|
||||||
|
|
||||||
await utils.reply(message, embed=embed)
|
|
||||||
|
|
||||||
|
|
||||||
async def clear(message):
|
async def clear(message):
|
||||||
@ -205,25 +54,12 @@ async def clear(message):
|
|||||||
action="store_true",
|
action="store_true",
|
||||||
help="delete messages with reactions",
|
help="delete messages with reactions",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
|
||||||
"-A",
|
|
||||||
"--attachments",
|
|
||||||
action="store_true",
|
|
||||||
help="delete messages with attachments",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"-d",
|
"-d",
|
||||||
"--delete-command",
|
"--delete-command",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help="delete the command message as well",
|
help="delete the command message as well",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
|
||||||
"-I",
|
|
||||||
"--ignore-ids",
|
|
||||||
type=int,
|
|
||||||
action="append",
|
|
||||||
help="ignore messages with this id",
|
|
||||||
)
|
|
||||||
if not (args := await parser.parse_args(message, tokens)):
|
if not (args := await parser.parse_args(message, tokens)):
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -238,8 +74,6 @@ async def clear(message):
|
|||||||
regex = re.compile(r, re.IGNORECASE if args.case_insensitive else 0)
|
regex = re.compile(r, re.IGNORECASE if args.case_insensitive else 0)
|
||||||
|
|
||||||
def check(m):
|
def check(m):
|
||||||
if (ids := args.ignore_ids) and m.id in ids:
|
|
||||||
return False
|
|
||||||
c = []
|
c = []
|
||||||
if regex:
|
if regex:
|
||||||
c.append(regex.search(m.content))
|
c.append(regex.search(m.content))
|
||||||
@ -252,8 +86,6 @@ async def clear(message):
|
|||||||
c.append(m.author.id in i)
|
c.append(m.author.id in i)
|
||||||
if args.reactions:
|
if args.reactions:
|
||||||
c.append(len(m.reactions) > 0)
|
c.append(len(m.reactions) > 0)
|
||||||
if args.attachments:
|
|
||||||
c.append(len(m.attachments) > 0)
|
|
||||||
return all(c)
|
return all(c)
|
||||||
|
|
||||||
messages = len(
|
messages = len(
|
||||||
|
@ -12,9 +12,7 @@ class Command(Enum):
|
|||||||
HELP = "help"
|
HELP = "help"
|
||||||
JOIN = "join"
|
JOIN = "join"
|
||||||
LEAVE = "leave"
|
LEAVE = "leave"
|
||||||
LOOKUP = "lookup"
|
|
||||||
PAUSE = "pause"
|
PAUSE = "pause"
|
||||||
PING = "ping"
|
|
||||||
PLAY = "play"
|
PLAY = "play"
|
||||||
PLAYING = "playing"
|
PLAYING = "playing"
|
||||||
PURGE = "purge"
|
PURGE = "purge"
|
||||||
|
@ -181,7 +181,7 @@ async def queue_or_play(message, edited=False):
|
|||||||
f"**{len(players[message.guild.id].queue)}.** {queued.format()}",
|
f"**{len(players[message.guild.id].queue)}.** {queued.format()}",
|
||||||
)
|
)
|
||||||
|
|
||||||
utils.cooldown(message, 2)
|
utils.cooldown(message, 3)
|
||||||
elif tokens[0].lower() == "play":
|
elif tokens[0].lower() == "play":
|
||||||
await resume(message)
|
await resume(message)
|
||||||
else:
|
else:
|
||||||
|
@ -16,16 +16,19 @@ def play_after_callback(e, message, once):
|
|||||||
def play_next(message, once=False, first=False):
|
def play_next(message, once=False, first=False):
|
||||||
if not message.guild.voice_client:
|
if not message.guild.voice_client:
|
||||||
return
|
return
|
||||||
|
|
||||||
message.guild.voice_client.stop()
|
message.guild.voice_client.stop()
|
||||||
|
|
||||||
if not disnake.opus.is_loaded():
|
|
||||||
utils.load_opus()
|
|
||||||
|
|
||||||
if message.guild.id in players and players[message.guild.id].queue:
|
if message.guild.id in players and players[message.guild.id].queue:
|
||||||
queued = players[message.guild.id].queue_pop()
|
queued = players[message.guild.id].queue_pop()
|
||||||
message.guild.voice_client.play(
|
try:
|
||||||
queued.player, after=lambda e: play_after_callback(e, message, once)
|
message.guild.voice_client.play(
|
||||||
)
|
queued.player, after=lambda e: play_after_callback(e, message, once)
|
||||||
|
)
|
||||||
|
except disnake.opus.OpusNotLoaded:
|
||||||
|
utils.load_opus()
|
||||||
|
message.guild.voice_client.play(
|
||||||
|
queued.player, after=lambda e: play_after_callback(e, message, once)
|
||||||
|
)
|
||||||
|
|
||||||
embed = queued.embed()
|
embed = queued.embed()
|
||||||
if first and len(players[message.guild.id].queue) == 0:
|
if first and len(players[message.guild.id].queue) == 0:
|
||||||
|
45
constants.py
45
constants.py
@ -35,56 +35,11 @@ RELOADABLE_MODULES = [
|
|||||||
"core",
|
"core",
|
||||||
"events",
|
"events",
|
||||||
"extra",
|
"extra",
|
||||||
"fun",
|
|
||||||
"tasks",
|
"tasks",
|
||||||
"utils",
|
"utils",
|
||||||
"voice",
|
"voice",
|
||||||
"youtubedl",
|
"youtubedl",
|
||||||
]
|
]
|
||||||
PUBLIC_FLAGS = {
|
|
||||||
1: "Discord Employee",
|
|
||||||
2: "Discord Partner",
|
|
||||||
4: "HypeSquad Events",
|
|
||||||
8: "Bug Hunter Level 1",
|
|
||||||
64: "HypeSquad Bravery",
|
|
||||||
128: "HypeSquad Brilliance",
|
|
||||||
256: "HypeSquad Balance",
|
|
||||||
512: "Early Supporter",
|
|
||||||
1024: "Team User",
|
|
||||||
16384: "Bug Hunter Level 2",
|
|
||||||
65536: "Verified Bot",
|
|
||||||
131072: "Verified Bot Developer",
|
|
||||||
262144: "Discord Certified Moderator",
|
|
||||||
524288: "HTTP Interactions Only",
|
|
||||||
4194304: "Active Developer",
|
|
||||||
}
|
|
||||||
BADGE_EMOJIS = {
|
|
||||||
"Discord Employee": "<:DiscordStaff:879666899980546068>",
|
|
||||||
"Discord Partner": "<:DiscordPartner:879668340434534400>",
|
|
||||||
"HypeSquad Events": "<:HypeSquadEvents:879666970310606848>",
|
|
||||||
"Bug Hunter Level 1": "<:BugHunter1:879666851448234014>",
|
|
||||||
"HypeSquad Bravery": "<:HypeSquadBravery:879666945153175612>",
|
|
||||||
"HypeSquad Brilliance": "<:HypeSquadBrilliance:879666956884643861>",
|
|
||||||
"HypeSquad Balance": "<:HypeSquadBalance:879666934717771786>",
|
|
||||||
"Early Supporter": "<:EarlySupporter:879666916493496400>",
|
|
||||||
"Team User": "<:TeamUser:890866907996127305>",
|
|
||||||
"Bug Hunter Level 2": "<:BugHunter2:879666866971357224>",
|
|
||||||
"Verified Bot": "<:VerifiedBot:879670687554498591>",
|
|
||||||
"Verified Bot Developer": "<:VerifiedBotDeveloper:879669786550890507>",
|
|
||||||
"Discord Certified Moderator": "<:DiscordModerator:879666882976837654>",
|
|
||||||
"HTTP Interactions Only": "<:HTTPInteractionsOnly:1047141867806015559>",
|
|
||||||
"Active Developer": "<:ActiveDeveloper:1047141451244523592>",
|
|
||||||
}
|
|
||||||
APPLICATION_FLAGS = {
|
|
||||||
1 << 12: "Presence Intent",
|
|
||||||
1 << 13: "Presence Intent (unverified)",
|
|
||||||
1 << 14: "Guild Members Intent",
|
|
||||||
1 << 15: "Guild Members Intent (unverified)",
|
|
||||||
1 << 16: "Unusual Growth (verification suspended)",
|
|
||||||
1 << 18: "Message Content Intent",
|
|
||||||
1 << 19: "Message Content Intent (unverified)",
|
|
||||||
1 << 23: "Suports Application Commands",
|
|
||||||
}
|
|
||||||
|
|
||||||
SECRETS = {
|
SECRETS = {
|
||||||
"TOKEN": os.getenv("BOT_TOKEN"),
|
"TOKEN": os.getenv("BOT_TOKEN"),
|
||||||
|
31
core.py
31
core.py
@ -47,7 +47,7 @@ async def on_message(message, edited=False):
|
|||||||
await command_locks[(message.guild.id, message.author.id)].acquire()
|
await command_locks[(message.guild.id, message.author.id)].acquire()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if (cooldowns := command_cooldowns.get(message.author.id)) and not edited:
|
if cooldowns := command_cooldowns.get(message.author.id):
|
||||||
if (end_time := cooldowns.get(matched)) and int(time.time()) < int(
|
if (end_time := cooldowns.get(matched)) and int(time.time()) < int(
|
||||||
end_time
|
end_time
|
||||||
):
|
):
|
||||||
@ -73,33 +73,37 @@ async def on_message(message, edited=False):
|
|||||||
end = time.time()
|
end = time.time()
|
||||||
if __debug__:
|
if __debug__:
|
||||||
debug(
|
debug(
|
||||||
f"reloaded {len(reloaded_modules)} modules in {round(end - start, 2)}s"
|
f"reloaded {len(reloaded_modules)} modules in {round(end-start, 2)}s"
|
||||||
)
|
)
|
||||||
|
|
||||||
await utils.add_check_reaction(message)
|
await utils.add_check_reaction(message)
|
||||||
|
|
||||||
case C.EXECUTE if message.author.id in OWNERS:
|
case C.EXECUTE if message.author.id in OWNERS:
|
||||||
code = message.content[len(tokens[0]) + 1 :].strip().strip("`")
|
code = message.content[len(tokens[0]) + 1 :].strip().strip("`")
|
||||||
for replacement in ["python", "py"]:
|
for replacement in ["python", "py"]:
|
||||||
if code.startswith(replacement):
|
if code.startswith(replacement):
|
||||||
code = code[len(replacement) :]
|
code = code[len(replacement) :]
|
||||||
|
|
||||||
|
stdout = io.StringIO()
|
||||||
try:
|
try:
|
||||||
stdout = io.StringIO()
|
|
||||||
with contextlib.redirect_stdout(stdout):
|
with contextlib.redirect_stdout(stdout):
|
||||||
wrapped_code = (
|
if "#globals" in code:
|
||||||
f"async def run_code():\n{textwrap.indent(code, ' ')}"
|
exec(
|
||||||
)
|
f"async def run_code():\n{textwrap.indent(code, ' ')}",
|
||||||
if "# globals" in code:
|
globals(),
|
||||||
exec(wrapped_code, globals())
|
)
|
||||||
await globals()["run_code"]()
|
await globals()["run_code"]()
|
||||||
else:
|
else:
|
||||||
dictionary = dict(locals(), **globals())
|
dictionary = dict(locals(), **globals())
|
||||||
exec(wrapped_code, dictionary, dictionary)
|
exec(
|
||||||
|
f"async def run_code():\n{textwrap.indent(code, ' ')}",
|
||||||
|
dictionary,
|
||||||
|
dictionary,
|
||||||
|
)
|
||||||
await dictionary["run_code"]()
|
await dictionary["run_code"]()
|
||||||
output = stdout.getvalue()
|
output = stdout.getvalue()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
output = "`" + str(e) + "`"
|
output = "`" + str(e) + "`"
|
||||||
|
|
||||||
output = utils.filter_secrets(output)
|
output = utils.filter_secrets(output)
|
||||||
|
|
||||||
if len(output) > 2000:
|
if len(output) > 2000:
|
||||||
@ -115,7 +119,6 @@ async def on_message(message, edited=False):
|
|||||||
await utils.add_check_reaction(message)
|
await utils.add_check_reaction(message)
|
||||||
else:
|
else:
|
||||||
await utils.reply(message, output)
|
await utils.reply(message, output)
|
||||||
|
|
||||||
case C.CLEAR | C.PURGE if message.author.id in OWNERS:
|
case C.CLEAR | C.PURGE if message.author.id in OWNERS:
|
||||||
await commands.tools.clear(message)
|
await commands.tools.clear(message)
|
||||||
case C.JOIN:
|
case C.JOIN:
|
||||||
@ -142,14 +145,10 @@ async def on_message(message, edited=False):
|
|||||||
await commands.voice.fast_forward(message)
|
await commands.voice.fast_forward(message)
|
||||||
case C.STATUS:
|
case C.STATUS:
|
||||||
await commands.bot.status(message)
|
await commands.bot.status(message)
|
||||||
case C.PING:
|
|
||||||
await commands.bot.ping(message)
|
|
||||||
case C.LOOKUP:
|
|
||||||
await commands.tools.lookup(message)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
await utils.reply(
|
await utils.reply(
|
||||||
message,
|
message,
|
||||||
f"exception occurred while processing command: ```\n{''.join(traceback.format_exception(e)).replace('`', '\\`')}```",
|
f"exception occurred while processing command: ```\n{"".join(traceback.format_exception(e)).replace("`", "\\`")}```",
|
||||||
)
|
)
|
||||||
raise e
|
raise e
|
||||||
finally:
|
finally:
|
||||||
|
@ -3,8 +3,6 @@ import threading
|
|||||||
import time
|
import time
|
||||||
from logging import info
|
from logging import info
|
||||||
|
|
||||||
import fun
|
|
||||||
|
|
||||||
import commands
|
import commands
|
||||||
import core
|
import core
|
||||||
import tasks
|
import tasks
|
||||||
@ -28,7 +26,6 @@ async def on_bulk_message_delete(messages):
|
|||||||
|
|
||||||
async def on_message(message):
|
async def on_message(message):
|
||||||
await core.on_message(message)
|
await core.on_message(message)
|
||||||
await fun.on_message(message)
|
|
||||||
|
|
||||||
|
|
||||||
async def on_message_delete(message):
|
async def on_message_delete(message):
|
||||||
|
2
extra.py
2
extra.py
@ -39,7 +39,7 @@ async def transcript(
|
|||||||
)
|
)
|
||||||
if len(messages) > max_messages:
|
if len(messages) > max_messages:
|
||||||
try:
|
try:
|
||||||
count = min(min_messages, len(messages))
|
count = max_messages - min_messages
|
||||||
if count == 1:
|
if count == 1:
|
||||||
await messages.pop().delete()
|
await messages.pop().delete()
|
||||||
else:
|
else:
|
||||||
|
7
fun.py
7
fun.py
@ -1,7 +0,0 @@
|
|||||||
import random
|
|
||||||
|
|
||||||
|
|
||||||
async def on_message(message):
|
|
||||||
if "gn" in message.content:
|
|
||||||
if random.random() < 0.01:
|
|
||||||
await message.add_reaction(random.choice(["💤", "😪", "😴", "🛌"]))
|
|
2
main.py
2
main.py
@ -14,7 +14,5 @@ if __name__ == "__main__":
|
|||||||
datefmt="%Y-%m-%d %T",
|
datefmt="%Y-%m-%d %T",
|
||||||
level=logging.DEBUG if __debug__ else logging.INFO,
|
level=logging.DEBUG if __debug__ else logging.INFO,
|
||||||
)
|
)
|
||||||
logging.getLogger("disnake").setLevel(logging.WARNING)
|
|
||||||
|
|
||||||
events.prepare()
|
events.prepare()
|
||||||
client.run(constants.SECRETS["TOKEN"])
|
client.run(constants.SECRETS["TOKEN"])
|
||||||
|
@ -1,3 +1,3 @@
|
|||||||
from . import test_filter_secrets, test_format_duration
|
from . import test_format_duration
|
||||||
|
|
||||||
__all__ = ["test_format_duration", "test_filter_secrets"]
|
__all__ = ["test_format_duration"]
|
||||||
|
@ -1,21 +0,0 @@
|
|||||||
import unittest
|
|
||||||
|
|
||||||
import utils
|
|
||||||
|
|
||||||
|
|
||||||
class TestFilterSecrets(unittest.TestCase):
|
|
||||||
def test_filter_secrets(self):
|
|
||||||
secret = "PLACEHOLDER_TOKEN"
|
|
||||||
self.assertFalse(
|
|
||||||
secret in utils.filter_secrets(f"HELLO{secret}WORLD", {"TOKEN": secret})
|
|
||||||
)
|
|
||||||
self.assertFalse(secret in utils.filter_secrets(secret, {"TOKEN": secret}))
|
|
||||||
self.assertFalse(
|
|
||||||
secret in utils.filter_secrets(f"123{secret}", {"TOKEN": secret})
|
|
||||||
)
|
|
||||||
self.assertFalse(
|
|
||||||
secret in utils.filter_secrets(f"{secret}{secret}", {"TOKEN": secret})
|
|
||||||
)
|
|
||||||
self.assertFalse(
|
|
||||||
secret in utils.filter_secrets(f"{secret}@#(*&*$)", {"TOKEN": secret})
|
|
||||||
)
|
|
33
utils.py
33
utils.py
@ -1,6 +1,6 @@
|
|||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
from logging import error, info
|
from logging import error, info, warning
|
||||||
|
|
||||||
import disnake
|
import disnake
|
||||||
|
|
||||||
@ -83,20 +83,12 @@ def format_duration(duration: int, natural: bool = False, short: bool = False):
|
|||||||
return separator.join(segments[:-1]) + f" and {segments[-1]}"
|
return separator.join(segments[:-1]) + f" and {segments[-1]}"
|
||||||
|
|
||||||
|
|
||||||
def parse_snowflake(id):
|
|
||||||
return round(((id >> 22) + 1420070400000) / 1000)
|
|
||||||
|
|
||||||
|
|
||||||
async def add_check_reaction(message):
|
async def add_check_reaction(message):
|
||||||
await message.add_reaction("✅")
|
await message.add_reaction("✅")
|
||||||
|
|
||||||
|
|
||||||
async def reply(message, *args, **kwargs):
|
async def reply(message, *args, **kwargs):
|
||||||
if message.id in message_responses:
|
if message.id in message_responses:
|
||||||
if len(args) == 0:
|
|
||||||
kwargs["content"] = None
|
|
||||||
elif len(kwargs) == 0:
|
|
||||||
kwargs["embeds"] = []
|
|
||||||
await message_responses[message.id].edit(
|
await message_responses[message.id].edit(
|
||||||
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
|
*args, **kwargs, allowed_mentions=disnake.AllowedMentions.none()
|
||||||
)
|
)
|
||||||
@ -120,8 +112,8 @@ async def invalid_user_handler(interaction):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def filter_secrets(text: str, secrets=constants.SECRETS) -> str:
|
def filter_secrets(text: str) -> str:
|
||||||
for secret_name, secret in secrets.items():
|
for secret_name, secret in constants.SECRETS.items():
|
||||||
if not secret:
|
if not secret:
|
||||||
continue
|
continue
|
||||||
text = text.replace(secret, f"<{secret_name}>")
|
text = text.replace(secret, f"<{secret_name}>")
|
||||||
@ -129,14 +121,13 @@ def filter_secrets(text: str, secrets=constants.SECRETS) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def load_opus():
|
def load_opus():
|
||||||
for path in filter(
|
warning("opus wasn't automatically loaded! trying to load manually...")
|
||||||
lambda p: os.path.exists(p),
|
for path in ["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"]:
|
||||||
["/usr/lib64/libopus.so.0", "/usr/lib/libopus.so.0"],
|
if os.path.exists(path):
|
||||||
):
|
try:
|
||||||
try:
|
disnake.opus.load_opus(path)
|
||||||
disnake.opus.load_opus(path)
|
info(f"successfully loaded opus from {path}")
|
||||||
info(f"successfully loaded opus from {path}")
|
return
|
||||||
return
|
except Exception as e:
|
||||||
except Exception as e:
|
error(f"failed to load opus from {path}: {e}")
|
||||||
error(f"failed to load opus from {path}: {e}")
|
|
||||||
raise Exception("could not locate working opus library")
|
raise Exception("could not locate working opus library")
|
||||||
|
18
youtubedl.py
18
youtubedl.py
@ -114,7 +114,7 @@ class QueuedSong:
|
|||||||
url=self.player.original_url,
|
url=self.player.original_url,
|
||||||
description=(
|
description=(
|
||||||
f"{'⏸️ ' if is_paused else ''}"
|
f"{'⏸️ ' if is_paused else ''}"
|
||||||
f"`[{'#' * int(progress * BAR_LENGTH)}{'-' * int((1 - progress) * BAR_LENGTH)}]` "
|
f"`[{'#'*int(progress * BAR_LENGTH)}{'-'*int((1 - progress) * BAR_LENGTH)}]` "
|
||||||
+ (
|
+ (
|
||||||
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
|
f"**{format_duration(int(self.player.original.progress))}** / **{format_duration(self.player.duration)}** (**{round(progress * 100)}%**)"
|
||||||
if self.player.duration
|
if self.player.duration
|
||||||
@ -123,20 +123,14 @@ class QueuedSong:
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.player.uploader_url:
|
embed.add_field(
|
||||||
embed.add_field(
|
name="Uploader",
|
||||||
name="Uploader",
|
value=f"[{self.player.uploader}]({self.player.uploader_url})",
|
||||||
value=f"[{self.player.uploader}]({self.player.uploader_url})",
|
)
|
||||||
)
|
|
||||||
else:
|
|
||||||
embed.add_field(
|
|
||||||
name="Uploader",
|
|
||||||
value=self.player.uploader,
|
|
||||||
)
|
|
||||||
embed.add_field(name="Likes", value=f"{self.player.like_count:,}")
|
embed.add_field(name="Likes", value=f"{self.player.like_count:,}")
|
||||||
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
|
embed.add_field(name="Views", value=f"{self.player.view_count:,}")
|
||||||
embed.add_field(name="Published", value=f"<t:{self.player.timestamp}>")
|
embed.add_field(name="Published", value=f"<t:{self.player.timestamp}>")
|
||||||
embed.add_field(name="Volume", value=f"{int(self.player.volume * 100)}%")
|
embed.add_field(name="Volume", value=f"{int(self.player.volume*100)}%")
|
||||||
|
|
||||||
embed.set_image(self.player.thumbnail_url)
|
embed.set_image(self.player.thumbnail_url)
|
||||||
embed.set_footer(
|
embed.set_footer(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user