-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreteti_text.py
112 lines (89 loc) · 3.04 KB
/
reteti_text.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/env python3
# Python core modules:
from datetime import timedelta
from time import time
# Python PIP modules:
import duckdb
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.fs as fs
def reteti_text_writer(
batch_table: pa.Table,
logger: object
) -> list:
text_file_paths = []
def text_file_visitor(written_file):
text_file_paths.append(written_file.path)
# Start measuring time:
processing_start = time()
text_arrow_table = duckdb.sql(
f'''
SELECT
text_id AS partition,
text_id,
* EXCLUDE (text_id, text),
text
FROM batch_table
'''
).arrow()
text_id_total = duckdb.query(
'''
SELECT COUNT(text_id) AS text_id_total
FROM text_arrow_table
'''
).fetch_arrow_table().to_pandas()['text_id_total'].iloc[0]
ds.write_dataset(
text_arrow_table,
format = 'arrow',
filesystem = fs.LocalFileSystem(),
base_dir = f'/app/data/reteti-texts/texts',
partitioning = ['partition'],
basename_template = 'part-{i}.arrow',
existing_data_behavior = 'overwrite_or_ignore',
max_partitions = text_id_total,
file_visitor = text_file_visitor
)
# Calculate, display and log processing time:
processing_time = round((time() - processing_start), 3)
processing_time_string = str(timedelta(seconds=processing_time))
message = (f'Text batch written for {processing_time_string}')
print(message, flush=True)
logger.info(message)
return text_file_paths
def reteti_text_extractor(
dataset_filesystem: fs.S3FileSystem,
bucket: str,
text_id_arrow_table: pa.Table
) -> pd.DataFrame:
text_id_list = text_id_arrow_table.to_pandas()['text_id'].to_list()
text_paths = []
for text_id in text_id_list:
text_paths.append(f'{bucket}/texts/{text_id}/part-0.arrow')
text_arrow_table = ds.dataset(
text_paths,
format = 'arrow',
filesystem = dataset_filesystem
).to_table(
use_threads = True,
fragment_readahead = 16
)
text_dataframe = None
if text_id_arrow_table is not None and text_arrow_table is not None:
text_dataframe = duckdb.query(
'''
SELECT
tiat.matching_tokens_frequency,
tiat.single_token_frequency,
tiat.matching_tokens,
tiat.hits,
tat.* EXCLUDE (text),
tat.text
FROM
text_arrow_table AS tat
LEFT JOIN text_id_arrow_table AS tiat
ON tiat.text_id = tat.text_id
ORDER BY matching_tokens_frequency DESC
'''
).fetch_arrow_table().to_pandas()
return text_dataframe