From ad336b4968ee50983716a3d588040a748b0aaf45 Mon Sep 17 00:00:00 2001 From: yohplala Date: Fri, 22 Nov 2024 14:13:16 +0100 Subject: [PATCH] Simplified 'iter_data()'. --- oups/store/iter_data.py | 211 ++++++----------------------- tests/test_store/test_iter_data.py | 121 +++-------------- 2 files changed, 55 insertions(+), 277 deletions(-) diff --git a/oups/store/iter_data.py b/oups/store/iter_data.py index 9c5da31..dc2ba52 100644 --- a/oups/store/iter_data.py +++ b/oups/store/iter_data.py @@ -109,11 +109,10 @@ def _get_next_chunk( return df.iloc[start_idx:end_idx], end_idx -def _iter_pandas_dataframe( +def _iter_df( ordered_on: str, max_row_group_size: int, - df: Optional[DataFrame] = None, - start_df: Optional[DataFrame] = None, + df: Union[DataFrame, List[DataFrame]], distinct_bounds: bool = False, duplicates_on: Optional[Union[str, List[str]]] = None, yield_remainder: bool = False, @@ -127,10 +126,9 @@ def _iter_pandas_dataframe( Column name by which data is ordered. Data must be in ascending order. max_row_group_size : int Maximum number of rows per row group. - df : DataFrame - Pandas DataFrame to split. - start_df : Optional[DataFrame] - Data to start the iteration with. Must be ordered by the same column. + df : Union[DataFrame, List[DataFrame]] + Pandas DataFrame to split. If a list, they are merged and sorted back by + 'ordered_on' column. distinct_bounds : bool, default False If True, ensures that row group boundaries do not split duplicate rows. duplicates_on : Optional[Union[str, List[str]]], default None @@ -154,15 +152,13 @@ def _iter_pandas_dataframe( than max_row_group_size. """ - if start_df is not None: - df = concat([start_df, df]) - del start_df + if isinstance(df, list): + df = concat(df, ignore_index=True).sort_values(ordered_on, ignore_index=True) if duplicates_on: df.drop_duplicates(duplicates_on, keep="last", ignore_index=True, inplace=True) start_idx = 0 - print(f"len(df): {len(df)}") while len(df) - start_idx >= max_row_group_size: chunk, next_idx = _get_next_chunk( df=df, @@ -173,7 +169,6 @@ def _iter_pandas_dataframe( ) yield chunk start_idx = next_idx - print(f"start_idx: {start_idx}") if start_idx < len(df): chunk = df.iloc[start_idx:].copy(deep=True) @@ -184,100 +179,7 @@ def _iter_pandas_dataframe( return chunk -def _iter_resized_parquet_file( - ordered_on: str, - max_row_group_size: int, - pf: ParquetFile, - start_df: Optional[DataFrame] = None, - distinct_bounds: bool = False, - yield_remainder: bool = False, -): - """ - Yield resized row groups from ParquetFile. - - Reads row groups from ParquetFile and yields chunks of data that respect - the maximum row group size. If a starter DataFrame is provided, it will be - concatenated with the first chunk. Handles distinct bounds to prevent - splitting of duplicate values. - - Parameters - ---------- - ordered_on : str - Column name by which data is ordered. Data must be in ascending order. - max_row_group_size : int - Maximum number of rows per row group. - pf : ParquetFile - The ParquetFile to iterate over. - start_df : Optional[DataFrame], default None - Data to start the iteration with. Must be ordered by the same column. - distinct_bounds : bool, default False - If True, ensures that row group boundaries do not split duplicate rows - in the ordered_on column. - yield_remainder : bool, default False - If True, yields the last chunk of data even if it is smaller than - max_row_group_size. - - Yields - ------ - DataFrame - Chunks of data, each with size <= max_row_group_size, except if - distinct_bounds is True and there are more duplicates in the - 'ordered_on' column than max_row_group_size. - - Returns - ------- - Optional[DataFrame] - Remaining data if yield_remainder is False and final chunk is smaller - than max_row_group_size. - - Notes - ----- - The function maintains the ordering of data throughout the chunking process - and ensures that duplicate values in the ordered_on column stay together - when distinct_bounds is True. - - """ - start_rg_idx = 0 - if start_df is None: - buffer_num_rows = 0 - else: - buffer_num_rows = len(start_df) - remainder = start_df - - for rg_idx, rg in enumerate(pf.row_groups, start=1): - buffer_num_rows += rg.num_rows - if buffer_num_rows >= max_row_group_size or rg_idx == len(pf): - # Data from last row group will be loaded no matter what. - data = pf[start_rg_idx:rg_idx].to_pandas() - if remainder is not None: - data = concat([remainder, data], ignore_index=True) - del remainder - if buffer_num_rows >= max_row_group_size: - chunk, end_idx = _get_next_chunk( - df=data, - start_idx=0, - size=max_row_group_size, - distinct_bounds=distinct_bounds, - ordered_on=ordered_on, - ) - yield chunk - if buffer_num_rows > end_idx: - remainder = data.iloc[end_idx:] - buffer_num_rows = len(remainder) - else: - buffer_num_rows = 0 - remainder = None - start_rg_idx = rg_idx - else: - remainder = data - - if yield_remainder and remainder is not None: - yield remainder - else: - return remainder - - -def iter_merged_pandas_parquet_file( +def iter_merged_pf_df( df: DataFrame, pf: ParquetFile, ordered_on: str, @@ -354,24 +256,15 @@ def iter_merged_pandas_parquet_file( max_row_group_size=max_row_group_size, ) - # Handle data before the first overlaps. + # Handle data in 'df' before the loop over row groups in 'pf'. remainder = None df_idx_start = 0 - if overlap_info.has_pf_head: - # Case there is parquet file data before the first overlapping row group. - remainder = yield from _iter_resized_parquet_file( - pf=pf[: overlap_info.rg_idx_overlap_start], - ordered_on=ordered_on, - max_row_group_size=max_row_group_size, - distinct_bounds=distinct_bounds, - yield_remainder=False, - ) - elif overlap_info.has_df_head: + if overlap_info.has_df_head: # Case there is sufficient data in the pandas DataFrame to start a new # row group. # If 'duplicates_on' is provided, duplicates have been removed already, # so no need to remove them again. - remainder = yield from _iter_pandas_dataframe( + remainder = yield from _iter_df( df=df.iloc[: overlap_info.df_idx_overlap_start], ordered_on=ordered_on, max_row_group_size=max_row_group_size, @@ -381,78 +274,50 @@ def iter_merged_pandas_parquet_file( ) # Correct then 'df_idx_start' to account for the dataframe head already # yielded. - df_idx_start = overlap_info.df_idx_overlap_start - - rg_idx_overlap_end_excl = overlap_info.rg_idx_overlap_end_excl - if overlap_info.has_overlap: - # Merge overlapping data. - rg_idx_start = overlap_info.rg_idx_overlap_start - buffer_num_rows = 0 if remainder is None else len(remainder) - for rg_idx, (_df_idx_start, df_idx_end) in enumerate( - zip( - overlap_info.df_idx_rg_starts[rg_idx_start:rg_idx_overlap_end_excl], - overlap_info.df_idx_rg_ends_excl[rg_idx_start:rg_idx_overlap_end_excl], - ), - start=rg_idx_start, - ): - n_data_rows = df_idx_end - _df_idx_start - buffer_num_rows += pf.row_groups[rg_idx].num_rows + n_data_rows + df_idx_start = overlap_info.df_idx_overlap_start if overlap_info.has_overlap else len(df) + + # Merge possibly overlapping data (full loop over 'pf' row groups). + rg_idx_start = 0 + buffer_num_rows = 0 if remainder is None else len(remainder) + for rg_idx_1, (_df_idx_start, df_idx_end) in enumerate( + zip(overlap_info.df_idx_rg_starts, overlap_info.df_idx_rg_ends_excl), + start=1, + ): + n_data_rows = df_idx_end - _df_idx_start + buffer_num_rows += pf.row_groups[rg_idx_1 - 1].num_rows + n_data_rows + if buffer_num_rows >= max_row_group_size or rg_idx_1 == len(pf): + chunk = pf[rg_idx_start:rg_idx_1].to_pandas() + if df_idx_start != df_idx_end: + # Merge with pandas DataFrame chunk. + # DataFrame chunk is added last in concat, to preserve + # its values in case of duplicates found with values in + # ParquetFile chunk. + chunk = [remainder, chunk, df.iloc[df_idx_start:df_idx_end]] + elif remainder is not None: + chunk = [remainder, chunk] if buffer_num_rows >= max_row_group_size: - rg_idx_1 = rg_idx + 1 - chunk = ( - [remainder, pf[rg_idx_start:rg_idx_1].to_pandas()] - if remainder is not None - else pf[rg_idx_start:rg_idx_1].to_pandas() - ) - if df_idx_start != df_idx_end: - # Merge with pandas DataFrame chunk. - # DataFrame chunk is added last in concat, to preserve - # its values in case of duplicates found with values in - # ParquetFile chunk. - if isinstance(chunk, list): - chunk.append(df.iloc[df_idx_start:df_idx_end]) - else: - chunk = [chunk, df.iloc[df_idx_start:df_idx_end]] - if isinstance(chunk, list): - chunk = concat( - chunk, - ignore_index=True, - ) - chunk.sort_values(ordered_on, inplace=True, ignore_index=True) - - remainder = yield from _iter_pandas_dataframe( + remainder = yield from _iter_df( df=chunk, ordered_on=ordered_on, max_row_group_size=max_row_group_size, - start_df=None, distinct_bounds=distinct_bounds, duplicates_on=duplicates_on, yield_remainder=False, ) - df_idx_start = df_idx_end rg_idx_start = rg_idx_1 buffer_num_rows = 0 if remainder is None else len(remainder) + else: + remainder = chunk # Handle data after the last overlaps. - if overlap_info.has_pf_tail: - # Case there is parquet file data after the last overlapping row group. - yield from _iter_resized_parquet_file( - pf=pf[rg_idx_overlap_end_excl:], - ordered_on=ordered_on, - max_row_group_size=max_row_group_size, - start_df=remainder, - distinct_bounds=distinct_bounds, - yield_remainder=True, - ) - elif overlap_info.has_df_tail: + if overlap_info.has_df_tail: # Case there is remaining data in pandas DataFrame. df_idx_overlap_end_excl = overlap_info.df_idx_overlap_end_excl - yield from _iter_pandas_dataframe( - df=df.iloc[df_idx_overlap_end_excl:], + yield from _iter_df( + df=[remainder, df.iloc[df_idx_overlap_end_excl:]], ordered_on=ordered_on, max_row_group_size=max_row_group_size, - start_df=remainder, distinct_bounds=distinct_bounds, duplicates_on=None if remainder is None else duplicates_on, yield_remainder=True, diff --git a/tests/test_store/test_iter_data.py b/tests/test_store/test_iter_data.py index a79c7b2..6d14fcc 100644 --- a/tests/test_store/test_iter_data.py +++ b/tests/test_store/test_iter_data.py @@ -15,9 +15,8 @@ from pandas.testing import assert_frame_equal from oups.store.iter_data import _get_next_chunk -from oups.store.iter_data import _iter_pandas_dataframe -from oups.store.iter_data import _iter_resized_parquet_file -from oups.store.iter_data import iter_merged_pandas_parquet_file +from oups.store.iter_data import _iter_df +from oups.store.iter_data import iter_merged_pf_df from tests.test_store.conftest import create_parquet_file @@ -117,14 +116,14 @@ def yield_all(iterator: Iterable[DataFrame]) -> Iterable[DataFrame]: (None, False, True), # Return remainder ], ) -def test_iter_pandas_dataframe( +def test_iter_df( sample_df, start_df, duplicates_on, yield_remainder, ): """ - Test _iter_pandas_dataframe with various configurations. + Test _iter_df with various configurations. Parameters ---------- @@ -139,11 +138,10 @@ def test_iter_pandas_dataframe( """ row_group_size = 3 - iterator = _iter_pandas_dataframe( + iterator = _iter_df( ordered_on="ordered", max_row_group_size=row_group_size, - df=sample_df, - start_df=start_df.copy(deep=True) if start_df is not None else None, + df=[start_df, sample_df] if start_df is not None else sample_df, distinct_bounds=bool(duplicates_on), duplicates_on=duplicates_on, yield_remainder=yield_remainder, @@ -165,11 +163,10 @@ def test_iter_pandas_dataframe( else: all_chunks = list(yield_all(iterator)) # Do a 2nd time to check only yielded chunks. - iterator2 = _iter_pandas_dataframe( + iterator2 = _iter_df( ordered_on="ordered", max_row_group_size=row_group_size, - df=sample_df, - start_df=start_df.copy(deep=True) if start_df is not None else None, + df=[start_df, sample_df] if start_df is not None else sample_df, distinct_bounds=bool(duplicates_on), duplicates_on=duplicates_on, yield_remainder=yield_remainder, @@ -194,87 +191,6 @@ def test_iter_pandas_dataframe( assert_frame_equal(result, expected_without_remainder) -@pytest.mark.parametrize( - "start_df,yield_remainder", - [ - (None, True), # Basic case - (DataFrame({"ordered": [0], "values": ["z"]}), True), # With start_df - (None, False), # Return remainder - (DataFrame({"ordered": [0], "values": ["z"]}), False), # With start_df - ], -) -def test_iter_resized_parquet_file( - sample_df, - create_parquet_file, - start_df, - yield_remainder, -): - """ - Test _iter_resized_parquet_file with various configurations. - - Parameters - ---------- - sample_df : DataFrame - Test DataFrame fixture. - create_parquet_file : callable - Fixture to create temporary parquet files. - start_df : DataFrame or None - Optional starter DataFrame. - yield_remainder : bool - Whether to yield the last chunk. - - """ - sample_pf = create_parquet_file(sample_df, row_group_offsets=3) - row_group_size = 4 - expected = ( - concat([start_df, sample_df], ignore_index=True) if start_df is not None else sample_df - ) - has_remainder = len(expected) % row_group_size > 0 - - iterator = _iter_resized_parquet_file( - ordered_on="ordered", - max_row_group_size=row_group_size, - pf=sample_pf, - start_df=start_df, - distinct_bounds=False, - yield_remainder=yield_remainder, - ) - - # Collect all chunks - if yield_remainder: - all_chunks = list(iterator) - yielded_chunks = all_chunks - else: - all_chunks = list(yield_all(iterator)) - # Do a 2nd time to check only yielded chunks. - iterator2 = _iter_resized_parquet_file( - ordered_on="ordered", - max_row_group_size=row_group_size, - pf=sample_pf, - start_df=start_df.copy(deep=True) if start_df is not None else None, - distinct_bounds=False, - yield_remainder=yield_remainder, - ) - yielded_chunks = list(iterator2) - - # Verify chunk sizes - complete_chunks = all_chunks[:-1] if has_remainder else all_chunks - for chunk in complete_chunks: - assert len(chunk) == row_group_size - - # Verify yielded data - result = concat(yielded_chunks, ignore_index=True) - - if yield_remainder: - assert_frame_equal(result, expected) - else: - # When not yielding last chunk, expected data should exclude remainder - expected_without_remainder = expected.iloc[ - : (len(expected) // row_group_size) * row_group_size - ] - assert_frame_equal(result, expected_without_remainder) - - @pytest.mark.parametrize( "df_data,pf_data,expected_chunks,max_row_group_size,distinct_bounds,duplicates_on", [ @@ -390,7 +306,7 @@ def test_iter_resized_parquet_file( ), ], ) -def test_iter_merged_pandas_parquet_file( +def test_iter_merged_pf_df( df_data, pf_data, expected_chunks, @@ -400,7 +316,7 @@ def test_iter_merged_pandas_parquet_file( create_parquet_file, ): """ - Test iter_merged_pandas_parquet_file with various overlap scenarios. + Test iter_merged_pf_df with various overlap scenarios. Parameters ---------- @@ -420,7 +336,7 @@ def test_iter_merged_pandas_parquet_file( pf = create_parquet_file(DataFrame(pf_data), row_group_offsets=max_row_group_size) chunks = list( - iter_merged_pandas_parquet_file( + iter_merged_pf_df( ordered_on="ordered", df=df, pf=pf, @@ -435,9 +351,6 @@ def test_iter_merged_pandas_parquet_file( assert_frame_equal(chunk.reset_index(drop=True), expected.reset_index(drop=True)) -" Add test case with values between df and pf the same / overlaps in bounds" - - @pytest.mark.parametrize( "duplicates_on,distinct_bounds,error_expected", [ @@ -447,14 +360,14 @@ def test_iter_merged_pandas_parquet_file( ("invalid_col", True, True), # Invalid column name ], ) -def test_iter_merged_pandas_parquet_file_exceptions( +def test_iter_merged_pf_df_exceptions( duplicates_on, distinct_bounds, error_expected, create_parquet_file, ): """ - Test validation in iter_merged_pandas_parquet_file. + Test validation in iter_merged_pf_df. Parameters ---------- @@ -477,7 +390,7 @@ def test_iter_merged_pandas_parquet_file_exceptions( if error_expected: with pytest.raises(ValueError): list( - iter_merged_pandas_parquet_file( + iter_merged_pf_df( df, pf, ordered_on="ordered", @@ -488,7 +401,7 @@ def test_iter_merged_pandas_parquet_file_exceptions( ) else: chunks = list( - iter_merged_pandas_parquet_file( + iter_merged_pf_df( df, pf, ordered_on="ordered", @@ -500,7 +413,7 @@ def test_iter_merged_pandas_parquet_file_exceptions( assert len(chunks) > 0 -def test_iter_merged_pandas_parquet_file_empty_df(create_parquet_file): +def test_iter_merged_pf_df_empty_df(create_parquet_file): """ Test handling of empty DataFrame input. """ @@ -511,7 +424,7 @@ def test_iter_merged_pandas_parquet_file_empty_df(create_parquet_file): ) chunks = list( - iter_merged_pandas_parquet_file( + iter_merged_pf_df( df, pf, ordered_on="ordered",