-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchat insights.py
342 lines (287 loc) · 11.5 KB
/
chat insights.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
import tkinter as tk
from tkinter import ttk
from tkinter import filedialog
from langchain.document_loaders import PyPDFLoader
#from langchain.document_loaders import Docx2txtLoader
from langchain.document_loaders import UnstructuredFileLoader
from langchain.utilities import WikipediaAPIWrapper
import re
import openai
import os
import threading
import chromadb
from chromadb.config import Settings
from chromadb.utils import embedding_functions
from InstructorEmbedding import INSTRUCTOR
client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory="chromadb" # Optional, defaults to .chromadb/ in the current directory
))
emb_fn = sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="msmarco-distilbert-base-tas-b")
#emb_fn = embedding_functions.InstructorEmbeddingFunction(model_name="hkunlp/instructor-large", device="cuda", instruction="Represent the document for retrieval: ")
chroma_client = chromadb.Client()
collection = client.get_or_create_collection(name="docs", embedding_function=emb_fn)
chunk_size = 400
n_chunks = 15
def preprocess(text):
text = text.replace('\n', ' ')
text = re.sub('\s+', ' ', text)
return text
def to_text(file, url):
text = ""
if url:
wikipedia = WikipediaAPIWrapper()
pages = wikipedia.run(file)
page = preprocess(pages)
text += page
elif file:
print(f"nome del file caricato: {file}")
file_extension = os.path.splitext(file)[1]
# Check if the file extension is ".txt" or ".word"
if file_extension == ".txt":
print("File extension is .txt")
loader = UnstructuredFileLoader(file)
elif file_extension == ".pdf":
print("File extension is .pdf")
loader = PyPDFLoader(file)
else:
print("File extension not supported")
return
pages = loader.load_and_split()
for page in pages:
page = page.page_content
page = preprocess(page)
text += page
else:
text = ""
print("input non riuscito")
return text
def text_to_chunks(text, overlap_percentage = 0.2):
chunks = []
overlap_size = int(chunk_size * overlap_percentage)
start = 0
end = chunk_size
counter = 1
while start < len(text):
chunk = text[start:end]
chunk_with_counter = f'[{counter}] {chunk}'
chunks.append(chunk_with_counter + '\n\n')
start += chunk_size - overlap_size
end = start + chunk_size
counter += 1
return chunks
import pickle
def registra_file(string, filename):
try:
# Load existing data from the file
data = get_file_list(filename)
if data is None:
data = [] # Create an empty list if the file doesn't exist or is empty
# Append the new string to the data list
data.append(string)
# Save the updated data back to the file
with open(filename, 'wb') as file:
pickle.dump(data, file)
print(f"file {filename} caricato nel db")
except Exception as e:
print(f"Error saving file '{filename}': {str(e)}")
def get_file_list(filename):
try:
with open(filename, 'rb') as file:
data = pickle.load(file)
return data
except FileNotFoundError:
print(f"File '{filename}' not found.")
return None
except Exception as e:
print(f"Error loading file '{filename}': {str(e)}")
return None
class SemanticSearch:
def fit(self, data, filename):
print(f"Numero di chunks nel documento: {len(data)}")
id_list = [f"{filename}_{index}" for index in range(1, len(data)+1)]
print(f"id list: {id_list}")
meta_data = [{"file": filename} for index in range(1, len(data)+1)]
collection.add(documents=data, metadatas=meta_data, ids=id_list)
registra_file(filename, 'files.pkl')
# restituisco i top n chunks più simili alla domanda
def __call__(self, text): # text è la domanda input dell'utente
results = collection.query(
query_texts=text,
n_results=n_chunks,
include=["documents"]
)
#print(type(results['documents']))
return results['documents']
def generate_text(openAI_key, prompt, engine="text-davinci-003"):
openai.api_key = openAI_key
completions = openai.Completion.create(
engine=engine,
prompt=prompt,
max_tokens=500, #incide sulla lunghezza della risposta output, quindi sul prezzo della chiamata
n=1,
stop=None,
temperature=0.7, #più è basso meno si spende, forse
)
message = completions.choices[0].text
tokens_used = completions['usage']['total_tokens']
print(f"sono stati consumati {tokens_used} tokens")
return message
def generate_answer(question, openAI_key):
# genero i chunks
topn_chunks = recommender(question) # metodo __call__ : confronto l'embedding della domanda all'embedding del pdf ed ottengo gli n snippet di testo più vicini
prompt = ""
prompt += 'search results:\n\n'
for c in topn_chunks:
prompt += ' '.join(c)
prompt += "Instructions: Compose a comprehensive reply to the query using the search results given. " \
"If the search results mention multiple subjects with the same name, create separate answers for each. " \
"Only include information found in the results and don't add any additional information. " \
"Make sure the answer is correct and don't output false content. " \
"If the text does not relate to the query, simply state 'Non è stata trovata una risposta alla tua domanda nel testo'." \
"Ignore outlier search results which has nothing to do with the question. Only answer what is asked. " \
"The answer should be short and concise. Answer step-by-step. \n\nQuery: {question}\nAnswer: "
prompt += f"Query: {question}\nAnswer:"
"""
file_object = open('docs\domande.txt', 'a')
file_object.write('\n\n\nprompt:\n')
file_object.write(prompt)
# Close the file
file_object.close()
"""
#answer = generate_text(openAI_key, prompt)
answer = prompt
print(f"il prompt è: \n {prompt}")
return answer
def ask_question():
query = query_entry.get()
api_key = key_entry.get()
answer = generate_answer(query, api_key)
text_area.pack()
text_area.delete(1.0, tk.END)
text_area.insert(tk.END, answer)
def clean_collection():
file_list = get_file_list('files.pkl')
for file in file_list:
collection.delete(where={"file": file})
print(f"rimossi chunk di: {file}")
if os.path.exists("files.pkl"):
# Delete the file
os.remove("files.pkl")
print("rimosso il picke")
create_scrollable_list(window, [], clear_list=True)
def load_file():
while not stop_event.is_set():
progress_bar.pack()
progress_bar.start()
url = url_entry.get()
if(url):
print(f"pagina wiki: {url}")
text = to_text(url, True)
chunks = text_to_chunks(text)
print(chunks)
recommender.fit(chunks, url)
else:
print("url false")
global file_paths
for file in file_paths:
text = to_text(file, False)
chunks = text_to_chunks(text)
file = os.path.basename(file)
print(f"nome del file riga 212: {file}")
recommender.fit(chunks, file)
# Create the scrollable list
file_list = get_file_list('files.pkl')
create_scrollable_list(window, file_list)
progress_bar.stop()
progress_bar.pack_forget()
stop_event.set() # Set the stop event to stop the thread
break
#chiama load_file
def start_thread():
global thread, stop_event
if thread and thread.is_alive():
print("già in esecuzione")
return # Do nothing if a thread is already running
stop_event.clear() # Reset the stop event
# Start the new thread
stop_event = threading.Event()
thread = threading.Thread(target=load_file)
thread.start()
def input_file():
global file_paths
filetypes = [("PDF Files", "*.pdf"), ("Text Files", "*.txt"), ("Word Files", "*.docx")]
file_paths = filedialog.askopenfilenames()
my_listbox = None
def create_scrollable_list(root, elements, clear_list=False):
global my_listbox
if not elements:
return # If elements is empty, do nothing
# Create a frame to hold the listbox and scrollbar if listbox is not provided
if my_listbox is None:
frame = tk.Frame(root)
frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# Create a scrollbar
scrollbar = tk.Scrollbar(frame)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Create a new listbox and associate it with the scrollbar
my_listbox = tk.Listbox(frame, yscrollcommand=scrollbar.set)
my_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# Configure the scrollbar to control the listbox
scrollbar.config(command=my_listbox.yview)
else:
# Clear the existing elements in the listbox
my_listbox.delete(0, tk.END)
if clear_list:
my_listbox.delete(0, tk.END)
# Add elements to the listbox
for element in elements:
my_listbox.insert(tk.END, element)
return my_listbox
def update_sizes(event=None):
window.update_idletasks() # Update the window to get the current size
text_area.configure(width=(window.winfo_width() // 10), height=(window.winfo_height() // 25))
recommender = SemanticSearch()
# Create the main window
window = tk.Tk()
window.title("Chat Insights")
window.geometry("600x400")
file_paths = None
button_frame = tk.Frame(window)
button_frame.pack()
select_button = tk.Button(button_frame, text="Select Files", command=input_file)
select_button.pack(side=tk.LEFT)
upload_button = tk.Button(button_frame, text="Upload", command=start_thread)
upload_button.pack(side=tk.LEFT)
delete_button = tk.Button(button_frame, text="Flush", command=clean_collection)
delete_button.pack(side=tk.LEFT)
stop_event = threading.Event()
thread = None
progress_bar = ttk.Progressbar(window, mode='indeterminate')
progress_bar.pack_forget()
url_label = tk.Label(window, text="inserisci una pagina Wikipedia:")
url_label.pack()
url_entry = tk.Entry(window, width=60)
url_entry.pack()
key_label = tk.Label(window, text="API Key:")
key_label.pack()
key_entry = tk.Entry(window, width=60)
key_entry.pack()
query_label = tk.Label(window, text="Fai la tua domanda:")
query_label.pack()
query_entry = tk.Entry(window, width=80)
query_entry.pack()
submit_button = tk.Button(window, text="Chiedi", command=ask_question)
submit_button.pack()
# Create a text area to display the extracted text
text_area = tk.Text(window)
progress_bar.pack_forget()
# Make the input boxes and text area adjust dynamically
window.bind('<Configure>', update_sizes)
query_entry.pack_propagate(False)
text_area.pack_propagate(False)
# Create the scrollable list
file_list = get_file_list('files.pkl')
my_listbox = create_scrollable_list(window, file_list)
# Start the main event loop
window.mainloop()