-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathmain.py
320 lines (261 loc) · 9.48 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
"""
Copyright (C) School Simplified - All Rights Reserved
* Permission is granted to use this application as a code reference for educational purposes.
* Written by School Simplified, IT Dept. <timmy@schoolsimplified.org>, March 2022
"""
__author__ = "School Simplified, IT Dept."
__author_email__ = "timmy@schoolsimplified.org"
import asyncio
import faulthandler
import logging
import os
import time
from datetime import datetime, timedelta
from typing import Union
import discord
import uvicorn
from alive_progress import alive_bar
from discord import app_commands
from discord.ext import commands
from discord_sentry_reporting import use_sentry
from dotenv import load_dotenv
from fastapi import Depends, FastAPI, HTTPException, status, Header
from slowapi.errors import RateLimitExceeded
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
# from googletrans import Translator
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
from pygit2 import Repository, GIT_DESCRIBE_TAGS
import core.common
from core import database
from core.common import get_extensions
from core.logging_module import get_log
from core.special_methods import (
before_invoke_,
initializeDB,
main_mode_check_,
on_app_command_error_,
on_command_error_,
on_ready_,
on_command_, authenticate_user, JSONPayload, create_gsuite,
)
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
load_dotenv()
faulthandler.enable()
logger = logging.getLogger("discord")
logger.setLevel(logging.INFO)
_log = get_log(__name__)
_log.info("Starting Timmy...")
"""class TimmyTranslator(app_commands.Translator):
async def load(self) -> None:
_log.info('Translator loaded')
async def unload(self) -> None:
_log.info('Translator unloaded')
async def translate(self, string: app_commands.locale_str, locale: discord.Locale, context: app_commands.TranslationContext):
translator = Translator()
locale = str(locale)
#reg_local = None
if "-" in locale:
#locale = locale.split("-")[1]
locale = locale.split("-")[0]
_log.info(locale)
try:
translated_text = translator.translate(string, dest=locale).text
except Exception as e:
return None
return translated_text"""
class TimmyCommandTree(app_commands.CommandTree):
def __init__(self, bot):
super().__init__(bot)
self.bot = bot
async def interaction_check(self, interaction: discord.Interaction, /) -> bool:
blacklisted_users = [p.discordID for p in database.Blacklist]
if interaction.user.avatar is None:
await interaction.response.send_message("Due to a discord limitation, you must have an avatar set to use this command.")
return False
if interaction.user.id in blacklisted_users:
await interaction.response.send_message(
"You have been blacklisted from using commands!", ephemeral=True
)
return False
return True
async def on_error(
self, interaction: discord.Interaction, error: app_commands.AppCommandError
):
await on_app_command_error_(self.bot, interaction, error)
class Timmy(commands.Bot):
"""
Generates a Timmy Instance.
"""
def __init__(self, uptime: time.time):
super().__init__(
command_prefix=commands.when_mentioned_or(os.getenv("PREFIX")),
intents=discord.Intents.all(),
case_insensitive=True,
tree_cls=TimmyCommandTree,
activity=discord.Activity(
type=discord.ActivityType.watching, name="/help | ssimpl.org/timmy"
),
)
self.help_command = None
self.before_invoke(self.analytics_before_invoke)
self.add_check(self.check)
self._start_time = uptime
async def on_ready(self):
await on_ready_(self)
async def on_command_error(self, ctx: commands.Context, error: Exception):
await on_command_error_(self, ctx, error)
async def on_command(self, ctx: commands.Context):
await on_command_(self, ctx)
async def analytics_before_invoke(self, ctx: commands.Context):
await before_invoke_(ctx)
async def check(self, ctx: commands.Context):
return await main_mode_check_(ctx)
async def setup_hook(self) -> None:
with alive_bar(
len(get_extensions()),
ctrl_c=False,
bar="bubbles",
title="Initializing Cogs:",
) as bar:
for ext in get_extensions():
try:
await bot.load_extension(ext)
except commands.ExtensionAlreadyLoaded:
await bot.unload_extension(ext)
await bot.load_extension(ext)
except commands.ExtensionNotFound:
raise commands.ExtensionNotFound(ext)
bar()
# await bot.tree.set_translator(TimmyTranslator())
async def is_owner(self, user: discord.User):
admin_ids = []
query = database.Administrators.select().where(
database.Administrators.TierLevel >= 3
)
for admin in query:
admin_ids.append(admin.discordID)
if user.id in admin_ids:
return True
return await super().is_owner(user)
@property
def version(self):
"""
Returns the current version of the bot.
"""
repo = Repository(".")
current_commit = repo.head
current_branch = repo.head.shorthand
version = ... # type: str
if current_branch == "HEAD":
current_tag = repo.describe(committish=current_commit, describe_strategy=GIT_DESCRIBE_TAGS)
version = f"{current_tag} (stable)"
else:
version = "development"
version += f" | {str(current_commit.target)[:7]}"
return version
@property
def author(self):
"""
Returns the author of the bot.
"""
return __author__
@property
def author_email(self):
"""
Returns the author email of the bot.
"""
return __author_email__
@property
def start_time(self):
"""
Returns the time the bot was started.
"""
return self._start_time
bot = Timmy(time.time())
@bot.event
async def on_interaction(interaction: discord.Interaction):
if interaction.type == discord.InteractionType.application_command:
database.CommandAnalytics.create(
command=interaction.command.name,
guild_id=interaction.guild.id,
user=interaction.user.id,
date=datetime.now(),
command_type="slash",
).save()
if os.getenv("DSN_SENTRY") is not None:
sentry_logging = LoggingIntegration(
level=logging.INFO, # Capture info and above as breadcrumbs
event_level=logging.ERROR, # Send errors as events
)
# Traceback tracking, DO NOT MODIFY THIS
use_sentry(
bot,
dsn=os.getenv("DSN_SENTRY"),
traces_sample_rate=1.0,
integrations=[FlaskIntegration(), sentry_logging],
)
initializeDB(bot)
@bot.tree.context_menu(name='Prompts an assignment title.', guild=discord.Object(id=core.common.MainID.g_main))
async def report_message(interaction: discord.Interaction, message: discord.Message):
await message.reply(
"Please send a screenshot or image of the assignment title so that the Helper team can confirm its not a "
"quiz or test of any kind!"
)
await interaction.response.send_message(
"Prompted!", ephemeral=True
)
"""if not os.getenv("StartAPI"):
bot.run(os.getenv("TOKEN"))"""
@app.on_event("startup")
async def startup_event():
asyncio.create_task(bot.start(os.getenv("TOKEN")))
@app.get("/")
async def root():
current_time = float(time.time())
difference = int(round(current_time - float(bot.start_time)))
text = str(timedelta(seconds=difference))
return {
"uptime": text,
"version": bot.version,
"author": bot.author,
"author_email": bot.author_email,
"start_time": bot.start_time,
"latency": bot.latency,
}
"""@app.post("/send")
@limiter.limit("5/minute")
async def send_route(token: str = Header(), payload: JSONPayload = Depends()):
user = authenticate_user(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# check if user is authorized to use this route
if payload.action not in user.authorized_routes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authorized to use this route",
headers={"WWW-Authenticate": "Bearer"},
)
if user.disabled:
raise HTTPException(
status_code=400, detail="Inactive user"
)
if payload.action == "route_send":
print("Creating new record")
print(payload.payload)
return status.HTTP_200_OK
elif payload.action == "gsuite":
create_gsuite(payload)"""
if __name__ == "__main__":
if os.getenv("StartAPI") == "True":
uvicorn.run(app, host="0.0.0.0", port=80)
else:
bot.run(os.getenv("TOKEN"))