From 83f585e9f90d5b9ab8b90057cd814b30e47f055a Mon Sep 17 00:00:00 2001 From: yohplala Date: Wed, 30 Oct 2024 17:55:32 +0100 Subject: [PATCH] WiP. Thorough testing of overlap identification and rewrite strategy. --- oups/store/writer.py | 142 +++++++++++++-------- tests/test_store/test_writer.py | 211 ++++++++++++++++++++++++++++++-- 2 files changed, 288 insertions(+), 65 deletions(-) diff --git a/oups/store/writer.py b/oups/store/writer.py index 57bb102..84c7cbb 100644 --- a/oups/store/writer.py +++ b/oups/store/writer.py @@ -289,14 +289,18 @@ def _indexes_of_overlapping_rrgs( recorded_pf: ParquetFile, ordered_on: str, max_row_group_size: Union[int, str], - max_nirgs: int = None, -): + drop_duplicates: bool, + max_nirgs: Union[int, None], +) -> Tuple[int, int, bool]: """ Identify overlaps between recorded row groups and new data. Overlaps may occur in the middle of recorded data. It may also occur at its tail in case there are incomplete row groups. + Returns also a flag indicating if the data that will be written is within + the set of complete row groups or after. + Parameters ---------- new_data : Dataframe @@ -314,7 +318,9 @@ def _indexes_of_overlapping_rrgs( If an ``int``, define the maximum number of rows allowed. If a ``str``, it has then to be a pandas `freqstr`, to gather data by timestamp over a defined period. - max_nirgs : int, optional + drop_duplicates : bool + If duplicates have to be dropped or not. + max_nirgs : Union[int, None] Max expected number of 'incomplete' row groups. To evaluate number of 'incomplete' row groups, only those at the end of an existing dataset are accounted for. 'Incomplete' row groups in the @@ -330,23 +336,28 @@ def _indexes_of_overlapping_rrgs( Returns ------- - int, int - rrg_start_idx, rrg_end_idx, indices of recorded row group overlapping - with new data. + int, int, bool + 'rrg_start_idx', 'rrg_end_idx', 'new_data_within_complete_rgs' + 'rrg_start_idx', 'rrg_end_idx' are indices of recorded row groups + overlapping with new data. If there is not overlapping row group, they are both set to ``None``. If there is overlapping with incomplete row groups (by definition of incomplete row groups, at the tail of the recorded data), then only 'rrg_end_idx' is set to ``None``. + 'new_data_within_complete_rgs' is a flag indicating if once written, the + row groups need to be sorted. """ # Case 1: assess existing overlaps. # Get 'rrg_start_idx' & 'rrg_end_idx'. + new_data_first = new_data.loc[:, ordered_on].iloc[0] new_data_last = new_data.loc[:, ordered_on].iloc[-1] + compare_greater = ">=" if drop_duplicates else ">" overlapping_rrgs_idx = filter_row_groups( recorded_pf, [ [ - (ordered_on, ">=", new_data.loc[:, ordered_on].iloc[0]), + (ordered_on, compare_greater, new_data_first), (ordered_on, "<=", new_data_last), ], ], @@ -363,18 +374,23 @@ def _indexes_of_overlapping_rrgs( # If 'rrg_end_idx' is the index of the last row group, it keeps its # default 'None' value. rrg_end_idx = overlapping_rrgs_idx[-1] + 1 + print("checking standard overlaps") + print(f"rrg_start_idx: {rrg_start_idx}") print(f"rrg_end_idx: {rrg_end_idx}") # Case 2: if incomplete row groups are allowed, and incomplete row groups # location is connected to where the new data will be written (be it at the # tail of recorded data, or within it). + full_tail_to_rewrite = False + ordered_on_recorded_max_vals = recorded_pf.statistics["max"][ordered_on] if max_nirgs is not None: - # Number of incomplete row groups at end of recorded data. - # Initialize number of rows with length of data to be written if it is - # located at the end of recorded data. - total_rows_in_irgs = len(new_data) if rrg_end_idx is None else 0 + new_data_connected_to_set_of_incomplete_rgs = False + last_group_boundary_exceeded = False + ordered_on_recorded_min_vals = recorded_pf.statistics["min"][ordered_on] rrg_start_idx_tmp = n_rrgs - 1 if isinstance(max_row_group_size, int): # Case 2.a: 'max_row_group_size' is an 'int'. + # Number of incomplete row groups at end of recorded data. + total_rows_in_irgs = 0 min_row_group_size = int(max_row_group_size * 0.9) while ( recorded_pf[rrg_start_idx_tmp].count() <= min_row_group_size @@ -382,52 +398,66 @@ def _indexes_of_overlapping_rrgs( ): total_rows_in_irgs += recorded_pf[rrg_start_idx_tmp].count() rrg_start_idx_tmp -= 1 - coalesce_due_to_boundary_exceeded = total_rows_in_irgs >= max_row_group_size + if new_data_last > ordered_on_recorded_max_vals[rrg_start_idx_tmp]: + # If new data is located in the set of incomplete row groups, + # add length of new data to the number of rows of incomplete row + # groups. + # In the 'if' above, 'rrg_start_idx_tmp' is the index of the + # last complete row groups. + total_rows_in_irgs += len(new_data) + new_data_connected_to_set_of_incomplete_rgs = True + if total_rows_in_irgs >= max_row_group_size: + last_group_boundary_exceeded = True print(f"total_rows_in_irgs: {total_rows_in_irgs}") - print(f"coalesce_due_to_boundary_exceeded: {coalesce_due_to_boundary_exceeded}") + print(f"last_group_boundary_exceeded: {last_group_boundary_exceeded}") else: # Case 2.b: 'max_row_group_size' is a str. - # Only the last row group is incomplete, and it is always considered - # so. - # Get the 1st timestamp allowed in the last "valid" row group (as if - # complete). + # Get the 1st timestamp allowed in the last open period. + # All row groups previous to this timestamp are considered complete. # TODO: if solved, select directly last row group in # recorded_pf.statistics["min"][ordered_on][-1] ? # https://github.com/dask/fastparquet/issues/938 - last_vrg_first_ts, new_rg_first_ts = date_range( - start=recorded_pf.statistics["min"][ordered_on][-1].floor(max_row_group_size), + last_period_first_ts, next_period_first_ts = date_range( + start=ordered_on_recorded_min_vals[-1].floor(max_row_group_size), freq=max_row_group_size, periods=2, - )[0] + ) while ( - recorded_pf.statistics["min"][ordered_on][rrg_start_idx_tmp] >= last_vrg_first_ts + ordered_on_recorded_min_vals[rrg_start_idx_tmp] >= last_period_first_ts and rrg_start_idx_tmp >= 0 ): rrg_start_idx_tmp -= 1 - # Coalesce if last row group is incomplete (more than 1) and the new - # data exceeds last ts of last "valid" row group. - coalesce_due_to_boundary_exceeded = (n_rrgs - rrg_start_idx_tmp > 2) and ( - new_data_last >= new_rg_first_ts - ) + if new_data_last >= next_period_first_ts and (n_rrgs - rrg_start_idx_tmp > 2): + # Coalesce if last recorded row group is incomplete (more than + # 1) and the new data exceeds first timestamp of next period. + last_group_boundary_exceeded = True + if new_data_last >= last_period_first_ts: + new_data_connected_to_set_of_incomplete_rgs = True rrg_start_idx_tmp += 1 print(f"rrg_start_idx_tmp: {rrg_start_idx_tmp}") + print( + f"new_data_connected_to_set_of_incomplete_rgs: {new_data_connected_to_set_of_incomplete_rgs}", + ) # Confirm or not coalescing of incomplete row groups. n_irgs = n_rrgs - rrg_start_idx_tmp - if coalesce_due_to_boundary_exceeded or n_irgs >= max_nirgs: - # Coalesce recorded data only it new data overlaps with it, or - # if new data is appended at the tail. - if rrg_start_idx and ( - rrg_end_idx is None or (rrg_end_idx and rrg_start_idx_tmp < rrg_end_idx) - ): - # 1st case checked: case overlap between existing data and - # new data. - rrg_start_idx = min(rrg_start_idx_tmp, rrg_start_idx) - elif rrg_start_idx is None: - # 2nd case checked: case no overlap between existing data - # and new data but new data is appended at the tail, and there - # does be incomplete row groups. - rrg_start_idx = rrg_start_idx_tmp - return rrg_start_idx, rrg_end_idx + if new_data_connected_to_set_of_incomplete_rgs and ( + last_group_boundary_exceeded or n_irgs >= max_nirgs + ): + # Coalesce recorded data only it new data overlaps with it, + # or if new data is appended at the tail. + rrg_start_idx = ( + rrg_start_idx_tmp + if rrg_start_idx is None + else min(rrg_start_idx_tmp, rrg_start_idx) + ) + full_tail_to_rewrite = True + # new_data_within_complete_rgs = rrg_end_idx is not None or not new_data_connected_to_set_of_incomplete_rgs + print(f"full tail to rewrite: {full_tail_to_rewrite}") + return ( + rrg_start_idx, + rrg_end_idx, + not full_tail_to_rewrite and new_data_last < ordered_on_recorded_max_vals[-1], + ) def write_ordered( @@ -567,11 +597,12 @@ def write_ordered( # 'ordered_on'. duplicates_on = [duplicates_on, ordered_on] pf = ParquetFile(dirpath) - rrg_start_idx, rrg_end_idx = _indexes_of_overlapping_rrgs( + rrg_start_idx, rrg_end_idx, sort_row_groups = _indexes_of_overlapping_rrgs( new_data=data, recorded_pf=pf, ordered_on=ordered_on, max_row_group_size=max_row_group_size, + drop_duplicates=duplicates_on is not None, max_nirgs=max_nirgs, ) if rrg_start_idx is None: @@ -612,17 +643,22 @@ def write_ordered( ) # Remove row groups of data that is overlapping. pf.remove_row_groups(overlapping_pf.row_groups, write_fmd=False) - if rrg_end_idx is not None: - # New data has been inserted in the middle of existing row groups. - # Sorting row groups based on 'max' in 'ordered_on'. - # TODO: why using 'ordered_on' index? - ordered_on_idx = pf.columns.index(ordered_on) - pf.fmd.row_groups = sorted( - pf.fmd.row_groups, - key=lambda rg: statistics(rg.columns[ordered_on_idx])["max"], - ) - # Rename partition files, and write fmd. - pf._sort_part_names(write_fmd=False) + if sort_row_groups: + # /!\ TODO error here /!\ + # We want to check if data is in area of incomplete row groups + # or if it is within the middle of data; + # 'rrg_end_idx' can be None and the new data being in the middle + # recorded data. + # New data has been inserted in the middle of existing row groups. + # Sorting row groups based on 'max' in 'ordered_on'. + # TODO: why using 'ordered_on' index? + ordered_on_idx = pf.columns.index(ordered_on) + pf.fmd.row_groups = sorted( + pf.fmd.row_groups, + key=lambda rg: statistics(rg.columns[ordered_on_idx])["max"], + ) + # Rename partition files. + pf._sort_part_names(write_fmd=False) else: # Case initiating a new dataset. iter_data = iter_dataframe(data, max_row_group_size) diff --git a/tests/test_store/test_writer.py b/tests/test_store/test_writer.py index 687a7bd..a64b39c 100644 --- a/tests/test_store/test_writer.py +++ b/tests/test_store/test_writer.py @@ -17,39 +17,225 @@ from numpy import arange from pandas import DataFrame from pandas import MultiIndex +from pandas import Timestamp from pandas import concat +from pandas import date_range from oups.store.writer import _indexes_of_overlapping_rrgs from oups.store.writer import to_midx from oups.store.writer import write_ordered as pswrite +REF_D = "2020/01/01 " + + @pytest.mark.parametrize( - ("df_to_record, row_group_offsets, max_row_group_size, new_data, max_nirgs, ref_rrg_indexes"), + ( + "new_data, df_to_record, row_group_offsets, max_row_group_size, drop_duplicates, max_nirgs, expected" + ), [ + ( + # Test 0 / + # Max row group size as int. + # Writing in-between recorded data, with incomplete row groups at + # the end of recorded data. + # drop_duplicates = True + # row grps: 0 1 2 3 4 + # recorded: [0,1], [2, 6], [7, 8], [9], [10] + # new data: [2, 3, 4] + DataFrame({"ordered_on": [2, 3, 4]}), + DataFrame({"ordered_on": [0, 1, 2, 6, 7, 8, 9, 10]}), + [0, 2, 4, 6, 7], + 2, + True, + 2, + (1, 2, True), # bool: need to sort rgs after write + ), ( # Test 1 / - # Writing in-between existing data, with incomplete row groups at + # Max row group size as int. + # Writing at end of recorded data, with incomplete row groups at # the end of recorded data. - # Row groups in recorded data: - # [0,1], [2,6], [7, 8], [9], [10] + # drop_duplicates = True + # row grps: 0 1 2 3 4 + # recorded: [0,1], [2, 6], [7, 8], [9], [10] + # new data: [10, 11, 12] + DataFrame({"ordered_on": [10, 11, 12]}), DataFrame({"ordered_on": [0, 1, 2, 6, 7, 8, 9, 10]}), [0, 2, 4, 6, 7], 2, - DataFrame({"ordered_on": [2, 3, 4]}), + True, + 2, + (3, None, False), # bool: need to sort rgs after write + ), + ( + # Test 2 / + # Max row group size as int. + # Writing at end of recorded data, with incomplete row groups at + # the end of recorded data. + # drop_duplicates = True + # row grps: 0 1 2 + # recorded: [0,1,2],[6,7,8],[9] + # new data: [9] + DataFrame({"ordered_on": [9]}), + DataFrame({"ordered_on": [0, 1, 2, 6, 7, 8, 9]}), + [0, 3, 6], + 3, # max_row_group_size + True, + 2, + (2, None, False), # bool: need to sort rgs after write + ), + ( + # Test 3 / + # Max row group size as int. + # Writing at end of recorded data, with incomplete row groups at + # the end of recorded data. + # drop_duplicates = False + # row grps: 0 1 2 + # recorded: [0,1,2],[6,7,8],[9] + # new data: [9] + DataFrame({"ordered_on": [9]}), + DataFrame({"ordered_on": [0, 1, 2, 6, 7, 8, 9]}), + [0, 3, 6], + 3, # max_row_group_size + False, + 2, + (None, None, False), # bool: need to sort rgs after write + ), + ( + # Test 4 / + # Max row group size as int. + # Writing at end of recorded data, with incomplete row groups at + # the end of recorded data. + # drop_duplicates = False + # row grps: 0 1 2 + # recorded: [0,1,2], [6,7,8],[9] + # new data: [3] + DataFrame({"ordered_on": [3]}), + DataFrame({"ordered_on": [0, 1, 2, 6, 7, 8, 9]}), + [0, 3, 6], + 3, # max_row_group_size + False, + 2, + (None, None, True), # bool: need to sort rgs after write + ), + ( + # Test 5 / + # Max row group size as int. + # Writing at end of recorded data, with incomplete row groups at + # the end of recorded data. + # drop_duplicates = False + # But enough rows to rewrite all tail. + # row grps: 0 1 2 + # recorded: [0,1,2],[6,7,8], [10] + # new data: [8, 9] + DataFrame({"ordered_on": [8, 9]}), + DataFrame({"ordered_on": [0, 1, 2, 6, 7, 8, 10]}), + [0, 3, 6], + 3, # max_row_group_size + False, + 2, + (2, None, False), # bool: need to sort rgs after write + ), + ( + # Test 6 / + # Max row group size as int. + # Writing at end of recorded data, with incomplete row groups at + # the end of recorded data. + # drop_duplicates = False + # But enough rows to rewrite all tail. + # row grps: 0 1 2 + # recorded: [0,1,2,3],[6,7,8,8], [10] + # new data: [8, 9] + DataFrame({"ordered_on": [8, 9]}), + DataFrame({"ordered_on": [0, 1, 2, 3, 6, 7, 8, 8, 10]}), + [0, 4, 8], + 4, # max_row_group_size + False, + 2, + (None, None, True), # bool: need to sort rgs after write + ), + ( + # Test 7 / + # Max row group size as int. + # Writing at end of recorded data, with incomplete row groups at + # the end of recorded data. + # drop_duplicates = True + # One-but last row group is complete, but because new data is + # overlapping with it, it has to be rewritten. + # By choice, the rewrite does not propagate till the end. + # row grps: 0 1 2 3 + # recorded: [0,1,2],[6,7,8], [10, 11, 12], [13] + # new data: [9, 10] + DataFrame({"ordered_on": [9, 10]}), + DataFrame({"ordered_on": [0, 1, 2, 6, 7, 8, 10, 11, 12, 13]}), + [0, 3, 6, 9], + 3, # max_row_group_size + True, + 2, + (2, 3, True), + ), + ( + # Test 5 / + # Max row group size as pandas freqstr. + # Writing in-between recorded data, with incomplete row groups at + # the end of recorded data. + # drop_duplicates = True + # row grps: 0 1 2 3 4 + # recorded: [8h,9h], [10h, 11h], [12h, 13h], [14h], [15h] + # new data: [11h] + DataFrame({"ordered_on": [Timestamp(f"{REF_D}11:00")]}), + DataFrame( + { + "ordered_on": date_range( + Timestamp(f"{REF_D}8:00"), + freq="1h", + periods=8, + ), + }, + ), + [0, 2, 4, 6, 7], + 2, + True, 2, (1, 2), ), + ( + # Test 6 / + # Max row group size as pandas freqstr. + # Writing in-between recorded data, with incomplete row groups at + # the end of recorded data. + # drop_duplicates = False + # row grps: 0 1 2 3 4 + # recorded: [8h,9h], [10h, 11h], [12h, 13h], [14h], [15h] + # new data: [9h] + DataFrame({"ordered_on": [Timestamp(f"{REF_D}9:00")]}), + DataFrame( + { + "ordered_on": date_range( + Timestamp(f"{REF_D}8:00"), + freq="1h", + periods=8, + ), + }, + ), + [0, 2, 4, 6, 7], + 2, + False, + "2h", # max_row_group_size + (2, 2), + ), ], ) def test_indexes_of_overlapping_rrgs( tmp_path, + new_data, df_to_record, row_group_offsets, max_row_group_size, - new_data, + drop_duplicates, max_nirgs, - ref_rrg_indexes, + expected, ): fp_write( f"{tmp_path}/test", @@ -59,14 +245,15 @@ def test_indexes_of_overlapping_rrgs( write_index=False, ) recorded_pf = ParquetFile(f"{tmp_path}/test") - res_rrg_indexes = _indexes_of_overlapping_rrgs( + res = _indexes_of_overlapping_rrgs( new_data=new_data, recorded_pf=recorded_pf, ordered_on="ordered_on", max_row_group_size=max_row_group_size, + drop_duplicates=drop_duplicates, max_nirgs=max_nirgs, ) - assert res_rrg_indexes == ref_rrg_indexes + assert res == expected def test_init_and_append_std(tmp_path): @@ -519,7 +706,7 @@ def test_inserting_data_no_drop_duplicate(tmp_path): # b [0, , ,3, , ,6, , , 9, ] # c [0,0,0,0,0,0,0,0,0, 0, 0] # a (new data, ordered_on) [3,4,5, 8] - # b (new data, duplicates_on) [7,7,4, 9] + # b (new data) [7,7,4, 9] # c (new data, check last) [1,1,1, 1] # 1 duplicate (on b) x x # a (concat) [11,12,0,1,3,3,3,4,5,5,7,7,8,8, 9] (before rg sorting) @@ -544,11 +731,11 @@ def test_inserting_data_no_drop_duplicate(tmp_path): b2 = [7, 7, 4, 9] c2 = [1] * len_a2 pdf2 = DataFrame({"a": a2, "b": b2, "c": c2}) - # ordered on 'a', duplicates on 'b' ('a' added implicitly) + # ordered on 'a', no 'duplicates_on pswrite(dn, pdf2, max_row_group_size=max_row_group_size, ordered_on="a") pf_rec = ParquetFile(dn) len_rgs_rec = [rg.num_rows for rg in pf_rec.row_groups] - assert len_rgs_rec == [2, 4, 2, 4, 1, 2] + assert len_rgs_rec == [3, 3, 2, 4, 1, 2] a_ref = [0, 1, 3, 3, 3, 4, 5, 5, 7, 7, 8, 8, 9, 11, 12] b_ref = [0, 1, 2, 3, 7, 7, 4, 4, 5, 6, 7, 9, 8, 9, 10] c_ref = [0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0]