Skip to content

Commit

Permalink
Merge pull request #19 from yohplala/bin_snap_recording
Browse files Browse the repository at this point in the history
Bin snap recording
  • Loading branch information
yohplala authored Sep 4, 2024
2 parents 742c2e7 + 3e1bbaf commit d176151
Show file tree
Hide file tree
Showing 15 changed files with 644 additions and 396 deletions.
1 change: 1 addition & 0 deletions oups/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .aggstream import AggStream
from .aggstream import by_x_rows
from .store import ParquetSet
from .store import conform_cmidx
from .store import is_toplevel
from .store import sublevel
from .store import toplevel
219 changes: 128 additions & 91 deletions oups/aggstream/aggstream.py

Large diffs are not rendered by default.

32 changes: 20 additions & 12 deletions oups/aggstream/cumsegagg.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,20 @@ def cumsegagg(
# Assemble 'bin_res' as a pandas DataFrame.
bin_res = pDataFrame(bin_res, index=bin_labels, copy=False)
bin_res.index.name = ordered_on if ordered_on else bin_by[KEY_BIN_ON]
if DTYPE_INT64 in agg:
# As of pandas 1.5.3, use "Int64" dtype to work with nullable 'int'.
# (it is a pandas dtype, not a numpy one, which is why it is set only
# in pandas results, and not numpy inputs to 'cumsegagg()').
# Force 'int64' to pandas nullable 'Int64', even if there is no null
# value in results at the moment. Indeed null values can appear in a
# later aggregation step (use of 'restart' feature).
bin_res[agg[DTYPE_INT64][1]] = bin_res[agg[DTYPE_INT64][1]].astype(
DTYPE_NULLABLE_INT64,
)
# Set null values.
if n_max_null_bins != 0:
null_bin_labels = bin_labels.iloc[null_bin_indices[~nisin(null_bin_indices, -1)]]
if not null_bin_labels.empty:
if DTYPE_INT64 in agg:
# As of pandas 1.5.3, use "Int64" dtype to work with nullable 'int'.
# (it is a pandas dtype, not a numpy one)
bin_res[agg[DTYPE_INT64][1]] = bin_res[agg[DTYPE_INT64][1]].astype(
DTYPE_NULLABLE_INT64,
)
for dtype_, (
_,
cols_name_in_res,
Expand All @@ -538,6 +542,16 @@ def cumsegagg(
if snap_by is not None:
snap_res = pDataFrame(snap_res, index=snap_labels, copy=False)
snap_res.index.name = ordered_on
if DTYPE_INT64 in agg:
# As of pandas 1.5.3, use "Int64" dtype to work with nullable 'int'.
# It is a pandas dtype, not a numpy one, which is why it is set
# only in pandas results, and not numpy inputs to 'cumsegagg()').
# Force 'int64' to pandas nullable 'Int64', even if there is no
# null value in results at the moment. Indeed null values can
# appear in a later aggregation step (use of 'restart' feature).
snap_res[agg[DTYPE_INT64][1]] = snap_res[agg[DTYPE_INT64][1]].astype(
DTYPE_NULLABLE_INT64,
)
# Set null values.
if n_max_null_snaps != 0:
# Remove -1 indices.
Expand All @@ -546,12 +560,6 @@ def cumsegagg(
# Alternatively, output number of empty snaps from 'jcumsegagg()'?
null_snap_labels = snap_labels[null_snap_indices[~nisin(null_snap_indices, -1)]]
if not null_snap_labels.empty:
if DTYPE_INT64 in agg:
# As of pandas 1.5.3, use "Int64" dtype to work with nullable 'int'.
# (it is a pandas dtype, not a numpy one)
snap_res[agg[DTYPE_INT64][1]] = snap_res[agg[DTYPE_INT64][1]].astype(
DTYPE_NULLABLE_INT64,
)
for dtype_, (
_,
cols_name_in_res,
Expand Down
4 changes: 3 additions & 1 deletion oups/aggstream/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
"""
import operator

from numpy import ndarray
from numpy import ones
from numpy import zeros
from pandas import DataFrame


ops = {
Expand All @@ -22,7 +24,7 @@
}


def dataframe_filter(df, filters):
def dataframe_filter(df: DataFrame, filters) -> ndarray:
"""
Produce a column filter of the input dataframe.
Expand Down
1 change: 1 addition & 0 deletions oups/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
from .indexer import is_toplevel
from .indexer import sublevel
from .indexer import toplevel
from .utils import conform_cmidx
45 changes: 45 additions & 0 deletions oups/store/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from os import scandir
from typing import Iterator, List, Tuple

from pandas import DataFrame

from oups.store.defines import DIR_SEP


Expand Down Expand Up @@ -73,3 +75,46 @@ def strip_path_tail(dirpath: str) -> str:
"""
if DIR_SEP in dirpath:
return dirpath.rsplit("/", 1)[0]


def conform_cmidx(df: DataFrame):
"""
Conform pandas column multi-index.
Library fastparquet has several requirements to handle column MultiIndex.
- It requires names for each level in a Multiindex. If these are not set,
there are set to '', an empty string.
- It requires column names to be tuple of string. If an object is
different than a string (for instance float or int), it is turned into
a string.
DataFrame is modified in-place.
Parameters
----------
df : DataFrame
DataFrame with a column multi-index to check and possibly adjust.
Returns
-------
None
"""
# If a name is 'None', set it to '' instead.
cmidx = df.columns
if None in cmidx.names:
level_updated_idx = [i for i, name in enumerate(cmidx.names) if name is None]
cmidx.set_names([""] * len(level_updated_idx), level=level_updated_idx, inplace=True)
# If an item of the column name is not a string, turn it into a string.
# Using 'set_levels()' instead of rconstructing a MultiIndex to preserve
# index names directly.
level_updated_idx = []
level_updated = []
for i, level in enumerate(cmidx.levels):
str_level = [name if isinstance(name, str) else str(name) for name in level]
if level.to_list() != str_level:
level_updated_idx.append(i)
level_updated.append(str_level)
if level_updated:
df.columns = df.columns.set_levels(level_updated, level=level_updated_idx)
46 changes: 16 additions & 30 deletions oups/store/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,43 +223,29 @@ def to_midx(idx: Index, levels: List[str] = None) -> MultiIndex:
return MultiIndex.from_tuples(tuples, names=levels)


def check_cmidx(chunk):
def check_cmidx(cmidx: MultiIndex):
"""
Check cmidx to have it complying with fastparquet limitations.
Check if column multi-index complies with fastparquet requirements.
Most notably, fastparquet requires names for each level in a Multiindex.
If these are not set, there are set to '', an empty string.
Library fastparquet requires names for each level in a Multiindex.
Also, column names have to be tuple of string.
DataFrame is modified in-place.
Parameters
----------
chunk : DataFrame
DataFrame with a column multi-index to check and possibly adjust.
Returns
-------
None
cmidx : MultiIndex
MultiIndex to check.
"""
# If a name is 'None', set it to '' instead.
cmidx = chunk.columns
for i, name in enumerate(cmidx.names):
if name is None:
cmidx.set_names("", level=i, inplace=True)
# If an item of the column name is not a string, turn it into a string.
# Using 'set_levels()' instead of rconstructing a MultiIndex to preserve
# index names directly.
level_updated_idx = []
level_updated = []
for i, level in enumerate(cmidx.levels):
str_level = [name if isinstance(name, str) else str(name) for name in level]
if level.to_list() != str_level:
level_updated_idx.append(i)
level_updated.append(str_level)
if level_updated:
chunk.columns = chunk.columns.set_levels(level_updated, level=level_updated_idx)
# Check level names.
if None in cmidx.names:
raise ValueError(
"not possible to have level name set to None.",
) # If an item of the column name is not a string, turn it into a string.
# Check column names.
for level in cmidx.levels:
for name in level:
if not isinstance(name, str):
raise TypeError(f"name {name} has to be of type 'string', not '{type(name)}'.")


def write_metadata(
Expand Down Expand Up @@ -605,7 +591,7 @@ def write(
# In case multi-index is used, check that it complies with fastparquet
# limitations.
if isinstance(chunk.columns, MultiIndex):
check_cmidx(chunk)
check_cmidx(chunk.columns)
fp_write(
dirpath,
chunk,
Expand Down
Loading

0 comments on commit d176151

Please sign in to comment.