Skip to content

Commit

Permalink
WiP. Thorough testing of overlap identification and rewrite strategy.
Browse files Browse the repository at this point in the history
  • Loading branch information
yohplala committed Oct 30, 2024
1 parent 044b21e commit 83f585e
Show file tree
Hide file tree
Showing 2 changed files with 288 additions and 65 deletions.
142 changes: 89 additions & 53 deletions oups/store/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,18 @@ def _indexes_of_overlapping_rrgs(
recorded_pf: ParquetFile,
ordered_on: str,
max_row_group_size: Union[int, str],
max_nirgs: int = None,
):
drop_duplicates: bool,
max_nirgs: Union[int, None],
) -> Tuple[int, int, bool]:
"""
Identify overlaps between recorded row groups and new data.
Overlaps may occur in the middle of recorded data.
It may also occur at its tail in case there are incomplete row groups.
Returns also a flag indicating if the data that will be written is within
the set of complete row groups or after.
Parameters
----------
new_data : Dataframe
Expand All @@ -314,7 +318,9 @@ def _indexes_of_overlapping_rrgs(
If an ``int``, define the maximum number of rows allowed.
If a ``str``, it has then to be a pandas `freqstr`, to gather data by
timestamp over a defined period.
max_nirgs : int, optional
drop_duplicates : bool
If duplicates have to be dropped or not.
max_nirgs : Union[int, None]
Max expected number of 'incomplete' row groups.
To evaluate number of 'incomplete' row groups, only those at the end of
an existing dataset are accounted for. 'Incomplete' row groups in the
Expand All @@ -330,23 +336,28 @@ def _indexes_of_overlapping_rrgs(
Returns
-------
int, int
rrg_start_idx, rrg_end_idx, indices of recorded row group overlapping
with new data.
int, int, bool
'rrg_start_idx', 'rrg_end_idx', 'new_data_within_complete_rgs'
'rrg_start_idx', 'rrg_end_idx' are indices of recorded row groups
overlapping with new data.
If there is not overlapping row group, they are both set to ``None``.
If there is overlapping with incomplete row groups (by definition of
incomplete row groups, at the tail of the recorded data), then only
'rrg_end_idx' is set to ``None``.
'new_data_within_complete_rgs' is a flag indicating if once written, the
row groups need to be sorted.
"""
# Case 1: assess existing overlaps.
# Get 'rrg_start_idx' & 'rrg_end_idx'.
new_data_first = new_data.loc[:, ordered_on].iloc[0]
new_data_last = new_data.loc[:, ordered_on].iloc[-1]
compare_greater = ">=" if drop_duplicates else ">"
overlapping_rrgs_idx = filter_row_groups(
recorded_pf,
[
[
(ordered_on, ">=", new_data.loc[:, ordered_on].iloc[0]),
(ordered_on, compare_greater, new_data_first),
(ordered_on, "<=", new_data_last),
],
],
Expand All @@ -363,71 +374,90 @@ def _indexes_of_overlapping_rrgs(
# If 'rrg_end_idx' is the index of the last row group, it keeps its
# default 'None' value.
rrg_end_idx = overlapping_rrgs_idx[-1] + 1
print("checking standard overlaps")
print(f"rrg_start_idx: {rrg_start_idx}")
print(f"rrg_end_idx: {rrg_end_idx}")
# Case 2: if incomplete row groups are allowed, and incomplete row groups
# location is connected to where the new data will be written (be it at the
# tail of recorded data, or within it).
full_tail_to_rewrite = False
ordered_on_recorded_max_vals = recorded_pf.statistics["max"][ordered_on]
if max_nirgs is not None:
# Number of incomplete row groups at end of recorded data.
# Initialize number of rows with length of data to be written if it is
# located at the end of recorded data.
total_rows_in_irgs = len(new_data) if rrg_end_idx is None else 0
new_data_connected_to_set_of_incomplete_rgs = False
last_group_boundary_exceeded = False
ordered_on_recorded_min_vals = recorded_pf.statistics["min"][ordered_on]
rrg_start_idx_tmp = n_rrgs - 1
if isinstance(max_row_group_size, int):
# Case 2.a: 'max_row_group_size' is an 'int'.
# Number of incomplete row groups at end of recorded data.
total_rows_in_irgs = 0
min_row_group_size = int(max_row_group_size * 0.9)
while (
recorded_pf[rrg_start_idx_tmp].count() <= min_row_group_size
and rrg_start_idx_tmp >= 0
):
total_rows_in_irgs += recorded_pf[rrg_start_idx_tmp].count()
rrg_start_idx_tmp -= 1
coalesce_due_to_boundary_exceeded = total_rows_in_irgs >= max_row_group_size
if new_data_last > ordered_on_recorded_max_vals[rrg_start_idx_tmp]:
# If new data is located in the set of incomplete row groups,
# add length of new data to the number of rows of incomplete row
# groups.
# In the 'if' above, 'rrg_start_idx_tmp' is the index of the
# last complete row groups.
total_rows_in_irgs += len(new_data)
new_data_connected_to_set_of_incomplete_rgs = True
if total_rows_in_irgs >= max_row_group_size:
last_group_boundary_exceeded = True
print(f"total_rows_in_irgs: {total_rows_in_irgs}")
print(f"coalesce_due_to_boundary_exceeded: {coalesce_due_to_boundary_exceeded}")
print(f"last_group_boundary_exceeded: {last_group_boundary_exceeded}")
else:
# Case 2.b: 'max_row_group_size' is a str.
# Only the last row group is incomplete, and it is always considered
# so.
# Get the 1st timestamp allowed in the last "valid" row group (as if
# complete).
# Get the 1st timestamp allowed in the last open period.
# All row groups previous to this timestamp are considered complete.
# TODO: if solved, select directly last row group in
# recorded_pf.statistics["min"][ordered_on][-1] ?
# https://github.com/dask/fastparquet/issues/938
last_vrg_first_ts, new_rg_first_ts = date_range(
start=recorded_pf.statistics["min"][ordered_on][-1].floor(max_row_group_size),
last_period_first_ts, next_period_first_ts = date_range(
start=ordered_on_recorded_min_vals[-1].floor(max_row_group_size),
freq=max_row_group_size,
periods=2,
)[0]
)
while (
recorded_pf.statistics["min"][ordered_on][rrg_start_idx_tmp] >= last_vrg_first_ts
ordered_on_recorded_min_vals[rrg_start_idx_tmp] >= last_period_first_ts
and rrg_start_idx_tmp >= 0
):
rrg_start_idx_tmp -= 1
# Coalesce if last row group is incomplete (more than 1) and the new
# data exceeds last ts of last "valid" row group.
coalesce_due_to_boundary_exceeded = (n_rrgs - rrg_start_idx_tmp > 2) and (
new_data_last >= new_rg_first_ts
)
if new_data_last >= next_period_first_ts and (n_rrgs - rrg_start_idx_tmp > 2):
# Coalesce if last recorded row group is incomplete (more than
# 1) and the new data exceeds first timestamp of next period.
last_group_boundary_exceeded = True
if new_data_last >= last_period_first_ts:
new_data_connected_to_set_of_incomplete_rgs = True
rrg_start_idx_tmp += 1
print(f"rrg_start_idx_tmp: {rrg_start_idx_tmp}")
print(
f"new_data_connected_to_set_of_incomplete_rgs: {new_data_connected_to_set_of_incomplete_rgs}",
)
# Confirm or not coalescing of incomplete row groups.
n_irgs = n_rrgs - rrg_start_idx_tmp
if coalesce_due_to_boundary_exceeded or n_irgs >= max_nirgs:
# Coalesce recorded data only it new data overlaps with it, or
# if new data is appended at the tail.
if rrg_start_idx and (
rrg_end_idx is None or (rrg_end_idx and rrg_start_idx_tmp < rrg_end_idx)
):
# 1st case checked: case overlap between existing data and
# new data.
rrg_start_idx = min(rrg_start_idx_tmp, rrg_start_idx)
elif rrg_start_idx is None:
# 2nd case checked: case no overlap between existing data
# and new data but new data is appended at the tail, and there
# does be incomplete row groups.
rrg_start_idx = rrg_start_idx_tmp
return rrg_start_idx, rrg_end_idx
if new_data_connected_to_set_of_incomplete_rgs and (
last_group_boundary_exceeded or n_irgs >= max_nirgs
):
# Coalesce recorded data only it new data overlaps with it,
# or if new data is appended at the tail.
rrg_start_idx = (
rrg_start_idx_tmp
if rrg_start_idx is None
else min(rrg_start_idx_tmp, rrg_start_idx)
)
full_tail_to_rewrite = True
# new_data_within_complete_rgs = rrg_end_idx is not None or not new_data_connected_to_set_of_incomplete_rgs
print(f"full tail to rewrite: {full_tail_to_rewrite}")
return (
rrg_start_idx,
rrg_end_idx,
not full_tail_to_rewrite and new_data_last < ordered_on_recorded_max_vals[-1],
)


def write_ordered(
Expand Down Expand Up @@ -567,11 +597,12 @@ def write_ordered(
# 'ordered_on'.
duplicates_on = [duplicates_on, ordered_on]
pf = ParquetFile(dirpath)
rrg_start_idx, rrg_end_idx = _indexes_of_overlapping_rrgs(
rrg_start_idx, rrg_end_idx, sort_row_groups = _indexes_of_overlapping_rrgs(
new_data=data,
recorded_pf=pf,
ordered_on=ordered_on,
max_row_group_size=max_row_group_size,
drop_duplicates=duplicates_on is not None,
max_nirgs=max_nirgs,
)
if rrg_start_idx is None:
Expand Down Expand Up @@ -612,17 +643,22 @@ def write_ordered(
)
# Remove row groups of data that is overlapping.
pf.remove_row_groups(overlapping_pf.row_groups, write_fmd=False)
if rrg_end_idx is not None:
# New data has been inserted in the middle of existing row groups.
# Sorting row groups based on 'max' in 'ordered_on'.
# TODO: why using 'ordered_on' index?
ordered_on_idx = pf.columns.index(ordered_on)
pf.fmd.row_groups = sorted(
pf.fmd.row_groups,
key=lambda rg: statistics(rg.columns[ordered_on_idx])["max"],
)
# Rename partition files, and write fmd.
pf._sort_part_names(write_fmd=False)
if sort_row_groups:
# /!\ TODO error here /!\
# We want to check if data is in area of incomplete row groups
# or if it is within the middle of data;
# 'rrg_end_idx' can be None and the new data being in the middle
# recorded data.
# New data has been inserted in the middle of existing row groups.
# Sorting row groups based on 'max' in 'ordered_on'.
# TODO: why using 'ordered_on' index?
ordered_on_idx = pf.columns.index(ordered_on)
pf.fmd.row_groups = sorted(
pf.fmd.row_groups,
key=lambda rg: statistics(rg.columns[ordered_on_idx])["max"],
)
# Rename partition files.
pf._sort_part_names(write_fmd=False)
else:
# Case initiating a new dataset.
iter_data = iter_dataframe(data, max_row_group_size)
Expand Down
Loading

0 comments on commit 83f585e

Please sign in to comment.