Skip to content

Commit

Permalink
My god... Significant debugging in incomplete row group analysis.
Browse files Browse the repository at this point in the history
  • Loading branch information
yohplala committed Nov 28, 2024
1 parent ea49098 commit 4b1c7d0
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 99 deletions.
70 changes: 54 additions & 16 deletions oups/store/ordered_merge_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ def _incomplete_row_groups_start(
----------
row_group_size_target : Union[int, str]
Target row group size.
df_max : Union[int, float, Timestamp]
Value of 'ordered_on' at end of DataFrame.
df_n_rows : int
Number of rows in DataFrame.
max_n_irgs : int
Expand All @@ -68,6 +66,8 @@ def _incomplete_row_groups_start(
Minimum value of 'ordered_on' in each row group.
rg_maxs : ndarray
Maximum value of 'ordered_on' in each row group.
df_max : Union[int, float, Timestamp]
Value of 'ordered_on' at end of DataFrame.
Returns
-------
Expand All @@ -82,27 +82,39 @@ def _incomplete_row_groups_start(
- The boundary of the last incomplete row group is exceeded by new data
"""
df_connected_to_set_of_incomplete_rgs = False
# df_end_in_or_after_set_of_incomplete_rgs = False
last_row_group_boundary_exceeded = False
# Walking parquet file row groups backward to identify where the set of
# incomplete row groups starts.
irg_idx_start = n_rgs - 1
print("")
print("starting irgs analysis")
if isinstance(row_group_size_target, int):
# Case 'row_group_size_target' is an 'int'.
# Number of incomplete row groups at end of recorded data.
total_rows_in_irgs = 0
min_row_group_size = int(row_group_size_target * MAX_ROW_GROUP_SIZE_SCALE_FACTOR)
print(f"min_row_group_size: {min_row_group_size}")
while rg_n_rows[irg_idx_start] <= min_row_group_size and irg_idx_start >= 0:
print(f"rg_n_rows[irg_idx_start]: {rg_n_rows[irg_idx_start]}")
total_rows_in_irgs += rg_n_rows[irg_idx_start]
print(f"irg_idx_start: {irg_idx_start}")
irg_idx_start -= 1
if df_max > rg_maxs[irg_idx_start]:
if irg_idx_start == n_rgs - 1:
# No incomplete row groups.
return None
irg_idx_start += 1
print(f"irg_idx_start: {irg_idx_start}")
if df_max < rg_mins[irg_idx_start]:
# If new data is before the set of incomplete row groups, then
# no coalescing.
return None
else:
# If new data is located in the set of incomplete row groups,
# add length of new data to the number of rows of incomplete row
# groups.
# In the 'if' above, 'irg_idx_start' is the index of the
# last complete row groups.
total_rows_in_irgs += df_n_rows
df_connected_to_set_of_incomplete_rgs = True
# df_end_in_or_after_set_of_incomplete_rgs = True
if total_rows_in_irgs >= row_group_size_target:
# Necessary condition to coalesce is that total number of rows
# in incomplete row groups is larger than target row group size.
Expand All @@ -116,25 +128,40 @@ def _incomplete_row_groups_start(
freq=row_group_size_target,
periods=2,
)
if df_max < last_period_first_ts:
return None
# df_end_in_or_after_set_of_incomplete_rgs = True
print(f"last_period_first_ts: {last_period_first_ts}")
print(f"next_period_first_ts: {next_period_first_ts}")
while rg_mins[irg_idx_start] >= last_period_first_ts and irg_idx_start >= 0:
print(f"irg_idx_start: {irg_idx_start}")
irg_idx_start -= 1
if df_max >= next_period_first_ts and (n_rgs - irg_idx_start > 2):
irg_idx_start += 1
if irg_idx_start == n_rgs - 1:
# If only one row group has been identified, then it is not an
# incomplete row group, and function should return None.
# To be considered incomplete, we need to identify at least two
# row groups in the last period.
return None
print(f"irg_idx_start: {irg_idx_start}")
if df_max >= next_period_first_ts:
# Necessary conditions to coalesce are that last recorded row
# group is incomplete (more than 1 row group) and the new data
# exceeds first timestamp of next period.
last_row_group_boundary_exceeded = True
if df_max >= last_period_first_ts:
df_connected_to_set_of_incomplete_rgs = True

# Whatever the type of 'row_group_size_target', need to confirm or not
# coalescing of incomplete row groups.
# 'irg_idx_start' is '-1' with respect to its definition.
irg_idx_start += 1
n_irgs = n_rgs - irg_idx_start

if df_connected_to_set_of_incomplete_rgs and (
last_row_group_boundary_exceeded or n_irgs >= max_n_irgs
):
print(f"n_irgs: {n_irgs}")
print(f"max_n_irgs: {max_n_irgs}")
# print(f"df_end_in_or_after_set_of_incomplete_rgs: {df_end_in_or_after_set_of_incomplete_rgs}")
print(f"last_row_group_boundary_exceeded: {last_row_group_boundary_exceeded}")
# if df_end_in_or_after_set_of_incomplete_rgs and (
# last_row_group_boundary_exceeded or n_irgs >= max_n_irgs
# ):
if last_row_group_boundary_exceeded or n_irgs >= max_n_irgs:
# Coalesce recorded data only it new data overlaps with it,
# or if new data is appended at the tail.
return irg_idx_start
Expand Down Expand Up @@ -234,8 +261,16 @@ def analyze_chunks_to_merge(
# df before first row group in pf.
rg_idx_merge_start = rg_idx_merge_end_excl = 0
else:
print("df within pf")
rg_idx_merge_start = df_idx_tmrg_ends_excl.astype(bool).argmax()
rg_idx_merge_end_excl = df_idx_tmrg_ends_excl.argmax() + 1
# First case, df is within pf, idx of last row group to merge is the row
# group before the first row group starting after the end of df.
# Second case, df is overlapping with last row group in pf.
rg_idx_merge_end_excl = (
df_idx_tmrg_starts.argmax()
if df_idx_tmrg_starts[-1] == df_n_rows
else df_idx_tmrg_ends_excl.argmax() + 1
)

print("")
print("before irgs analysis")
Expand Down Expand Up @@ -347,6 +382,9 @@ def analyze_chunks_to_merge(
# rg_idx_merge_end_excl = max(1, rg_idx_merge_end_excl)
# and force the merge to start at the first row in the DataFrame.
# df_idx_merge_start = df_idx_tmrg_starts[rg_idx_merge_start] = 0
print("no leading df chunk")
# Make sure the last row group to merge encompass df end.
df_idx_tmrg_ends_excl[rg_idx_merge_end_excl - 1] = df_n_rows

# Trim row group related lists to the overlapping region.
print("trimming step")
Expand Down
Loading

0 comments on commit 4b1c7d0

Please sign in to comment.