-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[timeseries] Remove Time Series Specific Code from V1 Engine #14841
[timeseries] Remove Time Series Specific Code from V1 Engine #14841
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14841 +/- ##
============================================
+ Coverage 61.75% 63.76% +2.01%
- Complexity 207 1611 +1404
============================================
Files 2436 2703 +267
Lines 133233 151188 +17955
Branches 20636 23341 +2705
============================================
+ Hits 82274 96402 +14128
- Misses 44911 47551 +2640
- Partials 6048 7235 +1187
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
/** | ||
* Used by the leaf stage, because the leaf stage materializes tag values very late. | ||
*/ | ||
public abstract TimeSeries buildWithTagOverrides(List<String> tagNames, Object[] tagValues); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@raghavyadav01 : this one change will be required in series builders. This should do the build, AND use the provided tag names and values for the new series. See other builders in this PR for reference.
@@ -220,7 +220,8 @@ public enum AggregationFunctionType { | |||
SqlTypeName.OTHER), | |||
PERCENTILERAWKLLMV("percentileRawKLLMV", ReturnTypes.VARCHAR, | |||
OperandTypes.family(List.of(SqlTypeFamily.ARRAY, SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER), i -> i == 2), | |||
SqlTypeName.OTHER); | |||
SqlTypeName.OTHER), | |||
TIMESERIESAGGREGATE("timeSeriesAggregate", SqlTypeName.OTHER, SqlTypeName.VARCHAR); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self-review: third arg (varchar) is wrong.
pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public double[] transformToDoubleValuesSV(ValueBlock valueBlock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also be implemented Values are stored as double ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the array that represents the index in the TimeBuckets that a given record maps to, so only supporting int so far.
if (groupByResultsBlock.getNumRows() == 0) { | ||
return new TimeSeriesBlock(timeBuckets, new HashMap<>()); | ||
} | ||
if (groupByResultsBlock.isNumGroupsLimitReached()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't want to allow partial results here is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I am updating this in the next PR. Scope of next PR is:
- Take in API parameter to control series limit. We will integrate that with V1 Engine via the numGroupsLimit config.
- Don't throw error on group limit reached.
note that throwing on series limit is same as the behavior before this PR. I didn't want to change it in this PR because I wanted to test it separately.
while (recordIterator.hasNext()) { | ||
Record record = recordIterator.next(); | ||
Object[] recordValues = record.getValues(); | ||
Object[] tagValues = new Object[recordValues.length - 1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can keep this as String array?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch again! The issue is that I had made tag values Object[] in TimeSeries. I wanted to change that to String[], which will lead to a change everywhere. Will take it up in the next few PRs. (also need to remove TimeSeries#id)
} | ||
|
||
@Override | ||
public long[] transformToLongValuesSV(ValueBlock valueBlock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need TODOs to support these in future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I don't want to support this since this function is not exactly intended for SQL, and I want to make sure that we use the int based code-path throughout.
Object[] recordValues = record.getValues(); | ||
Object[] tagValues = new Object[recordValues.length - 1]; | ||
for (int index = 0; index + 1 < recordValues.length; index++) { | ||
tagValues[index] = recordValues[index] == null ? "null" : recordValues[index].toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is "null " correct here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. For tag values we don't want actual nulls. Also just a heads up that I'll change Object[] tagValues
to String[] tagValues
in future PR
* Aggregation function used by the Time Series Engine. | ||
* TODO: This can't be used with SQL because the Object Serde is not implemented. | ||
*/ | ||
public class TimeSeriesAggregationFunction implements AggregationFunction<BaseTimeSeriesBuilder, DoubleArrayList> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed sometime back that we would also provide raw timestamp for each value. Do we have that change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add it in the PR after this.
Description
Removes Time Series specific code from Pinot's V1 Engine, and instead leverages a Transform function and an Aggregation function to implement the leaf stage for Time Series queries.
This helps us take advantage of the speed that
GroupByOperator
andAggregationOperator
, while at the same time, allows Time Series languages to define how they want their series to built.This also greatly simplifies the Time Series code, as can be seen by the fact that this PR removes a ton of code.
Series Limit Handling
Series limit are now handled via standard Group By limits as defined here.
API Changes
BaseTimeSeriesBuilder
has a new method calledbuildWithTagOverrides
. This is because theGroupByOperator
materializes Group values lazily to avoid excessive allocations.Testing
Updated existing unit tests, tested on Quickstart, and also tested on our cluster. Long range dashboards are finally loading with V1 Engine speeds.
Future Work
TimeSeriesBlock
, and propagate the group limit reached warning to the caller.