-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[timeseries] Part-3: Add Time Series Exchange Operator, Plan Node and Serde #14611
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14611 +/- ##
============================================
+ Coverage 61.75% 64.11% +2.36%
- Complexity 207 1579 +1372
============================================
Files 2436 2692 +256
Lines 133233 148074 +14841
Branches 20636 22703 +2067
============================================
+ Hits 82274 94945 +12671
- Misses 44911 46157 +1246
- Partials 6048 6972 +924
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
timeSeriesMap.computeIfAbsent(seriesHash, (x) -> new ArrayList<>()).addAll(timeSeriesList); | ||
} | ||
} | ||
Preconditions.checkNotNull(timeBuckets, "Time buckets is null in exchange receive operator"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can move this before for loop for timeseriesMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted this to act as an exit condition hence kept it here.
...c/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
Outdated
Show resolved
Hide resolved
Preconditions.checkState(timeBuckets.equals(blockToMerge.getTimeBuckets()), | ||
"Found unequal time buckets from server response"); | ||
} | ||
for (var entry : blockToMerge.getSeriesMap().entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the union of series looks like below?
SeriesHash -----List of [seriesBlockServer1, seriesBlockServer2, ....]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's say we have two servers, they are sending:
server-1
seriesHash0 ==> [TimeSeries-1, TimeSeries-2]
seriesHash1 ==> [TimeSeries-3]
server-2:
seriesHash0 ==> [TimeSeries-4]
Then if getNextBlockNoAggregation
is called, then we'll have:
seriesHash0 ==> [TimeSeries-1, TimeSeries-2, TimeSeries-4]
seriesHash1 ==> [TimeSeries-3]
You can also refer to the UTs which have some examples.
* +-------------+------------+-------------+---------------------------------+ | ||
* | San Fran. | 94107 | ... | [value-0, value-1, ... value-x] | | ||
* +-------------+------------+-------------+---------------------------------+ | ||
* </pre> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Prometheus we are storing tags in json Column. As you mentioned json columns may have different series tags and levels. Looks like this scheme does not handle this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using json for storage is fine. This serde is only applicable for queries.
If I had to guess, you are using some JSON Transform function in the groupByExpressions
to extract out the appropriate keys and values. That case is handled by this Serde, because each Group By Expression will become a column.
...time/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
Outdated
Show resolved
Hide resolved
return Optional.of(timeSeriesList.get(0)); | ||
} | ||
|
||
private static Object[] timeBucketsToRow(TimeBuckets timeBuckets, DataSchema dataSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might make more sense to move these methods timeBucketsToRow
, timeBucketsFromRow
, timeSeriesToRow
, timeSeriesFromRow
in an utils class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now all of these methods are private hence keeping them here. For the foreseeable future we shouldn't need them outside of this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few nits. Overall LGTM!
Adds Exchange Operator, Plan Node, Plan Fragmenter and Serde. None of this code is hooked into the query execution yet, so this doesn't change the query behavior yet. In the next PR I'll wire all of this in and that PR would likely complete the Broker Reduce support.
Design Principle
Our goal is to be as minimal as possible to enable multi-server time-series queries. Our next step would be to hook all of this in with the MSE and remove the concept of the Time Series "Engine", but before getting there we'll propose a few MSE optimizations like Broker-Reduce mode and improved Collocated Join support.
Plan Fragmenter and Broker Reduce Overview
To enable multi-server queries, our approach is to take the logical plan generated by the TimeSeriesLogicalPlanner, and fragment the plan at every leaf node. We will then add a
TimeSeriesExchangeNode
to the upper fragment, and we'll leave the lower fragment as is.The
TimeSeriesExchangeNode
will be converted to aTimeSeriesExchangeReceivePlanNode
during physical plan generation, which will createTimeSeriesExchangeReceiveOperator
.This operator will receive data from all servers and merge them. Merging has two modes: aggregation mode where duplicate series received from different servers will be aggregated, and non-aggregation. mode where they will be appended to the
List<TimeSeries>
.Also, the broker will run its plan fragment in a PreOrder traversal, and each MailboxReceiveOperator is blocking and does not yield.
Block Serde
We convert the TimeSeriesBlock to/from a Row DataBlock using
TransferableBlock
as an intermediary. In #14558, I am also planning to eliminate the TimeSeriesBlock from the leaf stage completely, which will make it cohesively tie in with the serde. There are many optimizations possible here, but for now I have left a TODO.Test Plan
I have added UTs for this PR, and have also run Quickstart to verify that the queries are working E2E.