This repository has been archived by the owner on Sep 12, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmain.py
126 lines (113 loc) · 5.75 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import random, asyncio
from sys import stderr
from datetime import datetime
from loguru import logger
from questionary import Choice, select
from tabulate import tabulate
from config import ACCOUNTS, PROXIES, WITHDRAW_ADDRESSES
from core import *
from settings import *
from functions import *
async def checker(zetachains: list[Zetachain]) -> None:
tasks = [loop.create_task(zetachain.check()) for zetachain in zetachains]
result = await asyncio.gather(*tasks)
result.sort(key=lambda x: x['№'])
print(tabulate(result, headers="keys", tablefmt="rounded_grid"))
def select_zetachains(zetachains: list[Zetachain]) -> list[Zetachain]:
if len(zetachains) == 1: return zetachains
print('Выберите аккаунты для работы. Формат: \n'
'1 — для выбора только первого аккаунта\n'
'1,2,3 — для выбора первого, второго и третьего аккаунта\n'
'1-3 — для выбора аккаунтов от первого до третьего включительно\n'
'all — для выбора всех аккаунтов (или нажмите Enter)\n')
result = input('Введите ваш выбор: ')
if result == 'all' or result == '': return zetachains
try:
if ',' in result:
return [zetachains[int(i) - 1] for i in result.split(',')]
elif '-' in result:
return zetachains[int(result.split('-')[0]) - 1:int(result.split('-')[1])]
elif '-' not in result and ',' not in result:
return [zetachains[int(result) - 1]]
except ValueError:
raise ValueError('Некорректный выбор аккаунтов!')
async def get_module() -> str:
return await select(
message="Выберите модуль для работы: ",
instruction='(используйте стрелочки для навигации)',
choices=[
Choice("🧠 Автоматический Маршрут", 'work'),
Choice("🧠 Кастомный маршрут", 'custom_way'),
Choice("💲 Вывод с ОКХ", "okx_withdraw"),
Choice("📝 Регистрация по рефке", 'enroll'),
Choice("💸 Перевод самому себе", 'transfer'),
Choice("🔄 Свап на iZUMi", 'izumi_swap'),
Choice("➕ Добавить ликвидность на iZUMi", 'izumi_liquidity'),
Choice("🔄 Свап на Eddy Finance", 'eddy_swap'),
Choice("🌹 Минт $stZETA", "mint_stzeta"),
Choice("🥇 Минт нфт бейджа на Ultiverse", "ultiverse_mint"),
Choice("🎁 Клейм поинтов", 'claim'),
Choice("💰 Депозит на адрес для вывода", 'withdraw'),
Choice("📊 Чекер", 'checker'),
Choice("💹 Проверить баланс ОКХ", 'okx_balance'),
Choice("🔙 Вернуться к выбору аккаунтов", 'back'),
Choice("❌ Выход", "exit"),
],
qmark="\n❓ ",
pointer="👉 "
).ask_async()
async def main(zetachains: list[Zetachain]) -> None | bool:
module = await get_module()
if module == 'exit':
raise KeyboardInterrupt
elif module == 'checker':
await checker(zetachains)
return
elif module == 'back':
return True
elif module == 'okx_balance':
await get_okx_balance()
return
for zetachain in zetachains:
if module in ['work', 'custom_way', 'okx_withdraw']:
await globals()[module](zetachain)
else:
await run_solo_module(module, zetachain)
if zetachain != zetachains[-1]:
if not zetachain.acc.proxy:
await wait_for_new_ip()
else:
await sleep(*SLEEP_BETWEEN_ACCS)
if __name__ == '__main__':
logger.remove()
format='<white>{time:HH:mm:ss}</white> | <bold><level>{level: <7}</level></bold> | <level>{message}</level>'
logger.add(sink=stderr, format=format)
logger.add(sink=f'logs/{datetime.today().strftime("%Y-%m-%d")}.log', format=format)
logger.info(f'Найдено: {len(ACCOUNTS)} аккаунтов, {len(PROXIES)} прокси, {len(WITHDRAW_ADDRESSES)} адресов для вывода')
accs = [Account(_id, key_mnemonic, PROXIES.get(_id) if USE_PROXY else None, WITHDRAW_ADDRESSES.get(_id))
for _id, key_mnemonic in enumerate(ACCOUNTS, 1)]
if SHUFFLE_WALLETS:
random.shuffle(accs)
for i, acc in enumerate(accs, 1): acc.account_id = i
logger.warning('Аккаунты перемешаны!')
if USE_PROXY: logger.warning('Используются прокси!')
else: logger.warning('Прокси не используются!')
if DO_ACTION_ANYWAY: logger.warning('Действия выполняются в любом случае!')
ZETACHAINS = [Zetachain(acc) for acc in accs]
loop = asyncio.get_event_loop()
if USE_OKX: loop.run_until_complete(get_okx_balance())
else: logger.warning('OKX не используется!')
loop.run_until_complete(checker(ZETACHAINS))
zetachains = select_zetachains(ZETACHAINS)
while True:
try:
result = loop.run_until_complete(main(zetachains))
if result:
loop.run_until_complete(checker(ZETACHAINS))
zetachains = select_zetachains(ZETACHAINS)
except KeyboardInterrupt:
break
except Exception as e:
logger.critical(e)
break
print('👋👋👋')