Skip to content

Commit

Permalink
On-going representative test case for snaps+bins. Difficult...
Browse files Browse the repository at this point in the history
  • Loading branch information
yohplala committed Apr 5, 2024
1 parent 2f1449d commit 4e84063
Showing 1 changed file with 176 additions and 30 deletions.
206 changes: 176 additions & 30 deletions tests/test_aggstream/test_aggstream_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,54 +321,72 @@ def post(buffer: dict, bin_res: pDataFrame, snap_res: pDataFrame):
test case...
"""
print("bin_res")
print(bin_res)
print("snap_res")
print(snap_res)
bin_res_last_ts = bin_res.loc[:, ordered_on].iloc[-1]
snap_res_last_ts = snap_res.loc[:, ordered_on].iloc[-1]
# Keep a copy in case 'bin_res' is trimmed.
bin_res_tail = bin_res.iloc[-2:].copy(deep=False)
if bin_res_last_ts > snap_res_last_ts:
# Trim bin extending past the last snapshot.
bin_res = bin_res.loc[bin_res.loc[:, ordered_on] <= snap_res_last_ts]
# Update last ts in 'bin_res'.
bin_res_last_ts = bin_res.loc[:, ordered_on].iloc[-1]
print("bin_res_last_ts")
print(bin_res_last_ts)
print("snap_res_last_ts")
print(snap_res_last_ts)
# if bin_res_last_ts > snap_res_last_ts and len(bin_res)>1:
# Trim bin extending past the last snapshot.
# bin_res = bin_res.loc[bin_res.loc[:, ordered_on] <= snap_res_last_ts]
# print("bin_res")
# print(bin_res)
# Update last ts in 'bin_res'.
# bin_res_last_ts = bin_res.loc[:, ordered_on].iloc[-1]
# Retrieve previous results, concat to new, and shift.
if buffer:
print("prev_bin_res")
print(buffer["prev_bin_res"])
# Not first iteration: 'smart' shift.
if (
buffer["prev_bin_res"].loc[:, ordered_on].iloc[-1]
== bin_res.loc[:, ordered_on].iloc[0]
):
prev_bin_res = buffer["prev_bin_res"].iloc[:-1].drop(columns=ordered_on)
print("equal ts in bin_res vs prev_bin_res")
prev_bin_res = buffer["prev_bin_res"].iloc[:-1]
print("prev_bin_res")
print(prev_bin_res)
else:
prev_bin_res = buffer["prev_bin_res"].iloc[1:].drop(columns=ordered_on)
shifted_bin_res = pconcat(
[prev_bin_res, bin_res.iloc[:-1].drop(columns=ordered_on)],
ignore_index=True,
)
shifted_bin_res = shifted_bin_res.rename(
columns={FIRST: "prev_first", LAST: "prev_last"},
)
print("unequal ts in bin_res vs prev_bin_res")
prev_bin_res = buffer["prev_bin_res"].iloc[1:]
print("prev_bin_res")
print(prev_bin_res)
shifted_bin_res = pconcat([prev_bin_res, bin_res], ignore_index=True)
# Forced to keep one row more in case the last row is a duplicate
# index with 1st row of next iteration.
buffer["prev_bin_res"] = shifted_bin_res.iloc[-2:]
# Remove row added by pas concat.
shifted_bin_res = shifted_bin_res.iloc[:-1]
else:
# 1st iteration.
shifted_bin_res = (
bin_res.drop(ordered_on, axis=1)
.shift(1)
.rename(columns={FIRST: "prev_first", LAST: "prev_last"})
)
# Forced to keep one row more in case the its last row is a duplicate
# index with 1st row of next iteration.
buffer["prev_bin_res"] = bin_res_tail
# Forced to keep one row more in case the last row is a duplicate
# index with 1st row of next iteration.
buffer["prev_bin_res"] = bin_res.iloc[-2:]
shifted_bin_res = bin_res.shift(1)
print("shifted_bin_res")
print(shifted_bin_res)
shifted_bin_res = shifted_bin_res.rename(columns={FIRST: "prev_first", LAST: "prev_last"})
# Align 'bin_res' on 2 columns.
# Keep track of existing NA by filling with '-1' as others will be
# created by 'merge_ordered' which have to be managed differently.
shifted_bin_res = (
pconcat([shifted_bin_res, bin_res], axis=1)
pconcat([shifted_bin_res.drop(ordered_on, axis=1), bin_res], axis=1)
.rename(
columns={FIRST: "current_first", LAST: "current_last"},
)
.fillna(-1)
)
print("merged_bin_res")
print(shifted_bin_res)
if bin_res_last_ts < snap_res_last_ts:
# Shift horizontally last row and keep it apart for reuse later.
print("bin_res_last_ts")
print(bin_res_last_ts)
hshifted_bin_res_last_row = (
shifted_bin_res.set_index(ordered_on)
.loc[bin_res_last_ts]
Expand Down Expand Up @@ -400,6 +418,8 @@ def post(buffer: dict, bin_res: pDataFrame, snap_res: pDataFrame):
merged_res = merged_res.reindex(
columns=[ordered_on, "prev_first", FIRST, "prev_last", LAST],
)
print("merged_res before dropna")
print(merged_res)
# 'bin_res' columns.
bin_res_cols = ["prev_first", "prev_last"]
if bin_res_last_ts < snap_res_last_ts:
Expand All @@ -414,6 +434,8 @@ def post(buffer: dict, bin_res: pDataFrame, snap_res: pDataFrame):
# be it because more bins than snaps, or snaps indexes with different
# values than bin indexes.
merged_res = merged_res.dropna(subset=FIRST, ignore_index=True)
print("merged_res after dropna")
print(merged_res)
return merged_res

# Setup streamed aggregation.
Expand Down Expand Up @@ -560,6 +582,9 @@ def post(buffer: dict, bin_res: pDataFrame, snap_res: pDataFrame):
# 01:54, 114, F, 46
# 01:57, 117, F, 48
seed_list = [seed_df.loc[:27], seed_df.loc[27:]]
# ---------------#
# Data stream 1 #
# ---------------#
as_.agg(
seed=seed_list,
trim_start=False,
Expand Down Expand Up @@ -639,6 +664,9 @@ def reference_results(seed: pDataFrame, key_conf: dict):
seed_df.loc[~seed_df[filter_on], :],
key2_cf | common_key_params,
)
# ---------------#
# Data stream 2 #
# ---------------#
# Seed data & streamed aggregation, not writing final results, with a seed
# data of two rows in 2 different snaps,
# - one at same timestamp than last one.
Expand All @@ -648,7 +676,7 @@ def reference_results(seed: pDataFrame, key_conf: dict):
{
ordered_on: [ts[-1], ts[-1] + Timedelta(snap_duration)],
val: [rand_ints[-1] + 1, rand_ints[-1] + 10],
filter_on: [filter_val[-1]] * 2,
filter_on: [True] * 2,
},
)
as_.agg(
Expand All @@ -664,6 +692,9 @@ def reference_results(seed: pDataFrame, key_conf: dict):
assert key2_res.equals(key2_res_ref)
key3_res = store[key3].pdf
assert key3_res.equals(key3_res_ref)
# ---------------#
# Data stream 3 #
# ---------------#
# Write and check 'last_seed_index' has been correctly updated even for
# 'key2'.
as_.agg(
Expand All @@ -688,18 +719,133 @@ def reference_results(seed: pDataFrame, key_conf: dict):
# For 'key2', results have not changed, as seed data has been filtered out.
key2_res = store[key2].pdf
assert key2_res.equals(key2_res_ref)
# Check even if no new seed data for key2 that "last_seed_index" has been
# Even if no new seed data for key2, check that "last_seed_index" has been
# updated.
assert (
store[key2]._oups_metadata[KEY_AGGSTREAM][KEY_RESTART_INDEX] == seed_df[ordered_on].iloc[-1]
)
# ---------------#
# Data stream 4 #
# ---------------#
# Last data appending considering a single row in seed with same timestamp
# and 'final_write' as last concatenation check with snapshots.
seed_df = pDataFrame(
{
ordered_on: [seed_df.loc[:, ordered_on].iloc[-1]],
val: [rand_ints[-1] + 50],
filter_on: [True],
},
)
as_.agg(
seed=seed_df,
trim_start=False,
discard_last=False,
final_write=True,
)
# Reference results.
seed_list.append(seed_df)
seed_df = pconcat(seed_list)
key1_res_ref = reference_results(
seed_df.loc[seed_df[filter_on], :],
key1_cf | common_key_params,
)
key3_res_ref = reference_results(
seed_df.loc[seed_df[filter_on], :],
key3_cf | common_key_params,
)
key1_res = store[key1].pdf
assert key1_res.equals(key1_res_ref)
key2_res = store[key2].pdf
assert key2_res.equals(key2_res_ref)
key3_res = store[key3].pdf
assert key3_res.equals(key3_res_ref)
# ---------------#
# Data stream 5 #
# ---------------#
# Empty snapshots are generated between 2 row groups in key2.
seed_df = pDataFrame(
{
ordered_on: [
seed_df.loc[:, ordered_on].iloc[-1],
seed_df.loc[:, ordered_on].iloc[-1] + 10 * Timedelta(snap_duration),
],
val: [rand_ints[-1] + 50, rand_ints[-1] + 100],
filter_on: [False] * 2,
},
)
seed = [seed_df.iloc[:1], seed_df.iloc[1:]]
as_.agg(
seed=seed,
trim_start=False,
discard_last=False,
final_write=True,
)
# Reference results.
seed_list.extend(seed)
seed_df = pconcat(seed_list)
key2_res_ref = reference_results(
seed_df.loc[~seed_df[filter_on], :],
key2_cf | common_key_params,
)
key1_res = store[key1].pdf
assert key1_res.equals(key1_res_ref)
key2_res = store[key2].pdf
assert key2_res.equals(key2_res_ref)
key3_res = store[key3].pdf
assert key3_res.equals(key3_res_ref)
# Even if no new seed data for key1 & key3, check that "last_seed_index"
# has been updated.
assert (
store[key1]._oups_metadata[KEY_AGGSTREAM][KEY_RESTART_INDEX] == seed_df[ordered_on].iloc[-1]
)
assert (
store[key3]._oups_metadata[KEY_AGGSTREAM][KEY_RESTART_INDEX] == seed_df[ordered_on].iloc[-1]
)
# ---------------#
# Data stream 6 #
# ---------------#
# Several seed chunks where neither bins, nor snaps end.
seed_df = pDataFrame(
{
ordered_on: [
seed_df.loc[:, ordered_on].iloc[-1],
seed_df.loc[:, ordered_on].iloc[-1],
seed_df.loc[:, ordered_on].iloc[-1],
],
val: [
seed_df.loc[:, val].iloc[-1] + 10,
seed_df.loc[:, val].iloc[-1] + 20,
seed_df.loc[:, val].iloc[-1] + 30,
],
filter_on: [True] * 3,
},
)
seed = [seed_df.iloc[:1], seed_df.iloc[1:2], seed_df.iloc[2:]]
as_.agg(
seed=seed,
trim_start=False,
discard_last=False,
final_write=True,
)
# Reference results.
seed_list.extend(seed)
seed_df = pconcat(seed_list)
key1_res_ref = reference_results(
seed_df.loc[seed_df[filter_on], :],
key1_cf | common_key_params,
)
key3_res_ref = reference_results(
seed_df.loc[seed_df[filter_on], :],
key3_cf | common_key_params,
)
key1_res = store[key1].pdf
assert key1_res.equals(key1_res_ref)
key2_res = store[key2].pdf
assert key2_res.equals(key2_res_ref)
key3_res = store[key3].pdf
assert key3_res.equals(key3_res_ref)


# - in test case with snapshot: when snapshot is a TimeGrouper, make sure that stitching
# works same as for bin: that empty snapshots are generated between 2 row groups.
# - un test snap + bin avec deux ou trois chunks dans lesquels il n'y a ni snap, ni bin qui se finissent.
# - bien faire un cas test snapshot ou le 2nd seed chunk démarre sur une nouvelle bin:
# straight away / ça pose des problèmes quand c'est simplement bin,
# alors bin+snapshot, il y a des chances que ça ne marche pas non plus.

0 comments on commit 4e84063

Please sign in to comment.