Skip to content

Commit

Permalink
Changing parameter 'check' to parameter 'pre'.
Browse files Browse the repository at this point in the history
  • Loading branch information
yohplala committed Jun 7, 2024
1 parent a02dd1c commit 9650db3
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 51 deletions.
74 changes: 38 additions & 36 deletions oups/aggstream/aggstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@
KEY_BIN_ON_OUT = "bin_on_out"
KEY_AGG_RES_BUFFER = "agg_res_buffer"
KEY_BIN_RES_BUFFER = "bin_res_buffer"
KEY_CHECK = "check"
KEY_CHECK_BUFFER = "check_buffer"
KEY_PRE = "pre"
KEY_PRE_BUFFER = "pre_buffer"
KEY_FILTERS = "filters"
KEY_RESTART_INDEX = "restart_index"
KEY_AGG_RES = "agg_res"
Expand Down Expand Up @@ -373,7 +373,7 @@ def _reset_agg_buffers(agg_buffers: Optional[dict] = None) -> Optional[dict]:
agg_buffers |= init_values


class SeedCheckException(Exception):
class SeedPreException(Exception):
"""
Exception related to user-defined checks on seed chunk.
"""
Expand All @@ -392,8 +392,8 @@ def _iter_data(
seed: Iterable[pDataFrame],
ordered_on: str,
restart_index: Union[int, float, pTimestamp, None],
check: Union[Callable, None],
check_buffer: dict,
pre: Union[Callable, None],
pre_buffer: dict,
filters: Union[dict, None],
trim_start: bool,
discard_last: bool,
Expand All @@ -414,20 +414,20 @@ def _iter_data(
restart_index : int, float, pTimestamp or None
Index (excluded) in `ordered_on` column before which rows in seed
will be trimmed.
check : Callable or None
pre : Callable or None
Used-defined Callable to proceed checks over each item of the seed
Iterable, accepting 2 parameters:
- An ``on`` parameter, a pandas dataframe, the current seed item
(before any filter is applied).
- A ``buffer`` parameter, a dict that can be used as a buffer
for storing temporary results from one chunk processing to
the next. Its initial value is that provided by `check.buffer`.
the next. Its initial value is that provided by `pre_buffer`.
In-place modifications of seed dataframe can be carried out here.
check_buffer : dict or None
In-place modifications of seed dataframe has to be carried out here.
pre_buffer : dict or None
Buffer to keep track of intermediate data that can be required for
proceeding with check of individual seed item.
proceeding with pre of individual seed item.
filters : dict or None
Dict in the form
``{"filter_id":[[("col", op, val), ...], ...]}``
Expand Down Expand Up @@ -478,22 +478,22 @@ def _iter_data(
for seed_chunk in seed:
# Check seed chunk is ordered on 'ordered_on'.
# This re-ordering is made because for 'trim_start' and
# 'discard_last', this ordereding is required.
# 'discard_last', this ordering is required.
if not seed_chunk[ordered_on].is_monotonic_increasing:
# Currently un-eased to silently modify seed data without knowing
# if it makes sense, so leaving this row commented.
# seed_chunk.sort_values(by=ordered_on, inplace=True)
# Instead, raise an exception.
raise SeedCheckException("seed data is not in ascending order.")
# Step 1 / Seed check by user.
if check:
raise SeedPreException("seed data is not in ascending order.")
# Step 1 / Seed pre-processing by user.
if pre:
# Apply user checks.
try:
check(seed_chunk, check_buffer)
pre(seed_chunk, pre_buffer)
except Exception as e:
# Stop iteration in case of failing check.
# Stop iteration in case of failing pre.
# Aggregation has been run up to the last valid chunk.
raise SeedCheckException(str(e))
raise SeedPreException(str(e))
# Step 2 / If a previous remainder, concatenate it to give current
# DataFrame its 'final' length.
if not (seed_remainder is None or seed_remainder.empty):
Expand Down Expand Up @@ -814,9 +814,9 @@ class AggStream:
'restart_index' : int, float or pTimestamp, the index from which
(included) should be restarted the next
aggregation iteration.
'check' : Callable, to apply user-defined check on each seed item.
'check_buffer' : dict, to keep track of intermediate values for
`check` function.
'pre' : Callable, to apply user-defined pre-processing on seed.
'pre_buffer' : dict, to keep track of intermediate values for
`pre` function.
'filters' : dict, as per `filters` parameter.
}``
- ``self.store``, oups store, as per `store` parameter.
Expand Down Expand Up @@ -871,8 +871,8 @@ def __init__(
ordered_on: str,
store: ParquetSet,
keys: Union[dataclass, dict],
check: Optional[Callable] = None,
check_buffer: Optional[dict] = None,
pre: Optional[Callable] = None,
pre_buffer: Optional[dict] = None,
filters: Optional[dict] = None,
agg: Optional[dict] = None,
bin_by: Optional[Union[TimeGrouper, Callable[[Series, dict], tuple]]] = None,
Expand Down Expand Up @@ -943,25 +943,27 @@ def __init__(
For keys deriving from unfiltered data, use the `NO_FILTER_ID`
``"_"``.
check : Callable, default None
Used-defined Callable to proceed checks over each item of the seed
Iterable, accepting 2 parameters:
pre : Callable, default None
Used-defined Callable to proceed with preèprocessing of each chunks
of the seed Iterable, accepting 2 parameters:
- An ``on`` parameter, a pandas dataframe, the current seed item
(before any filter is applied).
- A ``buffer`` parameter, a dict that can be used as a buffer
for storing temporary results from one chunk processing to
the next. Its initial value is that provided by `check.buffer`.
the next. Its initial value is that provided by `pre_buffer`.
If running ``check`` raises an exception (whichever type it is), a
``SeedCheckException`` will subsequently be raised.
If running ``pre`` raises an exception (whichever type it is), a
``SeedPreException`` will subsequently be raised.
Modification of seed chunk, if any, has to be realized in-place.
No DataFrame returned by this function is expected.
check_buffer : dict, default None
pre_buffer : dict, default None
Buffer to keep track of intermediate data that can be required for
proceeding with check of individual seed item.
proceeding with pre-processing of individual seed item.
Once aggregation stream is over, its value is not recorded.
User has to take care of this if needed. Its value can be
retrieved with ``self.check_buffer`` object attribute.
retrieved with ``self.pre_buffer`` object attribute.
filters : Union[dict, None], default None
Dict in the form
``{"filter_id":[[("col", op, val), ...], ...]}``
Expand Down Expand Up @@ -1102,7 +1104,7 @@ def __init__(
the user omit a column name, it means that this is a voluntary
choice from the user.
- If an exception is raised by ``check`` function on seed data, then,
- If an exception is raised by ``pre`` function on seed data, then,
last good results are still written to disk with correct metadata. If
an exception is raised at some other point of the aggregation
process, results are not written.
Expand Down Expand Up @@ -1211,8 +1213,8 @@ def __init__(
) = _init_agg_buffers(store, keys)
self.seed_config = {
KEY_ORDERED_ON: ordered_on,
KEY_CHECK: check,
KEY_CHECK_BUFFER: {} if check_buffer is None else check_buffer,
KEY_PRE: pre,
KEY_PRE_BUFFER: {} if pre_buffer is None else pre_buffer,
KEY_FILTERS: filters,
KEY_RESTART_INDEX: restart_index,
}
Expand Down Expand Up @@ -1372,7 +1374,7 @@ def agg(
# Set 'seed_index_restart' to the 'last_seed_index' with
# which restarting the next aggregation iteration.
self.seed_config[KEY_RESTART_INDEX] = _last_seed_index
except SeedCheckException as sce:
except SeedPreException as sce:
seed_check_exception = True
exception_message = str(sce)
if final_write:
Expand All @@ -1393,4 +1395,4 @@ def agg(
for key, agg_res in self.agg_buffers.items()
)
if seed and seed_check_exception:
raise SeedCheckException(exception_message)
raise SeedPreException(exception_message)
6 changes: 3 additions & 3 deletions tests/test_aggstream/test_aggstream_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from oups.aggstream.aggstream import KEY_AGG
from oups.aggstream.aggstream import KEY_AGGSTREAM
from oups.aggstream.aggstream import KEY_RESTART_INDEX
from oups.aggstream.aggstream import SeedCheckException
from oups.aggstream.aggstream import SeedPreException
from oups.aggstream.cumsegagg import DTYPE_NULLABLE_INT64
from oups.aggstream.cumsegagg import cumsegagg
from oups.aggstream.jcumsegagg import FIRST
Expand Down Expand Up @@ -349,15 +349,15 @@ def check(seed_chunk, check_buffer=None):
filter2: [(filter_on, "==", False)],
},
max_row_group_size=max_row_group_size,
check=check,
pre=check,
)
# Seed data.
filter_val = np.ones(len(ts), dtype=bool)
filter_val[::2] = False
seed = pDataFrame({ordered_on: ts, "val": rand_ints, filter_on: filter_val})
# Streamed aggregation, raising an exception, but 1st chunk should be
# written.
with pytest.raises(SeedCheckException, match="^not possible to have"):
with pytest.raises(SeedPreException, match="^not possible to have"):
as_.agg(
seed=[seed[:ref_idx], seed[ref_idx:]],
trim_start=False,
Expand Down
20 changes: 10 additions & 10 deletions tests/test_aggstream/test_aggstream_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
from oups.aggstream.aggstream import KEY_AGG_RES_BUFFER
from oups.aggstream.aggstream import KEY_BIN_ON_OUT
from oups.aggstream.aggstream import KEY_BIN_RES_BUFFER
from oups.aggstream.aggstream import KEY_CHECK
from oups.aggstream.aggstream import KEY_CHECK_BUFFER
from oups.aggstream.aggstream import KEY_FILTERS
from oups.aggstream.aggstream import KEY_POST
from oups.aggstream.aggstream import KEY_POST_BUFFER
from oups.aggstream.aggstream import KEY_PRE
from oups.aggstream.aggstream import KEY_PRE_BUFFER
from oups.aggstream.aggstream import KEY_RESTART_INDEX
from oups.aggstream.aggstream import KEY_SEGAGG_BUFFER
from oups.aggstream.aggstream import NO_FILTER_ID
Expand Down Expand Up @@ -89,8 +89,8 @@ def always_false(**kwargs):
# ref_seed_config
{
KEY_ORDERED_ON: "ts",
KEY_CHECK: None,
KEY_CHECK_BUFFER: {},
KEY_PRE: None,
KEY_PRE_BUFFER: {},
KEY_FILTERS: {NO_FILTER_ID: None},
KEY_RESTART_INDEX: None,
},
Expand Down Expand Up @@ -127,7 +127,7 @@ def always_false(**kwargs):
KEY_ORDERED_ON: "ts_dflt",
KEY_MAX_ROW_GROUP_SIZE: 1000,
"max_nirgs": 4,
KEY_CHECK: always_true,
KEY_PRE: always_true,
KEY_AGG: {"out_dflt": ("in_dflt", LAST)},
KEY_POST: always_true,
"keys": {
Expand Down Expand Up @@ -156,8 +156,8 @@ def always_false(**kwargs):
# ref_seed_config
{
KEY_ORDERED_ON: "ts_dflt",
KEY_CHECK: always_true,
KEY_CHECK_BUFFER: {},
KEY_PRE: always_true,
KEY_PRE_BUFFER: {},
KEY_FILTERS: {NO_FILTER_ID: None},
KEY_RESTART_INDEX: None,
},
Expand Down Expand Up @@ -234,7 +234,7 @@ def always_false(**kwargs):
KEY_ORDERED_ON: "ts_dflt",
KEY_MAX_ROW_GROUP_SIZE: 1000,
"max_nirgs": 4,
KEY_CHECK: always_true,
KEY_PRE: always_true,
KEY_AGG: {"out_dflt": ("in_dflt", LAST)},
KEY_POST: always_true,
KEY_SNAP_BY: TimeGrouper(key="ts_dflt", freq="30T"),
Expand Down Expand Up @@ -272,8 +272,8 @@ def always_false(**kwargs):
# ref_seed_config
{
KEY_ORDERED_ON: "ts_dflt",
KEY_CHECK: always_true,
KEY_CHECK_BUFFER: {},
KEY_PRE: always_true,
KEY_PRE_BUFFER: {},
KEY_FILTERS: {
"filter1": [[("ts_dflt", ">=", 4)]],
"filter2": [[("in_spec", "<=", 10)]],
Expand Down
4 changes: 2 additions & 2 deletions tests/test_aggstream/test_aggstream_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from oups.aggstream.aggstream import KEY_POST_BUFFER
from oups.aggstream.aggstream import KEY_RESTART_INDEX
from oups.aggstream.aggstream import NO_FILTER_ID
from oups.aggstream.aggstream import SeedCheckException
from oups.aggstream.aggstream import SeedPreException
from oups.aggstream.cumsegagg import DTYPE_NULLABLE_INT64
from oups.aggstream.jcumsegagg import FIRST
from oups.aggstream.jcumsegagg import LAST
Expand Down Expand Up @@ -1462,7 +1462,7 @@ def test_exception_unordered_seed(store, seed_path):
seed.iloc[ref_idx, seed.columns.get_loc(ordered_on)] = pNaT
# Streamed aggregation, raising an exception, but 1st chunk should be
# written.
with pytest.raises(SeedCheckException, match="^seed data is not in"):
with pytest.raises(SeedPreException, match="^seed data is not in"):
as_.agg(
seed=[seed[:ref_idx], seed[ref_idx:]],
trim_start=False,
Expand Down

0 comments on commit 9650db3

Please sign in to comment.