-
Notifications
You must be signed in to change notification settings - Fork 104
/
main.py
235 lines (195 loc) · 8.25 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""
Copyright (C) 2023-2024 Fern Lane
This file is part of the GPT-Telegramus distribution
(see <https://github.com/F33RNI/GPT-Telegramus>)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import argparse
from ctypes import c_double
import json
import logging
import multiprocessing
import os
import sys
from typing import Dict
from _version import __version__
import logging_handler
import messages
import users_handler
import queue_handler
import bot_handler
import module_wrapper_global
# Default config file
CONFIG_FILE = "config.json"
CONFIG_COMPATIBLE_VERSIONS = [5, 6, 7, 8]
def load_and_parse_config(config_file: str) -> Dict:
"""Loads and parses config from main file and from module's config files
This is separate because of /restart command
Args:
config_file (str): path to main config file
Raises:
Exception: loading / parsing / version error
Returns:
Dict: loaded and parsed config
"""
logging.info(f"Loading config file {config_file}")
with open(config_file, "r", encoding="utf-8") as file:
config = json.loads(file.read())
# Check config version
config_version = config.get("config_version")
if config_version is None:
raise Exception("No config_version key! Please update your config file")
if not config_version in CONFIG_COMPATIBLE_VERSIONS:
raise Exception(
f"Your config version ({config_version}) is not compatible! "
f"Compatible versions: {', '.join(str(version) for version in CONFIG_COMPATIBLE_VERSIONS)}"
)
if config_version < max(CONFIG_COMPATIBLE_VERSIONS):
logging.warning(f"You config version {config_version} < {max(CONFIG_COMPATIBLE_VERSIONS)}! Please update it")
# List of enabled modules
enabled_modules = config.get("modules").get("enabled")
if len(enabled_modules) == 0:
raise Exception("No modules enabled")
logging.info(f"Enabled modules: {', '.join(enabled_modules)}")
# Load config of enabled modules and merge it into global config
module_configs_dir = config.get("files").get("module_configs_dir")
logging.info(f"Parsing {module_configs_dir} directory")
for file in os.listdir(module_configs_dir):
# Parse only .json files
if file.lower().endswith(".json"):
# Check if need to load it
module_name_from_file = os.path.splitext(os.path.basename(file))[0]
if module_name_from_file not in enabled_modules:
continue
# Parse and merge
logging.info(f"Adding config of {module_name_from_file} module")
with open(os.path.join(module_configs_dir, file), "r", encoding="utf-8") as file_:
module_config = json.loads(file_.read())
config[module_name_from_file] = module_config
return config
def parse_args() -> argparse.Namespace:
"""Parses cli arguments
Returns:
argparse.Namespace: parsed arguments
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"-c",
"--config",
type=str,
default=os.getenv("TELEGRAMUS_CONFIG_FILE", CONFIG_FILE),
required=False,
help=f"path to config.json file (Default: {os.getenv('TELEGRAMUS_CONFIG_FILE', CONFIG_FILE)})",
)
parser.add_argument("-v", "--version", action="version", version=__version__)
return parser.parse_args()
def main():
"""Main entry"""
# Multiprocessing fix for Windows
if sys.platform.startswith("win"):
multiprocessing.freeze_support()
# Parse arguments
args = parse_args()
# Initialize logging and start logging listener as process
logging_handler_ = logging_handler.LoggingHandler()
logging_handler_process = multiprocessing.Process(target=logging_handler_.configure_and_start_listener)
logging_handler_process.start()
logging_handler.worker_configurer(logging_handler_.queue, log_test_message=False)
# Log software version and GitHub link
logging.info(f"GPT-Telegramus version: {__version__}")
logging.info("https://github.com/F33RNI/GPT-Telegramus")
modules = {}
# Catch errors during initialization process
initialization_ok = False
try:
# Load config
config = multiprocessing.Manager().dict(load_and_parse_config(args.config))
# Create conversations and user images dirs (it's not necessary but just in case)
conversations_dir = config.get("files").get("conversations_dir")
if not os.path.exists(conversations_dir):
logging.info(f"Creating {conversations_dir} directory")
os.makedirs(conversations_dir)
user_images_dir = config.get("files").get("user_images_dir")
if not os.path.exists(user_images_dir):
logging.info(f"Creating {user_images_dir} directory")
os.makedirs(user_images_dir)
# Initialize users and messages handlers
users_handler_ = users_handler.UsersHandler(config)
messages_ = messages.Messages(users_handler_)
# Load messages
messages_.langs_load(config.get("files").get("messages_dir"))
web_cooldown_timer = multiprocessing.Value(c_double, 0.0)
web_request_lock = multiprocessing.Lock()
# modules = {} is a dictionary of ModuleWrapperGlobal (each enabled module)
# {
# "module_name": ModuleWrapperGlobal,
# ...
# }
for module_name in config.get("modules").get("enabled"):
logging.info(f"Trying to load and initialize {module_name} module")
use_web = (
module_name.startswith("lmao_")
and module_name in config.get("modules").get("lmao_web_for_modules", [])
and "lmao_web_api_url" in config.get("modules")
)
try:
module = module_wrapper_global.ModuleWrapperGlobal(
module_name,
config,
messages_,
users_handler_,
logging_handler_.queue,
use_web,
web_cooldown_timer=web_cooldown_timer,
web_request_lock=web_request_lock,
)
modules[module_name] = module
except Exception as e:
logging.error(f"Error initializing {module_name} module: {e} Module will be ignored")
# Initialize main classes
queue_handler_ = queue_handler.QueueHandler(
config, messages_, users_handler_, logging_handler_.queue, None, modules
)
bot_handler_ = bot_handler.BotHandler(
config,
args.config,
messages_,
users_handler_,
logging_handler_.queue,
queue_handler_,
modules,
web_cooldown_timer,
web_request_lock,
)
queue_handler_.prevent_shutdown_flag = bot_handler_.prevent_shutdown_flag
# At least, initialization did not raised any error
initialization_ok = True
except Exception as e:
logging.error("Initialization error", exc_info=e)
# Finally, start queue handler and bot polling (blocking)
if initialization_ok:
queue_handler_.start_processing_loop()
bot_handler_.start_bot()
# Stop queue handler
queue_handler_.stop_processing_loop()
# Close (stop) each module
for module_name, module in modules.items():
logging.info(f"Trying to close and unload {module_name} module")
try:
module.on_exit()
except Exception as e:
logging.error(f"Error closing {module_name} module", exc_info=e)
# Finally, stop logging loop
logging.info("GPT-Telegramus exited")
logging_handler_.queue.put(None)
if __name__ == "__main__":
main()