From 4b1c7d07ed0f078ba1fbc675965b7de70e01c6a6 Mon Sep 17 00:00:00 2001 From: yohplala Date: Thu, 28 Nov 2024 22:59:06 +0100 Subject: [PATCH] My god... Significant debugging in incomplete row group analysis. --- oups/store/ordered_merge_info.py | 70 +++++-- tests/test_store/test_ordered_merge_info.py | 193 +++++++++++--------- 2 files changed, 164 insertions(+), 99 deletions(-) diff --git a/oups/store/ordered_merge_info.py b/oups/store/ordered_merge_info.py index 9a3c591..1e43391 100644 --- a/oups/store/ordered_merge_info.py +++ b/oups/store/ordered_merge_info.py @@ -47,8 +47,6 @@ def _incomplete_row_groups_start( ---------- row_group_size_target : Union[int, str] Target row group size. - df_max : Union[int, float, Timestamp] - Value of 'ordered_on' at end of DataFrame. df_n_rows : int Number of rows in DataFrame. max_n_irgs : int @@ -68,6 +66,8 @@ def _incomplete_row_groups_start( Minimum value of 'ordered_on' in each row group. rg_maxs : ndarray Maximum value of 'ordered_on' in each row group. + df_max : Union[int, float, Timestamp] + Value of 'ordered_on' at end of DataFrame. Returns ------- @@ -82,27 +82,39 @@ def _incomplete_row_groups_start( - The boundary of the last incomplete row group is exceeded by new data """ - df_connected_to_set_of_incomplete_rgs = False + # df_end_in_or_after_set_of_incomplete_rgs = False last_row_group_boundary_exceeded = False # Walking parquet file row groups backward to identify where the set of # incomplete row groups starts. irg_idx_start = n_rgs - 1 + print("") + print("starting irgs analysis") if isinstance(row_group_size_target, int): # Case 'row_group_size_target' is an 'int'. # Number of incomplete row groups at end of recorded data. total_rows_in_irgs = 0 min_row_group_size = int(row_group_size_target * MAX_ROW_GROUP_SIZE_SCALE_FACTOR) + print(f"min_row_group_size: {min_row_group_size}") while rg_n_rows[irg_idx_start] <= min_row_group_size and irg_idx_start >= 0: + print(f"rg_n_rows[irg_idx_start]: {rg_n_rows[irg_idx_start]}") total_rows_in_irgs += rg_n_rows[irg_idx_start] + print(f"irg_idx_start: {irg_idx_start}") irg_idx_start -= 1 - if df_max > rg_maxs[irg_idx_start]: + if irg_idx_start == n_rgs - 1: + # No incomplete row groups. + return None + irg_idx_start += 1 + print(f"irg_idx_start: {irg_idx_start}") + if df_max < rg_mins[irg_idx_start]: + # If new data is before the set of incomplete row groups, then + # no coalescing. + return None + else: # If new data is located in the set of incomplete row groups, # add length of new data to the number of rows of incomplete row # groups. - # In the 'if' above, 'irg_idx_start' is the index of the - # last complete row groups. total_rows_in_irgs += df_n_rows - df_connected_to_set_of_incomplete_rgs = True + # df_end_in_or_after_set_of_incomplete_rgs = True if total_rows_in_irgs >= row_group_size_target: # Necessary condition to coalesce is that total number of rows # in incomplete row groups is larger than target row group size. @@ -116,25 +128,40 @@ def _incomplete_row_groups_start( freq=row_group_size_target, periods=2, ) + if df_max < last_period_first_ts: + return None + # df_end_in_or_after_set_of_incomplete_rgs = True + print(f"last_period_first_ts: {last_period_first_ts}") + print(f"next_period_first_ts: {next_period_first_ts}") while rg_mins[irg_idx_start] >= last_period_first_ts and irg_idx_start >= 0: + print(f"irg_idx_start: {irg_idx_start}") irg_idx_start -= 1 - if df_max >= next_period_first_ts and (n_rgs - irg_idx_start > 2): + irg_idx_start += 1 + if irg_idx_start == n_rgs - 1: + # If only one row group has been identified, then it is not an + # incomplete row group, and function should return None. + # To be considered incomplete, we need to identify at least two + # row groups in the last period. + return None + print(f"irg_idx_start: {irg_idx_start}") + if df_max >= next_period_first_ts: # Necessary conditions to coalesce are that last recorded row # group is incomplete (more than 1 row group) and the new data # exceeds first timestamp of next period. last_row_group_boundary_exceeded = True - if df_max >= last_period_first_ts: - df_connected_to_set_of_incomplete_rgs = True # Whatever the type of 'row_group_size_target', need to confirm or not # coalescing of incomplete row groups. - # 'irg_idx_start' is '-1' with respect to its definition. - irg_idx_start += 1 n_irgs = n_rgs - irg_idx_start - if df_connected_to_set_of_incomplete_rgs and ( - last_row_group_boundary_exceeded or n_irgs >= max_n_irgs - ): + print(f"n_irgs: {n_irgs}") + print(f"max_n_irgs: {max_n_irgs}") + # print(f"df_end_in_or_after_set_of_incomplete_rgs: {df_end_in_or_after_set_of_incomplete_rgs}") + print(f"last_row_group_boundary_exceeded: {last_row_group_boundary_exceeded}") + # if df_end_in_or_after_set_of_incomplete_rgs and ( + # last_row_group_boundary_exceeded or n_irgs >= max_n_irgs + # ): + if last_row_group_boundary_exceeded or n_irgs >= max_n_irgs: # Coalesce recorded data only it new data overlaps with it, # or if new data is appended at the tail. return irg_idx_start @@ -234,8 +261,16 @@ def analyze_chunks_to_merge( # df before first row group in pf. rg_idx_merge_start = rg_idx_merge_end_excl = 0 else: + print("df within pf") rg_idx_merge_start = df_idx_tmrg_ends_excl.astype(bool).argmax() - rg_idx_merge_end_excl = df_idx_tmrg_ends_excl.argmax() + 1 + # First case, df is within pf, idx of last row group to merge is the row + # group before the first row group starting after the end of df. + # Second case, df is overlapping with last row group in pf. + rg_idx_merge_end_excl = ( + df_idx_tmrg_starts.argmax() + if df_idx_tmrg_starts[-1] == df_n_rows + else df_idx_tmrg_ends_excl.argmax() + 1 + ) print("") print("before irgs analysis") @@ -347,6 +382,9 @@ def analyze_chunks_to_merge( # rg_idx_merge_end_excl = max(1, rg_idx_merge_end_excl) # and force the merge to start at the first row in the DataFrame. # df_idx_merge_start = df_idx_tmrg_starts[rg_idx_merge_start] = 0 + print("no leading df chunk") + # Make sure the last row group to merge encompass df end. + df_idx_tmrg_ends_excl[rg_idx_merge_end_excl - 1] = df_n_rows # Trim row group related lists to the overlapping region. print("trimming step") diff --git a/tests/test_store/test_ordered_merge_info.py b/tests/test_store/test_ordered_merge_info.py index 647c924..9a9bee6 100644 --- a/tests/test_store/test_ordered_merge_info.py +++ b/tests/test_store/test_ordered_merge_info.py @@ -27,12 +27,11 @@ # 'max_n_irgs' is never triggered. ( # Max row group size as int. - # df connected to incomplete rgs. - # Writing after pf data, no incomplete row groups. + # Writing after pf data, no incomplete row group. # rg: 0 1 # pf: [0,1], [2,3] # df: [3] - "no_drop_duplicates_simple_append_int", + "new_rg_simple_append_int", [3], [0, 1, 2, 3], [0, 2], # row_group_offsets @@ -44,11 +43,27 @@ "sort_rgs_after_write": False, }, ), + ( + # Max row group size as freqstr. + # Writing after pf data, no incomplete row group. + # rg: 0 1 + # pf: [8h10,9h10], [10h10] + # df: [12h10] + "new_rg_simple_append_timestamp_not_on_boundary", + [Timestamp(f"{REF_D}12:10")], + date_range(Timestamp(f"{REF_D}08:10"), freq="1h", periods=3), + [0, 2], + "2h", # row_group_size_target | should not merge irg + False, # drop_duplicates | should not merge with preceding rg + 3, # max_n_irgs | should not rewrite irg + { + "chunk_counter": [1], + "sort_rgs_after_write": False, + }, + ), ( # Max row group size as int. - # df connected to incomplete rgs. - # Writing at end of pf data, with incomplete row groups at the end - # of pf data. + # Writing at end of pf data, merging with incomplete row group. # rg: 0 1 2 # pf: [0,1,2], [6,7,8], [9] # df: [9] @@ -66,9 +81,8 @@ ), ( # Max row group size as freqstr. - # df connected to incomplete rgs. # Values not on boundary to check 'floor()'. - # Writing after pf data. + # Writing after pf data, not merging with incomplete row group. # rg: 0 1 # pf: [8h10,9h10], [10h10] # df: [10h10] @@ -86,9 +100,8 @@ ), ( # Max row group size as freqstr. - # df connected to incomplete rgs. # Values not on boundary to check 'floor()'. - # Writing after pf data. + # Writing after pf data, merging with incomplete row group. # rg: 0 1 # pf: [8h10,9h10], [10h10] # df: [10h10] @@ -106,9 +119,8 @@ ), ( # Max row group size as freqstr. - # df connected to incomplete rgs. # Values on boundary. - # Writing after pf data, incomplete row groups. + # Writing after pf data, not merging with incomplete row group. # rg: 0 1 # pf: [8h00,9h00], [10h00] # df: [10h00] @@ -126,9 +138,8 @@ ), ( # Max row group size as freqstr. - # df connected to incomplete rgs. # Values on boundary. - # Writing after pf data, incomplete row groups. + # Writing after pf data, merging with incomplete row group. # rg: 0 1 # pf: [8h00,9h00], [10h00] # df: [10h00] @@ -144,20 +155,55 @@ "sort_rgs_after_write": True, }, ), + ( + # Max row group size as int. + # Writing after pf data, incomplete row group to merge. + # rg: 0 1 2 3 + # pf: [0,1,2], [3,4,5], [6], [7], + # df: [8] + "last_row_group_exceeded_merge_tail_int", + [8], + range(8), + [0, 3, 6, 7], # row_group_offsets + 3, # row_group_size_target | should merge irgs + False, # drop_duplicates | should not merge with preceding rg + 4, # max_n_irgs | should not rewrite irg + { + "chunk_counter": [0, 0, 0, 1], + "sort_rgs_after_write": True, + }, + ), + ( + # Max row group size as freqstr. + # Writing after pf data, incomplete row group should be merged. + # rg: 0 1 2 + # pf: [8h00,9h00], [10h00], [11h00] + # df: [13h00] + "last_row_group_exceeded_merge_tail_timestamp", + [Timestamp(f"{REF_D}13:00")], + date_range(Timestamp(f"{REF_D}8:00"), freq="1h", periods=4), + [0, 2, 3], # row_group_offsets + "2h", # row_group_size_target | new period, should merge irgs + True, # drop_duplicates | no duplicates to drop + 3, # max_n_irgs | should not rewrite irg + { + "chunk_counter": [0, 0, 0, 1], + "sort_rgs_after_write": True, + }, + ), # 2/ Adding data right at the start. ( - # Test 21 (5.a) / # Max row group size as int. # df at the start of pf data. # rg: 0 1 2 3 # pf: [2, 6], [7, 8], [9], [10] # df: [0,1] - "drop_duplicates_insert_at_start_new_rg", + "no_duplicates_insert_at_start_new_rg_int", [0, 1], [2, 6, 7, 8, 9, 10], [0, 2, 4, 5], # row_group_offsets - 2, # row_group_size_target - True, # drop_duplicates + 2, # row_group_size_target | df enough to make complete rg, should merge. + True, # no duplicates to drop 2, # max_n_irgs | not triggered { "chunk_counter": [2], @@ -165,18 +211,17 @@ }, ), ( - # Test 21 (5.a) / # Max row group size as int. # df at the start of pf data. # rg: 0 1 2 3 # pf: [2, 6], [7, 8], [9], [10] # df: [0] - "drop_duplicates_insert_at_start_no_new_rg", + "no_duplicates_insert_at_start_no_new_rg_int", [0], [2, 6, 7, 8, 9, 10], [0, 2, 4, 5], # row_group_offsets - 2, # row_group_size_target - True, # drop_duplicates + 2, # row_group_size_target | df not enough to make complete rg, should not merge. + True, # no duplicates to drop 2, # max_n_irgs | not triggered { "chunk_counter": [0, 1], @@ -184,14 +229,13 @@ }, ), ( - # Test 22 (5.b) / # Max row group size as freqstr. - # df at the very start. + # df at the start of pf data. # df is not overlapping with existing row groups. # rg: 0 1 2 # pf: [8h00,9h00], [12h00], [13h00] # df: [7h30] - "drop_duplicates_insert_at_start_new_rg_timestamp_not_on_boundary", + "no_duplicates_insert_at_start_new_rg_timestamp_not_on_boundary", [Timestamp(f"{REF_D}7:30")], [ Timestamp(f"{REF_D}08:00"), @@ -200,8 +244,8 @@ Timestamp(f"{REF_D}14:00"), ], [0, 2, 3], - "2h", # row_group_size_target - True, # drop_duplicates + "2h", # row_group_size_target | no rg in same period to merge with + True, # no duplicates to drop 2, # max_n_irgs | should rewrite tail { "chunk_counter": [1], @@ -209,14 +253,13 @@ }, ), ( - # Test 22 (5.b) / # Max row group size as freqstr. - # df at the very start. + # df at the start of pf data. # df is overlapping with existing row groups. # rg: 0 1 2 # pf: [8h10,9h10], [12h10], [13h10] # df: [8h00] - "drop_duplicates_insert_at_start_no_new_rg_timestamp_on_boundary", + "no_duplicates_insert_at_start_no_new_rg_timestamp_on_boundary", [Timestamp(f"{REF_D}8:00")], [ Timestamp(f"{REF_D}08:10"), @@ -225,21 +268,19 @@ Timestamp(f"{REF_D}14:10"), ], [0, 2, 3], # row_group_offsets - "2h", # row_group_size_target - True, # drop_duplicates + "2h", # row_group_size_target | should merge with rg in same period + True, # no duplicates to drop 2, # max_n_irgs | should rewrite tail { "chunk_counter": [0, 1], "sort_rgs_after_write": True, }, ), - # 3/ Adding data at complete tail, testing 'max_n_irgs'. + # 3/ Adding data at complete end, testing 'max_n_irgs'. ( - # Test 5 (1.a) / # Max row group size as int # df connected to incomplete rgs. - # Writing at end of pf data, with incomplete row groups at - # the end of pf data. + # Writing at end of pf data, with incomplete row groups. # rg: 0 1 2 3 # pf: [0,1,2,6], [7,8,9,10], [11], [12] # df: [12] @@ -247,37 +288,34 @@ [12], [0, 1, 2, 6, 7, 8, 9, 10, 11, 12], [0, 4, 8, 9], # row_group_offsets - 4, # row_group_size_target - False, # drop_duplicates - 3, # max_n_irgs + 4, # row_group_size_target | should not rewrite tail + False, # drop_duplicates | should not merge with preceding rg + 3, # max_n_irgs | should not rewrite tail { "chunk_counter": [1], "sort_rgs_after_write": False, }, ), ( - # Test 5 (1.a) / # Max row group size as int # df connected to incomplete rgs. - # Writing at end of pf data, with incomplete row groups at - # the end of pf data. + # Writing at end of pf data, with incomplete row groups. # rg: 0 1 2 3 # pf: [0,1,2,6], [7,8,9,10], [11], [12] # df: [12] - "max_n_irgs_reached_rewrite_tail_int", + "max_n_irgs_reached_tail_rewrite_int", [12], [0, 1, 2, 6, 7, 8, 9, 10, 11, 12], [0, 4, 8, 9], # row_group_offsets - 4, # row_group_size_target - False, # drop_duplicates - 2, # max_n_irgs + 4, # row_group_size_target | should not rewrite tail + False, # drop_duplicates | should not merge with preceding rg + 2, # max_n_irgs | should rewrite tail { "chunk_counter": [0, 0, 0, 1], "sort_rgs_after_write": True, }, ), ( - # Test 6 (1.b) / # Max row group size as freqstr. # df connected to incomplete rgs. # Values on boundary. @@ -289,16 +327,15 @@ [Timestamp(f"{REF_D}11:00")], date_range(Timestamp(f"{REF_D}8:00"), freq="1h", periods=4), [0, 2, 3], # row_group_offsets - "2h", # row_group_size_target - False, # drop_duplicates - 3, # max_n_irgs + "2h", # row_group_size_target | should not rewrite tail + False, # drop_duplicates | should not merge with preceding rg + 3, # max_n_irgs | should not rewrite tail { "chunk_counter": [1], "sort_rgs_after_write": False, }, ), ( - # Test 6 (1.b) / # Max row group size as freqstr. # df connected to incomplete rgs. # Values on boundary. @@ -306,25 +343,22 @@ # rg: 0 1 2 # pf: [8h00,9h00], [10h00], [11h00] # df: [11h00] - "max_n_irgs_reached_rewrite_tail_timestamp", + "max_n_irgs_reached_tail_rewrite_timestamp", [Timestamp(f"{REF_D}11:00")], date_range(Timestamp(f"{REF_D}8:00"), freq="1h", periods=4), [0, 2, 3], # row_group_offsets - "2h", # row_group_size_target - False, # drop_duplicates - 2, # max_n_irgs + "2h", # row_group_size_target | should not merge with irg. + False, # drop_duplicates | should not merge with preceding rg + 2, # max_n_irgs | should rewrite tail { "chunk_counter": [0, 0, 0, 1], "sort_rgs_after_write": True, }, ), - # 3/ Adding data at complete tail, testing 'max_n_irgs=None'. ( - # Test 7 (2.a) / # Max row group size as int. # df connected to incomplete rgs. - # Writing at end of pf data, with incomplete row groups at - # the end of pf data. + # Writing at end of pf data, with incomplete row groups. # rg: 0 1 2 3 # pf: [0,1,2,6], [7,8,9,10], [11], [12] # df: [12] @@ -332,16 +366,15 @@ [12], [0, 1, 2, 6, 7, 8, 9, 10, 11, 12], [0, 4, 8, 9], - 4, # row_group_size_target - False, # drop_duplicates - None, # max_n_irgs + 4, # row_group_size_target | should not rewrite tail + False, # drop_duplicates | should not merge with preceding rg + None, # max_n_irgs | should not rewrite tail { "chunk_counter": [1], "sort_rgs_after_write": False, }, ), ( - # Test 8 (2.b) / # Max row group size as freqstr # df connected to incomplete rgs. # Values on boundary. @@ -363,12 +396,9 @@ ), # 4/ Adding data just before last incomplete row groups. ( - # Test 9 (3.a) / # Max row group size as int. # df connected to incomplete rgs. - # Writing at end of pf data, with incomplete row groups at - # the end of pf data. - # Enough rows to rewrite all tail. + # Writing at end of pf data, with incomplete row groups. # rg: 0 1 2 # pf: [0,1,2], [6,7,8], [11] # df: [8, 9, 10] @@ -376,8 +406,8 @@ [8, 9, 10], [0, 1, 2, 6, 7, 8, 11], [0, 3, 6], - 3, # row_group_size_target | should not rewrite tail - False, # drop_duplicates + 3, # row_group_size_target | no df remainder to merge with next rg + False, # drop_duplicates | should not merge with preceding rg 3, # max_n_irgs | should not rewrite tail { "chunk_counter": [3], @@ -385,21 +415,18 @@ }, ), ( - # Test 9 (3.a) / # Max row group size as int. # df connected to incomplete rgs. - # Writing at end of pf data, with incomplete row groups at - # the end of pf data. - # Enough rows to rewrite all tail. + # Writing at end of pf data, with incomplete row groups # rg: 0 1 2 # pf: [0,1,2], [6,7,8], [11] # df: [8, 9] - "insert_before_irgs_rewrite_tail_int", + "insert_before_irgs_tail_rewrite_int", [8, 9], [0, 1, 2, 6, 7, 8, 11], [0, 3, 6], - 3, # row_group_size_target | should rewrite tail - False, # drop_duplicates + 3, # row_group_size_target | df remainder to merge with next rg + False, # drop_duplicates | should not merge with preceding rg 3, # max_n_irgs | should not rewrite tail { "chunk_counter": [0, 2], @@ -407,24 +434,24 @@ }, ), ( - # Test 10 (3.b) / # Max row group size as int. # df connected to incomplete rgs. # Incomplete row groups at the end of pf data. # rg: 0 1 2 # pf: [0,1,2], [6,7,8], [10] # df: [8, 9] - "insert_before_irgs_drop_duplicates_rewrite_tail_int", + "insert_before_irgs_drop_duplicates_no_tail_rewrite_int", [8, 9], [0, 1, 2, 6, 7, 8, 10], [0, 3, 6], - 3, # row_group_size_target | should rewrite tail + 3, # row_group_size_target | because df merge with previous df, + # df remainder should not merge with next rg True, # drop_duplicates | merge with preceding rg 3, # max_n_irgs | should not rewrite tail { # Other acceptable solution: # [0, 1, 1, 2] - "chunk_counter": [0, 1, 2, 2], + "chunk_counter": [0, 2], "sort_rgs_after_write": True, }, ), @@ -438,7 +465,7 @@ # rg: 0 1 2 # pf: [0,1,2], [6,7,8], [10] # df: [8] - "insert_before_irgs_drop_duplicates_rewrite_tail_int", + "insert_before_irgs_drop_duplicates_tail_rewrite_int", [8], [0, 1, 2, 6, 7, 8, 10], [0, 3, 6], @@ -458,7 +485,7 @@ # rg: 0 1 2 # pf: [0,1,2,3],[6,7,8,8], [10] # df: [8, 9] - "insert_before_irgs_rewrite_tail_int", + "insert_before_irgs_tail_rewrite_int", [8, 9], [0, 1, 2, 3, 6, 7, 8, 8, 10], [0, 4, 8], @@ -479,7 +506,7 @@ # row grps: 0 1 2 # pf: [8h00,9h00], [10h00], [13h00] # df: [12h00] - "insert_timestamp_max_n_irgs_rewrite_tail", + "insert_timestamp_max_n_irgs_tail_rewrite", DataFrame({"ordered_on": [Timestamp(f"{REF_D}12:00")]}), DataFrame( {