Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[timeseries] Part-3: Add Time Series Exchange Operator, Plan Node and Serde #14611

Merged
merged 5 commits into from
Dec 10, 2024

Conversation

ankitsultana
Copy link
Contributor

@ankitsultana ankitsultana commented Dec 6, 2024

Adds Exchange Operator, Plan Node, Plan Fragmenter and Serde. None of this code is hooked into the query execution yet, so this doesn't change the query behavior yet. In the next PR I'll wire all of this in and that PR would likely complete the Broker Reduce support.

Design Principle

Our goal is to be as minimal as possible to enable multi-server time-series queries. Our next step would be to hook all of this in with the MSE and remove the concept of the Time Series "Engine", but before getting there we'll propose a few MSE optimizations like Broker-Reduce mode and improved Collocated Join support.

Plan Fragmenter and Broker Reduce Overview

To enable multi-server queries, our approach is to take the logical plan generated by the TimeSeriesLogicalPlanner, and fragment the plan at every leaf node. We will then add a TimeSeriesExchangeNode to the upper fragment, and we'll leave the lower fragment as is.

The TimeSeriesExchangeNode will be converted to a TimeSeriesExchangeReceivePlanNode during physical plan generation, which will create TimeSeriesExchangeReceiveOperator.

This operator will receive data from all servers and merge them. Merging has two modes: aggregation mode where duplicate series received from different servers will be aggregated, and non-aggregation. mode where they will be appended to the List<TimeSeries>.

Also, the broker will run its plan fragment in a PreOrder traversal, and each MailboxReceiveOperator is blocking and does not yield.

Block Serde

We convert the TimeSeriesBlock to/from a Row DataBlock using TransferableBlock as an intermediary. In #14558, I am also planning to eliminate the TimeSeriesBlock from the leaf stage completely, which will make it cohesively tie in with the serde. There are many optimizations possible here, but for now I have left a TODO.

Test Plan

I have added UTs for this PR, and have also run Quickstart to verify that the queries are working E2E.

@ankitsultana ankitsultana added the timeseries-engine Tracking tag for generic time-series engine work label Dec 6, 2024
@ankitsultana ankitsultana changed the title Part-3: Add Time Series Exchange Operator, Plan Node and Serde [timeseries] Part-3: Add Time Series Exchange Operator, Plan Node and Serde Dec 6, 2024
@codecov-commenter
Copy link

codecov-commenter commented Dec 6, 2024

Codecov Report

Attention: Patch coverage is 86.24339% with 26 lines in your changes missing coverage. Please review.

Project coverage is 64.11%. Comparing base (59551e4) to head (a8cc6cb).
Report is 1449 commits behind head on master.

Files with missing lines Patch % Lines
.../timeseries/TimeSeriesExchangeReceivePlanNode.java 0.00% 12 Missing ⚠️
...runtime/timeseries/serde/TimeSeriesBlockSerde.java 93.42% 1 Missing and 4 partials ⚠️
...che/pinot/tsdb/planner/TimeSeriesExchangeNode.java 37.50% 5 Missing ⚠️
.../timeseries/TimeSeriesExchangeReceiveOperator.java 94.59% 1 Missing and 3 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14611      +/-   ##
============================================
+ Coverage     61.75%   64.11%   +2.36%     
- Complexity      207     1579    +1372     
============================================
  Files          2436     2692     +256     
  Lines        133233   148074   +14841     
  Branches      20636    22703    +2067     
============================================
+ Hits          82274    94945   +12671     
- Misses        44911    46157    +1246     
- Partials       6048     6972     +924     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 64.03% <86.24%> (+2.32%) ⬆️
java-21 64.01% <86.24%> (+2.39%) ⬆️
skip-bytebuffers-false 64.11% <86.24%> (+2.36%) ⬆️
skip-bytebuffers-true 63.92% <86.24%> (+36.19%) ⬆️
temurin 64.11% <86.24%> (+2.36%) ⬆️
unittests 64.11% <86.24%> (+2.36%) ⬆️
unittests1 56.19% <74.60%> (+9.30%) ⬆️
unittests2 34.61% <11.64%> (+6.88%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

timeSeriesMap.computeIfAbsent(seriesHash, (x) -> new ArrayList<>()).addAll(timeSeriesList);
}
}
Preconditions.checkNotNull(timeBuckets, "Time buckets is null in exchange receive operator");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can move this before for loop for timeseriesMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted this to act as an exit condition hence kept it here.

Preconditions.checkState(timeBuckets.equals(blockToMerge.getTimeBuckets()),
"Found unequal time buckets from server response");
}
for (var entry : blockToMerge.getSeriesMap().entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the union of series looks like below?

SeriesHash -----List of [seriesBlockServer1, seriesBlockServer2, ....]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's say we have two servers, they are sending:

server-1

seriesHash0 ==> [TimeSeries-1, TimeSeries-2]
seriesHash1 ==> [TimeSeries-3]
server-2:

seriesHash0 ==> [TimeSeries-4]

Then if getNextBlockNoAggregation is called, then we'll have:

seriesHash0 ==> [TimeSeries-1, TimeSeries-2, TimeSeries-4]
seriesHash1 ==> [TimeSeries-3]

You can also refer to the UTs which have some examples.

* +-------------+------------+-------------+---------------------------------+
* | San Fran. | 94107 | ... | [value-0, value-1, ... value-x] |
* +-------------+------------+-------------+---------------------------------+
* </pre>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Prometheus we are storing tags in json Column. As you mentioned json columns may have different series tags and levels. Looks like this scheme does not handle this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using json for storage is fine. This serde is only applicable for queries.

If I had to guess, you are using some JSON Transform function in the groupByExpressions to extract out the appropriate keys and values. That case is handled by this Serde, because each Group By Expression will become a column.

return Optional.of(timeSeriesList.get(0));
}

private static Object[] timeBucketsToRow(TimeBuckets timeBuckets, DataSchema dataSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might make more sense to move these methods timeBucketsToRow, timeBucketsFromRow, timeSeriesToRow , timeSeriesFromRow in an utils class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now all of these methods are private hence keeping them here. For the foreseeable future we shouldn't need them outside of this class.

Copy link
Contributor

@tibrewalpratik17 tibrewalpratik17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few nits. Overall LGTM!

@tibrewalpratik17 tibrewalpratik17 merged commit 1b5858b into apache:master Dec 10, 2024
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
timeseries-engine Tracking tag for generic time-series engine work
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants