This repository has been archived by the owner on Dec 11, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.py
171 lines (131 loc) · 5.55 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import os, sys, time, json, threading
from operagxdriver import start_opera_driver
from selenium.webdriver.common.by import By
from win32com.shell import shell, shellcon
from colorama import init, Fore
init(autoreset=True) # Initialize colorama for colored output
def print_debug(symbol_type_and_colour: int, message, thread_id):
colour = Fore.RESET # Initialize colour to a default value
symbol = "!!" # Default value for symbol
if symbol_type_and_colour == 1:
symbol = "+"
colour = Fore.GREEN
elif symbol_type_and_colour == 2:
symbol = "/"
colour = Fore.YELLOW
elif symbol_type_and_colour == 3:
symbol = "!"
colour = Fore.RED
print(
f"[{colour}{symbol}{Fore.RESET}] {colour}{message}{Fore.RESET} | {Fore.CYAN}(TID: {thread_id}){Fore.RESET}"
)
def load_config():
config_path = "config.json"
if os.path.exists(config_path):
with open(config_path, "r") as file:
return json.load(file)
def find_opera_gx_directory():
app_data_local = shell.SHGetFolderPath(0, shellcon.CSIDL_LOCAL_APPDATA, None, 0)
default_opera_gx_path = os.path.join(app_data_local, "Programs", "Opera GX")
default_opera_exe_path = os.path.join(default_opera_gx_path, "opera.exe")
if os.path.exists(default_opera_exe_path):
print_debug(1, f"Found Opera GX path: {default_opera_exe_path}", thread_id=None)
config = load_config() or {"opera_browser_exe": "", "opera_driver_exe": ""}
if not config["opera_browser_exe"]:
config["opera_browser_exe"] = default_opera_exe_path
with open("config.json", "w") as file:
json.dump(config, file, indent=4)
print(
"Default Opera GX path saved to config.json. Please restart the script."
)
sys.exit()
return True
print_debug(3, "Opera GX path not found", thread_id=None)
return False
def process_thread(thread_id):
config = load_config()
if not config["opera_browser_exe"] and not config["opera_driver_exe"]:
default_opera_path = find_opera_gx_directory()
custom_browser_path = input(f"Enter the path to Opera browser executable: ")
custom_driver_path = input("Enter the path to Opera WebDriver executable: ")
config["opera_browser_exe"] = custom_browser_path or default_opera_path
config["opera_driver_exe"] = custom_driver_path
elif not config["opera_browser_exe"]:
default_opera_path = find_opera_gx_directory()
custom_browser_path = input(f"Enter the path to Opera browser executable: ")
config["opera_browser_exe"] = custom_browser_path or config["opera_browser_exe"]
with open("config.json", "w") as file:
json.dump(config, file, indent=4)
elif not config["opera_driver_exe"]:
custom_driver_path = input("Enter the path to Opera WebDriver executable: ")
config["opera_driver_exe"] = custom_driver_path
with open("config.json", "w") as file:
json.dump(config, file, indent=4)
opera_driver_exe = config["opera_driver_exe"]
opera_browser_exe = config["opera_browser_exe"]
driver = None
try:
driver = start_opera_driver(
opera_browser_exe=opera_browser_exe,
opera_driver_exe=opera_driver_exe,
arguments=(
"--no-sandbox",
"--test-type",
"--no-default-browser-check",
"--no-first-run",
"--incognito",
"--start-maximized",
# "--headless",
),
)
initial_url = "https://www.opera.com/gx/discord-nitro"
driver.get(initial_url)
print_debug(2, f"Opening: {initial_url}", thread_id)
time.sleep(3)
driver.execute_script("document.getElementById('claim-button').click()")
print_debug(2, "Clicking claim button", thread_id)
time.sleep(3)
time.sleep(5)
for tab_index, tab_handle in enumerate(driver.window_handles):
driver.switch_to.window(tab_handle)
current_tab_url = driver.current_url
if "discord.com" in current_tab_url:
print_debug(1, f"Received URL: {current_tab_url}", thread_id)
with open("output.txt", "a") as file:
file.write(current_tab_url + "\n\n")
print_debug(2, "Writing to output.txt", thread_id)
break
except Exception as e:
print_debug(3, f"Thread {thread_id} - Error: {e}", Fore.RED, thread_id)
finally:
if driver:
try:
driver.quit()
time.sleep(5)
print_debug(1, "Successfully saved URL, closing...", thread_id)
except Exception as close_error:
print_debug(
3,
f"Thread {thread_id} - Error closing driver: {close_error}",
Fore.RED,
thread_id,
)
def main():
try:
num_threads = int(
input("Enter the number of threads to open (default is 1): ") or 1
)
while True:
threads = []
for i in range(num_threads):
thread = threading.Thread(
target=process_thread, args=(i + 1,), daemon=True
)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
except KeyboardInterrupt:
print("Script terminated by the user.")
if __name__ == "__main__":
main()