Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only metadata #18

Merged
merged 3 commits into from
Jul 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 85 additions & 62 deletions oups/aggstream/aggstream.py
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
from joblib import Parallel
from joblib import delayed
from numpy import ones
from pandas import DataFrame as pDataFrame
from pandas import DataFrame
from pandas import DatetimeIndex
from pandas import Series
from pandas import Timestamp as pTimestamp
@@ -342,11 +342,11 @@ def _reset_agg_buffers(agg_buffers: Optional[dict] = None) -> Optional[dict]:
- n_rows : int, number of rows in main aggregation results (snapshots
is snapshots are quested, or bins otherwise). It is reset here after
writing.
- agg_res : pDataFrame, last aggregation results, to reset to None
- agg_res : DataFrame, last aggregation results, to reset to None
after writing.
- bin_res : pDataFrame, last aggregation results, to reset to None
- bin_res : DataFrame, last aggregation results, to reset to None
after writing.
- agg_res_buffer : List[pDataFrame], list of chunks resulting from
- agg_res_buffer : List[DataFrame], list of chunks resulting from
aggregation (pandas DataFrame), either from bins if only bins
requested, or from snapshots if bins and snapshots requested.
- bin_res_buffer : List[pandas.DataFrame], list of bins resulting from
@@ -396,15 +396,15 @@ def __init__(self, message: str = None):


def _iter_data(
seed: Iterable[pDataFrame],
seed: Iterable[DataFrame],
ordered_on: str,
restart_index: Union[int, float, pTimestamp, None],
pre: Union[Callable, None],
pre_buffer: dict,
filters: Union[dict, None],
trim_start: bool,
discard_last: bool,
) -> Tuple[str, pDataFrame]:
) -> Tuple[str, DataFrame]:
"""
Iterate provided seed, applying sequentially (optionally) filters.

@@ -413,7 +413,7 @@ def _iter_data(

Parameters
----------
seed : Iterable[pDataFrame]
seed : Iterable[DataFrame]
Iterable of pandas Dataframe.
ordered_on : str
Name of column with respect to which seed data is in ascending
@@ -465,7 +465,7 @@ def _iter_data(
chunk.
- 'filter_id', str, indicating which set of filters has been
applied for the seed chunk provided.
- 'filtered_chunk', pDataFrame, from the seed Iterable, with
- 'filtered_chunk', DataFrame, from the seed Iterable, with
optionally filters applied.

Notes
@@ -553,8 +553,8 @@ def _iter_data(


def _concat_agg_res(
agg_res_buffers: List[pDataFrame],
agg_res: pDataFrame,
agg_res_buffers: List[DataFrame],
agg_res: DataFrame,
append_last_res: bool,
index_name: str,
):
@@ -563,9 +563,9 @@ def _concat_agg_res(

Parameters
----------
agg_res_buffers : List[pDataFrame]
agg_res_buffers : List[DataFrame]
List of aggregation results to concatenate.
agg_res : pDataFrame
agg_res : DataFrame
Last aggregation results (all rows from last iteration).
append_last_res : bool
If 'agg_res' should be appended to 'agg_res_buffer' and if 'bin_res'
@@ -576,7 +576,7 @@ def _concat_agg_res(

Returns
-------
pDataFrame
DataFrame
List of aggregation results concatenated in a single DataFrame.

"""
@@ -617,11 +617,11 @@ def _post_n_write_agg_chunks(
- n_rows : int, number of rows in main aggregation results (snapshots
is snapshots are quested, or bins otherwise). It is reset here after
writing.
- agg_res : pDataFrame, last aggregation results, to reset to None
- agg_res : DataFrame, last aggregation results, to reset to None
after writing.
- bin_res : pDataFrame, last aggregation results, to reset to None
- bin_res : DataFrame, last aggregation results, to reset to None
after writing.
- agg_res_buffer : List[pDataFrame], list of chunks resulting from
- agg_res_buffer : List[DataFrame], list of chunks resulting from
aggregation (pandas DataFrame), either from bins if only bins
requested, or from snapshots if bins and snapshots requested.
It contains 'agg_res' (last aggregation results),but without last
@@ -685,41 +685,49 @@ def _post_n_write_agg_chunks(
proceeding with preprocessing of individual seed chunk.

"""
if (agg_res := agg_buffers[KEY_AGG_RES]) is None:
# No iteration has been achieved, as no data.
if last_seed_index:
# If 'last_seed_index', at least set it in oups metadata.
# It is possible new seed data has been streamed and taken into
# account, but used for this key, because having been filtered out.
# Also set 'pre_buffer' if not null.
OUPS_METADATA[key] = {
KEY_AGGSTREAM: {KEY_RESTART_INDEX: last_seed_index, KEY_PRE_BUFFER: pre_buffer},
}
write_metadata(pf=store[key].pf, md_key=key)
return
# Concat list of aggregation results.
agg_res = _concat_agg_res(agg_buffers[KEY_AGG_RES_BUFFER], agg_res, append_last_res, index_name)
# Same if needed with 'bin_res_buffer'
bin_res = agg_buffers[KEY_BIN_RES]
if bin_res is not None:
bin_res = _concat_agg_res(
agg_buffers[KEY_BIN_RES_BUFFER],
bin_res,
post_buffer = agg_buffers[KEY_POST_BUFFER]
# When there is no result, 'agg_res' is None.
if isinstance((agg_res := agg_buffers[KEY_AGG_RES]), DataFrame):
# To keep track there has been res in the 1st place.
not_null_res = True
# Concat list of aggregation results.
agg_res = _concat_agg_res(
agg_buffers[KEY_AGG_RES_BUFFER],
agg_res,
append_last_res,
index_name,
)
post_buffer = agg_buffers[KEY_POST_BUFFER]
if post:
# Post processing if any.
# 'post_buffer' has to be modified in-place.
agg_res = (
post(buffer=post_buffer, bin_res=agg_res)
if bin_res is None
else post(buffer=post_buffer, bin_res=bin_res, snap_res=agg_res)
)
# Same if needed with 'bin_res_buffer'
bin_res = agg_buffers[KEY_BIN_RES]
if bin_res is not None:
bin_res = _concat_agg_res(
agg_buffers[KEY_BIN_RES_BUFFER],
bin_res,
append_last_res,
index_name,
)
if post:
# Post processing if any.
# 'post_buffer' has to be modified in-place.
# It is possible 'agg_res' is None, if 'post' needs a minimal
# number of rows before outputting results (warm-up).
agg_res = (
post(buffer=post_buffer, bin_res=agg_res)
if bin_res is None
else post(buffer=post_buffer, bin_res=bin_res, snap_res=agg_res)
)
else:
not_null_res = False
if last_seed_index:
# If 'last_seed_index', set oups metadata.
# Also set 'pre_buffer' if not null.
# It is possible there is no result yet to write for different reasons:
# - new seed data has been streamed and needs to be taken into account,
# but there is no result for this key, because all related seed data
# has been filtered out.
# - or maybe 'post' has a wamr up period and has not released results
# yet.
# But 'last_seed_index' has to be recorded, and so do possibly
# 'pre_buffer', 'segagg_buffer' and 'post_buffer'.
OUPS_METADATA[key] = {
KEY_AGGSTREAM: {
KEY_RESTART_INDEX: last_seed_index,
@@ -728,14 +736,29 @@ def _post_n_write_agg_chunks(
KEY_POST_BUFFER: post_buffer,
},
}
# Record data.
store[key] = write_config, agg_res
# Reset aggregation buffers and counters.
_reset_agg_buffers(agg_buffers)
# When there is no result, 'agg_res' is None.
if isinstance(agg_res, DataFrame):
# Record data (with metadata possibly updated).
store[key] = write_config, agg_res
elif last_seed_index:
# If no result, metadata is possibly to be written, as this is the
# flag indicating the last 'aggstream' local iteration.
try:
write_metadata(pf=store[key].pf, md_key=key)
except FileNotFoundError:
# In case no Parquet file exist yet, need to initiate one to start
# storing metadata.
store[key] = DataFrame()
if not_null_res:
# If there have been results, they have been processed (either written
# directly or through 'post()'). Time to reset aggregation buffers and
# counters.
_reset_agg_buffers(agg_buffers)
return


def agg_iter(
seed_chunk: pDataFrame,
seed_chunk: DataFrame,
store: ParquetSet,
key: dataclass,
keys_config: dict,
@@ -747,7 +770,7 @@ def agg_iter(

Parameters
----------
seed_chunk : pDataFrame
seed_chunk : DataFrame
Chunk of seed data.
store : ParquetSet
ParquetSet to which recording aggregation results.
@@ -776,7 +799,7 @@ def agg_iter(
agg_res_buffer = agg_buffers[KEY_AGG_RES_BUFFER]
if agg_res_len > 1:
# Add 'agg_res' to 'agg_res_buffer' ignoring last row.
# It is incimplete, so useless to write it to results while
# It is incomplete, so useless to write it to results while
# aggregation iterations are on-going.
agg_res_buffer.append(agg_res.iloc[:-1])
# Remove last row that is not recorded from total number of rows.
@@ -863,14 +886,14 @@ class AggStream:
intermediate results.
``{key: {'agg_n_rows' : int, number of rows in aggregation results,
for bins (if snapshots not requested) or snapshots.
'agg_res' : None or pDataFrame, last aggregation results,
'agg_res' : None or DataFrame, last aggregation results,
for bins (if snapshots not requested) or snapshots,
'bin_res' : None or pDataFrame, last aggregation results,
'bin_res' : None or DataFrame, last aggregation results,
for bins (if snapshots requested),
'agg_res_buffer' : list of pDataFrame, buffer to keep
'agg_res_buffer' : list of DataFrame, buffer to keep
aggregagation results, bins (if snapshots not
requested) or snapshots,
'bin_res_buffer' : list of pDataFrame, buffer to keep bin
'bin_res_buffer' : list of DataFrame, buffer to keep bin
aggregagation results (if snapshots requested)
'segagg_buffer' : dict, possibly empty, keeping track of
segmentation and aggregation intermediate
@@ -1238,7 +1261,7 @@ def __init__(
# Store attribute.
self.store = store

def _init_agg_cs(self, seed: Iterable[pDataFrame]):
def _init_agg_cs(self, seed: Iterable[DataFrame]):
"""
Initialize ``self.agg_cs``.

@@ -1248,7 +1271,7 @@ def _init_agg_cs(self, seed: Iterable[pDataFrame]):

Parameters
----------
seed : Iterable[pDataFrame]
seed : Iterable[DataFrame]
Seed data, from which getting pandas DataFrame dtypes.

Returns
@@ -1271,7 +1294,7 @@ def _init_agg_cs(self, seed: Iterable[pDataFrame]):

def agg(
self,
seed: Union[pDataFrame, Iterable[pDataFrame]] = None,
seed: Union[DataFrame, Iterable[DataFrame]] = None,
trim_start: Optional[bool] = True,
discard_last: Optional[bool] = True,
final_write: Optional[bool] = True,
@@ -1285,7 +1308,7 @@ def agg(

Parameters
----------
seed : Union[pDataFrame, Iterable[pDataFrame]]
seed : Union[DataFrame, Iterable[DataFrame]]
Seed data over which conducting streamed aggregations.
trim_start : bool, default True
If ``True``, and if aggregated results already exist, then
@@ -1352,7 +1375,7 @@ def agg(
# snapshots results will be easily merged.
# TODO: change default settings:
# discard_last = trim_start = final_write = False
if isinstance(seed, pDataFrame):
if isinstance(seed, DataFrame):
# Make the seed an iterable.
seed = [seed]
# Seed can be an empty list or None.
16 changes: 10 additions & 6 deletions oups/store/writer.py
Original file line number Diff line number Diff line change
@@ -117,12 +117,16 @@ def iter_dataframe(
# Case 'duplicates_on' is a single column name, but not
# 'sharp_on'.
duplicates_on = [duplicates_on, sharp_on]
# Define bins to split into row groups.
# Acknowledging this piece of code to be an extract from fastparquet.
n_rows = len(data)
n_parts = (n_rows - 1) // max_row_group_size + 1
row_group_size = min((n_rows - 1) // n_parts + 1, n_rows)
starts = list(range(0, n_rows, row_group_size))
if n_rows:
# Define bins to split into row groups.
# Acknowledging this piece of code to be an extract from fastparquet.
n_parts = (n_rows - 1) // max_row_group_size + 1
row_group_size = min((n_rows - 1) // n_parts + 1, n_rows)
starts = list(range(0, n_rows, row_group_size))
else:
# If n_rows=0
starts = [0]
if sharp_on:
# Adjust bins so that they do not end in the middle of duplicate values
# in `sharp_on` column.
@@ -424,7 +428,7 @@ def write(
all_cols = data.get_column_names()
if ordered_on not in all_cols:
raise ValueError(f"column '{ordered_on}' does not exist in input data.")
if os_path.isdir(dirpath) and os_listdir(dirpath):
if os_path.isdir(dirpath) and any(file.endswith(".parquet") for file in os_listdir(dirpath)):
# Case updating an existing dataset.
# Identify overlaps in row groups between new data and recorded data.
# Recorded row group start and end indexes.
127 changes: 115 additions & 12 deletions tests/test_aggstream/test_aggstream_simple.py
Original file line number Diff line number Diff line change
@@ -28,7 +28,8 @@
from pandas import Series as pSeries
from pandas import Timedelta
from pandas import Timestamp
from pandas import concat as pconcat
from pandas import concat
from pandas import date_range
from pandas.core.resample import TimeGrouper

from oups import AggStream
@@ -299,7 +300,7 @@ def test_time_grouper_first_last_min_max_agg(store, seed_path):
discard_last=True,
)
# Test results
ref_res = pconcat([seed_df, seed_df2]).iloc[:-1].groupby(bin_by).agg(**agg).reset_index()
ref_res = concat([seed_df, seed_df2]).iloc[:-1].groupby(bin_by).agg(**agg).reset_index()
ref_res[[FIRST, LAST, MIN, MAX]] = ref_res[[FIRST, LAST, MIN, MAX]].astype(DTYPE_NULLABLE_INT64)
rec_res = store[key].pdf
assert rec_res.equals(ref_res)
@@ -322,7 +323,7 @@ def test_time_grouper_first_last_min_max_agg(store, seed_path):
)
# Test results
ref_res = (
pconcat([seed_df, seed_df2, seed_df3]).iloc[:-1].groupby(bin_by).agg(**agg).reset_index()
concat([seed_df, seed_df2, seed_df3]).iloc[:-1].groupby(bin_by).agg(**agg).reset_index()
)
ref_res[[FIRST, LAST, MIN, MAX]] = ref_res[[FIRST, LAST, MIN, MAX]].astype(DTYPE_NULLABLE_INT64)
rec_res = store[key]
@@ -484,7 +485,7 @@ def post(buffer: dict, bin_res: DataFrame):
discard_last=True,
)
# Test results
ref_res_agg = pconcat([seed_df, seed_df2]).iloc[:-1].groupby(bin_by).agg(**agg).reset_index()
ref_res_agg = concat([seed_df, seed_df2]).iloc[:-1].groupby(bin_by).agg(**agg).reset_index()
ref_res_post = post({}, ref_res_agg)
rec_res = store[key].pdf
assert rec_res.equals(ref_res_post)
@@ -559,7 +560,7 @@ def test_seed_time_grouper_bin_on_as_tuple(store, seed_path):
discard_last=True,
)
# Test results
ref_res = pconcat([seed_pdf, seed_pdf2]).iloc[:-1].groupby(bin_by).agg(**agg)
ref_res = concat([seed_pdf, seed_pdf2]).iloc[:-1].groupby(bin_by).agg(**agg)
ref_res.index.name = ts_open
ref_res.reset_index(inplace=True)
rec_res = store[key].pdf
@@ -688,7 +689,7 @@ def test_by_callable_wo_bin_on(store, seed_path):
seed_pdf2 = DataFrame({ordered_on: ts2, "val": range(1, len(ts2) + 1)})
# Forcing dtype of 'seed_pdf' to float.
seed_pdf2["val"] = seed_pdf2["val"].astype("float64")
seed_pdf2 = pconcat([seed_pdf, seed_pdf2], ignore_index=True)
seed_pdf2 = concat([seed_pdf, seed_pdf2], ignore_index=True)
fp_write(seed_path, seed_pdf2, row_group_offsets=13, file_scheme="hive")
seed = ParquetFile(seed_path).iter_row_groups()
# Streamed aggregation.
@@ -787,7 +788,7 @@ def by_1val(on: DataFrame, buffer: dict):
else:
# 1st bin is one that was started before.
ncs = np.append(ncs, len(on))
group_keys = pconcat([pSeries([first_lab]), group_keys])
group_keys = concat([pSeries([first_lab]), group_keys])
buffer["last_key"] = group_keys.iloc[-1]
return ncs, group_keys, 0, "left", ncs, True

@@ -913,7 +914,7 @@ def agg_with_same_bin_labels(seed_pdf):
)
val = np.arange(1, len(ts2) + 1)
val[3] = 1
seed_pdf = pconcat([seed_pdf, DataFrame({ordered_on: ts2, bin_on: val})], ignore_index=True)
seed_pdf = concat([seed_pdf, DataFrame({ordered_on: ts2, bin_on: val})], ignore_index=True)
fp_write(seed_path, seed_pdf, row_group_offsets=13, file_scheme="hive")
seed = ParquetFile(seed_path).iter_row_groups()
# Setup streamed aggregation.
@@ -981,7 +982,7 @@ def test_time_grouper_trim_start(store, seed_path):
discard_last=True,
)
# Test results.
seed_pdf_ref = pconcat([seed_pdf.iloc[:-1], seed_pdf2])
seed_pdf_ref = concat([seed_pdf.iloc[:-1], seed_pdf2])
ref_res = seed_pdf_ref.iloc[:-1].groupby(bin_by).agg(**agg).reset_index()
rec_res = store[key].pdf
assert rec_res.equals(ref_res)
@@ -1020,7 +1021,7 @@ def test_time_grouper_agg_first(store):
# 1st append, starting a new bin.
ts2 = DatetimeIndex([date + "10:20", date + "10:40", date + "11:00", date + "11:30"])
seed2 = DataFrame({ordered_on: ts2, "val": range(1, len(ts2) + 1)})
seed2 = pconcat([seed, seed2])
seed2 = concat([seed, seed2])
# Streamed aggregation.
as_.agg(
seed=seed2,
@@ -1290,7 +1291,7 @@ def test_bin_on_col_sum_agg(store):
],
)
seed2 = DataFrame({ordered_on: ts, "val": [1, 2]})
seed = pconcat([seed, seed2])
seed = concat([seed, seed2])
# Setup streamed aggregation.
as_.agg(
seed=seed,
@@ -1345,7 +1346,7 @@ def test_time_grouper_agg_first_filters_and_no_filter(store):
# 1st append, starting a new bin.
ts2 = DatetimeIndex([date + "10:20", date + "10:40", date + "11:00", date + "11:30"])
seed2 = DataFrame({ordered_on: ts2, "val": range(1, len(ts2) + 1)})
seed2 = pconcat([seed, seed2])
seed2 = concat([seed, seed2])
# Streamed aggregation.
as_.agg(
seed=seed2,
@@ -1493,3 +1494,105 @@ def test_exception_unordered_seed(store, seed_path):
assert streamagg_md[KEY_RESTART_INDEX] == ts[ref_idx - 1]
assert not streamagg_md[KEY_PRE_BUFFER]
assert not streamagg_md[KEY_POST_BUFFER]


def test_post_with_warm_up(store):
# Test a 'post' with a warm-up period and check 'post_buffer' is correctly
# recorded even if 'post' does not output result yet.
# No binning so to say: keeping each value in 'val'.
#
# Setup aggregation.
agg_on = "val"
ordered_on = "ts"
ts_period = "2min"

def post(buffer: dict, bin_res: DataFrame):
"""
Rolling sum of last ten values.
Warm-up period is then 10 rows.
"""
if buffer:
prev_bin = buffer["prev_bin"]
last_idx = (
-1
if bin_res.loc[:, ordered_on].iloc[0] == prev_bin.loc[:, ordered_on].iloc[-1]
else len(prev_bin)
)
bin_res = concat([prev_bin.iloc[:last_idx], bin_res], ignore_index=True)
# Keep in buffer last 10 rows.
buffer["prev_bin"] = bin_res[-10:].reset_index(drop=True)
if len(bin_res) >= 10:
return DataFrame(
{
agg_on: bin_res[agg_on].rolling(10).sum().dropna().reset_index(drop=True),
ordered_on: bin_res[ordered_on].iloc[9:].reset_index(drop=True),
},
)

max_row_group_size = 10
agg = {agg_on: (agg_on, FIRST)}
bin_by = TimeGrouper(key=ordered_on, freq=ts_period, closed="left", label="left")
as_1 = AggStream(
ordered_on=ordered_on,
agg=agg,
store=store,
keys={
key: {
"ordered_on": ordered_on,
"bin_by": bin_by,
},
},
max_row_group_size=max_row_group_size,
post=post,
)
# Setup seed data.
n_values = 20
ts = date_range("2020/01/01 08:00", freq=ts_period, periods=n_values)
seed = DataFrame({ordered_on: ts, agg_on: range(1, n_values + 1)})
# 1st chunk of data, not reaching the required number of warm-up rows.
as_1.agg(
seed=seed.iloc[:5],
discard_last=False,
final_write=True,
)
# Check 'post_buffer'.
post_buffer = store[key]._oups_metadata[KEY_AGGSTREAM][KEY_POST_BUFFER]
assert post_buffer["prev_bin"].equals(seed.iloc[:5])
# 2nd chunk of data, starting to output actual data.
as_1.agg(
seed=[seed.iloc[5:8], seed.iloc[8:14]],
discard_last=False,
final_write=True,
)
# Check.
ref_res = post({}, seed.iloc[:14])
rec_res = store[key].pdf
assert rec_res.equals(ref_res)
post_buffer = store[key]._oups_metadata[KEY_AGGSTREAM][KEY_POST_BUFFER]
assert post_buffer["prev_bin"].equals(seed.iloc[4:14].reset_index(drop=True))
# 3rd chunk, cold start.
as_2 = AggStream(
ordered_on=ordered_on,
agg=agg,
store=store,
keys={
key: {
"ordered_on": ordered_on,
"bin_by": bin_by,
},
},
max_row_group_size=max_row_group_size,
post=post,
)
as_2.agg(
seed=seed.iloc[14:],
discard_last=False,
final_write=True,
)
ref_res = post({}, seed)
rec_res = store[key].pdf
assert rec_res.equals(ref_res)
post_buffer = store[key]._oups_metadata[KEY_AGGSTREAM][KEY_POST_BUFFER]
assert post_buffer["prev_bin"].equals(seed.iloc[10:].reset_index(drop=True))