-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsearch_engine_best.py
249 lines (214 loc) Β· 9.57 KB
/
search_engine_best.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
import pandas as pd
from reader import ReadFile
from configuration import ConfigClass
from parser_module import Parse
from indexer import Indexer
from searcher import Searcher
import concurrent.futures
import timeit
import traceback
import requests
import utils
import configuration
import os
# DO NOT CHANGE THE CLASS NAME
class SearchEngine:
# DO NOT MODIFY THIS SIGNATURE
# You can change the internal implementation, but you must have a parser and an indexer.
def __init__(self, config=None):
self._config = config
self._parser = Parse()
self._indexer = Indexer(config)
self._model = None
self.url = "http://bionlp-www.utu.fi/wv_demo/nearest"
self.data = {'form[1][name]': 'topn', 'form[0][name]': 'word', 'model_name': "English GoogleNews Negative300", }
self.local_cache = {}
# DO NOT MODIFY THIS SIGNATURE
# You can change the internal implementation as you see fit.
def build_index_from_parquet(self, fn):
"""
Reads parquet file and passes it to the parser, then indexer.
Input:
fn - path to parquet file
Output:
No output, just modifies the internal _indexer object.
"""
# Configure the path
config = ConfigClass()
config.corpusPath = fn
# Get list of all parquets files
corpus = []
df = pd.read_parquet(fn, engine="pyarrow")
value = df.values.tolist()
corpus.extend(value)
# for file in filenames:
# df = pd.read_parquet(file, engine="pyarrow")
# value = df.values.tolist()
# corpus.extend(value)
one_third = len(corpus) // 3
with concurrent.futures.ProcessPoolExecutor() as executor:
f1 = executor.submit(self.process_index, corpus[:one_third], 1)
f2 = executor.submit(self.process_index, corpus[one_third:2 * one_third], 2)
f3 = executor.submit(self.process_index, corpus[2 * one_third:], 3)
indexer1 = f1.result()
indexer2 = f2.result()
indexer3 = f3.result()
self._indexer.inverted_idx = self.merge_inverted_index(indexer1.inverted_idx, indexer2.inverted_idx,
indexer3.inverted_idx)
self.three_way_external_merge(self._indexer.inverted_idx, indexer1.postingDict, indexer2.postingDict,
indexer3.postingDict)
self.merge_indexer(indexer1, indexer2, indexer3)
self._indexer.save_index('idx_bench')
#'bench_idx'
def process_index(self, documents_list, num_thread):
config = ConfigClass()
p = Parse()
indexer = Indexer(config)
indexer.set_thread(num_thread)
print("Number of tweet in the process {} : {}".format(num_thread, len(documents_list)))
number_of_documents = 0
start = timeit.default_timer()
try:
# Iterate over every document in the file
for idx, document in enumerate(documents_list):
# parse the document
parsed_document = p.parse_doc(document)
number_of_documents += 1
# index the document data
indexer.add_new_doc(parsed_document)
except:
print("Problem with process {}".format(num_thread))
print(traceback.print_exc())
stop = timeit.default_timer()
print("Time of indexer and posting of process {} : ".format(num_thread), stop - start)
return indexer
def merge_indexer(self, *indexers):
for indexer in indexers:
for doc_id, value in indexer.tweet_index.items():
self._indexer.tweet_index[doc_id] = value
def merge_inverted_index(self, *inverted_index_dicts):
super_dict = {}
for inverted_index_dict in inverted_index_dicts:
for term, value in inverted_index_dict.items():
if term in super_dict:
super_dict[term][0] += value[0]
super_dict[term][1] += value[1]
else:
super_dict[term] = value
return super_dict
def three_way_external_merge(self, inverted_index, posting_file1, posting_file2, posting_file3):
try:
line = 1
with open("posting_file.txt", 'w', encoding="cp437", errors='ignore') as file:
for term in sorted(inverted_index.keys()):
posting_file = []
lower_term = term.lower() if term[0].isupper() else None
is_duplicate = False
if term in posting_file1:
posting_file.extend(posting_file1[term])
if term in posting_file2:
posting_file.extend(posting_file2[term])
if term in posting_file3:
posting_file.extend(posting_file3[term])
if lower_term is not None:
if lower_term in posting_file1 and not is_duplicate:
posting_file1[lower_term].extend(posting_file)
is_duplicate = True
if lower_term in posting_file2 and not is_duplicate:
posting_file2[lower_term].extend(posting_file)
is_duplicate = True
if lower_term in posting_file3 and not is_duplicate:
posting_file3[lower_term].extend(posting_file)
is_duplicate = True
if is_duplicate:
inverted_index[lower_term][0] += inverted_index[term][0]
inverted_index[lower_term][1] += inverted_index[term][1]
del inverted_index[term]
else:
file.write("{}: {} \n".format(term, posting_file))
inverted_index[term][2] = line
line += 1
except:
print(traceback.print_exc())
# DO NOT MODIFY THIS SIGNATURE
# You can change the internal implementation as you see fit.
def load_index(self, fn):
"""
Loads a pre-computed index (or indices) so we can answer queries.
Input:
fn - file name of pickled index.
"""
self._indexer.load_index(fn)
# DO NOT MODIFY THIS SIGNATURE
# You can change the internal implementation as you see fit.
def load_precomputed_model(self, model_dir=None):
"""
Loads a pre-computed model (or models) so we can answer queries.
This is where you would load models like word2vec, LSI, LDA, etc. and
assign to self._model, which is passed on to the searcher at query time.
"""
pass
def search(self, query):
"""
Executes a query over an existing index and returns the number of
relevant docs and an ordered list of search results.
Input:
query - string.
Output:
A tuple containing the number of relevant search results, and
a list of tweet_ids where the first element is the most relavant
and the last is the least relevant result.
"""
query_as_list = self._parser.parse_sentence(query)
query_expansion = self.query_expansion(query_as_list)
self.add_similar_word_to_query(query_as_list, query_expansion)
searcher = Searcher(self._parser, self._indexer, model=self._model)
n_relevant, ranked_doc_ids = searcher.search(query_as_list)
#print(ranked_doc_ids)
return n_relevant, ranked_doc_ids
def add_similar_word_to_query(self, query_as_list, query_expansion):
for word in query_expansion.keys():
if query_expansion[word] is not None:
query_as_list.extend(query_expansion[word])
def query_expansion(self, query_as_list):
ret_dict = {}
for word in query_as_list:
if word in self.local_cache:
ret_dict[word] = self.local_cache[word]
else:
result = self.get_top_n(word)
if result is not None:
ret_dict[word] = word
return ret_dict
def get_top_n(self, word):
self.data['form[1][value]'] = str(3)
self.data['form[0][value]'] = word
if word in self.local_cache:
return self.local_cache[word]
# Post the request to the server
try:
r = requests.post(self.url, self.data)
except:
pass
# Check request status
if r.status_code != "200" and r.reason != "OK":
return None
# Get the text from the response
response_text = r.text
# Given word is not in the dictionary
if "is not in the vocabulary" in response_text:
return None
response_text = response_text.replace(r'{"tbl": "<div class=\"w2vresultblock bg-info\">\n\n', "") \
.replace(r"\n", "").replace("</div>", " ").replace("</br>", " ").replace('"}', "")
response_text = response_text.split()
self.local_cache[word] = response_text
return response_text
if __name__ == "__main__":
config = configuration.ConfigClass()
engine = SearchEngine(config=config)
#print(os.path.join('data', 'benchmark_data_train.snappy.parquet'))
engine.build_index_from_parquet(os.path.join('data', 'benchmark_data_train.snappy.parquet'))
n_res, res = engine.search('bioweapon')
print(res)
# s.build_index_from_parquet("/Users/samuel/Desktop/Corpus/test")
# s.search("Coronavirus is less dangerous than the flu")