This repository has been archived by the owner on Jun 26, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
135 lines (106 loc) · 4.62 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from json import load
from time import sleep
from telepotpro import Bot
from pony.orm import db_session
from telepotpro.namedtuple import (InlineQueryResultArticle, InputTextMessageContent,
InlineKeyboardMarkup, InlineKeyboardButton)
from modules.api import SubscriberAPI
from modules.database import Chat, Data
with open("settings.json") as f:
settings = load(f)
bot = Bot(settings['token'])
api = SubscriberAPI()
LAST_MILESTONE = None
@db_session
def updateData():
data = Data.get(id=0)
data.mrbeast, _ = api.get_subscribers("UCX6OQ3DkcsbYNE6H8uQQuVA")
data.tseries, _ = api.get_subscribers("UCq-Fj5jknLsUf-MWSy4_brA")
@db_session
def sendAlerts():
global LAST_MILESTONE
data = Data.get(id=0)
if LAST_MILESTONE is None:
LAST_MILESTONE = data.diff
return
# send every 500k milestone
if data.diff // 500_000 > LAST_MILESTONE // 500_000:
for chat in Chat.select(lambda c: c.wantsAlert):
bot.sendMessage(chat.chatId, f"📈 <b>Good news!</b>\n"
f"The gap between MrBeast and T-Series has reached {data.diff} subs 👀",
parse_mode="HTML")
# if overtaken
if LAST_MILESTONE < 0 and data.diff > 0:
for chat in Chat.select(lambda c: c.wantsAlert):
bot.sendMessage(chat.chatId, f"🚨 <b>BREAKING NEWS!</b>\n"
f"<b>MrBeast</b> has overtaken <b>T-Series</b> in subscribers! 🎉\n\n"
f"{leaderboard()}",
parse_mode="HTML")
LAST_MILESTONE = data.diff
@db_session
def leaderboard() -> str:
data = Data.get(id=0)
mrbeast = f"<b>MrBeast:</b> <code>{data.mrbeast:,}</code>"
tseries = f"<b>T-Series:</b> <code>{data.tseries:,}</code>"
p1 = mrbeast if data.mrbeast > data.tseries else tseries
p2 = tseries if data.mrbeast > data.tseries else mrbeast
return (f"<b>Current subscribers status:</b>\n"
f"🏆 {p1}\n"
f"🥈 {p2}\n"
f"<b>Gap:</b> <code>{data.diff:,}</code> subs")
@db_session
def reply(msg):
chatId = msg['chat']['id']
name = msg['from']['first_name']
text = msg.get("text").replace("@" + bot.getMe()['username'], "")
if not (chat := Chat.get(chatId=chatId)):
chat = Chat(chatId=chatId)
if text == "/start":
bot.sendMessage(chatId, f"<b>Hi, {name}!</b>\n"
f"I'm the BeastSeries Bot 🤖. I monitor the live subsriber count of MrBeast vs. T-Series.\n"
f"I can also send you notifications if you use /alert!\n\n"
f"{leaderboard()}\n\n"
f"<i>Hint: use</i> /subs <i>to only show the stats.\n"
f"The bot also works inline and in groups!</i>",
parse_mode="HTML", reply_markup=InlineKeyboardMarkup(inline_keyboard=[[
InlineKeyboardButton(text="💬 Try me inline!", switch_inline_query="")]]))
elif text == "/subs":
bot.sendMessage(chatId, leaderboard(), parse_mode="HTML")
elif text == "/alert":
chat.wantsAlert = not chat.wantsAlert
if chat.wantsAlert:
bot.sendMessage(chatId, "🔔 <i>Alerts have been successfully activated for this chat!</i>\n"
"Use /alert again to toggle them off.", parse_mode="HTML")
else:
bot.sendMessage(chatId, "🔕 <i>Alerts have been successfully deactivated for this chat.</i>\n"
"Use /alert again to toggle them on.", parse_mode="HTML")
elif text == "/users":
bot.sendMessage(chatId, f"👤 <b>Users:</b> {Chat.select().count()}", parse_mode="HTML")
@db_session
def query(msg):
queryId = msg['id']
data = Data.get(id=0)
results = [
InlineQueryResultArticle(
id="subs",
title="MrBeast vs. T-Series",
input_message_content=InputTextMessageContent(
message_text=leaderboard(),
parse_mode="HTML"
),
description=f"{data.mrbeast:,} vs. {data.tseries:,}",
thumb_url="https://i.imgur.com/lyZvq8T.png"
)
]
bot.answerInlineQuery(queryId, results, cache_time=60, is_personal=False)
def main():
with db_session:
if not Data.exists():
Data(id=0)
bot.message_loop({'chat': reply, 'inline_query': query})
while True:
updateData()
sendAlerts()
sleep(120)
if __name__ == "__main__":
main()