From 5360c498a73e23de932057d1d60fdcf06d9440ce Mon Sep 17 00:00:00 2001 From: yohplala Date: Sun, 21 Apr 2024 21:08:36 +0200 Subject: [PATCH] Minor corrections. --- oups/aggstream/aggstream.py | 1 + .../test_aggstream/test_aggstream_advanced.py | 61 ++++++++++--------- tests/test_aggstream/test_aggstream_init.py | 28 +++++---- 3 files changed, 48 insertions(+), 42 deletions(-) diff --git a/oups/aggstream/aggstream.py b/oups/aggstream/aggstream.py index cff01fd..7244dc5 100644 --- a/oups/aggstream/aggstream.py +++ b/oups/aggstream/aggstream.py @@ -424,6 +424,7 @@ def _iter_data( for storing temporary results from one chunk processing to the next. Its initial value is that provided by `check.buffer`. + In-place modifications of seed dataframe can be carried out here. check_buffer : dict or None Buffer to keep track of intermediate data that can be required for proceeding with check of individual seed item. diff --git a/tests/test_aggstream/test_aggstream_advanced.py b/tests/test_aggstream/test_aggstream_advanced.py index 69c53dc..2ab1957 100644 --- a/tests/test_aggstream/test_aggstream_advanced.py +++ b/tests/test_aggstream/test_aggstream_advanced.py @@ -35,6 +35,7 @@ from oups import AggStream from oups import ParquetSet from oups import toplevel +from oups.aggstream.aggstream import KEY_AGG from oups.aggstream.aggstream import KEY_AGGSTREAM from oups.aggstream.aggstream import KEY_RESTART_INDEX from oups.aggstream.aggstream import SeedCheckException @@ -44,6 +45,8 @@ from oups.aggstream.jcumsegagg import LAST from oups.aggstream.jcumsegagg import MAX from oups.aggstream.jcumsegagg import MIN +from oups.aggstream.segmentby import KEY_BIN_BY +from oups.aggstream.segmentby import KEY_SNAP_BY from oups.aggstream.segmentby import by_x_rows @@ -77,16 +80,16 @@ def test_3_keys_only_bins(store, seed_path): key2 = Indexer("agg_13T") key3 = Indexer("agg_4rows") key1_cf = { - "bin_by": TimeGrouper(key=ordered_on, freq="2T", closed="left", label="left"), - "agg": {FIRST: ("val", FIRST), LAST: ("val", LAST)}, + KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="2T", closed="left", label="left"), + KEY_AGG: {FIRST: ("val", FIRST), LAST: ("val", LAST)}, } key2_cf = { - "bin_by": TimeGrouper(key=ordered_on, freq="13T", closed="left", label="left"), - "agg": {FIRST: ("val", FIRST), MAX: ("val", MAX)}, + KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="13T", closed="left", label="left"), + KEY_AGG: {FIRST: ("val", FIRST), MAX: ("val", MAX)}, } key3_cf = { - "bin_by": by_x_rows, - "agg": {MIN: ("val", MIN), MAX: ("val", MAX)}, + KEY_BIN_BY: by_x_rows, + KEY_AGG: {MIN: ("val", MIN), MAX: ("val", MAX)}, } max_row_group_size = 6 key_configs = { @@ -130,15 +133,15 @@ def get_ref_results(seed_df): key3: deepcopy(key3_cf), } k1_res = ( - seed_df.groupby(key_configs[key1]["bin_by"]) - .agg(**key_configs[key1]["agg"]) + seed_df.groupby(key_configs[key1][KEY_BIN_BY]) + .agg(**key_configs[key1][KEY_AGG]) .reset_index() ) k1_res[FIRST] = k1_res[FIRST].astype(DTYPE_NULLABLE_INT64) k1_res[LAST] = k1_res[LAST].astype(DTYPE_NULLABLE_INT64) k2_res = ( - seed_df.groupby(key_configs[key2]["bin_by"]) - .agg(**key_configs[key2]["agg"]) + seed_df.groupby(key_configs[key2][KEY_BIN_BY]) + .agg(**key_configs[key2][KEY_AGG]) .reset_index() ) key3_bins = by_x_rows(on=seed_df[ordered_on], buffer={}) @@ -146,7 +149,7 @@ def get_ref_results(seed_df): key3_bin_starts = np.arange(0, len(seed_df), 4) key3_bins.iloc[key3_bin_starts] = seed_df.iloc[key3_bin_starts].loc[:, ordered_on] key3_bins.ffill(inplace=True) - k3_res = seed_df.groupby(key3_bins).agg(**key_configs[key3]["agg"]) + k3_res = seed_df.groupby(key3_bins).agg(**key_configs[key3][KEY_AGG]) k3_res.index.name = ordered_on k3_res.reset_index(inplace=True) return k1_res, k2_res, k3_res @@ -231,8 +234,8 @@ def test_exception_different_indexes_at_restart(store, seed_path): # Setup a 1st separate streamed aggregations (awkward...). key1 = Indexer("agg_2T") key1_cf = { - "bin_by": TimeGrouper(key=ordered_on, freq="2T", closed="left", label="left"), - "agg": {FIRST: ("val", FIRST), LAST: ("val", LAST)}, + KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="2T", closed="left", label="left"), + KEY_AGG: {FIRST: ("val", FIRST), LAST: ("val", LAST)}, } max_row_group_size = 6 as1 = AggStream( @@ -259,8 +262,8 @@ def test_exception_different_indexes_at_restart(store, seed_path): # Setup a 2nd separate streamed aggregation. key2 = Indexer("agg_13T") key2_cf = { - "bin_by": TimeGrouper(key=ordered_on, freq="13T", closed="left", label="left"), - "agg": {FIRST: ("val", FIRST), MAX: ("val", MAX)}, + KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="13T", closed="left", label="left"), + KEY_AGG: {FIRST: ("val", FIRST), MAX: ("val", MAX)}, } as2 = AggStream( ordered_on=ordered_on, @@ -322,13 +325,13 @@ def check(seed_chunk, check_buffer=None): key1 = Indexer("agg_2T") key1_cf = { - "bin_by": TimeGrouper(key=ordered_on, freq="2T", closed="left", label="left"), - "agg": {FIRST: ("val", FIRST), LAST: ("val", LAST)}, + KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="2T", closed="left", label="left"), + KEY_AGG: {FIRST: ("val", FIRST), LAST: ("val", LAST)}, } key2 = Indexer("agg_60T") key2_cf = { - "bin_by": TimeGrouper(key=ordered_on, freq="60T", closed="left", label="left"), - "agg": {FIRST: ("val", FIRST), MAX: ("val", MAX)}, + KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="60T", closed="left", label="left"), + KEY_AGG: {FIRST: ("val", FIRST), MAX: ("val", MAX)}, } filter1 = "filter1" filter2 = "filter2" @@ -525,20 +528,20 @@ def test_3_keys_bins_snaps_filters(store, seed_path): max_row_group_size = 5 snap_duration = "5T" common_key_params = { - "snap_by": TimeGrouper(key=ordered_on, freq=snap_duration, closed="left", label="right"), - "agg": {FIRST: (val, FIRST), LAST: (val, LAST)}, + KEY_SNAP_BY: TimeGrouper(key=ordered_on, freq=snap_duration, closed="left", label="right"), + KEY_AGG: {FIRST: (val, FIRST), LAST: (val, LAST)}, } key1 = Indexer("agg_10T") key1_cf = { - "bin_by": TimeGrouper(key=ordered_on, freq="10T", closed="left", label="right"), + KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="10T", closed="left", label="right"), } key2 = Indexer("agg_20T") key2_cf = { - "bin_by": TimeGrouper(key=ordered_on, freq="20T", closed="left", label="right"), + KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="20T", closed="left", label="right"), } key3 = Indexer("agg_2T") key3_cf = { - "bin_by": TimeGrouper(key=ordered_on, freq="2T", closed="left", label="right"), + KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="2T", closed="left", label="right"), } filter1 = "filter1" filter2 = "filter2" @@ -967,20 +970,20 @@ def test_3_keys_bins_snaps_filters_restart(store, seed_path): max_row_group_size = 5 snap_duration = "5T" common_key_params = { - "snap_by": TimeGrouper(key=ordered_on, freq=snap_duration, closed="left", label="right"), - "agg": {FIRST: (val, FIRST), LAST: (val, LAST)}, + KEY_SNAP_BY: TimeGrouper(key=ordered_on, freq=snap_duration, closed="left", label="right"), + KEY_AGG: {FIRST: (val, FIRST), LAST: (val, LAST)}, } key1 = Indexer("agg_10T") key1_cf = { - "bin_by": TimeGrouper(key=ordered_on, freq="10T", closed="left", label="right"), + KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="10T", closed="left", label="right"), } key2 = Indexer("agg_20T") key2_cf = { - "bin_by": TimeGrouper(key=ordered_on, freq="20T", closed="left", label="right"), + KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="20T", closed="left", label="right"), } key3 = Indexer("agg_2T") key3_cf = { - "bin_by": TimeGrouper(key=ordered_on, freq="2T", closed="left", label="right"), + KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="2T", closed="left", label="right"), } filter1 = "filter1" filter2 = "filter2" diff --git a/tests/test_aggstream/test_aggstream_init.py b/tests/test_aggstream/test_aggstream_init.py index 69d4923..6882fb0 100644 --- a/tests/test_aggstream/test_aggstream_init.py +++ b/tests/test_aggstream/test_aggstream_init.py @@ -41,7 +41,9 @@ from oups.aggstream.jcumsegagg import FIRST from oups.aggstream.jcumsegagg import LAST from oups.aggstream.jcumsegagg import SUM +from oups.aggstream.segmentby import KEY_BIN_BY from oups.aggstream.segmentby import KEY_ORDERED_ON +from oups.aggstream.segmentby import KEY_SNAP_BY from oups.store.writer import KEY_DUPLICATES_ON from oups.store.writer import KEY_MAX_ROW_GROUP_SIZE @@ -81,7 +83,7 @@ def always_false(**kwargs): KEY_MAX_ROW_GROUP_SIZE: 6, KEY_ORDERED_ON: "ts", "keys": Indexer("key1"), - "bin_by": TimeGrouper(key="ts", freq="1H", closed="left", label="left"), + KEY_BIN_BY: TimeGrouper(key="ts", freq="1H", closed="left", label="left"), KEY_AGG: {"agg_out": ("val", SUM)}, }, # ref_seed_config @@ -131,22 +133,22 @@ def always_false(**kwargs): "keys": { Indexer("key1_some_default"): { KEY_AGG: {"out_spec": ("in_spec", FIRST)}, - "bin_by": TimeGrouper(key="ts_dflt", freq="1H"), + KEY_BIN_BY: TimeGrouper(key="ts_dflt", freq="1H"), KEY_POST: always_false, }, Indexer("key2_only_specific"): { KEY_AGG: {"out_spec": ("in_spec", FIRST)}, - "bin_by": always_true, + KEY_BIN_BY: always_true, KEY_POST: None, KEY_MAX_ROW_GROUP_SIZE: 3000, KEY_ORDERED_ON: "ts_spec", }, Indexer("key3_only_default"): { - "bin_by": always_false, + KEY_BIN_BY: always_false, "bin_on": ("bin_on_spec", "bin_out_spec"), }, Indexer("key4_most_default"): { - "bin_by": TimeGrouper(key="ts_dflt", freq="1H"), + KEY_BIN_BY: TimeGrouper(key="ts_dflt", freq="1H"), KEY_ORDERED_ON: "ts_spec", }, }, @@ -235,29 +237,29 @@ def always_false(**kwargs): KEY_CHECK: always_true, KEY_AGG: {"out_dflt": ("in_dflt", LAST)}, KEY_POST: always_true, - "snap_by": TimeGrouper(key="ts_dflt", freq="30T"), + KEY_SNAP_BY: TimeGrouper(key="ts_dflt", freq="30T"), "keys": { "filter1": { Indexer("key1_some_default"): { KEY_AGG: {"out_spec": ("in_spec", FIRST)}, - "bin_by": TimeGrouper(key="ts_dflt", freq="1H"), + KEY_BIN_BY: TimeGrouper(key="ts_dflt", freq="1H"), KEY_POST: always_false, }, Indexer("key2_only_specific"): { KEY_AGG: {"out_spec": ("in_spec", FIRST)}, - "bin_by": always_true, + KEY_BIN_BY: always_true, KEY_POST: None, KEY_MAX_ROW_GROUP_SIZE: 3000, KEY_ORDERED_ON: "ts_spec", }, Indexer("key3_only_default"): { - "bin_by": always_false, + KEY_BIN_BY: always_false, "bin_on": ("bin_on_spec", "bin_out_spec"), }, }, "filter2": { Indexer("key4_most_default"): { - "bin_by": TimeGrouper(key="ts_dflt", freq="1H"), + KEY_BIN_BY: TimeGrouper(key="ts_dflt", freq="1H"), KEY_ORDERED_ON: "ts_spec", }, }, @@ -357,9 +359,9 @@ def test_aggstream_init( assert as_.seed_config == ref_seed_config # Do not check 'seg_config' in 'keys_config'. res_keys_config = deepcopy(as_.keys_config) - if "snap_by" in root_parameters: + if KEY_SNAP_BY in root_parameters: # Check 'snap_by' is initialized in 'seg_config': - ref_grouper = root_parameters["snap_by"] + ref_grouper = root_parameters[KEY_SNAP_BY] ref_grouper_attr = { "key": ref_grouper.key, "freq": ref_grouper.freq, @@ -373,7 +375,7 @@ def test_aggstream_init( "origin": ref_grouper.origin, } for key in ref_keys_config: - res_grouper = res_keys_config[key]["seg_config"]["snap_by"] + res_grouper = res_keys_config[key]["seg_config"][KEY_SNAP_BY] for attr in ref_grouper_attr: assert getattr(res_grouper, attr) == ref_grouper_attr[attr] for key, ref in ref_keys_config.items():