Skip to content

Commit

Permalink
Reverting some changes in 'iter_dataframe()'.
Browse files Browse the repository at this point in the history
  • Loading branch information
yohplala committed Nov 18, 2024
1 parent 1d2da90 commit 510b9e3
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 38 deletions.
28 changes: 10 additions & 18 deletions oups/store/iter_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def _iter_pandas_dataframe(
Column name by which data is ordered. Data must be in ascending order.
max_row_group_size : int
Maximum number of rows per row group.
df : Optional[DataFrame]
df : DataFrame
Pandas DataFrame to split.
start_df : Optional[DataFrame]
Data to start the iteration with. Must be ordered by the same column.
Expand All @@ -154,16 +154,9 @@ def _iter_pandas_dataframe(
than max_row_group_size.
"""
if df is None and start_df is None:
raise ValueError("either 'df' or 'start_df' must be provided.")

if df is not None and start_df is not None:
# Case both 'df' and 'start_df' are provided.
if start_df is not None:
df = concat([start_df, df])
del start_df
elif start_df is not None:
# Case only 'start_df' is provided.
df = start_df

if duplicates_on:
df.drop_duplicates(duplicates_on, keep="last", ignore_index=True, inplace=True)
Expand Down Expand Up @@ -276,20 +269,17 @@ def _iter_resized_parquet_file(
else:
remainder = data

print("remainder")
print(remainder)

if yield_remainder and remainder is not None:
yield remainder
else:
return remainder


def iter_merged_pandas_parquet_file(
ordered_on: str,
max_row_group_size: int,
df: DataFrame,
pf: ParquetFile,
ordered_on: str,
max_row_group_size: int,
distinct_bounds: Optional[bool] = False,
duplicates_on: Optional[Union[str, Iterable[str]]] = None,
):
Expand Down Expand Up @@ -448,17 +438,19 @@ def iter_merged_pandas_parquet_file(
distinct_bounds=distinct_bounds,
yield_remainder=True,
)
elif overlap_info.has_df_tail or remainder:
# Case there is remaining data in pandas DataFrame and/or there is a
# remainder from previous iterations.
elif overlap_info.has_df_tail:
# Case there is remaining data in pandas DataFrame.
print("is ending with yielding df only")
df_idx_overlap_end_excl = overlap_info.df_idx_overlap_end_excl
yield from _iter_pandas_dataframe(
df=df.iloc[df_idx_overlap_end_excl:] if overlap_info.has_df_tail else None,
df=df.iloc[df_idx_overlap_end_excl:],
ordered_on=ordered_on,
max_row_group_size=max_row_group_size,
start_df=remainder,
distinct_bounds=distinct_bounds,
duplicates_on=None if remainder is None else duplicates_on,
yield_remainder=True,
)
elif remainder:
# Case there only is a remainder from previous iterations.
yield remainder
29 changes: 9 additions & 20 deletions tests/test_store/test_iter_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,34 +99,25 @@ def yield_all(iterator: Iterable[DataFrame]) -> Iterable[DataFrame]:


@pytest.mark.parametrize(
"use_sample_df, start_df, duplicates_on, yield_remainder",
"start_df, duplicates_on, yield_remainder",
[
(True, None, None, True), # Basic case, no remainder
(True, DataFrame({"ordered": [0], "values": ["z"]}), None, True), # With start_df
(None, None, True), # Basic case, no remainder
(DataFrame({"ordered": [0], "values": ["z"]}), None, True), # With start_df
(
True,
DataFrame({"ordered": [-1, 0], "values": ["w", "z"]}),
"ordered",
False,
), # With start_df
(
True,
DataFrame({"ordered": [-1, 0, 1], "values": ["w", "z", "u"]}),
"ordered",
False,
), # With start_df
(True, None, "ordered", True), # With duplicates
(True, None, False, True), # Return remainder
(
False,
DataFrame({"ordered": [-1, 0], "values": ["w", "z"]}),
"ordered",
True,
), # No sample_df
(None, "ordered", True), # With duplicates
(None, False, True), # Return remainder
],
)
def test_iter_pandas_dataframe(
use_sample_df,
sample_df,
start_df,
duplicates_on,
Expand All @@ -151,19 +142,17 @@ def test_iter_pandas_dataframe(
iterator = _iter_pandas_dataframe(
ordered_on="ordered",
max_row_group_size=row_group_size,
df=sample_df if use_sample_df else None,
df=sample_df,
start_df=start_df.copy(deep=True) if start_df is not None else None,
distinct_bounds=bool(duplicates_on),
duplicates_on=duplicates_on,
yield_remainder=yield_remainder,
)

if start_df is not None and use_sample_df:
if start_df is not None:
expected = concat([start_df, sample_df], ignore_index=True)
elif use_sample_df:
expected = sample_df
else:
expected = start_df
expected = sample_df

if duplicates_on:
expected = expected.drop_duplicates(duplicates_on, keep="last", ignore_index=True)
Expand All @@ -179,7 +168,7 @@ def test_iter_pandas_dataframe(
iterator2 = _iter_pandas_dataframe(
ordered_on="ordered",
max_row_group_size=row_group_size,
df=sample_df if use_sample_df else None,
df=sample_df,
start_df=start_df.copy(deep=True) if start_df is not None else None,
distinct_bounds=bool(duplicates_on),
duplicates_on=duplicates_on,
Expand Down

0 comments on commit 510b9e3

Please sign in to comment.