Skip to content

Commit

Permalink
WiP. More tests for overlap identifications.
Browse files Browse the repository at this point in the history
  • Loading branch information
yohplala committed Oct 31, 2024
1 parent 83f585e commit 2753f98
Show file tree
Hide file tree
Showing 2 changed files with 417 additions and 114 deletions.
86 changes: 56 additions & 30 deletions oups/store/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
from fastparquet.api import filter_row_groups
from fastparquet.api import statistics
from fastparquet.util import update_custom_metadata
from numpy import searchsorted as np_searchsorted
from numpy import unique as np_unique
from numpy import dtype
from numpy import searchsorted
from numpy import unique
from pandas import DataFrame
from pandas import Index
from pandas import MultiIndex
from pandas import Timestamp
from pandas import concat
from pandas import date_range


DTYPE_DATETIME64 = dtype("datetime64[ns]")
COMPRESSION = "SNAPPY"
MAX_ROW_GROUP_SIZE = 6_345_000
KEY_MAX_ROW_GROUP_SIZE = "max_row_group_size"
Expand Down Expand Up @@ -110,7 +113,7 @@ def iter_dataframe(
# Adjust bins so that they do not end in the middle of duplicate
# values in `sharp_on` column.
val_at_start = data.loc[:, sharp_on].iloc[starts]
starts = np_unique(np_searchsorted(data[sharp_on].to_numpy(), val_at_start)).tolist()
starts = unique(searchsorted(data[sharp_on].to_numpy(), val_at_start)).tolist()
else:
# If n_rows=0
starts = [0]
Expand Down Expand Up @@ -316,7 +319,7 @@ def _indexes_of_overlapping_rrgs(
max_row_group_size : Union[int, str]
Define how to group data, either an ``int`` or a ``str``.
If an ``int``, define the maximum number of rows allowed.
If a ``str``, it has then to be a pandas `freqstr`, to gather data by
If a ``str``, it has to be a pandas `freqstr`, to gather data by
timestamp over a defined period.
drop_duplicates : bool
If duplicates have to be dropped or not.
Expand All @@ -338,20 +341,25 @@ def _indexes_of_overlapping_rrgs(
-------
int, int, bool
'rrg_start_idx', 'rrg_end_idx', 'new_data_within_complete_rgs'
'rrg_start_idx', 'rrg_end_idx' are indices of recorded row groups
'rrg_start_idx' and 'rrg_end_idx' are indices of recorded row groups
overlapping with new data.
If there is not overlapping row group, they are both set to ``None``.
If there is no overlapping row group, they are both set to ``None``.
If there is overlapping with incomplete row groups (by definition of
incomplete row groups, at the tail of the recorded data), then only
'rrg_end_idx' is set to ``None``.
'new_data_within_complete_rgs' is a flag indicating if once written, the
row groups need to be sorted.
"""
# Case 1: assess existing overlaps.
# 1: assess existing overlaps.
# Get 'rrg_start_idx' & 'rrg_end_idx'.
new_data_first = new_data.loc[:, ordered_on].iloc[0]
new_data_last = new_data.loc[:, ordered_on].iloc[-1]
ordered_on_recorded_max_vals = recorded_pf.statistics["max"][ordered_on]
ordered_on_recorded_min_vals = recorded_pf.statistics["min"][ordered_on]
# Recorded row group start and end indexes.
rrg_start_idx, rrg_end_idx = None, None
n_rrgs = len(recorded_pf.row_groups)
compare_greater = ">=" if drop_duplicates else ">"
overlapping_rrgs_idx = filter_row_groups(
recorded_pf,
Expand All @@ -363,29 +371,24 @@ def _indexes_of_overlapping_rrgs(
],
as_idx=True,
)
# Recorded row group start and end indexes.
rrg_start_idx, rrg_end_idx = None, None
n_rrgs = len(recorded_pf.row_groups)
if overlapping_rrgs_idx:
print(f"overlapping_rrgs_idx: {overlapping_rrgs_idx}")
rrg_start_idx = overlapping_rrgs_idx[0]
if overlapping_rrgs_idx[-1] + 1 != n_rrgs:
# For slicing, 'rrg_end_idx' is increased by 1.
# If 'rrg_end_idx' is the index of the last row group, it keeps its
# default 'None' value.
rrg_end_idx = overlapping_rrgs_idx[-1] + 1
print(f"overlapping_rrgs_idx: {overlapping_rrgs_idx}")
print("checking standard overlaps")
print(f"rrg_start_idx: {rrg_start_idx}")
print(f"rrg_end_idx: {rrg_end_idx}")
# Case 2: if incomplete row groups are allowed, and incomplete row groups
# 2: if incomplete row groups are allowed, and incomplete row groups
# location is connected to where the new data will be written (be it at the
# tail of recorded data, or within it).
full_tail_to_rewrite = False
ordered_on_recorded_max_vals = recorded_pf.statistics["max"][ordered_on]
if max_nirgs is not None:
new_data_connected_to_set_of_incomplete_rgs = False
last_group_boundary_exceeded = False
ordered_on_recorded_min_vals = recorded_pf.statistics["min"][ordered_on]
rrg_start_idx_tmp = n_rrgs - 1
if isinstance(max_row_group_size, int):
# Case 2.a: 'max_row_group_size' is an 'int'.
Expand Down Expand Up @@ -418,7 +421,7 @@ def _indexes_of_overlapping_rrgs(
# recorded_pf.statistics["min"][ordered_on][-1] ?
# https://github.com/dask/fastparquet/issues/938
last_period_first_ts, next_period_first_ts = date_range(
start=ordered_on_recorded_min_vals[-1].floor(max_row_group_size),
start=Timestamp(ordered_on_recorded_min_vals[-1]).floor(max_row_group_size),
freq=max_row_group_size,
periods=2,
)
Expand All @@ -433,13 +436,17 @@ def _indexes_of_overlapping_rrgs(
last_group_boundary_exceeded = True
if new_data_last >= last_period_first_ts:
new_data_connected_to_set_of_incomplete_rgs = True
# Confirm or not coalescing of incomplete row groups.
# 'rrg_start_idx_tmp' is '-1' with respect to its definition.
# This account for the new data that will make at least one row group
# more.
n_irgs = n_rrgs - rrg_start_idx_tmp
print(f"n_irgs: {n_irgs}")
rrg_start_idx_tmp += 1
print(f"rrg_start_idx_tmp: {rrg_start_idx_tmp}")
print(
f"new_data_connected_to_set_of_incomplete_rgs: {new_data_connected_to_set_of_incomplete_rgs}",
)
# Confirm or not coalescing of incomplete row groups.
n_irgs = n_rrgs - rrg_start_idx_tmp
if new_data_connected_to_set_of_incomplete_rgs and (
last_group_boundary_exceeded or n_irgs >= max_nirgs
):
Expand All @@ -450,6 +457,8 @@ def _indexes_of_overlapping_rrgs(
if rrg_start_idx is None
else min(rrg_start_idx_tmp, rrg_start_idx)
)
# Force 'rrg_end_idx' to None.
rrg_end_idx = None
full_tail_to_rewrite = True
# new_data_within_complete_rgs = rrg_end_idx is not None or not new_data_connected_to_set_of_incomplete_rgs
print(f"full tail to rewrite: {full_tail_to_rewrite}")
Expand Down Expand Up @@ -521,12 +530,16 @@ def write_ordered(
used instead of all columns names.
If not set, default to ``None``, meaning no row is dropped.
max_nirgs : int, optional
Max expected number of 'incomplete' row groups. A 'complete' row group
Max expected number of 'incomplete' row groups.
If 'max_row_group_size' is an ``int``, then a 'complete' row group
is one which size is 'close to' ``max_row_group_size`` (>=90%).
If 'max_row_group_size' is a pandas `freqstr`, and if there are several
row groups in the last period defined by the `freqstr`, then these row
groups are considered incomplete.
To evaluate number of 'incomplete' row groups, only those at the end of
an existing dataset are accounted for. 'Incomplete' row groups in the
middle of 'complete' row groups are not accounted for (they can be
created by insertion of new data 'in the middle' of existing data).
created by insertion of new data in the middle of existing data).
If not set, default to ``None``.
- ``None`` value induces no coalescing of row groups. If there is no
Expand Down Expand Up @@ -559,28 +572,40 @@ def write_ordered(
- When ``duplicates_on`` is set, duplicate search is made row group to be
written per row group to be written. A `row group to be written` is made
from the merge between new data, and existing recorded row groups which
overlap. For this reason ``ordered_on`` parameter is compulsory when
using ``duplicates_on``, so as to be able to position new data with
respect to existing row groups and also to cluster this data (new and
overlapping recorded) into row group which have distinct values in
``ordered_on`` column. If 2 rows are duplicates according values in
indicated columns but are not in the same row group, first duplicates
will not be dropped.
overlap.
- As per logic of previous comment, duplicates need to be gathered by
row group to be identified, they need consequently to share the same
`index`, defined by the value in ``ordered_on``. Extending this logic,
``ordered_on`` is added to ``duplicates_on`` if not already part of it.
- For simple data appending, i.e. without need to check where to insert
data and without need to drop duplicates, it is advised to keep
``ordered_on`` and ``duplicates_on`` parameters set to ``None`` as these
parameters will trigger unnecessary evaluations.
- For simple data appending, i.e. without need to drop duplicates, it is
advised to keep ``ordered_on`` and ``duplicates_on`` parameters set to
``None`` as this parameter will trigger unnecessary evaluations.
- Incomplete row groups are row groups:
- either not reaching the maximum number of rows if 'max_row_group_size'
is an ``int``,
- or several row groups lying in tge same time period if
'max_row_group_size' is a pandas freqstr.
- When incorporating new data within recorded data, the algorithm checking
for overlapping will not try to complete incomplete row groups that may
lie within recorded data. If there is no intersection with existing data,
new data is only added, without merging with existing incomplete row
groups.
- the algorithm will try to complete only incomplete row groups at the tail
of recorded data.
"""
if os_path.isdir(dirpath) and any(file.endswith(".parquet") for file in os_listdir(dirpath)):
# Case updating an existing dataset.
if ordered_on not in data.columns:
# Check 'ordered_on' column is within input dataframe.
raise ValueError(f"column '{ordered_on}' does not exist in input data.")
if isinstance(ordered_on, str) and data.dtypes[ordered_on] != DTYPE_DATETIME64:
raise TypeError(
"if 'max_row_group_size' is a pandas freqstr, dtype"
f" of column {ordered_on} has to be 'datetime64[ns]'.",
)
if duplicates_on is not None:
# Enforce 'ordered_on' in 'duplicates_on', as per logic of
# duplicate identification restricted to the data overlap between new
Expand Down Expand Up @@ -703,6 +728,7 @@ def write_ordered(


# Cas test:
# - test exception ordered_on column not datetime64, but max_row_group_size is str.
# - test specific append case
# - max_row_group_size an int
# - new data with length above "max_row_group_size"
Expand Down
Loading

0 comments on commit 2753f98

Please sign in to comment.