Skip to content

Commit

Permalink
Simplified 'iter_data()'.
Browse files Browse the repository at this point in the history
  • Loading branch information
yohplala committed Nov 22, 2024
1 parent db0ac51 commit ad336b4
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 277 deletions.
211 changes: 38 additions & 173 deletions oups/store/iter_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,10 @@ def _get_next_chunk(
return df.iloc[start_idx:end_idx], end_idx


def _iter_pandas_dataframe(
def _iter_df(
ordered_on: str,
max_row_group_size: int,
df: Optional[DataFrame] = None,
start_df: Optional[DataFrame] = None,
df: Union[DataFrame, List[DataFrame]],
distinct_bounds: bool = False,
duplicates_on: Optional[Union[str, List[str]]] = None,
yield_remainder: bool = False,
Expand All @@ -127,10 +126,9 @@ def _iter_pandas_dataframe(
Column name by which data is ordered. Data must be in ascending order.
max_row_group_size : int
Maximum number of rows per row group.
df : DataFrame
Pandas DataFrame to split.
start_df : Optional[DataFrame]
Data to start the iteration with. Must be ordered by the same column.
df : Union[DataFrame, List[DataFrame]]
Pandas DataFrame to split. If a list, they are merged and sorted back by
'ordered_on' column.
distinct_bounds : bool, default False
If True, ensures that row group boundaries do not split duplicate rows.
duplicates_on : Optional[Union[str, List[str]]], default None
Expand All @@ -154,15 +152,13 @@ def _iter_pandas_dataframe(
than max_row_group_size.
"""
if start_df is not None:
df = concat([start_df, df])
del start_df
if isinstance(df, list):
df = concat(df, ignore_index=True).sort_values(ordered_on, ignore_index=True)

if duplicates_on:
df.drop_duplicates(duplicates_on, keep="last", ignore_index=True, inplace=True)

start_idx = 0
print(f"len(df): {len(df)}")
while len(df) - start_idx >= max_row_group_size:
chunk, next_idx = _get_next_chunk(
df=df,
Expand All @@ -173,7 +169,6 @@ def _iter_pandas_dataframe(
)
yield chunk
start_idx = next_idx
print(f"start_idx: {start_idx}")

if start_idx < len(df):
chunk = df.iloc[start_idx:].copy(deep=True)
Expand All @@ -184,100 +179,7 @@ def _iter_pandas_dataframe(
return chunk


def _iter_resized_parquet_file(
ordered_on: str,
max_row_group_size: int,
pf: ParquetFile,
start_df: Optional[DataFrame] = None,
distinct_bounds: bool = False,
yield_remainder: bool = False,
):
"""
Yield resized row groups from ParquetFile.
Reads row groups from ParquetFile and yields chunks of data that respect
the maximum row group size. If a starter DataFrame is provided, it will be
concatenated with the first chunk. Handles distinct bounds to prevent
splitting of duplicate values.
Parameters
----------
ordered_on : str
Column name by which data is ordered. Data must be in ascending order.
max_row_group_size : int
Maximum number of rows per row group.
pf : ParquetFile
The ParquetFile to iterate over.
start_df : Optional[DataFrame], default None
Data to start the iteration with. Must be ordered by the same column.
distinct_bounds : bool, default False
If True, ensures that row group boundaries do not split duplicate rows
in the ordered_on column.
yield_remainder : bool, default False
If True, yields the last chunk of data even if it is smaller than
max_row_group_size.
Yields
------
DataFrame
Chunks of data, each with size <= max_row_group_size, except if
distinct_bounds is True and there are more duplicates in the
'ordered_on' column than max_row_group_size.
Returns
-------
Optional[DataFrame]
Remaining data if yield_remainder is False and final chunk is smaller
than max_row_group_size.
Notes
-----
The function maintains the ordering of data throughout the chunking process
and ensures that duplicate values in the ordered_on column stay together
when distinct_bounds is True.
"""
start_rg_idx = 0
if start_df is None:
buffer_num_rows = 0
else:
buffer_num_rows = len(start_df)
remainder = start_df

for rg_idx, rg in enumerate(pf.row_groups, start=1):
buffer_num_rows += rg.num_rows
if buffer_num_rows >= max_row_group_size or rg_idx == len(pf):
# Data from last row group will be loaded no matter what.
data = pf[start_rg_idx:rg_idx].to_pandas()
if remainder is not None:
data = concat([remainder, data], ignore_index=True)
del remainder
if buffer_num_rows >= max_row_group_size:
chunk, end_idx = _get_next_chunk(
df=data,
start_idx=0,
size=max_row_group_size,
distinct_bounds=distinct_bounds,
ordered_on=ordered_on,
)
yield chunk
if buffer_num_rows > end_idx:
remainder = data.iloc[end_idx:]
buffer_num_rows = len(remainder)
else:
buffer_num_rows = 0
remainder = None
start_rg_idx = rg_idx
else:
remainder = data

if yield_remainder and remainder is not None:
yield remainder
else:
return remainder


def iter_merged_pandas_parquet_file(
def iter_merged_pf_df(
df: DataFrame,
pf: ParquetFile,
ordered_on: str,
Expand Down Expand Up @@ -354,24 +256,15 @@ def iter_merged_pandas_parquet_file(
max_row_group_size=max_row_group_size,
)

# Handle data before the first overlaps.
# Handle data in 'df' before the loop over row groups in 'pf'.
remainder = None
df_idx_start = 0
if overlap_info.has_pf_head:
# Case there is parquet file data before the first overlapping row group.
remainder = yield from _iter_resized_parquet_file(
pf=pf[: overlap_info.rg_idx_overlap_start],
ordered_on=ordered_on,
max_row_group_size=max_row_group_size,
distinct_bounds=distinct_bounds,
yield_remainder=False,
)
elif overlap_info.has_df_head:
if overlap_info.has_df_head:
# Case there is sufficient data in the pandas DataFrame to start a new
# row group.
# If 'duplicates_on' is provided, duplicates have been removed already,
# so no need to remove them again.
remainder = yield from _iter_pandas_dataframe(
remainder = yield from _iter_df(
df=df.iloc[: overlap_info.df_idx_overlap_start],
ordered_on=ordered_on,
max_row_group_size=max_row_group_size,
Expand All @@ -381,78 +274,50 @@ def iter_merged_pandas_parquet_file(
)
# Correct then 'df_idx_start' to account for the dataframe head already
# yielded.
df_idx_start = overlap_info.df_idx_overlap_start

rg_idx_overlap_end_excl = overlap_info.rg_idx_overlap_end_excl
if overlap_info.has_overlap:
# Merge overlapping data.
rg_idx_start = overlap_info.rg_idx_overlap_start
buffer_num_rows = 0 if remainder is None else len(remainder)
for rg_idx, (_df_idx_start, df_idx_end) in enumerate(
zip(
overlap_info.df_idx_rg_starts[rg_idx_start:rg_idx_overlap_end_excl],
overlap_info.df_idx_rg_ends_excl[rg_idx_start:rg_idx_overlap_end_excl],
),
start=rg_idx_start,
):
n_data_rows = df_idx_end - _df_idx_start
buffer_num_rows += pf.row_groups[rg_idx].num_rows + n_data_rows
df_idx_start = overlap_info.df_idx_overlap_start if overlap_info.has_overlap else len(df)

# Merge possibly overlapping data (full loop over 'pf' row groups).
rg_idx_start = 0
buffer_num_rows = 0 if remainder is None else len(remainder)
for rg_idx_1, (_df_idx_start, df_idx_end) in enumerate(
zip(overlap_info.df_idx_rg_starts, overlap_info.df_idx_rg_ends_excl),
start=1,
):
n_data_rows = df_idx_end - _df_idx_start
buffer_num_rows += pf.row_groups[rg_idx_1 - 1].num_rows + n_data_rows
if buffer_num_rows >= max_row_group_size or rg_idx_1 == len(pf):
chunk = pf[rg_idx_start:rg_idx_1].to_pandas()
if df_idx_start != df_idx_end:
# Merge with pandas DataFrame chunk.
# DataFrame chunk is added last in concat, to preserve
# its values in case of duplicates found with values in
# ParquetFile chunk.
chunk = [remainder, chunk, df.iloc[df_idx_start:df_idx_end]]
elif remainder is not None:
chunk = [remainder, chunk]
if buffer_num_rows >= max_row_group_size:
rg_idx_1 = rg_idx + 1
chunk = (
[remainder, pf[rg_idx_start:rg_idx_1].to_pandas()]
if remainder is not None
else pf[rg_idx_start:rg_idx_1].to_pandas()
)
if df_idx_start != df_idx_end:
# Merge with pandas DataFrame chunk.
# DataFrame chunk is added last in concat, to preserve
# its values in case of duplicates found with values in
# ParquetFile chunk.
if isinstance(chunk, list):
chunk.append(df.iloc[df_idx_start:df_idx_end])
else:
chunk = [chunk, df.iloc[df_idx_start:df_idx_end]]
if isinstance(chunk, list):
chunk = concat(
chunk,
ignore_index=True,
)
chunk.sort_values(ordered_on, inplace=True, ignore_index=True)

remainder = yield from _iter_pandas_dataframe(
remainder = yield from _iter_df(
df=chunk,
ordered_on=ordered_on,
max_row_group_size=max_row_group_size,
start_df=None,
distinct_bounds=distinct_bounds,
duplicates_on=duplicates_on,
yield_remainder=False,
)

df_idx_start = df_idx_end
rg_idx_start = rg_idx_1
buffer_num_rows = 0 if remainder is None else len(remainder)
else:
remainder = chunk

# Handle data after the last overlaps.
if overlap_info.has_pf_tail:
# Case there is parquet file data after the last overlapping row group.
yield from _iter_resized_parquet_file(
pf=pf[rg_idx_overlap_end_excl:],
ordered_on=ordered_on,
max_row_group_size=max_row_group_size,
start_df=remainder,
distinct_bounds=distinct_bounds,
yield_remainder=True,
)
elif overlap_info.has_df_tail:
if overlap_info.has_df_tail:
# Case there is remaining data in pandas DataFrame.
df_idx_overlap_end_excl = overlap_info.df_idx_overlap_end_excl
yield from _iter_pandas_dataframe(
df=df.iloc[df_idx_overlap_end_excl:],
yield from _iter_df(
df=[remainder, df.iloc[df_idx_overlap_end_excl:]],
ordered_on=ordered_on,
max_row_group_size=max_row_group_size,
start_df=remainder,
distinct_bounds=distinct_bounds,
duplicates_on=None if remainder is None else duplicates_on,
yield_remainder=True,
Expand Down
Loading

0 comments on commit ad336b4

Please sign in to comment.