-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathllm_analyze.py
225 lines (188 loc) · 8.1 KB
/
llm_analyze.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import requests
import json
from config import LLM_CONFIG, QUERY_CONFIG
import os
import time
def llm_analyze(llm_model_name, query_name, context=None):
"""
Sends a query to a specified LLM model and returns the response.
Args:
llm_model_name (str): The name of the LLM model to use (e.g., 'amp1', 'openrouter_llama3.3:70b').
query_name (str): The name of the query to use (e.g., 'BEZUGSPREIS_ABFRAGE').
context (str, optional): An optional context string to add to the query. Defaults to None.
Returns:
str: The response from the LLM model, or None if an error occurs.
"""
llm_config = LLM_CONFIG.get(llm_model_name)
if not llm_config:
print(f"Error: No configuration found for LLM model: {llm_model_name}")
return None
query_config = QUERY_CONFIG.get(query_name)
if not query_config:
print(f"Error: No query found for query name: {query_name}")
return None
query = query_config[0].get("QUERY")
# Add context to the query if provided
if context:
query = f"{query}\n\n{context}"
base_url = llm_config[0].get("BASEURL")
api_key = llm_config[0].get("APIKEY")
model = llm_config[0].get("MODEL")
headers = {
"Content-Type": "application/json"
}
if 'openrouter' in llm_model_name:
headers['Authorization'] = f'Bearer {api_key}'
data = {
"model": model,
"messages": [{"role": "user", "content": query}]
}
elif 'amp1' in llm_model_name:
data = {
"prompt": query,
"model": model
}
else: # openai fallback
headers['Authorization'] = f'Bearer {api_key}'
data = {
"model": model,
"messages": [{"role": "user", "content": query}]
}
try:
if 'openrouter' in llm_model_name:
response = requests.post(
f"{base_url}/chat/completions", headers=headers, json=data)
elif 'amp1' in llm_model_name:
response = requests.post(
f"{base_url}/v1/completions", headers=headers, json=data)
else:
response = requests.post(
f"{base_url}/chat/completions", headers=headers, json=data)
response.raise_for_status() # Raise an exception for bad status codes
if 'openrouter' in llm_model_name:
return response.json()['choices'][0]['message']['content']
elif 'amp1' in llm_model_name:
return response.json()['choices'][0]['text']
else:
return response.json()['choices'][0]['message']['content']
except requests.exceptions.RequestException as e:
print(f"Error during request to {llm_model_name}: {e}")
return None
except KeyError as e:
print(f"Error parsing response from {llm_model_name}: {e}")
print(response.text)
return None
def llmanalyze_files(llm_model='arli_nemo', files='crawl_', query_to_use='TARIFLISTE_ABFRAGE', maxtokens=12000):
"""
Processes files in the 'data/crawls' directory, sends them to the LLM for analysis,
and saves the results to a report file.
Args:
llm_model (str): The name of the LLM model to use.
query_to_use (str): The name of the query to use.
maxtokens (int, optional): The maximum number of tokens to use from a file. Defaults to 12000.
"""
flist = [f for f in os.listdir('data/crawls') if files in f]
print(flist)
# Create or open the report file
report_file_path = f'data/crawls/report_{time.strftime("%Y%m%d")}.txt'
with open(report_file_path, 'a', encoding='utf-8') as report_file:
# Loop through the file list
for f in flist:
with open(f'data/crawls/{f}', 'r', encoding='utf-8') as file:
example_context = file.read()
tokens = round(len(example_context) / 4)
if tokens > maxtokens:
print(f"Context is too long ({tokens} tokens). Truncated at {
maxtokens} tokens.")
example_context = example_context[:maxtokens * 4]
print(f' {llm_model} ({flist.index(f) + 1}/{len(flist)
}) analyzing {f} / {tokens} tokens')
result = llm_analyze(
llm_model, query_to_use, context=example_context)
if result:
# Print response in light grey
print(f"\033[37mResponse from {
llm_model}:\n{result[:2000]}\033[0m")
# Append the result to the report file
# stromanbieter is the second part of the filename
Stromanbietername = f.split('_')[1]
report_file.write(
f"-- Stromanbieter: {Stromanbietername}\n{result}\n\n")
# Wait 2s
time.sleep(2)
else:
print(f"Failed to get a response from {llm_model}.")
break
return report_file_path
def solidify_report(report_file_path='Default', query_to_use='Standard', llm_model='arli_nemo', ending='solid.txt'):
"""
Reads the content of a report file, sends it to the LLM for solidification
using the specified query, and saves the solidified report to a new file.
Args:
report_file_path (str): The path to the report file.
query_to_use (str): The name of the query to use for solidification.
llm_model (str, optional): The name of the LLM model to use. Defaults to 'arli_nemo'.
"""
try:
with open(report_file_path, 'r', encoding='utf-8') as report_file:
report_content = report_file.read()
print(f"Analyzing report with {llm_model} and query: {query_to_use}")
solidified_result = llm_analyze(
llm_model, query_to_use, context=report_content)
if solidified_result:
# Create solidified report file
solid_report_file_path = f'{os.path.splitext(report_file_path)[
0]}_{ending}'
with open(solid_report_file_path, 'w', encoding='utf-8') as solid_report_file:
solid_report_file.write(solidified_result)
print(f"Solidified report saved to: {solid_report_file_path}")
return solid_report_file_path
else:
print(f"Failed to get a solidified response from {llm_model}.")
return None
except FileNotFoundError:
print(f"Error: Report file not found at {report_file_path}")
return None
except Exception as e:
print(f"An unexpected error occurred: {e}")
return None
def del_files(contains='report_', doesnotcontain=''):
for f in os.listdir('data/crawls'):
should_delete = True
# Only check 'contains' if it's not empty
if contains:
should_delete = contains in f
# Only check 'doesnotcontain' if it's not empty
if should_delete and doesnotcontain:
should_delete = doesnotcontain not in f
if should_delete:
try:
full_path = os.path.join('data/crawls', f)
os.remove(full_path)
except OSError as e:
print(f"Error deleting {f}: {e}")
if __name__ == '__main__':
# Example usage:
# llm_model = 'openrouter_llama'
# llm_model = 'groq_r1'
# llm_model = 'amp1_gemma'
# llm_model = 'arli_nemo'
# query_to_use = 'TARIFLISTE_ABFRAGE'
# delete files starting with report_ and not containing solid
if True:
del_files(contains='report_', doesnotcontain='solid')
report_file_path = llmanalyze_files(
llm_model='mistral_large',
files='crawl_',
query_to_use='TARIFLISTE_ABFRAGE')
if True:
try:
report_file_path
except NameError:
report_file_path = 'data/crawls/report_20250131.txt'
del_files(contains='solid')
solidify_report(
report_file_path=report_file_path,
query_to_use='TARIF_TABELLE',
llm_model='groq_r1',
ending='tab.md')