Skip to content

Commit

Permalink
Fixing a bug arising from non-initialized 'snap_by' parameter in cums…
Browse files Browse the repository at this point in the history
…egagg.
  • Loading branch information
yohplala committed Mar 7, 2024
1 parent b1702d0 commit 72e43e5
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 24 deletions.
24 changes: 15 additions & 9 deletions oups/aggstream/aggstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ def _iter_data(
A single list of tuples can also be used, meaning that no `OR`
operation between set of filters is to be conducted.
trim_start : bool
Flag to indicate if seed head has to be trimmed.
Flag to indicate if seed head has to be trimmed till value of
'restart_index' (last seed index of previous aggregation sequence).
discard_last : bool
If ``True``, last row group in seed data (sharing the same value in
`ordered_on` column) is removed from the aggregation step.
Expand Down Expand Up @@ -493,7 +494,7 @@ def _iter_data(
# Step 3 / Prepare filter to trim seed head and tail if requested.
if trim_start:
if seed_chunk.loc[:, ordered_on].iloc[-1] < restart_index:
# Do not process this chunk.
# This full chunk is to be discarded. Go to the next.
continue
else:
filter_array = seed_chunk[ordered_on] >= restart_index
Expand Down Expand Up @@ -1214,9 +1215,11 @@ def agg(
Seed data over which conducting streamed aggregations.
trim_start : bool, default True
If ``True``, and if aggregated results already exist, then
retrieves the first index from seed data not processed yet
(recorded in metadata of existing aggregated results), and trim all
seed data before this index (index excluded from trim).
retrieves the last index present in seed data (recorded in metadata
of existing aggregated results), and trim all seed data before this
index (index excluded from trim, so it will be in new aggregation
results). This trimming makes sense if previous aggregation
iteration has been managed with ``discard_last`` set ``True``.
discard_last : bool, default True
If ``True``, last row group in seed data (sharing the same value in
`ordered_on` column) is removed from the aggregation step. See
Expand All @@ -1234,11 +1237,11 @@ def agg(
- Aggregation is by default processed up to the last index excluded,
and subsequent aggregation will start from this last index included,
assumed to be that of an incomplete row group.
If `discard_last` is set `False`, then aggregation is process up to the
last data.
If `discard_last` is set `False`, then aggregation is process up to
the last data.
- By default, with parameter `discard_last`` set ``True``, the last row
group (composed from rows sharing the same value in `ordered_on` column),
is discarded.
group (composed from rows sharing the same value in `ordered_on`
column), is discarded.
- It may be for instance that this row group is not complete yet
and should therefore not be accounted for. More precisely, new
Expand All @@ -1256,6 +1259,8 @@ def agg(
when it becomes the one-but-last row, as a new row is added).
"""
# TODO: add 'snap_by' parameter to 'agg()' to allow using list of
# timestamps. 'cumsegagg()' is already compatible.
if isinstance(seed, pDataFrame):
# Make the seed an iterable.
seed = [seed]
Expand Down Expand Up @@ -1346,3 +1351,4 @@ def agg(
# check when some chunk are empty for a given filter id
# - A test with first seed generator like Parquet File, then 1 dataframe each time.
# - Test with different 'ordered_on' values for a key vs seed
# - Test with a final_write, then create a new AggStream instance and have a new aggregation iteration
25 changes: 17 additions & 8 deletions oups/aggstream/cumsegagg.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@

from oups.aggstream.jcumsegagg import AGG_FUNCS
from oups.aggstream.jcumsegagg import jcsagg
from oups.aggstream.segmentby import KEY_BIN_ON
from oups.aggstream.segmentby import KEY_LAST_BIN_LABEL
from oups.aggstream.segmentby import KEY_ORDERED_ON
from oups.aggstream.segmentby import KEY_SNAP_BY
from oups.aggstream.segmentby import segmentby
from oups.aggstream.segmentby import setup_segmentby


# Some constants.
Expand Down Expand Up @@ -376,6 +379,9 @@ def cumsegagg(
# without snapshot in 1st iteration, an empty 'snap_res' gets returned
# nonetheless and that concatenation can be managed with subsequent
# 'snap_res' from next iterations.
# TODO: if requesting snapshots, bin aggregation results are not necessary.
# Consider just outputting label of bins in snapshot results, without
# agrgegation results for bins? (memory savings).
len_data = len(data)
if not len_data:
# 'data' is empty. Simply return.
Expand All @@ -392,6 +398,16 @@ def cumsegagg(
# is started.
preserve_res = True
prev_last_bin_label = buffer[KEY_LAST_BIN_LABEL] if KEY_LAST_BIN_LABEL in buffer else None
if not isinstance(bin_by, dict):
bin_by = setup_segmentby(bin_by, bin_on, ordered_on, snap_by)
# Following 'setup_segmentby', parameters 'ordered_on', 'bin_on' have to
# be retrieved from it.
ordered_on = bin_by[KEY_ORDERED_ON]
# 'bin_by' as a dict may contain 'snap_by' if it is a TimeGrouper.
if bin_by[KEY_SNAP_BY] is not None:
# 'bin_by[KEY_SNAP_BY]' is not none if 'snap_by' is a TimeGrouper.
# Otherwise, it can be a DatetimeIndex or a Series.
snap_by = bin_by[KEY_SNAP_BY]
# In case of restart, 'n_max_null_bins' is a max because 1st null bin may
# well be continuation of last in-progress bin, without result in current
# iteration, but with results from previous iteration.
Expand All @@ -405,20 +421,13 @@ def cumsegagg(
) = segmentby(
data=data,
bin_by=bin_by,
bin_on=bin_on,
ordered_on=ordered_on,
snap_by=snap_by,
buffer=buffer,
)
if preserve_res and prev_last_bin_label != bin_labels.iloc[0]:
# A new bin has been started. Do not preserve past results.
# This behavior is only possible in case no snapshot is used.
preserve_res = False
if isinstance(bin_by, dict):
# If 'bin_by' is a dict, then setup has been managed separately, and
# 'cumsegagg' may be running without 'bin_on' and 'ordered_on'
# parameters. Force 'ordered_on' as it is re-used below.
ordered_on = bin_by[KEY_ORDERED_ON]
# Initiate dict of result columns.
# Setup 'chunk_res'.
chunk_res_prev = (
Expand Down Expand Up @@ -503,7 +512,7 @@ def cumsegagg(
buffer[KEY_LAST_CHUNK_RES] = pDataFrame(chunk_res, copy=False)
# Assemble 'bin_res' as a pandas DataFrame.
bin_res = pDataFrame(bin_res, index=bin_labels, copy=False)
bin_res.index.name = ordered_on if ordered_on else bin_on
bin_res.index.name = ordered_on if ordered_on else bin_by[KEY_BIN_ON]
# Set null values.
if n_max_null_bins != 0:
null_bin_labels = bin_labels.iloc[null_bin_indices[~nisin(null_bin_indices, -1)]]
Expand Down
3 changes: 3 additions & 0 deletions oups/aggstream/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,8 @@ def dataframe_filter(df, filters):
and_part &= ops[op](df[name].values, val)
elif op == "~":
and_part &= ~df[name].values
else:
# Unknown operator.
raise ValueError(f"operator '{op}' is not supported.")
out |= and_part
return out
80 changes: 78 additions & 2 deletions tests/test_aggstream/test_aggstream_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def seed_path(tmp_path):


def test_3_keys_only_bins(store, seed_path):
# Test with 4 keys, no snapshots, parallel iterations.
# Test with 3 keys, no snapshots, parallel iterations.
# - key 1: time grouper '2T', agg 'first', and 'last',
# - key 2: time grouper '13T', agg 'first', and 'max',
# - key 3: 'by' as callable, every 4 rows, agg 'min', 'max',
Expand Down Expand Up @@ -225,7 +225,6 @@ def test_exception_different_indexes_at_restart(store, seed_path):
#
# Setup a 1st separate streamed aggregations (awkward...).
max_row_group_size = 6
ordered_on = "ts"
as1 = AggStream(
ordered_on=ordered_on,
store=store,
Expand Down Expand Up @@ -282,3 +281,80 @@ def test_exception_different_indexes_at_restart(store, seed_path):
keys={key1: deepcopy(key1_cf), key2: deepcopy(key2_cf)},
max_row_group_size=max_row_group_size,
)


def test_3_keys_bins_snaps_filters(store, seed_path):
# Test with 3 keys, bins and snapshots, filters and parallel iterations.
# - filter 'True' : key 1: time grouper '10T', agg 'first' (bin),
# with snap '5T', agg 'last'.
# - filter 'False' : key 2: time grouper '20T', agg 'first' (bin),
# with snap '5T', agg 'last'.
# No head or tail trimming.
#
# Setup streamed aggregation.
max_row_group_size = 5
val = "val"
key1 = Indexer("agg_10T")
key1_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="10T", closed="left", label="left"),
"agg": {FIRST: (val, FIRST), LAST: (val, LAST)},
"snap_by": TimeGrouper(key=ordered_on, freq="5T", closed="left", label="left"),
}
key2 = Indexer("agg_20T")
key2_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="20T", closed="left", label="left"),
"agg": {FIRST: (val, FIRST), LAST: (val, LAST)},
"snap_by": TimeGrouper(key=ordered_on, freq="5T", closed="left", label="left"),
}
filter1 = "filter1"
filter2 = "filter2"

# Setup 'post'.
def post(buffer: dict, bin_res: pDataFrame, snap_res: pDataFrame):
"""
Aggregate previous and current bin aggregation results.
Keep per row 'first' value of previous and current bin, and 'last' value from
current snapshot.
"""
print("bin_res")
print(bin_res)
print("snap_res")
print(snap_res)
return snap_res

filter_on = "filter_on"
as_ = AggStream(
ordered_on=ordered_on,
store=store,
keys={
filter1: {key1: deepcopy(key1_cf)},
filter2: {key2: deepcopy(key2_cf)},
},
filters={
filter1: [(filter_on, "==", True)],
filter2: [(filter_on, "==", False)],
},
max_row_group_size=max_row_group_size,
parallel=True,
post=post,
)
# Seed data.
start = Timestamp("2020/01/01")
rr = np.random.default_rng(1)
N = 50
rand_ints = rr.integers(120, size=N)
rand_ints.sort()
ts = [start + Timedelta(f"{mn}T") for mn in rand_ints]
filter_val = np.ones(len(ts), dtype=bool)
filter_val[::2] = False
seed_df = pDataFrame({ordered_on: ts, val: rand_ints, filter_on: filter_val})
print(seed_df)
print(as_)
# seed_li = [seed_df.loc[:28], seed_df.loc[28:]]
# Streamed aggregation.
# as_.agg(seed=seed_df,
# trim_start = False,
# discard_last=False,
# final_write=False)
3 changes: 1 addition & 2 deletions tests/test_aggstream/test_cumsegagg.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@


# from pandas.testing import assert_frame_equal
# tmp_path = os_path.expanduser('~/Documents/code/data/oups')


def test_setup_csagg():
Expand Down Expand Up @@ -1122,7 +1121,7 @@ def test_exception_error_on_0():
)


def test_exception_unknown_agg_function(tmp_path):
def test_exception_unknown_agg_function():
# Test exception when agg func is unknown.
values = array([0.0], dtype=DTYPE_FLOAT64)
dtidx = array(["2020-01-01T08:00"], dtype=DTYPE_DATETIME64)
Expand Down
59 changes: 57 additions & 2 deletions tests/test_aggstream/test_cumsegagg_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@

import pytest
from numpy import array
from numpy import random as nrandom
from pandas import NA as pNA
from pandas import DataFrame as pDataFrame
from pandas import DatetimeIndex
from pandas import Timedelta
from pandas import Timestamp as pTimestamp
from pandas import concat as pconcat
from pandas.core.resample import TimeGrouper
Expand All @@ -32,7 +34,6 @@


# from pandas.testing import assert_frame_equal
# tmp_path = os_path.expanduser('~/Documents/code/data/oups')


@pytest.mark.parametrize(
Expand Down Expand Up @@ -239,6 +240,7 @@ def test_cumsegagg_bin_only(
# 10-9:44
by_x_rows,
"dti",
# snap_by
[
DatetimeIndex(["2020-01-01 08:12", "2020-01-01 08:15"]),
DatetimeIndex(
Expand Down Expand Up @@ -558,7 +560,6 @@ def test_cumsegagg_bin_snap(
agg=agg,
bin_by=bin_by,
buffer=buffer,
ordered_on=ordered_on,
snap_by=snap_by[i] if isinstance(snap_by, list) else snap_by,
)
assert buffer[KEY_LAST_CHUNK_RES].equals(last_chunk_res_ref[i])
Expand All @@ -573,3 +574,57 @@ def test_cumsegagg_bin_snap(
snap_res = pconcat(snap_res_to_concatenate)
snap_res = snap_res[~snap_res.index.duplicated(keep="last")]
assert snap_res.equals(snap_ref)


def test_cumsegagg_bin_snap_time_grouper():
# Testing bins and snapshots, both as time grouper.
# This test case has shown trouble that transforming input data within
# 'setup_segmentby()' may lead to. Transformed values 'snap_by',
# and 'bin_on' were then not retrieved in 'cumsegagg()', leading to
# segmentation fault.
ordered_on = "ts"
val = "val"
bin_by = TimeGrouper(key=ordered_on, freq="10T", closed="left", label="left")
agg = {FIRST: (val, FIRST), LAST: (val, LAST)}
snap_by = TimeGrouper(key=ordered_on, freq="5T", closed="left", label="left")
# Seed data.
start = pTimestamp("2020/01/01")
rr = nrandom.default_rng(1)
N = 50
rand_ints = rr.integers(120, size=N)
rand_ints.sort()
ts = [start + Timedelta(f"{mn}T") for mn in rand_ints]
seed = pDataFrame({ordered_on: ts, val: rand_ints})
# Setup for restart
agg_as_list = setup_cumsegagg(agg, seed.dtypes.to_dict())
bin_by_as_dict = setup_segmentby(bin_by=bin_by, ordered_on=ordered_on, snap_by=snap_by)
seed1 = seed[:28]
seed2 = seed[28:]
buffer = {}
# Aggregation
bin_res1, snap_res1 = cumsegagg(
data=seed1,
agg=agg_as_list,
bin_by=bin_by_as_dict,
buffer=buffer,
)
bin_res2, snap_res2 = cumsegagg(
data=seed2,
agg=agg_as_list,
bin_by=bin_by_as_dict,
buffer=buffer,
)
bin_res = pconcat([bin_res1, bin_res2])
bin_res = bin_res[~bin_res.index.duplicated(keep="last")]
snap_res = pconcat([snap_res1, snap_res2])
snap_res = snap_res[~snap_res.index.duplicated(keep="last")]
# Reference results obtained by a straight execution.
bin_res_ref, snap_res_ref = cumsegagg(
data=seed,
agg=agg,
bin_by=bin_by,
ordered_on=ordered_on,
snap_by=snap_by,
)
assert bin_res.equals(bin_res_ref)
assert snap_res.equals(snap_res_ref)
1 change: 0 additions & 1 deletion tests/test_aggstream/test_segmentby.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@


# from pandas.testing import assert_frame_equal
# tmp_path = os_path.expanduser('~/Documents/code/data/oups')


@pytest.mark.parametrize(
Expand Down

0 comments on commit 72e43e5

Please sign in to comment.