Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Clean up rolling window interfaces #17578

Open
wence- opened this issue Dec 11, 2024 · 2 comments
Open

[FEA] Clean up rolling window interfaces #17578

wence- opened this issue Dec 11, 2024 · 2 comments
Labels
feature request New feature or request

Comments

@wence-
Copy link
Contributor

wence- commented Dec 11, 2024

Is your feature request related to a problem? Please describe.

I have been exploring extending the rolling window interfaces we have in libcudf to support the use cases that cudf-polars (and cudf-classic) need.

In doing so, I think there is some opportunity for both cleanup, and performance improvement in the construction of the inputs to cudf::detail::rolling_window (which is where eventually every frontend caller ends up).

[Aside: This perf improvement is separate from improving the algorithmic complexity of some rolling window operations when the window itself is large.]

To do so, we need to classify first the required types of rolling windows, and how the implementation proceeds.

libcudf low-level API

To perform a rolling (or grouped rolling) aggregation, libcudf provides cudf::detail::rolling_window with signature:

template <typename PrecedingWindowIterator, typename FollowingWindowIterator>
std::unique_ptr<column> rolling_window(column_view const& input,
                                       column_view const& default_outputs,
                                       PrecedingWindowIterator preceding_window_begin,
                                       FollowingWindowIterator following_window_begin,
                                       size_type min_periods,
                                       rolling_aggregation const& agg,
                                       rmm::cuda_stream_view stream,
                                       rmm::device_async_resource_ref mr)

Different window types can be controlled by providing appropriate iterators to preceding_window_begin ($P$) and following_window_begin ($F$). For this API, the window begin/end are always row-based integer offsets. That is, for each row $i$ in the input, the window is defined by all rows $j$ such that

$$ j \in [i - P_i + 1, \dots, i + F_i] \cup [0, \dots, \text{input.size()}). $$

Currently, the kernel that does the windowed aggregation additionally clamps the window bounds to be in-range for the column, additionally, various public interfaces appear to pre-clamp.

polars, pandas, spark window specifications

In all cases (polars, pandas, spark) windows can be described either as row-based (using integer offsets) or range-based (using a delta from the current row value to select rows in the window). As we can see above, to interface with libcudf we eventually need to turn these into row-based offsets in all cases.

All three also offer both grouped and ungrouped rolling windows. A grouped rolling window partitions the input column by some grouping table and then applies normal rolling windows group-wise. An ungrouped rolling window can be treated as a grouped window with a single group.

In terms of constructing the $P$ and $F$ iterators, grouping keys merely provide extra constraints on the offset values: offsets may never result in a window crossing a group boundary.

Row-based window specifications

spark: A window is specified by a pair of (preceding_offset, following_offset). The offset can be an integer, subtracted from the current row to produce the left endpoint (preceding), added to the current row to to produce the right endpoint (following), or the magic flag UNBOUNDED which is equivalent to setting the row offset to $\infty$. Note that in spark the window intervals defined by these offsets are always closed, that is, they include the endpoints. This specification maps, pretty much directly, onto what libcudf requires.

pandas: A window is specified by a pair of (length, is_centered=True/False). The offset is an integer, windows always trail the current row unless is_centered=True (in which case the window is centered over the current row). In pandas one can specify whether the window interval is closed, left-half-open, right-half-open, or open. To match up with libcudf, we just need to convert the length/is_centered pair + interval bounds to appropriate integers. Note: Pandas supports the equivalent of left-UNBOUNDED windows via DataFrame.expanding.

polars: A window is specified by a pair of (length, offset). Both offset and length are integers, the window is defined by [i + offset, ..., i + offset + length]. One can specify, like pandas, whether the interval is closed, left-half-open, right-half-open, or open. To match up with libcudf, we again just need to map the length and offset + interval bounds to appropriate integers. Note: polars doesn't AFAICT support the equivalent of UNBOUNDED windows (though one can achieve the result by setting length=len(dataframe)).

All three libraries also support grouped versions of the above.

Range-based window specifications

This code is, as far as I can tell, all implemented with spark semantics only. As well as row-based (integer offset) windows, all libraries support window specifications (on some subset of types) that are range based. That is, given a preceding value $P$, following value $F$, and an orderby column $O$, for a row $i$ with value $O_i$ rows of window $i$ are the ordered rows $j$ such that

$$ O_i - P \le O_j \le O_i + F. $$

Note, this is the spark definition (windows are closed at the endpoints). One can provide the magic value UNBOUNDED to mean $\infty$, and the magic value CURRENT_ROW equivalent to $P$ (respectively $F$) being equal to zero.

It is not documented in the API, but both $P$ and $F$ must be non-negative.

Polars and pandas also support, in their APIs specifying range-based windows. However, they do not restrict the preceding and following values to be non-negative. They also allow (as above) both open and half-open intervals (as well as closed intervals like spark), but do not have magic sigils for UNBOUNDED and CURRENT_ROW windows.

In all cases, the orderby column must be in sorted order (spark and pandas allow sorted in ascending and descending orders, polars only in ascending). Polars and pandas do not permit nulls in the orderby column, spark does.

All libraries permit providing grouping keys in which case the orderby column must be sorted within each group (polars does not require that the group keys are sorted, and does not guarantee that group key order is maintained).

Implementation issues

Needless work

Depending on exactly how one calls the full column rolling window code, the libcudf code-path either clamps preceding/following bounds to be in-bounds at least once, often twice, sometimes materialising two columns of integers along the way.

Specifically:

  • Fixed size rolling windows: cudf::rolling_window(input, size_type preceding, size_type following, ...) calls cudf::detail::rolling_window(input, default_output, size_type, size_type, ...) which materialises the preceding and following integers into columns via a transform_iterator
    auto const preceding_calc = cuda::proclaim_return_type<cudf::size_type>(
    [preceding_window] __device__(size_type i) { return thrust::min(i + 1, preceding_window); });
    auto const following_calc = cuda::proclaim_return_type<cudf::size_type>(
    [col_size = input.size(), following_window] __device__(size_type i) {
    return thrust::min(col_size - i - 1, following_window);
    });
    auto const preceding_column = expand_to_column(preceding_calc, input.size(), stream);
    auto const following_column = expand_to_column(following_calc, input.size(), stream);
    return cudf::detail::rolling_window(input,
    , this eventually calls the rolling kernel which also ensures the start/stop are in bounds
    auto const start = static_cast<size_type>(
    min(static_cast<int64_t>(input.size()), max(int64_t{0}, i - preceding_window + 1)));
    auto const end = static_cast<size_type>(
    min(static_cast<int64_t>(input.size()), max(int64_t{0}, i + following_window + 1)));
    auto const start_index = min(start, end);
    (note, if the rolling aggregation is a UDF [CUDA or PTX], then we only clamp in the kernel).
  • Variable size rolling windows: cudf::rolling_window(input, column_view preceding, column_view following, ...) calls cudf::detail::rolling_window(input, column_view, column_view, ...) which clamps the preceding and following iterators using a transform iterator
    auto const preceding_window_begin = cudf::detail::make_counting_transform_iterator(
    0,
    cuda::proclaim_return_type<size_type>(
    [preceding = preceding_window.begin<size_type>()] __device__(size_type i) {
    return thrust::min(i + 1, preceding[i]);
    }));
    auto const following_window_begin = cudf::detail::make_counting_transform_iterator(
    0,
    cuda::proclaim_return_type<size_type>(
    [col_size = input.size(), following = following_window.begin<size_type>()] __device__(
    size_type i) { return thrust::min(col_size - i - 1, following[i]); }));
    , this eventually calls the same rolling kernel which again ensures that the start/stop are in bounds (note, once again, if the rolling aggregation is a UDF [CUDA or PTRX], then we only clamp in the kernel).

There are other code paths (used by spark I believe) for grouped_rolling_window, what happens here again depends on whether the rolling aggregation is a UDF. If it is then the preceding/following integers are clamped to the group bounds by a transform_iterator (really, a struct that provides operator[]) and no additional materialisation is done.

If it is not, then the preceding/following integers are expanded into columns

auto const preceding_column =
make_preceding_column(group_offsets, group_labels, preceding_window, input.size(), stream);
auto const following_column =
make_following_column(group_offsets, group_labels, following_window, input.size(), stream);
return cudf::detail::rolling_window(input,
default_outputs,
(clamped to the group boundaries).

In both cases, the rolling kernel clamps the start/stop to the bounds of the input column.

As a consequence of these divergences, the UDF-based aggregations do less work (and require less memory), but do not support (because they were not modified) all the cases that non-UDF aggregations support: specifically UDF-based aggregations require that both preceding and following are non-negative (and hence the window covers the current row), in contrast non-UDF aggregations allow preceding and following to be negative (the window need not cover the current row).

spark-specific semantics for some interfaces

There are APIs to do grouped range-based and group row-based rolling windows, but they conform to spark requirementse. The range-based rolling window API cudf::grouped_range_rolling_window does lots of work, turning the range-based bounds into columns of preceding and following windows (for passing to cudf::detail::rolling_window) and then doing a single rolling aggregation.

This precludes computing the window bounds once and reusing them for multiple aggregations (an API usage supported by both polars and pandas). Again, these functions guarantee that they produce in-bounds window offsets, but the kernel that does rolling windows does not know this and does extra work.

I would like it if the computation of the window offsets from range bounds were more flexible (to support polars/pandas requirements), and separate from the calculation of the rolling aggregation (so that I can reuse it).

Concrete suggestions/requests

Bounds checking

I think the responsibility for clamping the provided windows to be in bounds should not be the responsibility of the rolling aggregation kernel, it should require iterators for the preceding/following windows that always produces an in-bounds value.

For the cudf::rolling_window API that takes columns of preceding/following, we should document and require that they always produce in-bounds values.

Remove need for materialisation of intermediates where possible

For row-based windows (both grouped and ungrouped), I think we should always use unmaterialised transform_iterators to turn the input row offsets into clamped (grouped) row offsets.
That is, the APIs that take size_type for preceding and following should not need to materialise a column.

Provide API to construct window offset bounds from ranges

Extend the existing (spark-specific) detail API for constructing window bounds from ranges to a public one, and expand it to support the use cases that polars and pandas need: I have a PoC for this that does everything except handle nulls.

Remove/deprecate dead code

Originally range-based windows were only for timestamp order-by columns (hence cudf::grouped_time_range_rolling_window). But this (since 2021) has just called into cudf::grouped_range_rolling_window, so we can probably deprecate it.

Rationalise API

I think we want:

  • construction of window offsets from range bounds (grouped and ungrouped). Suggestion: std::pair<std::unique_ptr<column>, std::unique_ptr<column>> cudf::compute_range_window_bounds(table_view const &group_keys, column_view &orderby, order column_order, null_order null_order, range_window const& preceding, std::optional<range_window> const& following, stream, mr)
  • computing row-based rolling windows on a std::vector of aggregation requests (like groupby requests). Suggestion: std::unique_ptr<table> cudf::rolling_window(table_view const &group_keys, size_type preceding, size_type following, std::vector<size_type> min_periods, std::vector<RollingAggRequests>, stream, mr)
  • computing row-based rolling windows on a std::vector of aggregation requests given columns of window bounds (same as above but column_view for preceding and following).

I think we can build all the pieces we want out of these three things.

@wence- wence- added the feature request New feature or request label Dec 11, 2024
@wence-
Copy link
Contributor Author

wence- commented Dec 11, 2024

Note that I have a bunch of ideas about how to improve the performance of computing the windows (for large bounded windows) for some subset of the aggregations that we care about, but it will be easier if the current code is a bit more straightforward to follow through and provides stronger guarantees or preconditions.

@davidwendt
Copy link
Contributor

@mythrocks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants