-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathRecursive_Masterize_Hospital_Entities.py
208 lines (164 loc) · 15 KB
/
Recursive_Masterize_Hospital_Entities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import pandas as pd, numpy as np
from config import *
from utils.util_functions import *
preformat_input_using_sparksql()
print('\nFormatted the {} file into {} using PySpark successfully.'.format(config._RAW_STATIC_FILE_NAME, config._STATIC_FILE_NAME))
if '.csv' in config._STATIC_FILE_NAME.lower():
site_master_df = pd.read_csv(config._STATIC_FILE_NAME, index_col = 0)
elif '.xlsx' in config._STATIC_FILE_NAME.lower():
site_master_df = pd.read_excel(config._STATIC_FILE_NAME, index_col = 0)
print('\nFinished reading the Source-file {}'.format(config._STATIC_FILE_NAME))
site_master_df = preprocess_dataframe(df = site_master_df)
print('\nColumns: {}\n'.format(site_master_df.columns.values))
countries = list(site_master_df['COUNTRY'].unique())
print('\nCountries identified are: {}'.format(countries))
for c in range(len(countries)):
curr_country = countries[c]
entire_country_df = site_master_df[site_master_df['COUNTRY'] == curr_country]
entire_country_df = clean_dataframe(entire_country_df, columns_to_clean = config._COLUMNS_TO_CLEAN, fields_to_concat = config._FIELDS_TO_CONCAT, replace_punctuations = True)
entire_country_df_copy = site_master_df[site_master_df['COUNTRY'] == curr_country]
entire_country_df_copy = clean_dataframe(entire_country_df_copy, columns_to_clean = config._COLUMNS_TO_CLEAN, fields_to_concat = config._FIELDS_TO_CONCAT, replace_punctuations = False)
nrows = entire_country_df.shape[0]
m = int(np.ceil(np.divide(nrows, config._MAXSIZE)))
print('\nThere will be {} batches since incoming dataset-size = {} and minibatch-size = {}'.format(m, entire_country_df.shape[0], config._MAXSIZE))
entire_country_cross_ref_df = pd.DataFrame()
queue_of_csvs = list()
for i in range(m):
print('\n\nStarting Batch[{}]...'.format(i))
country_df = entire_country_df.iloc[i*config._MAXSIZE : (i+1)*config._MAXSIZE]
country_df_copy = entire_country_df_copy.iloc[i*config._MAXSIZE : (i+1)*config._MAXSIZE]
write_df_to_csv(df = country_df[config._THRESHOLDS_DICT.keys()], curr_country = curr_country, file_suffix = '_country_df.csv', index_flag = True)
_CREATE_MASTER_MINIBATCHES = (country_df.shape[0]>1)
if not _CREATE_MASTER_MINIBATCHES:
print('\n\nGet the unique set of all record-ids, since Layer-zero cannot create mastered mini-batches.\n')
# Get the unique set of master-record-ids
master_record_ids = country_df.index.values.astype(list)
# Create the country-master-df
country_master_df = generate_deduplicated_master(country_df = country_df, master_record_ids = master_record_ids, curr_country = curr_country, target_dir = config._STAGING_AREA_DIRECTORY, write_csv = False)
# Create a dummy set of cross-refs for masters
cross_ref_df = generate_dummy_cross_refs_for_masters(master_record_ids = master_record_ids, curr_country = curr_country)
# Write the current master dataset to a csv, add the filename to the queue of csvs, and append currently generated cross-ref to existing cross-ref
new_file_name = '_{}_Master.csv'.format(i)
write_df_to_csv(df = country_master_df, root_dir = config._STAGING_AREA_DIRECTORY, curr_country = curr_country, file_suffix = new_file_name, index_flag = True)
new_file_name = curr_country+new_file_name
queue_of_csvs.append(new_file_name)
entire_country_cross_ref_df = entire_country_cross_ref_df.append(cross_ref_df)
elif _CREATE_MASTER_MINIBATCHES:
# Invoke the Rscript and generate the Raw_score_features csv file for each minibatch
args = '{} {} {} {} {} {} {} {} {} NA NA'.format(
config._BINARIES_NAME, config._BINARIES_EXTENSION, config._THRESHOLD_FOR_INDIVIDUAL, config._THRESHOLD_FOR_ADDRESS_COMBINED, config._SCALING_FACTOR,
curr_country, config._RAW_SCORES_DIRECTORY, config._TOTAL_MATCHES_THRESHOLD, config._DEDUP_METHOD
)
print('\n{}_{} has {} records.\n\nInvoking the Rscript now...'.format(curr_country, i, country_df.shape[0]))
deduplicate_dataset_R( rscript_command = config._RSCRIPT_CMD, script_name = config._SCRIPT_NAME, args = args )
normalized_duplicates = pd.DataFrame()
# Clean and normalize the score features
normalized_duplicates = clean_score_features(curr_country = curr_country, country_df = country_df, source_dir = config._RAW_SCORES_DIRECTORY, target_dir = config._CLEANED_SCORES_DIRECTORY, verbose = False)
if normalized_duplicates.shape[0] != 0:
print('\n\nFound potential duplicates. Processing their master and cross-reference...\n')
# Get the unique set of master-record-ids
master_record_ids = get_deduplicated_master_records(normalized_duplicates = normalized_duplicates, country_df = country_df)
# Get the country-master-df
country_master_df = generate_deduplicated_master(country_df = country_df, master_record_ids = master_record_ids, curr_country = curr_country, target_dir = config._STAGING_AREA_DIRECTORY, write_csv = False)
# Create a dummy set of cross-refs for masters
cross_ref_df = generate_dummy_cross_refs_for_masters(master_record_ids = master_record_ids, curr_country = curr_country)
# Create full set of cross-refs for country-df
cross_ref_df = generate_final_cross_refs(cross_ref_df = cross_ref_df, normalized_duplicates = normalized_duplicates, curr_country = curr_country, target_dir = config._STAGING_AREA_DIRECTORY, write_csv = False)
# Create the csv for the cross-ref report
generate_cross_ref_report(cross_ref_df = cross_ref_df, curr_country = curr_country, country_df = country_df_copy, target_dir = config._STAGING_AREA_DIRECTORY)
else:
print('\n\nGet the unique set of all record-ids since there aren\'t any potential duplicates.\n')
# Get the unique set of all-record-ids since there aren't any potential duplicates
master_record_ids = country_df.index.values.astype(list)
# Get the country-master-df
country_master_df = generate_deduplicated_master(country_df = country_df, master_record_ids = master_record_ids, curr_country = curr_country, target_dir = config._STAGING_AREA_DIRECTORY, write_csv = False)
# Create a dummy set of cross-refs for masters
cross_ref_df = generate_dummy_cross_refs_for_masters(master_record_ids = master_record_ids, curr_country = curr_country)
# Write the current master dataset to a csv, add the filename to the queue of csvs, and append currently generated cross-ref to existing cross-ref
new_file_name = '_{}_Master.csv'.format(i)
write_df_to_csv(df = country_master_df, root_dir = config._STAGING_AREA_DIRECTORY, curr_country = curr_country, file_suffix = new_file_name, index_flag = True)
new_file_name = curr_country+new_file_name
queue_of_csvs.append(new_file_name)
entire_country_cross_ref_df = entire_country_cross_ref_df.append(cross_ref_df)
del country_df, country_df_copy, master_record_ids
if _CREATE_MASTER_MINIBATCHES:
del normalized_duplicates
print('{} csvs generated are: {}'.format(len(queue_of_csvs), queue_of_csvs))
# Number of levels for the recursive computations
d = (m+1)//2
print('\nMax-depth for {} will be {}'.format(curr_country, d))
for j in range(1,d+1):
combined_crossref_at_depth = pd.DataFrame()
n_csvs_to_read = len(queue_of_csvs)
length = n_csvs_to_read if n_csvs_to_read%2 == 0 else n_csvs_to_read+1
print('{} csvs need to be processed: {} , length = {}'.format(n_csvs_to_read, queue_of_csvs, length))
for i in range(0, length, 2):
master_csv_1 = os.path.join(config._STAGING_AREA_DIRECTORY, queue_of_csvs[i])
master_df_1 = pd.read_csv(master_csv_1, index_col = 0)
if i+1<n_csvs_to_read:
master_csv_2 = os.path.join(config._STAGING_AREA_DIRECTORY, queue_of_csvs[i+1])
master_df_2 = pd.read_csv(master_csv_2, index_col = 0)
# Invoke the Rscript and generate the Raw_score_features csv file
print('\n{} has {} records, and {} has {} records.\n\nInvoking the Rscript now...\n'.format(master_csv_1, master_df_1.shape[0],master_csv_2, master_df_2.shape[0]))
args = '{} {} {} {} {} {} {} {} {} {} {}'.format(
config._BINARIES_NAME, config._BINARIES_EXTENSION, config._THRESHOLD_FOR_INDIVIDUAL, config._THRESHOLD_FOR_ADDRESS_COMBINED, config._SCALING_FACTOR,
curr_country, config._RAW_SCORES_DIRECTORY, config._TOTAL_MATCHES_THRESHOLD, config._LINKAGE_METHOD, master_csv_1, master_csv_2
)
deduplicate_dataset_R( rscript_command = config._RSCRIPT_CMD, script_name = config._SCRIPT_NAME, args = args )
normalized_duplicates = pd.DataFrame()
# Clean and normalize the score features
normalized_duplicates = clean_score_features(curr_country = curr_country, country_df = master_df_1.append(master_df_2), source_dir = config._RAW_SCORES_DIRECTORY, target_dir = config._CLEANED_SCORES_DIRECTORY, verbose = False)
if normalized_duplicates.shape[0] != 0:
print('\n\nFound potential duplicates. Processing their master and cross-reference...\n')
# Get the unique set of master-record-ids
master_record_ids = get_deduplicated_master_records(normalized_duplicates = normalized_duplicates, country_df = master_df_1.append(master_df_2))
# Get the country-master-df
country_master_df = generate_deduplicated_master(country_df = master_df_1.append(master_df_2), master_record_ids = list(master_record_ids), curr_country = curr_country, target_dir = config._STAGING_AREA_DIRECTORY, write_csv = False)
# Create a dummy set of cross-refs for masters
cross_ref_df = generate_dummy_cross_refs_for_masters(master_record_ids = master_record_ids, curr_country = curr_country)
# Create full set of cross-refs for country-df
cross_ref_df = generate_final_cross_refs(cross_ref_df = cross_ref_df, normalized_duplicates = normalized_duplicates, curr_country = curr_country, target_dir = config._STAGING_AREA_DIRECTORY, write_csv = False)
# Create the csv for the cross-ref report
generate_cross_ref_report(cross_ref_df = cross_ref_df, country_df = master_df_1.append(master_df_2), curr_country = curr_country, target_dir = config._STAGING_AREA_DIRECTORY)
else:
print('\n\nGet the unique set of all record-ids since there aren\'t any potential duplicates.\n')
# Get the unique set of all-record-ids since there aren't any potential duplicates
master_record_ids = master_df_1.append(master_df_2).index.values.astype(list)
# Get the country-master-df
country_master_df = generate_deduplicated_master(country_df = master_df_1.append(master_df_2), master_record_ids = master_record_ids, curr_country = curr_country, target_dir = config._STAGING_AREA_DIRECTORY, write_csv = False)
# Create a dummy set of cross-refs for masters
cross_ref_df = generate_dummy_cross_refs_for_masters(master_record_ids = master_record_ids, curr_country = curr_country)
else:
print('\n\nGet the unique set of all record-ids since there isn\'t a second file to compare.\n')
# Get the unique set of master-record-ids
master_record_ids = master_df_1.index.values.astype(list)
# Get the country-master-df
country_master_df = generate_deduplicated_master(country_df = master_df_1, master_record_ids = master_record_ids, curr_country = curr_country, target_dir = config._STAGING_AREA_DIRECTORY, write_csv = False)
# Create a dummy set of cross-refs for masters
cross_ref_df = generate_dummy_cross_refs_for_masters(master_record_ids = master_record_ids, curr_country = curr_country)
# Write the current master dataset to a csv, add the filename to the queue of csvs, and append currently generated cross-ref to existing cross-ref
combined_crossref_at_depth = combined_crossref_at_depth.append(cross_ref_df)
new_file_name = '_d{}_{}_Master.csv'.format(j,i)
write_df_to_csv(df = country_master_df, root_dir = config._STAGING_AREA_DIRECTORY, curr_country = curr_country, file_suffix = new_file_name, index_flag = True)
new_file_name = curr_country+new_file_name
queue_of_csvs.append(new_file_name)
del master_record_ids, master_df_1
if i+1<n_csvs_to_read:
del normalized_duplicates, master_df_2
write_df_to_csv(df = combined_crossref_at_depth, root_dir = config._STAGING_AREA_DIRECTORY, curr_country = curr_country, file_suffix = '_d{}_Raw_Cross_Ref.csv'.format(j), index_flag = False)
print('\n\nDepth[{}] processed successfully.'.format(j))
update_entire_country_cross_ref(new_depth_cross_ref_df = combined_crossref_at_depth, entire_country_cross_ref_df = entire_country_cross_ref_df)
queue_of_csvs = queue_of_csvs[n_csvs_to_read:]
if len(queue_of_csvs) == 1:
print('\n\n\n\nProcessed all {} levels. Generating the master and cross-reference at the final-layer...'.format(d))
master_csv_1 = os.path.join(config._STAGING_AREA_DIRECTORY, queue_of_csvs[i])
master_df_1 = pd.read_csv(master_csv_1, index_col = 0)
# Get the unique set of master-record-ids
master_record_ids = master_df_1.index.values.astype(list)
# Get the country-master-df
country_master_df = generate_deduplicated_master(country_df = entire_country_df_copy, master_record_ids = master_record_ids, curr_country = curr_country, target_dir = config._MASTER_DATA_DIRECTORY, write_csv = True)
# Write the final raw-cross-ref to a csv
write_df_to_csv(df = entire_country_cross_ref_df, root_dir = config._MASTER_DATA_DIRECTORY, curr_country = curr_country, file_suffix = '_Raw_Cross_Ref.csv', index_flag = False)
# Create the csv for the cross-ref report
generate_cross_ref_report(cross_ref_df = entire_country_cross_ref_df, country_df = entire_country_df_copy, curr_country = curr_country, target_dir = config._MASTER_DATA_DIRECTORY)
print('\n\n\nPipeline completed execution...')