-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
155 lines (136 loc) · 5.69 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
import traceback
import asyncio
import requests
from aiohttp import web
from motor.motor_asyncio import AsyncIOMotorClient
# Custom Imports
from Imports.log_imports import logger
from Imports.discord_imports import *
from Imports.depend_imports import *
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Print loaded environment variables (masking sensitive info)
print("\033[93mLoaded Environment Variables:\033[0m")
for key, value in os.environ.items():
if key.startswith(("TOKEN", "PASSWORD", "SECRET")):
print(f"{key} = [REDACTED]")
else:
print(f"{key} = {value}")
class BotSetup(commands.AutoShardedBot):
def __init__(self):
intents = discord.Intents.all()
intents.members = True
self.prefix = "..."
super().__init__(
command_prefix=commands.when_mentioned_or(self.prefix),
intents=intents,
help_command=None,
shard_count=5,
shard_reconnect_interval=10,
heartbeat_timeout=120,
)
self.mongo_client = None
self.DB_NAME = "Bot"
self.COLLECTION_NAME = "information"
self.token_field = "Token"
async def on_ready(self):
print(f"\033[92mLogged in as {self.user} (ID: {self.user.id})\033[0m")
async def get_token_from_db(self):
mongo_url = os.getenv("MONGO_URI")
if not mongo_url:
raise ValueError("No MONGO_URI found in environment variables")
self.mongo_client = AsyncIOMotorClient(mongo_url)
db = self.mongo_client[self.DB_NAME]
collection = db[self.COLLECTION_NAME]
token_data = await collection.find_one({self.token_field: {"$exists": True}})
if token_data:
return token_data.get(self.token_field)
else:
raise ValueError("No token found in the database")
async def start_bot(self):
await self.setup()
token = await self.get_token_from_db()
if not token:
logger.error("No token found. Please check the database.")
return
try:
await self.start(token)
except KeyboardInterrupt:
await self.close()
except Exception as e:
logger.error(f"An error occurred while logging in: {e}\n{traceback.format_exc()}")
finally:
if self.is_closed():
print("Bot is closed, cleaning up.")
else:
print("Bot is still running.")
await self.close()
async def setup(self):
print("\n\033[94m• —— Cogs/\033[0m")
await self.import_cogs("Cogs")
print("\n\033[94m• —— Events/\033[0m")
await self.import_cogs("Events")
print("\n\033[94m===== Setup Completed =====\033[0m")
async def import_cogs(self, dir_name):
for filename in os.listdir(dir_name):
if filename.endswith(".py"):
print(f"\033[94m| ├── {filename}\033[0m")
module = __import__(f"{dir_name}.{os.path.splitext(filename)[0]}", fromlist=[""])
for obj_name in dir(module):
obj = getattr(module, obj_name)
if isinstance(obj, commands.CogMeta):
if not self.get_cog(obj_name):
await self.add_cog(obj(self))
print(f"\033[92m| | └── {obj_name}\033[0m")
async def check_rate_limit():
url = "https://discord.com/api/v10/users/@me"
token = await BotSetup().get_token_from_db()
headers = {"Authorization": f"Bot {token}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
remaining_requests = int(response.headers.get("X-RateLimit-Remaining", 1))
rate_limit_reset_after = float(response.headers.get("X-RateLimit-Reset-After", 0))
if remaining_requests <= 0:
logger.error(f"Rate limit exceeded. Retry after {rate_limit_reset_after} seconds.")
print(f"Rate limit exceeded. Please wait for {rate_limit_reset_after} seconds before retrying.")
await asyncio.sleep(rate_limit_reset_after)
else:
logger.error(f"Failed to check rate limit. Status code: {response.status_code}")
async def start_http_server():
try:
app = web.Application()
async def handle_index(request):
return web.Response(text="Bot is running", content_type="text/html")
app.router.add_get("/", handle_index)
runner = web.AppRunner(app)
await runner.setup()
port = int(os.getenv("PORT", 8080))
site = web.TCPSite(runner, "0.0.0.0", port)
await site.start()
print(f"HTTP server started on port {port}")
except Exception as e:
logger.error(f"Failed to start HTTP server: {e}")
print("Failed to start HTTP server.")
async def main():
bot = BotSetup()
try:
await check_rate_limit()
await start_http_server() # Ensure HTTP server starts before bot
await bot.start_bot()
except discord.HTTPException as e:
if e.status == 429:
retry_after = int(e.response.headers.get("Retry-After", 0))
logger.error(f"Rate limit exceeded. Retry after {retry_after} seconds.")
print(f"Rate limit exceeded. Please wait for {retry_after} seconds before retrying.")
await asyncio.sleep(retry_after)
else:
logger.error(f"An error occurred: {e}\n{traceback.format_exc()}")
except Exception as e:
logger.error(f"An error occurred: {e}\n{traceback.format_exc()}")
finally:
await bot.close()
if __name__ == "__main__":
load_dotenv(dotenv_path=os.path.join(".github", ".env"))
asyncio.run(main())