diff --git a/oups/aggstream/aggstream.py b/oups/aggstream/aggstream.py index 0b94764..d698c6d 100644 --- a/oups/aggstream/aggstream.py +++ b/oups/aggstream/aggstream.py @@ -56,8 +56,8 @@ KEY_BIN_ON_OUT = "bin_on_out" KEY_AGG_RES_BUFFER = "agg_res_buffer" KEY_BIN_RES_BUFFER = "bin_res_buffer" -KEY_CHECK = "check" -KEY_CHECK_BUFFER = "check_buffer" +KEY_PRE = "pre" +KEY_PRE_BUFFER = "pre_buffer" KEY_FILTERS = "filters" KEY_RESTART_INDEX = "restart_index" KEY_AGG_RES = "agg_res" @@ -373,7 +373,7 @@ def _reset_agg_buffers(agg_buffers: Optional[dict] = None) -> Optional[dict]: agg_buffers |= init_values -class SeedCheckException(Exception): +class SeedPreException(Exception): """ Exception related to user-defined checks on seed chunk. """ @@ -392,8 +392,8 @@ def _iter_data( seed: Iterable[pDataFrame], ordered_on: str, restart_index: Union[int, float, pTimestamp, None], - check: Union[Callable, None], - check_buffer: dict, + pre: Union[Callable, None], + pre_buffer: dict, filters: Union[dict, None], trim_start: bool, discard_last: bool, @@ -414,7 +414,7 @@ def _iter_data( restart_index : int, float, pTimestamp or None Index (excluded) in `ordered_on` column before which rows in seed will be trimmed. - check : Callable or None + pre : Callable or None Used-defined Callable to proceed checks over each item of the seed Iterable, accepting 2 parameters: @@ -422,12 +422,12 @@ def _iter_data( (before any filter is applied). - A ``buffer`` parameter, a dict that can be used as a buffer for storing temporary results from one chunk processing to - the next. Its initial value is that provided by `check.buffer`. + the next. Its initial value is that provided by `pre_buffer`. - In-place modifications of seed dataframe can be carried out here. - check_buffer : dict or None + In-place modifications of seed dataframe has to be carried out here. + pre_buffer : dict or None Buffer to keep track of intermediate data that can be required for - proceeding with check of individual seed item. + proceeding with pre of individual seed item. filters : dict or None Dict in the form ``{"filter_id":[[("col", op, val), ...], ...]}`` @@ -478,22 +478,22 @@ def _iter_data( for seed_chunk in seed: # Check seed chunk is ordered on 'ordered_on'. # This re-ordering is made because for 'trim_start' and - # 'discard_last', this ordereding is required. + # 'discard_last', this ordering is required. if not seed_chunk[ordered_on].is_monotonic_increasing: # Currently un-eased to silently modify seed data without knowing # if it makes sense, so leaving this row commented. # seed_chunk.sort_values(by=ordered_on, inplace=True) # Instead, raise an exception. - raise SeedCheckException("seed data is not in ascending order.") - # Step 1 / Seed check by user. - if check: + raise SeedPreException("seed data is not in ascending order.") + # Step 1 / Seed pre-processing by user. + if pre: # Apply user checks. try: - check(seed_chunk, check_buffer) + pre(seed_chunk, pre_buffer) except Exception as e: - # Stop iteration in case of failing check. + # Stop iteration in case of failing pre. # Aggregation has been run up to the last valid chunk. - raise SeedCheckException(str(e)) + raise SeedPreException(str(e)) # Step 2 / If a previous remainder, concatenate it to give current # DataFrame its 'final' length. if not (seed_remainder is None or seed_remainder.empty): @@ -814,9 +814,9 @@ class AggStream: 'restart_index' : int, float or pTimestamp, the index from which (included) should be restarted the next aggregation iteration. - 'check' : Callable, to apply user-defined check on each seed item. - 'check_buffer' : dict, to keep track of intermediate values for - `check` function. + 'pre' : Callable, to apply user-defined pre-processing on seed. + 'pre_buffer' : dict, to keep track of intermediate values for + `pre` function. 'filters' : dict, as per `filters` parameter. }`` - ``self.store``, oups store, as per `store` parameter. @@ -871,8 +871,8 @@ def __init__( ordered_on: str, store: ParquetSet, keys: Union[dataclass, dict], - check: Optional[Callable] = None, - check_buffer: Optional[dict] = None, + pre: Optional[Callable] = None, + pre_buffer: Optional[dict] = None, filters: Optional[dict] = None, agg: Optional[dict] = None, bin_by: Optional[Union[TimeGrouper, Callable[[Series, dict], tuple]]] = None, @@ -943,25 +943,27 @@ def __init__( For keys deriving from unfiltered data, use the `NO_FILTER_ID` ``"_"``. - check : Callable, default None - Used-defined Callable to proceed checks over each item of the seed - Iterable, accepting 2 parameters: + pre : Callable, default None + Used-defined Callable to proceed with preèprocessing of each chunks + of the seed Iterable, accepting 2 parameters: - An ``on`` parameter, a pandas dataframe, the current seed item (before any filter is applied). - A ``buffer`` parameter, a dict that can be used as a buffer for storing temporary results from one chunk processing to - the next. Its initial value is that provided by `check.buffer`. + the next. Its initial value is that provided by `pre_buffer`. - If running ``check`` raises an exception (whichever type it is), a - ``SeedCheckException`` will subsequently be raised. + If running ``pre`` raises an exception (whichever type it is), a + ``SeedPreException`` will subsequently be raised. + Modification of seed chunk, if any, has to be realized in-place. + No DataFrame returned by this function is expected. - check_buffer : dict, default None + pre_buffer : dict, default None Buffer to keep track of intermediate data that can be required for - proceeding with check of individual seed item. + proceeding with pre-processing of individual seed item. Once aggregation stream is over, its value is not recorded. User has to take care of this if needed. Its value can be - retrieved with ``self.check_buffer`` object attribute. + retrieved with ``self.pre_buffer`` object attribute. filters : Union[dict, None], default None Dict in the form ``{"filter_id":[[("col", op, val), ...], ...]}`` @@ -1102,7 +1104,7 @@ def __init__( the user omit a column name, it means that this is a voluntary choice from the user. - - If an exception is raised by ``check`` function on seed data, then, + - If an exception is raised by ``pre`` function on seed data, then, last good results are still written to disk with correct metadata. If an exception is raised at some other point of the aggregation process, results are not written. @@ -1211,8 +1213,8 @@ def __init__( ) = _init_agg_buffers(store, keys) self.seed_config = { KEY_ORDERED_ON: ordered_on, - KEY_CHECK: check, - KEY_CHECK_BUFFER: {} if check_buffer is None else check_buffer, + KEY_PRE: pre, + KEY_PRE_BUFFER: {} if pre_buffer is None else pre_buffer, KEY_FILTERS: filters, KEY_RESTART_INDEX: restart_index, } @@ -1372,7 +1374,7 @@ def agg( # Set 'seed_index_restart' to the 'last_seed_index' with # which restarting the next aggregation iteration. self.seed_config[KEY_RESTART_INDEX] = _last_seed_index - except SeedCheckException as sce: + except SeedPreException as sce: seed_check_exception = True exception_message = str(sce) if final_write: @@ -1393,4 +1395,4 @@ def agg( for key, agg_res in self.agg_buffers.items() ) if seed and seed_check_exception: - raise SeedCheckException(exception_message) + raise SeedPreException(exception_message) diff --git a/tests/test_aggstream/test_aggstream_advanced.py b/tests/test_aggstream/test_aggstream_advanced.py index 2ab1957..8332277 100644 --- a/tests/test_aggstream/test_aggstream_advanced.py +++ b/tests/test_aggstream/test_aggstream_advanced.py @@ -38,7 +38,7 @@ from oups.aggstream.aggstream import KEY_AGG from oups.aggstream.aggstream import KEY_AGGSTREAM from oups.aggstream.aggstream import KEY_RESTART_INDEX -from oups.aggstream.aggstream import SeedCheckException +from oups.aggstream.aggstream import SeedPreException from oups.aggstream.cumsegagg import DTYPE_NULLABLE_INT64 from oups.aggstream.cumsegagg import cumsegagg from oups.aggstream.jcumsegagg import FIRST @@ -349,7 +349,7 @@ def check(seed_chunk, check_buffer=None): filter2: [(filter_on, "==", False)], }, max_row_group_size=max_row_group_size, - check=check, + pre=check, ) # Seed data. filter_val = np.ones(len(ts), dtype=bool) @@ -357,7 +357,7 @@ def check(seed_chunk, check_buffer=None): seed = pDataFrame({ordered_on: ts, "val": rand_ints, filter_on: filter_val}) # Streamed aggregation, raising an exception, but 1st chunk should be # written. - with pytest.raises(SeedCheckException, match="^not possible to have"): + with pytest.raises(SeedPreException, match="^not possible to have"): as_.agg( seed=[seed[:ref_idx], seed[ref_idx:]], trim_start=False, diff --git a/tests/test_aggstream/test_aggstream_init.py b/tests/test_aggstream/test_aggstream_init.py index 6882fb0..2ab28a2 100644 --- a/tests/test_aggstream/test_aggstream_init.py +++ b/tests/test_aggstream/test_aggstream_init.py @@ -29,11 +29,11 @@ from oups.aggstream.aggstream import KEY_AGG_RES_BUFFER from oups.aggstream.aggstream import KEY_BIN_ON_OUT from oups.aggstream.aggstream import KEY_BIN_RES_BUFFER -from oups.aggstream.aggstream import KEY_CHECK -from oups.aggstream.aggstream import KEY_CHECK_BUFFER from oups.aggstream.aggstream import KEY_FILTERS from oups.aggstream.aggstream import KEY_POST from oups.aggstream.aggstream import KEY_POST_BUFFER +from oups.aggstream.aggstream import KEY_PRE +from oups.aggstream.aggstream import KEY_PRE_BUFFER from oups.aggstream.aggstream import KEY_RESTART_INDEX from oups.aggstream.aggstream import KEY_SEGAGG_BUFFER from oups.aggstream.aggstream import NO_FILTER_ID @@ -89,8 +89,8 @@ def always_false(**kwargs): # ref_seed_config { KEY_ORDERED_ON: "ts", - KEY_CHECK: None, - KEY_CHECK_BUFFER: {}, + KEY_PRE: None, + KEY_PRE_BUFFER: {}, KEY_FILTERS: {NO_FILTER_ID: None}, KEY_RESTART_INDEX: None, }, @@ -127,7 +127,7 @@ def always_false(**kwargs): KEY_ORDERED_ON: "ts_dflt", KEY_MAX_ROW_GROUP_SIZE: 1000, "max_nirgs": 4, - KEY_CHECK: always_true, + KEY_PRE: always_true, KEY_AGG: {"out_dflt": ("in_dflt", LAST)}, KEY_POST: always_true, "keys": { @@ -156,8 +156,8 @@ def always_false(**kwargs): # ref_seed_config { KEY_ORDERED_ON: "ts_dflt", - KEY_CHECK: always_true, - KEY_CHECK_BUFFER: {}, + KEY_PRE: always_true, + KEY_PRE_BUFFER: {}, KEY_FILTERS: {NO_FILTER_ID: None}, KEY_RESTART_INDEX: None, }, @@ -234,7 +234,7 @@ def always_false(**kwargs): KEY_ORDERED_ON: "ts_dflt", KEY_MAX_ROW_GROUP_SIZE: 1000, "max_nirgs": 4, - KEY_CHECK: always_true, + KEY_PRE: always_true, KEY_AGG: {"out_dflt": ("in_dflt", LAST)}, KEY_POST: always_true, KEY_SNAP_BY: TimeGrouper(key="ts_dflt", freq="30T"), @@ -272,8 +272,8 @@ def always_false(**kwargs): # ref_seed_config { KEY_ORDERED_ON: "ts_dflt", - KEY_CHECK: always_true, - KEY_CHECK_BUFFER: {}, + KEY_PRE: always_true, + KEY_PRE_BUFFER: {}, KEY_FILTERS: { "filter1": [[("ts_dflt", ">=", 4)]], "filter2": [[("in_spec", "<=", 10)]], diff --git a/tests/test_aggstream/test_aggstream_simple.py b/tests/test_aggstream/test_aggstream_simple.py index 8ef3c20..372d6ad 100644 --- a/tests/test_aggstream/test_aggstream_simple.py +++ b/tests/test_aggstream/test_aggstream_simple.py @@ -38,7 +38,7 @@ from oups.aggstream.aggstream import KEY_POST_BUFFER from oups.aggstream.aggstream import KEY_RESTART_INDEX from oups.aggstream.aggstream import NO_FILTER_ID -from oups.aggstream.aggstream import SeedCheckException +from oups.aggstream.aggstream import SeedPreException from oups.aggstream.cumsegagg import DTYPE_NULLABLE_INT64 from oups.aggstream.jcumsegagg import FIRST from oups.aggstream.jcumsegagg import LAST @@ -1462,7 +1462,7 @@ def test_exception_unordered_seed(store, seed_path): seed.iloc[ref_idx, seed.columns.get_loc(ordered_on)] = pNaT # Streamed aggregation, raising an exception, but 1st chunk should be # written. - with pytest.raises(SeedCheckException, match="^seed data is not in"): + with pytest.raises(SeedPreException, match="^seed data is not in"): as_.agg( seed=[seed[:ref_idx], seed[ref_idx:]], trim_start=False,