From 727e7e165cd50427a325f90f7a0848274724642f Mon Sep 17 00:00:00 2001 From: yohplala Date: Thu, 25 Jul 2024 21:45:02 +0200 Subject: [PATCH 1/3] Adjusting the code so that metadata can be written without actual data in 'aggstream()'. --- oups/aggstream/aggstream.py | 147 ++++++++++-------- oups/aggstream/cumsegagg.py | 1 + oups/store/writer.py | 16 +- tests/test_aggstream/test_aggstream_simple.py | 104 +++++++++++-- 4 files changed, 188 insertions(+), 80 deletions(-) diff --git a/oups/aggstream/aggstream.py b/oups/aggstream/aggstream.py index 40e8dc9..a289618 100644 --- a/oups/aggstream/aggstream.py +++ b/oups/aggstream/aggstream.py @@ -15,7 +15,7 @@ from joblib import Parallel from joblib import delayed from numpy import ones -from pandas import DataFrame as pDataFrame +from pandas import DataFrame from pandas import DatetimeIndex from pandas import Series from pandas import Timestamp as pTimestamp @@ -342,11 +342,11 @@ def _reset_agg_buffers(agg_buffers: Optional[dict] = None) -> Optional[dict]: - n_rows : int, number of rows in main aggregation results (snapshots is snapshots are quested, or bins otherwise). It is reset here after writing. - - agg_res : pDataFrame, last aggregation results, to reset to None + - agg_res : DataFrame, last aggregation results, to reset to None after writing. - - bin_res : pDataFrame, last aggregation results, to reset to None + - bin_res : DataFrame, last aggregation results, to reset to None after writing. - - agg_res_buffer : List[pDataFrame], list of chunks resulting from + - agg_res_buffer : List[DataFrame], list of chunks resulting from aggregation (pandas DataFrame), either from bins if only bins requested, or from snapshots if bins and snapshots requested. - bin_res_buffer : List[pandas.DataFrame], list of bins resulting from @@ -396,7 +396,7 @@ def __init__(self, message: str = None): def _iter_data( - seed: Iterable[pDataFrame], + seed: Iterable[DataFrame], ordered_on: str, restart_index: Union[int, float, pTimestamp, None], pre: Union[Callable, None], @@ -404,7 +404,7 @@ def _iter_data( filters: Union[dict, None], trim_start: bool, discard_last: bool, -) -> Tuple[str, pDataFrame]: +) -> Tuple[str, DataFrame]: """ Iterate provided seed, applying sequentially (optionally) filters. @@ -413,7 +413,7 @@ def _iter_data( Parameters ---------- - seed : Iterable[pDataFrame] + seed : Iterable[DataFrame] Iterable of pandas Dataframe. ordered_on : str Name of column with respect to which seed data is in ascending @@ -465,7 +465,7 @@ def _iter_data( chunk. - 'filter_id', str, indicating which set of filters has been applied for the seed chunk provided. - - 'filtered_chunk', pDataFrame, from the seed Iterable, with + - 'filtered_chunk', DataFrame, from the seed Iterable, with optionally filters applied. Notes @@ -553,8 +553,8 @@ def _iter_data( def _concat_agg_res( - agg_res_buffers: List[pDataFrame], - agg_res: pDataFrame, + agg_res_buffers: List[DataFrame], + agg_res: DataFrame, append_last_res: bool, index_name: str, ): @@ -563,9 +563,9 @@ def _concat_agg_res( Parameters ---------- - agg_res_buffers : List[pDataFrame] + agg_res_buffers : List[DataFrame] List of aggregation results to concatenate. - agg_res : pDataFrame + agg_res : DataFrame Last aggregation results (all rows from last iteration). append_last_res : bool If 'agg_res' should be appended to 'agg_res_buffer' and if 'bin_res' @@ -576,7 +576,7 @@ def _concat_agg_res( Returns ------- - pDataFrame + DataFrame List of aggregation results concatenated in a single DataFrame. """ @@ -617,11 +617,11 @@ def _post_n_write_agg_chunks( - n_rows : int, number of rows in main aggregation results (snapshots is snapshots are quested, or bins otherwise). It is reset here after writing. - - agg_res : pDataFrame, last aggregation results, to reset to None + - agg_res : DataFrame, last aggregation results, to reset to None after writing. - - bin_res : pDataFrame, last aggregation results, to reset to None + - bin_res : DataFrame, last aggregation results, to reset to None after writing. - - agg_res_buffer : List[pDataFrame], list of chunks resulting from + - agg_res_buffer : List[DataFrame], list of chunks resulting from aggregation (pandas DataFrame), either from bins if only bins requested, or from snapshots if bins and snapshots requested. It contains 'agg_res' (last aggregation results),but without last @@ -685,41 +685,49 @@ def _post_n_write_agg_chunks( proceeding with preprocessing of individual seed chunk. """ - if (agg_res := agg_buffers[KEY_AGG_RES]) is None: - # No iteration has been achieved, as no data. - if last_seed_index: - # If 'last_seed_index', at least set it in oups metadata. - # It is possible new seed data has been streamed and taken into - # account, but used for this key, because having been filtered out. - # Also set 'pre_buffer' if not null. - OUPS_METADATA[key] = { - KEY_AGGSTREAM: {KEY_RESTART_INDEX: last_seed_index, KEY_PRE_BUFFER: pre_buffer}, - } - write_metadata(pf=store[key].pf, md_key=key) - return - # Concat list of aggregation results. - agg_res = _concat_agg_res(agg_buffers[KEY_AGG_RES_BUFFER], agg_res, append_last_res, index_name) - # Same if needed with 'bin_res_buffer' - bin_res = agg_buffers[KEY_BIN_RES] - if bin_res is not None: - bin_res = _concat_agg_res( - agg_buffers[KEY_BIN_RES_BUFFER], - bin_res, + post_buffer = agg_buffers[KEY_POST_BUFFER] + # When there is no result, 'agg_res' is None. + if isinstance((agg_res := agg_buffers[KEY_AGG_RES]), DataFrame): + # To keep track there has been res in the 1st place. + not_null_res = True + # Concat list of aggregation results. + agg_res = _concat_agg_res( + agg_buffers[KEY_AGG_RES_BUFFER], + agg_res, append_last_res, index_name, ) - post_buffer = agg_buffers[KEY_POST_BUFFER] - if post: - # Post processing if any. - # 'post_buffer' has to be modified in-place. - agg_res = ( - post(buffer=post_buffer, bin_res=agg_res) - if bin_res is None - else post(buffer=post_buffer, bin_res=bin_res, snap_res=agg_res) - ) + # Same if needed with 'bin_res_buffer' + bin_res = agg_buffers[KEY_BIN_RES] + if bin_res is not None: + bin_res = _concat_agg_res( + agg_buffers[KEY_BIN_RES_BUFFER], + bin_res, + append_last_res, + index_name, + ) + if post: + # Post processing if any. + # 'post_buffer' has to be modified in-place. + # It is possible 'agg_res' is None, if 'post' needs a minimal + # number of rows before outputting results (warm-up). + agg_res = ( + post(buffer=post_buffer, bin_res=agg_res) + if bin_res is None + else post(buffer=post_buffer, bin_res=bin_res, snap_res=agg_res) + ) + else: + not_null_res = False if last_seed_index: # If 'last_seed_index', set oups metadata. - # Also set 'pre_buffer' if not null. + # It is possible there is no result yet to write for different reasons: + # - new seed data has been streamed and needs to be taken into account, + # but there is no result for this key, because all related seed data + # has been filtered out. + # - or maybe 'post' has a wamr up period and has not released results + # yet. + # But 'last_seed_index' has to be recorded, and so do possibly + # 'pre_buffer', 'segagg_buffer' and 'post_buffer'. OUPS_METADATA[key] = { KEY_AGGSTREAM: { KEY_RESTART_INDEX: last_seed_index, @@ -728,14 +736,29 @@ def _post_n_write_agg_chunks( KEY_POST_BUFFER: post_buffer, }, } - # Record data. - store[key] = write_config, agg_res - # Reset aggregation buffers and counters. - _reset_agg_buffers(agg_buffers) + # When there is no result, 'agg_res' is None. + if isinstance(agg_res, DataFrame): + # Record data (with metadata possibly updated). + store[key] = write_config, agg_res + elif last_seed_index: + # If no result, metadata is possibly to be written, as this is the + # flag indicating the last 'aggstream' local iteration. + try: + write_metadata(pf=store[key].pf, md_key=key) + except FileNotFoundError: + # In case no Parquet file exist yet, need to initiate one to start + # storing metadata. + store[key] = DataFrame() + if not_null_res: + # If there have been results, they have been processed (either written + # directly or through 'post()'). Time to reset aggregation buffers and + # counters. + _reset_agg_buffers(agg_buffers) + return def agg_iter( - seed_chunk: pDataFrame, + seed_chunk: DataFrame, store: ParquetSet, key: dataclass, keys_config: dict, @@ -747,7 +770,7 @@ def agg_iter( Parameters ---------- - seed_chunk : pDataFrame + seed_chunk : DataFrame Chunk of seed data. store : ParquetSet ParquetSet to which recording aggregation results. @@ -776,7 +799,7 @@ def agg_iter( agg_res_buffer = agg_buffers[KEY_AGG_RES_BUFFER] if agg_res_len > 1: # Add 'agg_res' to 'agg_res_buffer' ignoring last row. - # It is incimplete, so useless to write it to results while + # It is incomplete, so useless to write it to results while # aggregation iterations are on-going. agg_res_buffer.append(agg_res.iloc[:-1]) # Remove last row that is not recorded from total number of rows. @@ -863,14 +886,14 @@ class AggStream: intermediate results. ``{key: {'agg_n_rows' : int, number of rows in aggregation results, for bins (if snapshots not requested) or snapshots. - 'agg_res' : None or pDataFrame, last aggregation results, + 'agg_res' : None or DataFrame, last aggregation results, for bins (if snapshots not requested) or snapshots, - 'bin_res' : None or pDataFrame, last aggregation results, + 'bin_res' : None or DataFrame, last aggregation results, for bins (if snapshots requested), - 'agg_res_buffer' : list of pDataFrame, buffer to keep + 'agg_res_buffer' : list of DataFrame, buffer to keep aggregagation results, bins (if snapshots not requested) or snapshots, - 'bin_res_buffer' : list of pDataFrame, buffer to keep bin + 'bin_res_buffer' : list of DataFrame, buffer to keep bin aggregagation results (if snapshots requested) 'segagg_buffer' : dict, possibly empty, keeping track of segmentation and aggregation intermediate @@ -1238,7 +1261,7 @@ def __init__( # Store attribute. self.store = store - def _init_agg_cs(self, seed: Iterable[pDataFrame]): + def _init_agg_cs(self, seed: Iterable[DataFrame]): """ Initialize ``self.agg_cs``. @@ -1248,7 +1271,7 @@ def _init_agg_cs(self, seed: Iterable[pDataFrame]): Parameters ---------- - seed : Iterable[pDataFrame] + seed : Iterable[DataFrame] Seed data, from which getting pandas DataFrame dtypes. Returns @@ -1271,7 +1294,7 @@ def _init_agg_cs(self, seed: Iterable[pDataFrame]): def agg( self, - seed: Union[pDataFrame, Iterable[pDataFrame]] = None, + seed: Union[DataFrame, Iterable[DataFrame]] = None, trim_start: Optional[bool] = True, discard_last: Optional[bool] = True, final_write: Optional[bool] = True, @@ -1285,7 +1308,7 @@ def agg( Parameters ---------- - seed : Union[pDataFrame, Iterable[pDataFrame]] + seed : Union[DataFrame, Iterable[DataFrame]] Seed data over which conducting streamed aggregations. trim_start : bool, default True If ``True``, and if aggregated results already exist, then @@ -1352,7 +1375,7 @@ def agg( # snapshots results will be easily merged. # TODO: change default settings: # discard_last = trim_start = final_write = False - if isinstance(seed, pDataFrame): + if isinstance(seed, DataFrame): # Make the seed an iterable. seed = [seed] # Seed can be an empty list or None. diff --git a/oups/aggstream/cumsegagg.py b/oups/aggstream/cumsegagg.py index 93b9743..f6150b5 100644 --- a/oups/aggstream/cumsegagg.py +++ b/oups/aggstream/cumsegagg.py @@ -565,6 +565,7 @@ def cumsegagg( "at least one null value exists in 'snap_res' which is likely to hint a bug.", ) if bin_res.eq(0).any().any(): + print(bin_res) raise ValueError( "at least one null value exists in 'bin_res' which is likely to hint a bug.", ) diff --git a/oups/store/writer.py b/oups/store/writer.py index 834b08f..6871185 100644 --- a/oups/store/writer.py +++ b/oups/store/writer.py @@ -117,12 +117,16 @@ def iter_dataframe( # Case 'duplicates_on' is a single column name, but not # 'sharp_on'. duplicates_on = [duplicates_on, sharp_on] - # Define bins to split into row groups. - # Acknowledging this piece of code to be an extract from fastparquet. n_rows = len(data) - n_parts = (n_rows - 1) // max_row_group_size + 1 - row_group_size = min((n_rows - 1) // n_parts + 1, n_rows) - starts = list(range(0, n_rows, row_group_size)) + if n_rows: + # Define bins to split into row groups. + # Acknowledging this piece of code to be an extract from fastparquet. + n_parts = (n_rows - 1) // max_row_group_size + 1 + row_group_size = min((n_rows - 1) // n_parts + 1, n_rows) + starts = list(range(0, n_rows, row_group_size)) + else: + # If n_rows=0 + starts = [0] if sharp_on: # Adjust bins so that they do not end in the middle of duplicate values # in `sharp_on` column. @@ -424,7 +428,7 @@ def write( all_cols = data.get_column_names() if ordered_on not in all_cols: raise ValueError(f"column '{ordered_on}' does not exist in input data.") - if os_path.isdir(dirpath) and os_listdir(dirpath): + if os_path.isdir(dirpath) and any(file.endswith(".parquet") for file in os_listdir(dirpath)): # Case updating an existing dataset. # Identify overlaps in row groups between new data and recorded data. # Recorded row group start and end indexes. diff --git a/tests/test_aggstream/test_aggstream_simple.py b/tests/test_aggstream/test_aggstream_simple.py index b91223e..ff170d8 100644 --- a/tests/test_aggstream/test_aggstream_simple.py +++ b/tests/test_aggstream/test_aggstream_simple.py @@ -28,7 +28,8 @@ from pandas import Series as pSeries from pandas import Timedelta from pandas import Timestamp -from pandas import concat as pconcat +from pandas import concat +from pandas import date_range from pandas.core.resample import TimeGrouper from oups import AggStream @@ -299,7 +300,7 @@ def test_time_grouper_first_last_min_max_agg(store, seed_path): discard_last=True, ) # Test results - ref_res = pconcat([seed_df, seed_df2]).iloc[:-1].groupby(bin_by).agg(**agg).reset_index() + ref_res = concat([seed_df, seed_df2]).iloc[:-1].groupby(bin_by).agg(**agg).reset_index() ref_res[[FIRST, LAST, MIN, MAX]] = ref_res[[FIRST, LAST, MIN, MAX]].astype(DTYPE_NULLABLE_INT64) rec_res = store[key].pdf assert rec_res.equals(ref_res) @@ -322,7 +323,7 @@ def test_time_grouper_first_last_min_max_agg(store, seed_path): ) # Test results ref_res = ( - pconcat([seed_df, seed_df2, seed_df3]).iloc[:-1].groupby(bin_by).agg(**agg).reset_index() + concat([seed_df, seed_df2, seed_df3]).iloc[:-1].groupby(bin_by).agg(**agg).reset_index() ) ref_res[[FIRST, LAST, MIN, MAX]] = ref_res[[FIRST, LAST, MIN, MAX]].astype(DTYPE_NULLABLE_INT64) rec_res = store[key] @@ -484,7 +485,7 @@ def post(buffer: dict, bin_res: DataFrame): discard_last=True, ) # Test results - ref_res_agg = pconcat([seed_df, seed_df2]).iloc[:-1].groupby(bin_by).agg(**agg).reset_index() + ref_res_agg = concat([seed_df, seed_df2]).iloc[:-1].groupby(bin_by).agg(**agg).reset_index() ref_res_post = post({}, ref_res_agg) rec_res = store[key].pdf assert rec_res.equals(ref_res_post) @@ -559,7 +560,7 @@ def test_seed_time_grouper_bin_on_as_tuple(store, seed_path): discard_last=True, ) # Test results - ref_res = pconcat([seed_pdf, seed_pdf2]).iloc[:-1].groupby(bin_by).agg(**agg) + ref_res = concat([seed_pdf, seed_pdf2]).iloc[:-1].groupby(bin_by).agg(**agg) ref_res.index.name = ts_open ref_res.reset_index(inplace=True) rec_res = store[key].pdf @@ -688,7 +689,7 @@ def test_by_callable_wo_bin_on(store, seed_path): seed_pdf2 = DataFrame({ordered_on: ts2, "val": range(1, len(ts2) + 1)}) # Forcing dtype of 'seed_pdf' to float. seed_pdf2["val"] = seed_pdf2["val"].astype("float64") - seed_pdf2 = pconcat([seed_pdf, seed_pdf2], ignore_index=True) + seed_pdf2 = concat([seed_pdf, seed_pdf2], ignore_index=True) fp_write(seed_path, seed_pdf2, row_group_offsets=13, file_scheme="hive") seed = ParquetFile(seed_path).iter_row_groups() # Streamed aggregation. @@ -787,7 +788,7 @@ def by_1val(on: DataFrame, buffer: dict): else: # 1st bin is one that was started before. ncs = np.append(ncs, len(on)) - group_keys = pconcat([pSeries([first_lab]), group_keys]) + group_keys = concat([pSeries([first_lab]), group_keys]) buffer["last_key"] = group_keys.iloc[-1] return ncs, group_keys, 0, "left", ncs, True @@ -913,7 +914,7 @@ def agg_with_same_bin_labels(seed_pdf): ) val = np.arange(1, len(ts2) + 1) val[3] = 1 - seed_pdf = pconcat([seed_pdf, DataFrame({ordered_on: ts2, bin_on: val})], ignore_index=True) + seed_pdf = concat([seed_pdf, DataFrame({ordered_on: ts2, bin_on: val})], ignore_index=True) fp_write(seed_path, seed_pdf, row_group_offsets=13, file_scheme="hive") seed = ParquetFile(seed_path).iter_row_groups() # Setup streamed aggregation. @@ -981,7 +982,7 @@ def test_time_grouper_trim_start(store, seed_path): discard_last=True, ) # Test results. - seed_pdf_ref = pconcat([seed_pdf.iloc[:-1], seed_pdf2]) + seed_pdf_ref = concat([seed_pdf.iloc[:-1], seed_pdf2]) ref_res = seed_pdf_ref.iloc[:-1].groupby(bin_by).agg(**agg).reset_index() rec_res = store[key].pdf assert rec_res.equals(ref_res) @@ -1020,7 +1021,7 @@ def test_time_grouper_agg_first(store): # 1st append, starting a new bin. ts2 = DatetimeIndex([date + "10:20", date + "10:40", date + "11:00", date + "11:30"]) seed2 = DataFrame({ordered_on: ts2, "val": range(1, len(ts2) + 1)}) - seed2 = pconcat([seed, seed2]) + seed2 = concat([seed, seed2]) # Streamed aggregation. as_.agg( seed=seed2, @@ -1290,7 +1291,7 @@ def test_bin_on_col_sum_agg(store): ], ) seed2 = DataFrame({ordered_on: ts, "val": [1, 2]}) - seed = pconcat([seed, seed2]) + seed = concat([seed, seed2]) # Setup streamed aggregation. as_.agg( seed=seed, @@ -1345,7 +1346,7 @@ def test_time_grouper_agg_first_filters_and_no_filter(store): # 1st append, starting a new bin. ts2 = DatetimeIndex([date + "10:20", date + "10:40", date + "11:00", date + "11:30"]) seed2 = DataFrame({ordered_on: ts2, "val": range(1, len(ts2) + 1)}) - seed2 = pconcat([seed, seed2]) + seed2 = concat([seed, seed2]) # Streamed aggregation. as_.agg( seed=seed2, @@ -1493,3 +1494,82 @@ def test_exception_unordered_seed(store, seed_path): assert streamagg_md[KEY_RESTART_INDEX] == ts[ref_idx - 1] assert not streamagg_md[KEY_PRE_BUFFER] assert not streamagg_md[KEY_POST_BUFFER] + + +def test_post_with_warm_up(store): + # Test a 'post' with a warm-up period and check 'post_buffer' is correctly + # recorded even if 'post' does not output result yet. + # No binning so to say: keeping each value in 'val'. + # + # Setup aggregation. + agg_on = "val" + ordered_on = "ts" + ts_period = "2min" + + def post(buffer: dict, bin_res: DataFrame): + """ + Rolling sum of last ten values. + + Warm-up period is then 10 rows. + + """ + print("post is running") + if buffer: + prev_bin = buffer["prev_bin"] + print("buffer with data / showing bin_res") + print(bin_res) + last_idx = ( + -1 + if bin_res.loc[:, ordered_on].iloc[0] != prev_bin.loc[:, ordered_on].iloc[-1] + else -2 + ) + bin_res = concat([prev_bin.iloc[:last_idx], bin_res], ignore_index=True) + # Keep in buffer last 10 rows. + buffer["prev_bin"] = bin_res[-10:].reset_index(drop=True) + if len(bin_res) >= 10: + return DataFrame( + { + agg_on: bin_res[agg_on].rolling(10).sum().dropna().reset_index(drop=True), + ordered_on: bin_res[ordered_on].iloc[9:].reset_index(drop=True), + }, + ) + + max_row_group_size = 10 + agg = {agg_on: (agg_on, FIRST)} + bin_by = TimeGrouper(key=ordered_on, freq=ts_period, closed="left", label="left") + as_ = AggStream( + ordered_on=ordered_on, + agg=agg, + store=store, + keys={ + key: { + "ordered_on": ordered_on, + "bin_by": bin_by, + }, + }, + max_row_group_size=max_row_group_size, + post=post, + ) + # Setup seed data. + n_values = 20 + ts = date_range("2020/01/01 08:00", freq=ts_period, periods=n_values) + seed = DataFrame({ordered_on: ts, agg_on: range(1, n_values + 1)}) + # 1st chunk of data, not reaching the required number of warm-up rows. + as_.agg( + seed=seed.iloc[:5], + discard_last=False, + final_write=True, + ) + # Check 'post_buffer'. + post_buffer = store[key]._oups_metadata[KEY_AGGSTREAM][KEY_POST_BUFFER] + assert post_buffer["prev_bin"].equals(seed.iloc[:5]) + # 2nd chunk of data, starting to output actual data. + as_.agg( + seed=[seed.iloc[5:8], seed.iloc[8:14]], + discard_last=False, + final_write=True, + ) + # Check /!\ not working /!\ rec_res != ref_res + ref_res = post({}, seed.iloc[:14]) + rec_res = store[key].pdf + assert rec_res.equals(ref_res) From 888bbefc807fea842f022a53e4ded15a20131cfe Mon Sep 17 00:00:00 2001 From: yohplala Date: Thu, 25 Jul 2024 21:48:53 +0200 Subject: [PATCH 2/3] Forgotten 'print'. --- oups/aggstream/cumsegagg.py | 1 - 1 file changed, 1 deletion(-) diff --git a/oups/aggstream/cumsegagg.py b/oups/aggstream/cumsegagg.py index f6150b5..93b9743 100644 --- a/oups/aggstream/cumsegagg.py +++ b/oups/aggstream/cumsegagg.py @@ -565,7 +565,6 @@ def cumsegagg( "at least one null value exists in 'snap_res' which is likely to hint a bug.", ) if bin_res.eq(0).any().any(): - print(bin_res) raise ValueError( "at least one null value exists in 'bin_res' which is likely to hint a bug.", ) From b18b33b18636406af18de3a08e6eb964000dfad7 Mon Sep 17 00:00:00 2001 From: yohplala Date: Fri, 26 Jul 2024 08:23:38 +0200 Subject: [PATCH 3/3] Test completed with a 'post()' having a warm-up period (not producing results yet, but producing a 'post_buffer' that needs to be recorded. --- tests/test_aggstream/test_aggstream_simple.py | 41 +++++++++++++++---- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/tests/test_aggstream/test_aggstream_simple.py b/tests/test_aggstream/test_aggstream_simple.py index ff170d8..fc81878 100644 --- a/tests/test_aggstream/test_aggstream_simple.py +++ b/tests/test_aggstream/test_aggstream_simple.py @@ -1513,15 +1513,12 @@ def post(buffer: dict, bin_res: DataFrame): Warm-up period is then 10 rows. """ - print("post is running") if buffer: prev_bin = buffer["prev_bin"] - print("buffer with data / showing bin_res") - print(bin_res) last_idx = ( -1 - if bin_res.loc[:, ordered_on].iloc[0] != prev_bin.loc[:, ordered_on].iloc[-1] - else -2 + if bin_res.loc[:, ordered_on].iloc[0] == prev_bin.loc[:, ordered_on].iloc[-1] + else len(prev_bin) ) bin_res = concat([prev_bin.iloc[:last_idx], bin_res], ignore_index=True) # Keep in buffer last 10 rows. @@ -1537,7 +1534,7 @@ def post(buffer: dict, bin_res: DataFrame): max_row_group_size = 10 agg = {agg_on: (agg_on, FIRST)} bin_by = TimeGrouper(key=ordered_on, freq=ts_period, closed="left", label="left") - as_ = AggStream( + as_1 = AggStream( ordered_on=ordered_on, agg=agg, store=store, @@ -1555,7 +1552,7 @@ def post(buffer: dict, bin_res: DataFrame): ts = date_range("2020/01/01 08:00", freq=ts_period, periods=n_values) seed = DataFrame({ordered_on: ts, agg_on: range(1, n_values + 1)}) # 1st chunk of data, not reaching the required number of warm-up rows. - as_.agg( + as_1.agg( seed=seed.iloc[:5], discard_last=False, final_write=True, @@ -1564,12 +1561,38 @@ def post(buffer: dict, bin_res: DataFrame): post_buffer = store[key]._oups_metadata[KEY_AGGSTREAM][KEY_POST_BUFFER] assert post_buffer["prev_bin"].equals(seed.iloc[:5]) # 2nd chunk of data, starting to output actual data. - as_.agg( + as_1.agg( seed=[seed.iloc[5:8], seed.iloc[8:14]], discard_last=False, final_write=True, ) - # Check /!\ not working /!\ rec_res != ref_res + # Check. ref_res = post({}, seed.iloc[:14]) rec_res = store[key].pdf assert rec_res.equals(ref_res) + post_buffer = store[key]._oups_metadata[KEY_AGGSTREAM][KEY_POST_BUFFER] + assert post_buffer["prev_bin"].equals(seed.iloc[4:14].reset_index(drop=True)) + # 3rd chunk, cold start. + as_2 = AggStream( + ordered_on=ordered_on, + agg=agg, + store=store, + keys={ + key: { + "ordered_on": ordered_on, + "bin_by": bin_by, + }, + }, + max_row_group_size=max_row_group_size, + post=post, + ) + as_2.agg( + seed=seed.iloc[14:], + discard_last=False, + final_write=True, + ) + ref_res = post({}, seed) + rec_res = store[key].pdf + assert rec_res.equals(ref_res) + post_buffer = store[key]._oups_metadata[KEY_AGGSTREAM][KEY_POST_BUFFER] + assert post_buffer["prev_bin"].equals(seed.iloc[10:].reset_index(drop=True))