From 4f3cfbbe0258356b0948319d1b61adbdf8dfadf6 Mon Sep 17 00:00:00 2001 From: ankitsultana Date: Tue, 26 Nov 2024 23:19:00 +0000 Subject: [PATCH 1/2] Squashed Commit for Transform/Agg Based Combine --- .../function/TransformFunctionType.java | 3 +- .../request/context/TimeSeriesContext.java | 73 ----- .../blocks/TimeSeriesBuilderBlock.java | 61 ---- .../blocks/results/ResultsBlockUtils.java | 13 - .../results/TimeSeriesResultsBlock.java | 70 ----- .../combine/TimeSeriesCombineOperator.java | 43 --- .../TimeSeriesAggResultsBlockMerger.java | 65 ----- .../TimeSeriesAggregationOperator.java | 272 ----------------- .../TimeSeriesSelectionOperator.java | 59 ---- .../function/TimeSeriesTransformFunction.java | 81 ++++++ .../function/TransformFunctionFactory.java | 3 + .../pinot/core/plan/CombinePlanNode.java | 9 +- .../pinot/core/plan/TimeSeriesPlanNode.java | 94 ------ .../plan/maker/InstancePlanMakerImplV2.java | 5 +- .../function/AggregationFunctionFactory.java | 5 + .../TimeSeriesAggregationFunction.java | 275 ++++++++++++++++++ .../query/request/context/QueryContext.java | 22 +- .../context/utils/QueryContextUtils.java | 11 +- .../TimeSeriesAggregationOperatorTest.java | 195 ------------- .../query/executor/QueryExecutorTest.java | 116 -------- .../pinot/tsdb/m3ql/parser/Tokenizer.java | 5 +- .../timeseries/LeafTimeSeriesOperator.java | 92 +++++- .../PhysicalTimeSeriesPlanVisitor.java | 55 +++- .../TimeSeriesPhysicalTableScan.java | 5 +- .../PhysicalTimeSeriesPlanVisitorTest.java | 10 - .../segment/spi/AggregationFunctionType.java | 5 +- 26 files changed, 515 insertions(+), 1132 deletions(-) delete mode 100644 pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java delete mode 100644 pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TimeSeriesBuilderBlock.java delete mode 100644 pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java delete mode 100644 pinot-core/src/main/java/org/apache/pinot/core/operator/combine/TimeSeriesCombineOperator.java delete mode 100644 pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java delete mode 100644 pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java delete mode 100644 pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesSelectionOperator.java create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesTransformFunction.java delete mode 100644 pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java delete mode 100644 pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java index cf8b018e8a3a..5fb1e1a3cf44 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java @@ -234,7 +234,8 @@ public enum TransformFunctionType { RADIANS("radians"), // Complex type handling - ITEM("item"); + ITEM("item"), + TIMESERIES_BUCKET_INDEX("timeSeriesBucketIndex"); private final String _name; private final List _names; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java b/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java deleted file mode 100644 index ba7858ea1185..000000000000 --- a/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.common.request.context; - -import java.util.concurrent.TimeUnit; -import org.apache.pinot.tsdb.spi.AggInfo; -import org.apache.pinot.tsdb.spi.TimeBuckets; - - -public class TimeSeriesContext { - private final String _language; - private final String _timeColumn; - private final TimeUnit _timeUnit; - private final TimeBuckets _timeBuckets; - private final Long _offsetSeconds; - private final ExpressionContext _valueExpression; - private final AggInfo _aggInfo; - - public TimeSeriesContext(String language, String timeColumn, TimeUnit timeUnit, TimeBuckets timeBuckets, - Long offsetSeconds, ExpressionContext valueExpression, AggInfo aggInfo) { - _language = language; - _timeColumn = timeColumn; - _timeUnit = timeUnit; - _timeBuckets = timeBuckets; - _offsetSeconds = offsetSeconds; - _valueExpression = valueExpression; - _aggInfo = aggInfo; - } - - public String getLanguage() { - return _language; - } - - public String getTimeColumn() { - return _timeColumn; - } - - public TimeUnit getTimeUnit() { - return _timeUnit; - } - - public TimeBuckets getTimeBuckets() { - return _timeBuckets; - } - - public Long getOffsetSeconds() { - return _offsetSeconds; - } - - public ExpressionContext getValueExpression() { - return _valueExpression; - } - - public AggInfo getAggInfo() { - return _aggInfo; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TimeSeriesBuilderBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TimeSeriesBuilderBlock.java deleted file mode 100644 index 0f9eda897d6e..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TimeSeriesBuilderBlock.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.blocks; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.pinot.core.operator.combine.merger.TimeSeriesAggResultsBlockMerger; -import org.apache.pinot.tsdb.spi.TimeBuckets; -import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; -import org.apache.pinot.tsdb.spi.series.TimeSeries; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; - - -/** - * Block used by the {@link TimeSeriesAggResultsBlockMerger}. - */ -public class TimeSeriesBuilderBlock { - private final TimeBuckets _timeBuckets; - private final Map _seriesBuilderMap; - - public TimeSeriesBuilderBlock(TimeBuckets timeBuckets, Map seriesBuilderMap) { - _timeBuckets = timeBuckets; - _seriesBuilderMap = seriesBuilderMap; - } - - public TimeBuckets getTimeBuckets() { - return _timeBuckets; - } - - public Map getSeriesBuilderMap() { - return _seriesBuilderMap; - } - - public TimeSeriesBlock build() { - Map> seriesMap = new HashMap<>(); - for (var entry : _seriesBuilderMap.entrySet()) { - List result = new ArrayList<>(1); - result.add(entry.getValue().build()); - seriesMap.put(entry.getKey(), result); - } - return new TimeSeriesBlock(_timeBuckets, seriesMap); - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java index 5969053755f3..9775d04d1cb1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.operator.blocks.results; -import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -26,10 +25,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FilterContext; -import org.apache.pinot.common.request.context.TimeSeriesContext; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; -import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.distinct.DistinctTable; @@ -43,9 +40,6 @@ private ResultsBlockUtils() { } public static BaseResultsBlock buildEmptyQueryResults(QueryContext queryContext) { - if (QueryContextUtils.isTimeSeriesQuery(queryContext)) { - return buildEmptyTimeSeriesResults(queryContext); - } if (QueryContextUtils.isSelectionQuery(queryContext)) { return buildEmptySelectionQueryResults(queryContext); } @@ -123,11 +117,4 @@ private static DistinctResultsBlock buildEmptyDistinctQueryResults(QueryContext queryContext.isNullHandlingEnabled()); return new DistinctResultsBlock(distinctTable, queryContext); } - - private static TimeSeriesResultsBlock buildEmptyTimeSeriesResults(QueryContext queryContext) { - TimeSeriesContext timeSeriesContext = queryContext.getTimeSeriesContext(); - Preconditions.checkNotNull(timeSeriesContext); - return new TimeSeriesResultsBlock( - new TimeSeriesBuilderBlock(timeSeriesContext.getTimeBuckets(), Collections.emptyMap())); - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java deleted file mode 100644 index f8e7fac944c5..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.blocks.results; - -import java.io.IOException; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.pinot.common.datatable.DataTable; -import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; -import org.apache.pinot.core.query.request.context.QueryContext; - - -// TODO(timeseries): Implement unsupported functions when merging with MSE. -public class TimeSeriesResultsBlock extends BaseResultsBlock { - private final TimeSeriesBuilderBlock _timeSeriesBuilderBlock; - - public TimeSeriesResultsBlock(TimeSeriesBuilderBlock timeSeriesBuilderBlock) { - _timeSeriesBuilderBlock = timeSeriesBuilderBlock; - } - - @Override - public int getNumRows() { - return _timeSeriesBuilderBlock.getSeriesBuilderMap().size(); - } - - @Nullable - @Override - public QueryContext getQueryContext() { - throw new UnsupportedOperationException("Time series results block does not support getting QueryContext yet"); - } - - @Nullable - @Override - public DataSchema getDataSchema() { - throw new UnsupportedOperationException("Time series results block does not support getting DataSchema yet"); - } - - @Nullable - @Override - public List getRows() { - throw new UnsupportedOperationException("Time series results block does not support getRows yet"); - } - - @Override - public DataTable getDataTable() - throws IOException { - throw new UnsupportedOperationException("Time series results block does not support returning DataTable"); - } - - public TimeSeriesBuilderBlock getTimeSeriesBuilderBlock() { - return _timeSeriesBuilderBlock; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/TimeSeriesCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/TimeSeriesCombineOperator.java deleted file mode 100644 index 3f410b58cb5e..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/TimeSeriesCombineOperator.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.combine; - -import java.util.List; -import java.util.concurrent.ExecutorService; -import javax.annotation.Nullable; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; -import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger; -import org.apache.pinot.core.query.request.context.QueryContext; - - -public class TimeSeriesCombineOperator extends BaseSingleBlockCombineOperator { - private static final String EXPLAIN_NAME = "TIME_SERIES_COMBINE"; - - public TimeSeriesCombineOperator(ResultsBlockMerger resultsBlockMerger, - List operators, QueryContext queryContext, ExecutorService executorService) { - super(resultsBlockMerger, operators, queryContext, executorService); - } - - @Nullable - @Override - public String toExplainString() { - return EXPLAIN_NAME; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java deleted file mode 100644 index 428cfde55511..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.combine.merger; - -import com.google.common.base.Preconditions; -import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; -import org.apache.pinot.tsdb.spi.AggInfo; -import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; - - -public class TimeSeriesAggResultsBlockMerger implements ResultsBlockMerger { - private final TimeSeriesBuilderFactory _seriesBuilderFactory; - private final AggInfo _aggInfo; - private final int _maxSeriesLimit; - private final long _maxDataPointsLimit; - - public TimeSeriesAggResultsBlockMerger(TimeSeriesBuilderFactory seriesBuilderFactory, AggInfo aggInfo) { - _seriesBuilderFactory = seriesBuilderFactory; - _aggInfo = aggInfo; - _maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit(); - _maxDataPointsLimit = _seriesBuilderFactory.getMaxDataPointsPerServerLimit(); - } - - @Override - public void mergeResultsBlocks(TimeSeriesResultsBlock mergedBlock, TimeSeriesResultsBlock blockToMerge) { - TimeSeriesBuilderBlock currentTimeSeriesBlock = mergedBlock.getTimeSeriesBuilderBlock(); - TimeSeriesBuilderBlock seriesBlockToMerge = blockToMerge.getTimeSeriesBuilderBlock(); - for (var entry : seriesBlockToMerge.getSeriesBuilderMap().entrySet()) { - long seriesHash = entry.getKey(); - BaseTimeSeriesBuilder currentTimeSeriesBuilder = currentTimeSeriesBlock.getSeriesBuilderMap().get(seriesHash); - BaseTimeSeriesBuilder newTimeSeriesToMerge = entry.getValue(); - if (currentTimeSeriesBuilder == null) { - currentTimeSeriesBlock.getSeriesBuilderMap().put(seriesHash, newTimeSeriesToMerge); - final long currentUniqueSeries = currentTimeSeriesBlock.getSeriesBuilderMap().size(); - final long numBuckets = currentTimeSeriesBlock.getTimeBuckets().getNumBuckets(); - Preconditions.checkState(currentUniqueSeries * numBuckets <= _maxDataPointsLimit, - "Max data points limit reached in combine operator. Limit: %s. Current count: %s", - _maxDataPointsLimit, currentUniqueSeries * numBuckets); - Preconditions.checkState(currentUniqueSeries <= _maxSeriesLimit, - "Max series limit reached in combine operator. Limit: %s. Current Size: %s", - _maxSeriesLimit, currentTimeSeriesBlock.getSeriesBuilderMap().size()); - } else { - currentTimeSeriesBuilder.mergeAlignedSeriesBuilder(newTimeSeriesToMerge); - } - } - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java deleted file mode 100644 index a39c996f4fb7..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java +++ /dev/null @@ -1,272 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.timeseries; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.NotImplementedException; -import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.core.common.BlockValSet; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseOperator; -import org.apache.pinot.core.operator.BaseProjectOperator; -import org.apache.pinot.core.operator.ExecutionStatistics; -import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; -import org.apache.pinot.core.operator.blocks.ValueBlock; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; -import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.tsdb.spi.AggInfo; -import org.apache.pinot.tsdb.spi.TimeBuckets; -import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; -import org.apache.pinot.tsdb.spi.series.TimeSeries; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; - - -/** - * Segment level operator which converts data in relational model to a time-series model, by aggregating on a - * configured aggregation expression. - */ -public class TimeSeriesAggregationOperator extends BaseOperator { - private static final String EXPLAIN_NAME = "TIME_SERIES_AGGREGATION"; - private final String _timeColumn; - private final TimeUnit _storedTimeUnit; - private final long _timeOffset; - private final AggInfo _aggInfo; - private final ExpressionContext _valueExpression; - private final List _groupByExpressions; - private final BaseProjectOperator _projectOperator; - private final TimeBuckets _timeBuckets; - private final TimeSeriesBuilderFactory _seriesBuilderFactory; - private final int _maxSeriesLimit; - private final long _maxDataPointsLimit; - private final long _numTotalDocs; - private long _numDocsScanned = 0; - - public TimeSeriesAggregationOperator( - String timeColumn, - TimeUnit timeUnit, - @Nullable Long timeOffsetSeconds, - AggInfo aggInfo, - ExpressionContext valueExpression, - List groupByExpressions, - TimeBuckets timeBuckets, - BaseProjectOperator projectOperator, - TimeSeriesBuilderFactory seriesBuilderFactory, - SegmentMetadata segmentMetadata) { - _timeColumn = timeColumn; - _storedTimeUnit = timeUnit; - _timeOffset = timeOffsetSeconds == null ? 0L : timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds)); - _aggInfo = aggInfo; - _valueExpression = valueExpression; - _groupByExpressions = groupByExpressions; - _projectOperator = projectOperator; - _timeBuckets = timeBuckets; - _seriesBuilderFactory = seriesBuilderFactory; - _maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit(); - _maxDataPointsLimit = _seriesBuilderFactory.getMaxDataPointsPerServerLimit(); - _numTotalDocs = segmentMetadata.getTotalDocs(); - } - - @Override - protected TimeSeriesResultsBlock getNextBlock() { - ValueBlock valueBlock; - Map seriesBuilderMap = new HashMap<>(1024); - while ((valueBlock = _projectOperator.nextBlock()) != null) { - int numDocs = valueBlock.getNumDocs(); - _numDocsScanned += numDocs; - // TODO: This is quite unoptimized and allocates liberally - BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn); - long[] timeValues = blockValSet.getLongValuesSV(); - applyTimeOffset(timeValues, numDocs); - int[] timeValueIndexes = getTimeValueIndex(timeValues, numDocs); - Object[][] tagValues = new Object[_groupByExpressions.size()][]; - for (int i = 0; i < _groupByExpressions.size(); i++) { - blockValSet = valueBlock.getBlockValueSet(_groupByExpressions.get(i)); - switch (blockValSet.getValueType()) { - case JSON: - case STRING: - tagValues[i] = blockValSet.getStringValuesSV(); - break; - case LONG: - tagValues[i] = ArrayUtils.toObject(blockValSet.getLongValuesSV()); - break; - case INT: - tagValues[i] = ArrayUtils.toObject(blockValSet.getIntValuesSV()); - break; - default: - throw new NotImplementedException("Can't handle types other than string and long"); - } - } - BlockValSet valueExpressionBlockValSet = valueBlock.getBlockValueSet(_valueExpression); - switch (valueExpressionBlockValSet.getValueType()) { - case LONG: - processLongExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues, numDocs); - break; - case INT: - processIntExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues, numDocs); - break; - case DOUBLE: - processDoubleExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues, numDocs); - break; - case STRING: - processStringExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues, numDocs); - break; - default: - // TODO: Support other types? - throw new IllegalStateException( - "Don't yet support value expression of type: " + valueExpressionBlockValSet.getValueType()); - } - Preconditions.checkState(seriesBuilderMap.size() * (long) _timeBuckets.getNumBuckets() <= _maxDataPointsLimit, - "Exceeded max data point limit per server. Limit: %s. Data points in current segment so far: %s", - _maxDataPointsLimit, seriesBuilderMap.size() * _timeBuckets.getNumBuckets()); - Preconditions.checkState(seriesBuilderMap.size() <= _maxSeriesLimit, - "Exceeded max unique series limit per server. Limit: %s. Series in current segment so far: %s", - _maxSeriesLimit, seriesBuilderMap.size()); - } - return new TimeSeriesResultsBlock(new TimeSeriesBuilderBlock(_timeBuckets, seriesBuilderMap)); - } - - @Override - @SuppressWarnings("rawtypes") - public List getChildOperators() { - return ImmutableList.of(_projectOperator); - } - - @Nullable - @Override - public String toExplainString() { - return EXPLAIN_NAME; - } - - @Override - public ExecutionStatistics getExecutionStatistics() { - long numEntriesScannedInFilter = _projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); - long numEntriesScannedPostFilter = _numDocsScanned * _projectOperator.getNumColumnsProjected(); - return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, - _numTotalDocs); - } - - @VisibleForTesting - protected int[] getTimeValueIndex(long[] actualTimeValues, int numDocs) { - if (_storedTimeUnit == TimeUnit.MILLISECONDS) { - return getTimeValueIndexMillis(actualTimeValues, numDocs); - } - int[] timeIndexes = new int[numDocs]; - final long reference = _timeBuckets.getTimeRangeStartExclusive(); - final long divisor = _timeBuckets.getBucketSize().getSeconds(); - for (int index = 0; index < numDocs; index++) { - timeIndexes[index] = (int) ((actualTimeValues[index] - reference - 1) / divisor); - } - return timeIndexes; - } - - private int[] getTimeValueIndexMillis(long[] actualTimeValues, int numDocs) { - int[] timeIndexes = new int[numDocs]; - final long reference = _timeBuckets.getTimeRangeStartExclusive() * 1000L; - final long divisor = _timeBuckets.getBucketSize().toMillis(); - for (int index = 0; index < numDocs; index++) { - timeIndexes[index] = (int) ((actualTimeValues[index] - reference - 1) / divisor); - } - return timeIndexes; - } - - public void processLongExpression(BlockValSet blockValSet, Map seriesBuilderMap, - int[] timeValueIndexes, Object[][] tagValues, int numDocs) { - long[] valueColumnValues = blockValSet.getLongValuesSV(); - for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) { - Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; - for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { - tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; - } - long hash = TimeSeries.hash(tagValuesForDoc); - seriesBuilderMap.computeIfAbsent(hash, - k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, - _groupByExpressions, - tagValuesForDoc)) - .addValueAtIndex(timeValueIndexes[docIdIndex], (double) valueColumnValues[docIdIndex]); - } - } - - public void processIntExpression(BlockValSet blockValSet, Map seriesBuilderMap, - int[] timeValueIndexes, Object[][] tagValues, int numDocs) { - int[] valueColumnValues = blockValSet.getIntValuesSV(); - for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) { - Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; - for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { - tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; - } - long hash = TimeSeries.hash(tagValuesForDoc); - seriesBuilderMap.computeIfAbsent(hash, - k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, - _groupByExpressions, - tagValuesForDoc)) - .addValueAtIndex(timeValueIndexes[docIdIndex], (double) valueColumnValues[docIdIndex]); - } - } - - public void processDoubleExpression(BlockValSet blockValSet, Map seriesBuilderMap, - int[] timeValueIndexes, Object[][] tagValues, int numDocs) { - double[] valueColumnValues = blockValSet.getDoubleValuesSV(); - for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) { - Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; - for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { - tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; - } - long hash = TimeSeries.hash(tagValuesForDoc); - seriesBuilderMap.computeIfAbsent(hash, - k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, - _groupByExpressions, - tagValuesForDoc)) - .addValueAtIndex(timeValueIndexes[docIdIndex], valueColumnValues[docIdIndex]); - } - } - - public void processStringExpression(BlockValSet blockValSet, Map seriesBuilderMap, - int[] timeValueIndexes, Object[][] tagValues, int numDocs) { - String[] valueColumnValues = blockValSet.getStringValuesSV(); - for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) { - Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; - for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { - tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; - } - long hash = TimeSeries.hash(tagValuesForDoc); - seriesBuilderMap.computeIfAbsent(hash, - k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, - _groupByExpressions, tagValuesForDoc)) - .addValueAtIndex(timeValueIndexes[docIdIndex], valueColumnValues[docIdIndex]); - } - } - - private void applyTimeOffset(long[] timeValues, int numDocs) { - if (_timeOffset == 0L) { - return; - } - for (int index = 0; index < numDocs; index++) { - timeValues[index] = timeValues[index] + _timeOffset; - } - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesSelectionOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesSelectionOperator.java deleted file mode 100644 index 62b069bb576f..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesSelectionOperator.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.timeseries; - -import com.google.common.collect.ImmutableList; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseOperator; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; -import org.apache.pinot.core.operator.transform.TransformOperator; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; - - -public class TimeSeriesSelectionOperator extends BaseOperator { - private static final String EXPLAIN_NAME = "TIME_SERIES_SELECTION_OPERATOR"; - private final Long _evaluationTimestamp; - private final TransformOperator _transformOperator; - private final TimeSeriesBuilderFactory _seriesBuilderFactory; - - public TimeSeriesSelectionOperator(Long evaluationTimestamp, - TransformOperator transformOperator, TimeSeriesBuilderFactory seriesBuilderFactory) { - _evaluationTimestamp = evaluationTimestamp; - _transformOperator = transformOperator; - _seriesBuilderFactory = seriesBuilderFactory; - } - - @Override - protected TimeSeriesResultsBlock getNextBlock() { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - public List getChildOperators() { - return ImmutableList.of(_transformOperator); - } - - @Nullable - @Override - public String toExplainString() { - return EXPLAIN_NAME; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesTransformFunction.java new file mode 100644 index 000000000000..396376a254c8 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesTransformFunction.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.transform.function; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.core.operator.ColumnContext; +import org.apache.pinot.core.operator.blocks.ValueBlock; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +/** + * Usage: + *
+ *   args: time column/expression, time-unit, first time bucket value, bucket size in seconds, offset in seconds
+ *   timeSeriesBucketIndex(secondsSinceEpoch, 'MILLISECONDS', 123, 10, 0)
+ * 
+ */ +public class TimeSeriesTransformFunction extends BaseTransformFunction { + public static final String FUNCTION_NAME = "timeSeriesBucketIndex"; + private TimeUnit _timeUnit; + private long _reference = -1; + private long _divisor = -1; + private long _offsetSeconds = 0; + + @Override + public void init(List arguments, Map columnContextMap) { + super.init(arguments, columnContextMap); + _timeUnit = TimeUnit.valueOf(((LiteralTransformFunction) arguments.get(1)).getStringLiteral().toUpperCase( + Locale.ENGLISH)); + _offsetSeconds = ((LiteralTransformFunction) arguments.get(4)).getLongLiteral(); + final long startSeconds = ((LiteralTransformFunction) arguments.get(2)).getLongLiteral(); + final long bucketSizeSeconds = ((LiteralTransformFunction) arguments.get(3)).getLongLiteral(); + _reference = (startSeconds - bucketSizeSeconds); + _divisor = bucketSizeSeconds; + if (_timeUnit == TimeUnit.MILLISECONDS) { + _reference *= 1000; + _divisor *= 1000; + } + } + + @Override + public int[] transformToIntValuesSV(ValueBlock valueBlock) { + int length = valueBlock.getNumDocs(); + initIntValuesSV(length); + long[] inputValues = _arguments.get(0).transformToLongValuesSV(valueBlock); + for (int docIndex = 0; docIndex < length; docIndex++) { + _intValuesSV[docIndex] = (int) (((inputValues[docIndex] + _offsetSeconds) - _reference - 1) / _divisor); + } + return _intValuesSV; + } + + @Override + public String getName() { + return FUNCTION_NAME; + } + + @Override + public TransformResultMetadata getResultMetadata() { + return new TransformResultMetadata(DataType.INT, true, false); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java index 3709d08f2b99..e9cd80197c5b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java @@ -251,6 +251,9 @@ private static Map> createRegistry() // Item functions typeToImplementation.put(TransformFunctionType.ITEM, ItemTransformFunction.class); + // Timeseries functions + typeToImplementation.put(TransformFunctionType.TIMESERIES_BUCKET_INDEX, TimeSeriesTransformFunction.class); + Map> registry = new HashMap<>(HashUtil.getHashMapCapacity(typeToImplementation.size())); for (Map.Entry> entry : typeToImplementation.entrySet()) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java index 26a92082259f..3cd14b95dff7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java @@ -32,8 +32,6 @@ import org.apache.pinot.core.operator.combine.MinMaxValueBasedSelectionOrderByCombineOperator; import org.apache.pinot.core.operator.combine.SelectionOnlyCombineOperator; import org.apache.pinot.core.operator.combine.SelectionOrderByCombineOperator; -import org.apache.pinot.core.operator.combine.TimeSeriesCombineOperator; -import org.apache.pinot.core.operator.combine.merger.TimeSeriesAggResultsBlockMerger; import org.apache.pinot.core.operator.streaming.StreamingSelectionOnlyCombineOperator; import org.apache.pinot.core.query.executor.ResultsBlockStreamer; import org.apache.pinot.core.query.request.context.QueryContext; @@ -44,7 +42,6 @@ import org.apache.pinot.spi.trace.InvocationRecording; import org.apache.pinot.spi.trace.InvocationScope; import org.apache.pinot.spi.trace.Tracing; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider; /** @@ -125,11 +122,7 @@ private BaseCombineOperator getCombineOperator() { }, _executorService, _queryContext.getEndTimeMs()); } - if (QueryContextUtils.isTimeSeriesQuery(_queryContext)) { - return new TimeSeriesCombineOperator(new TimeSeriesAggResultsBlockMerger( - TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_queryContext.getTimeSeriesContext().getLanguage()), - _queryContext.getTimeSeriesContext().getAggInfo()), operators, _queryContext, _executorService); - } else if (_streamer != null + if (_streamer != null && QueryContextUtils.isSelectionOnlyQuery(_queryContext) && _queryContext.getLimit() != 0) { // Use streaming operator only for non-empty selection-only query return new StreamingSelectionOnlyCombineOperator(operators, _queryContext, _executorService); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java deleted file mode 100644 index dae0479ebbab..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.plan; - -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.common.request.context.TimeSeriesContext; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseProjectOperator; -import org.apache.pinot.core.operator.blocks.ValueBlock; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; -import org.apache.pinot.core.operator.timeseries.TimeSeriesAggregationOperator; -import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.segment.spi.SegmentContext; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider; - - -public class TimeSeriesPlanNode implements PlanNode { - private final SegmentContext _segmentContext; - private final QueryContext _queryContext; - private final TimeSeriesContext _timeSeriesContext; - private final TimeSeriesBuilderFactory _seriesBuilderFactory; - - public TimeSeriesPlanNode(SegmentContext segmentContext, QueryContext queryContext) { - _segmentContext = segmentContext; - _queryContext = queryContext; - _timeSeriesContext = Objects.requireNonNull(queryContext.getTimeSeriesContext(), - "Missing time-series context in TimeSeriesPlanNode"); - _seriesBuilderFactory = TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_timeSeriesContext.getLanguage()); - } - - @Override - public Operator run() { - FilterPlanNode filterPlanNode = new FilterPlanNode(_segmentContext, _queryContext); - ProjectPlanNode projectPlanNode = new ProjectPlanNode( - _segmentContext, _queryContext, getProjectPlanNodeExpressions(), DocIdSetPlanNode.MAX_DOC_PER_CALL, - filterPlanNode.run()); - BaseProjectOperator projectionOperator = projectPlanNode.run(); - return new TimeSeriesAggregationOperator( - _timeSeriesContext.getTimeColumn(), - _timeSeriesContext.getTimeUnit(), - _timeSeriesContext.getOffsetSeconds(), - _timeSeriesContext.getAggInfo(), - _timeSeriesContext.getValueExpression(), - getGroupByColumns(), - _timeSeriesContext.getTimeBuckets(), - projectionOperator, - _seriesBuilderFactory, - _segmentContext.getIndexSegment().getSegmentMetadata()); - } - - private List getProjectPlanNodeExpressions() { - List result = new ArrayList<>(_queryContext.getSelectExpressions()); - if (CollectionUtils.isNotEmpty(_queryContext.getGroupByExpressions())) { - result.addAll(_queryContext.getGroupByExpressions()); - } - result.add(_queryContext.getTimeSeriesContext().getValueExpression()); - result.add(ExpressionContext.forIdentifier(_queryContext.getTimeSeriesContext().getTimeColumn())); - return result; - } - - private List getGroupByColumns() { - if (_queryContext.getGroupByExpressions() == null) { - return new ArrayList<>(); - } - List groupByColumns = new ArrayList<>(); - for (ExpressionContext expression : _queryContext.getGroupByExpressions()) { - Preconditions.checkState(expression.getType() == ExpressionContext.Type.IDENTIFIER); - groupByColumns.add(expression.getIdentifier()); - } - return groupByColumns; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index bd023911120b..70b4fc8961be 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -45,7 +45,6 @@ import org.apache.pinot.core.plan.SelectionPlanNode; import org.apache.pinot.core.plan.StreamingInstanceResponsePlanNode; import org.apache.pinot.core.plan.StreamingSelectionPlanNode; -import org.apache.pinot.core.plan.TimeSeriesPlanNode; import org.apache.pinot.core.query.executor.ResultsBlockStreamer; import org.apache.pinot.core.query.prefetch.FetchPlanner; import org.apache.pinot.core.query.prefetch.FetchPlannerRegistry; @@ -234,9 +233,7 @@ private void applyQueryOptions(QueryContext queryContext) { @Override public PlanNode makeSegmentPlanNode(SegmentContext segmentContext, QueryContext queryContext) { rewriteQueryContextWithHints(queryContext, segmentContext.getIndexSegment()); - if (QueryContextUtils.isTimeSeriesQuery(queryContext)) { - return new TimeSeriesPlanNode(segmentContext, queryContext); - } else if (QueryContextUtils.isAggregationQuery(queryContext)) { + if (QueryContextUtils.isAggregationQuery(queryContext)) { List groupByExpressions = queryContext.getGroupByExpressions(); if (groupByExpressions != null) { // Group-by query diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 96491acbd47e..d5c47e6cdbe1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -479,6 +479,11 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio return new DistinctCountULLAggregationFunction(arguments); case DISTINCTCOUNTRAWULL: return new DistinctCountRawULLAggregationFunction(arguments); + case TIMESERIESMIN: + case TIMESERIESMAX: + case TIMESERIESSUM: + return new TimeSeriesAggregationFunction(functionType, arguments.get(0), arguments.get(1), + arguments.get(2)); default: throw new IllegalArgumentException("Unsupported aggregation function type: " + functionType); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java new file mode 100644 index 000000000000..7cf502d9dcbb --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +public class TimeSeriesAggregationFunction implements AggregationFunction { + private final AggregationFunctionType _aggregationFunctionType; + private final ExpressionContext _valueExpression; + private final ExpressionContext _timeExpression; + private final int _numTimeBuckets; + + public TimeSeriesAggregationFunction(AggregationFunctionType aggregationFunctionType, + ExpressionContext valueExpression, ExpressionContext timeExpression, ExpressionContext numTimeBuckets) { + _aggregationFunctionType = aggregationFunctionType; + _valueExpression = valueExpression; + _timeExpression = timeExpression; + _numTimeBuckets = numTimeBuckets.getLiteral().getIntValue(); + } + + @Override + public AggregationFunctionType getType() { + return _aggregationFunctionType; + } + + @Override + public String getResultColumnName() { + return _aggregationFunctionType.getName(); + } + + @Override + public List getInputExpressions() { + return List.of(_valueExpression, _timeExpression); + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map blockValSetMap) { + int[] timeIndexes = blockValSetMap.get(_timeExpression).getIntValuesSV(); + double[] values = blockValSetMap.get(_valueExpression).getDoubleValuesSV(); + Double[] currentValues = aggregationResultHolder.getResult(); + if (currentValues == null) { + currentValues = new Double[_numTimeBuckets]; + aggregationResultHolder.setValue(currentValues); + } + switch (_aggregationFunctionType) { + case TIMESERIESMAX: + aggregateTimeSeriesMax(length, aggregationResultHolder, timeIndexes, values); + break; + case TIMESERIESMIN: + aggregateTimeSeriesMin(length, aggregationResultHolder, timeIndexes, values); + break; + case TIMESERIESSUM: + aggregateTimeSeriesSum(length, aggregationResultHolder, timeIndexes, values); + break; + default: + throw new UnsupportedOperationException("Unknown aggregation function: " + _aggregationFunctionType); + } + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map blockValSetMap) { + final int[] timeIndexes = blockValSetMap.get(_timeExpression).getIntValuesSV(); + final double[] values = blockValSetMap.get(_valueExpression).getDoubleValuesSV(); + switch (_aggregationFunctionType) { + case TIMESERIESMAX: + aggregateGroupBySVTimeSeriesMax(length, groupKeyArray, groupByResultHolder, timeIndexes, values); + break; + case TIMESERIESMIN: + aggregateGroupBySVTimeSeriesMin(length, groupKeyArray, groupByResultHolder, timeIndexes, values); + break; + case TIMESERIESSUM: + aggregateGroupBySVTimeSeriesSum(length, groupKeyArray, groupByResultHolder, timeIndexes, values); + break; + default: + throw new UnsupportedOperationException("Unknown aggregation function: " + _aggregationFunctionType); + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map blockValSetMap) { + throw new UnsupportedOperationException(""); + } + + @Override + public Double[] extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + return aggregationResultHolder.getResult(); + } + + @Override + public Double[] extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + return groupByResultHolder.getResult(groupKey); + } + + @Override + public Double[] merge(Double[] intermediateResult1, Double[] intermediateResult2) { + switch (_aggregationFunctionType) { + case TIMESERIESMAX: + mergeMax(intermediateResult1, intermediateResult2); + break; + case TIMESERIESMIN: + mergeMin(intermediateResult1, intermediateResult2); + break; + case TIMESERIESSUM: + mergeSum(intermediateResult1, intermediateResult2); + break; + default: + throw new UnsupportedOperationException("Found unsupported time series aggregation: " + + _aggregationFunctionType); + } + return intermediateResult1; + } + + @Override + public DataSchema.ColumnDataType getIntermediateResultColumnType() { + return DataSchema.ColumnDataType.DOUBLE_ARRAY; + } + + @Override + public DataSchema.ColumnDataType getFinalResultColumnType() { + throw new UnsupportedOperationException("getFinalResult not implemented yet"); + } + + @Override + public DoubleArrayList extractFinalResult(Double[] seriesBuilder) { + // TODO: Why does final result type required + throw new UnsupportedOperationException("Extract final result not supported"); + } + + @Override + public String toExplainString() { + return "TIME_SERIES"; + } + + private void aggregateTimeSeriesMax(int length, AggregationResultHolder resultHolder, int[] timeIndexes, + double[] values) { + Double[] currentValues = resultHolder.getResult(); + for (int docIndex = 0; docIndex < length; docIndex++) { + int timeIndex = timeIndexes[docIndex]; + currentValues[timeIndex] = + currentValues[timeIndex] == null ? values[docIndex] : Math.max(currentValues[timeIndex], values[docIndex]); + } + } + + private void aggregateTimeSeriesMin(int length, AggregationResultHolder resultHolder, int[] timeIndexes, + double[] values) { + Double[] currentValues = resultHolder.getResult(); + for (int docIndex = 0; docIndex < length; docIndex++) { + int timeIndex = timeIndexes[docIndex]; + currentValues[timeIndex] = + currentValues[timeIndex] == null ? values[docIndex] : Math.min(currentValues[timeIndex], values[docIndex]); + } + } + + private void aggregateTimeSeriesSum(int length, AggregationResultHolder resultHolder, int[] timeIndexes, + double[] values) { + Double[] currentValues = resultHolder.getResult(); + for (int docIndex = 0; docIndex < length; docIndex++) { + int timeIndex = timeIndexes[docIndex]; + currentValues[timeIndex] = + currentValues[timeIndex] == null ? values[docIndex] : (currentValues[timeIndex] + values[docIndex]); + } + } + + private void aggregateGroupBySVTimeSeriesMax(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + int[] timeIndexes, double[] values) { + for (int docIndex = 0; docIndex < length; docIndex++) { + int groupId = groupKeyArray[docIndex]; + int timeIndex = timeIndexes[docIndex]; + double valueToAdd = values[docIndex]; + Double[] currentValues = groupByResultHolder.getResult(groupId); + if (currentValues == null) { + currentValues = new Double[_numTimeBuckets]; + groupByResultHolder.setValueForKey(groupId, currentValues); + } + currentValues[timeIndex] = + currentValues[timeIndex] == null ? valueToAdd : Math.max(currentValues[timeIndex], valueToAdd); + } + } + + private void aggregateGroupBySVTimeSeriesMin(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + int[] timeIndexes, double[] values) { + for (int docIndex = 0; docIndex < length; docIndex++) { + int groupId = groupKeyArray[docIndex]; + int timeIndex = timeIndexes[docIndex]; + double valueToAdd = values[docIndex]; + Double[] currentValues = groupByResultHolder.getResult(groupId); + if (currentValues == null) { + currentValues = new Double[_numTimeBuckets]; + groupByResultHolder.setValueForKey(groupId, currentValues); + } + currentValues[timeIndex] = + currentValues[timeIndex] == null ? valueToAdd : Math.min(currentValues[timeIndex], valueToAdd); + } + } + + private void aggregateGroupBySVTimeSeriesSum(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + int[] timeIndexes, double[] values) { + for (int docIndex = 0; docIndex < length; docIndex++) { + int groupId = groupKeyArray[docIndex]; + int timeIndex = timeIndexes[docIndex]; + double valueToAdd = values[docIndex]; + Double[] currentValues = groupByResultHolder.getResult(groupId); + if (currentValues == null) { + currentValues = new Double[_numTimeBuckets]; + groupByResultHolder.setValueForKey(groupId, currentValues); + } + currentValues[timeIndex] = + currentValues[timeIndex] == null ? valueToAdd : (currentValues[timeIndex] + valueToAdd); + } + } + + private void mergeSum(Double[] ir1, Double[] ir2) { + for (int index = 0; index < _numTimeBuckets; index++) { + if (ir2[index] != null) { + ir1[index] = ir1[index] == null ? ir2[index] : (ir1[index] + ir2[index]); + } + } + } + + private void mergeMin(Double[] ir1, Double[] ir2) { + for (int index = 0; index < _numTimeBuckets; index++) { + if (ir2[index] != null) { + ir1[index] = ir1[index] == null ? ir2[index] : Math.min(ir1[index], ir2[index]); + } + } + } + + private void mergeMax(Double[] ir1, Double[] ir2) { + for (int index = 0; index < _numTimeBuckets; index++) { + if (ir2[index] != null) { + ir1[index] = ir1[index] == null ? ir2[index] : Math.max(ir1[index], ir2[index]); + } + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java index 0aa233b43ea7..41667c639b11 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java @@ -35,7 +35,6 @@ import org.apache.pinot.common.request.context.FunctionContext; import org.apache.pinot.common.request.context.OrderByExpressionContext; import org.apache.pinot.common.request.context.RequestContextUtils; -import org.apache.pinot.common.request.context.TimeSeriesContext; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; @@ -75,7 +74,6 @@ public class QueryContext { private final String _tableName; private final QueryContext _subquery; - private final TimeSeriesContext _timeSeriesContext; private final List _selectExpressions; private final boolean _distinct; private final List _aliasList; @@ -132,14 +130,13 @@ public class QueryContext { private Map> _skipIndexes; private QueryContext(@Nullable String tableName, @Nullable QueryContext subquery, - @Nullable TimeSeriesContext timeSeriesContext, List selectExpressions, boolean distinct, - List aliasList, @Nullable FilterContext filter, @Nullable List groupByExpressions, + List selectExpressions, boolean distinct, List aliasList, + @Nullable FilterContext filter, @Nullable List groupByExpressions, @Nullable FilterContext havingFilter, @Nullable List orderByExpressions, int limit, int offset, Map queryOptions, @Nullable Map expressionOverrideHints, ExplainMode explain) { _tableName = tableName; _subquery = subquery; - _timeSeriesContext = timeSeriesContext; _selectExpressions = selectExpressions; _distinct = distinct; _aliasList = Collections.unmodifiableList(aliasList); @@ -170,11 +167,6 @@ public QueryContext getSubquery() { return _subquery; } - @Nullable - public TimeSeriesContext getTimeSeriesContext() { - return _timeSeriesContext; - } - /** * Returns a list of expressions in the SELECT clause. */ @@ -468,7 +460,6 @@ public boolean isIndexUseAllowed(DataSource dataSource, FieldConfig.IndexType in public static class Builder { private String _tableName; private QueryContext _subquery; - private TimeSeriesContext _timeSeriesContext; private List _selectExpressions; private boolean _distinct; private List _aliasList; @@ -492,11 +483,6 @@ public Builder setSubquery(QueryContext subquery) { return this; } - public Builder setTimeSeriesContext(TimeSeriesContext timeSeriesContext) { - _timeSeriesContext = timeSeriesContext; - return this; - } - public Builder setSelectExpressions(List selectExpressions) { _selectExpressions = selectExpressions; return this; @@ -573,8 +559,8 @@ public QueryContext build() { _queryOptions = Collections.emptyMap(); } QueryContext queryContext = - new QueryContext(_tableName, _subquery, _timeSeriesContext, _selectExpressions, _distinct, _aliasList, - _filter, _groupByExpressions, _havingFilter, _orderByExpressions, _limit, _offset, _queryOptions, + new QueryContext(_tableName, _subquery, _selectExpressions, _distinct, _aliasList, _filter, + _groupByExpressions, _havingFilter, _orderByExpressions, _limit, _offset, _queryOptions, _expressionOverrideHints, _explain); queryContext.setNullHandlingEnabled(QueryOptionsUtils.isNullHandlingEnabled(_queryOptions)); queryContext.setServerReturnFinalResult(QueryOptionsUtils.isServerReturnFinalResult(_queryOptions)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java index fa7f53e31485..b5beee1a4013 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java @@ -35,7 +35,7 @@ private QueryContextUtils() { * Returns {@code true} if the given query is a selection query, {@code false} otherwise. */ public static boolean isSelectionQuery(QueryContext query) { - return !query.isDistinct() && query.getAggregationFunctions() == null && !isTimeSeriesQuery(query); + return !query.isDistinct() && query.getAggregationFunctions() == null; } /** @@ -51,14 +51,7 @@ public static boolean isSelectionOnlyQuery(QueryContext query) { * Returns {@code true} if the given query is an aggregation query, {@code false} otherwise. */ public static boolean isAggregationQuery(QueryContext query) { - return query.getAggregationFunctions() != null && !isTimeSeriesQuery(query); - } - - /** - * Returns {@code true} if the given query is a time series query, {@code false} otherwise. - */ - public static boolean isTimeSeriesQuery(QueryContext query) { - return query.getTimeSeriesContext() != null; + return query.getAggregationFunctions() != null; } /** diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java deleted file mode 100644 index eea81a4ba164..000000000000 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.timeseries; - -import java.time.Duration; -import java.util.Collections; -import java.util.Random; -import java.util.concurrent.TimeUnit; -import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.core.common.BlockValSet; -import org.apache.pinot.core.operator.BaseProjectOperator; -import org.apache.pinot.core.operator.blocks.ValueBlock; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; -import org.apache.pinot.core.plan.DocIdSetPlanNode; -import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.tsdb.spi.AggInfo; -import org.apache.pinot.tsdb.spi.TimeBuckets; -import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; -import org.testng.annotations.Test; - -import static org.mockito.Mockito.*; -import static org.testng.Assert.*; - - -public class TimeSeriesAggregationOperatorTest { - private static final Random RANDOM = new Random(); - private static final String DUMMY_TIME_COLUMN = "someTimeColumn"; - private static final String GROUP_BY_COLUMN = "city"; - private static final AggInfo AGG_INFO = new AggInfo("SUM", Collections.emptyMap()); - private static final ExpressionContext VALUE_EXPRESSION = ExpressionContext.forIdentifier("someValueColumn"); - private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, Duration.ofSeconds(100), 10); - private static final int NUM_DOCS_IN_DUMMY_DATA = 1000; - - @Test - public void testTimeSeriesAggregationOperator() { - TimeSeriesAggregationOperator timeSeriesAggregationOperator = buildOperatorWithSampleData( - new SimpleTimeSeriesBuilderFactory()); - TimeSeriesResultsBlock resultsBlock = timeSeriesAggregationOperator.getNextBlock(); - // Expect 2 series: Chicago and San Francisco - assertNotNull(resultsBlock); - assertEquals(2, resultsBlock.getNumRows()); - } - - @Test - public void testTimeSeriesAggregationOperatorWhenSeriesLimit() { - // Since we test with 2 series, use 1 as the limit. - TimeSeriesAggregationOperator timeSeriesAggregationOperator = buildOperatorWithSampleData( - new SimpleTimeSeriesBuilderFactory(1, 100_000_000L)); - try { - timeSeriesAggregationOperator.getNextBlock(); - fail(); - } catch (IllegalStateException e) { - assertTrue(e.getMessage().contains("Limit: 1. Series in current")); - } - } - - @Test - public void testTimeSeriesAggregationOperatorWhenDataPoints() { - // Since we test with 2 series, use 1 as the limit. - TimeSeriesAggregationOperator timeSeriesAggregationOperator = buildOperatorWithSampleData( - new SimpleTimeSeriesBuilderFactory(1000, 11)); - try { - timeSeriesAggregationOperator.getNextBlock(); - fail(); - } catch (IllegalStateException e) { - assertTrue(e.getMessage().contains("Limit: 11. Data points in current")); - } - } - - @Test - public void testGetTimeValueIndexForSeconds() { - /* - * TimeBuckets: [10_000, 10_100, 10_200, ..., 10_900] - * storedTimeValues: [9_999, 10_000, 9_999, 9_901, 10_100, 10_899, 10_900] - * expected indexes: [0, 0, 0, 0, 1, 9, 9] - */ - final int[] expectedIndexes = new int[]{0, 0, 0, 0, 1, 9, 9}; - final TimeUnit storedTimeUnit = TimeUnit.SECONDS; - TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000, Duration.ofSeconds(100), 10); - TimeSeriesAggregationOperator aggregationOperator = buildOperator(storedTimeUnit, timeBuckets); - long[] storedTimeValues = new long[]{9_999L, 10_000L, 9_999L, 9_901L, 10_100L, 10_899L, 10_900L}; - int[] indexes = aggregationOperator.getTimeValueIndex(storedTimeValues, storedTimeValues.length); - assertEquals(indexes, expectedIndexes); - } - - @Test - public void testGetTimeValueIndexForMillis() { - /* - * TimeBuckets: [10_000, 10_100, 10_200, ..., 10_900] - * storedTimeValues: [9_999_000, 10_000_000, 10_500_000, 10_899_999, 10_800_001, 10_900_000] - * expected indexes: [0, 0, 5, 9, 9, 9] - */ - final int[] expectedIndexes = new int[]{0, 0, 5, 9, 9, 9}; - final TimeUnit storedTimeUnit = TimeUnit.MILLISECONDS; - TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000, Duration.ofSeconds(100), 10); - TimeSeriesAggregationOperator aggregationOperator = buildOperator(storedTimeUnit, timeBuckets); - long[] storedTimeValues = new long[]{9_999_000L, 10_000_000L, 10_500_000L, 10_899_999L, 10_800_001L, 10_900_000L}; - int[] indexes = aggregationOperator.getTimeValueIndex(storedTimeValues, storedTimeValues.length); - assertEquals(indexes, expectedIndexes); - } - - @Test - public void testGetTimeValueIndexOutOfBounds() { - final TimeUnit storedTimeUnit = TimeUnit.SECONDS; - final int numTimeBuckets = 10; - final int windowSeconds = 100; - TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000, Duration.ofSeconds(windowSeconds), numTimeBuckets); - TimeSeriesAggregationOperator aggregationOperator = buildOperator(storedTimeUnit, timeBuckets); - testOutOfBoundsTimeValueIndex(new long[]{8_000}, numTimeBuckets, aggregationOperator); - testOutOfBoundsTimeValueIndex(new long[]{timeBuckets.getTimeRangeEndInclusive() + 1}, numTimeBuckets, - aggregationOperator); - } - - private void testOutOfBoundsTimeValueIndex(long[] storedTimeValues, int numTimeBuckets, - TimeSeriesAggregationOperator aggOperator) { - assertEquals(storedTimeValues.length, 1, "Misconfigured test: pass single stored time value"); - int[] indexes = aggOperator.getTimeValueIndex(storedTimeValues, storedTimeValues.length); - assertTrue(indexes[0] < 0 || indexes[0] >= numTimeBuckets, "Expected time index to spill beyond valid range"); - } - - private TimeSeriesAggregationOperator buildOperatorWithSampleData(TimeSeriesBuilderFactory seriesBuilderFactory) { - BaseProjectOperator mockProjectOperator = mock(BaseProjectOperator.class); - ValueBlock valueBlock = buildValueBlockForProjectOperator(); - when(mockProjectOperator.nextBlock()).thenReturn(valueBlock, (ValueBlock) null); - return new TimeSeriesAggregationOperator(DUMMY_TIME_COLUMN, - TimeUnit.SECONDS, 0L, AGG_INFO, VALUE_EXPRESSION, Collections.singletonList(GROUP_BY_COLUMN), - TIME_BUCKETS, mockProjectOperator, seriesBuilderFactory, mock(SegmentMetadata.class)); - } - - private static ValueBlock buildValueBlockForProjectOperator() { - ValueBlock valueBlock = mock(ValueBlock.class); - doReturn(NUM_DOCS_IN_DUMMY_DATA).when(valueBlock).getNumDocs(); - doReturn(buildBlockValSetForTime()).when(valueBlock).getBlockValueSet(DUMMY_TIME_COLUMN); - doReturn(buildBlockValSetForValues()).when(valueBlock).getBlockValueSet(VALUE_EXPRESSION); - doReturn(buildBlockValSetForGroupByColumns()).when(valueBlock).getBlockValueSet(GROUP_BY_COLUMN); - return valueBlock; - } - - private static BlockValSet buildBlockValSetForGroupByColumns() { - BlockValSet blockValSet = mock(BlockValSet.class); - String[] stringArray = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL]; - for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) { - stringArray[index] = RANDOM.nextBoolean() ? "Chicago" : "San Francisco"; - } - doReturn(stringArray).when(blockValSet).getStringValuesSV(); - doReturn(FieldSpec.DataType.STRING).when(blockValSet).getValueType(); - return blockValSet; - } - - private static BlockValSet buildBlockValSetForValues() { - BlockValSet blockValSet = mock(BlockValSet.class); - long[] valuesArray = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL]; - for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) { - valuesArray[index] = index; - } - doReturn(valuesArray).when(blockValSet).getLongValuesSV(); - doReturn(FieldSpec.DataType.LONG).when(blockValSet).getValueType(); - return blockValSet; - } - - private static BlockValSet buildBlockValSetForTime() { - BlockValSet blockValSet = mock(BlockValSet.class); - long[] timeValueArray = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL]; - for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) { - timeValueArray[index] = 901 + RANDOM.nextInt(1000); - } - doReturn(timeValueArray).when(blockValSet).getLongValuesSV(); - return blockValSet; - } - - private TimeSeriesAggregationOperator buildOperator(TimeUnit storedTimeUnit, TimeBuckets timeBuckets) { - return new TimeSeriesAggregationOperator( - DUMMY_TIME_COLUMN, storedTimeUnit, 0L, AGG_INFO, VALUE_EXPRESSION, Collections.emptyList(), - timeBuckets, mock(BaseProjectOperator.class), mock(TimeSeriesBuilderFactory.class), - mock(SegmentMetadata.class)); - } -} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index 4a171128c813..4e4a7fc6f511 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -20,33 +20,21 @@ import java.io.File; import java.net.URL; -import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; import java.util.List; -import java.util.Objects; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.InstanceRequest; -import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.common.request.context.TimeSeriesContext; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider; import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; import org.apache.pinot.core.query.request.ServerQueryRequest; -import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils; @@ -68,11 +56,7 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; -import org.apache.pinot.tsdb.spi.AggInfo; -import org.apache.pinot.tsdb.spi.TimeBuckets; import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory; -import org.apache.pinot.tsdb.spi.series.TimeSeries; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -82,8 +66,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -217,89 +199,6 @@ public void testMinQuery() { assertEquals(((AggregationResultsBlock) instanceResponse.getResultsBlock()).getResults().get(0), 0.0); } - @Test - public void testTimeSeriesSumQuery() { - TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofHours(2), 2); - ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderAmount"); - TimeSeriesContext timeSeriesContext = - new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, - 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", null)); - QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext, Collections.emptyList()); - ServerQueryRequest serverQueryRequest = - new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); - InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); - assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock); - TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); - TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build(); - assertEquals(timeSeriesBlock.getSeriesMap().size(), 1); - assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0]); - assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[1], 29885544.0); - } - - @Test - public void testTimeSeriesMaxQuery() { - TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100); - ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount"); - TimeSeriesContext timeSeriesContext = - new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, - 0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", null)); - QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); - ServerQueryRequest serverQueryRequest = - new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); - InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); - assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock); - TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); - TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build(); - assertEquals(5, timeSeriesBlock.getSeriesMap().size()); - // For any city, say "New York", the max order item count should be 4 - boolean foundNewYork = false; - for (var listOfTimeSeries : timeSeriesBlock.getSeriesMap().values()) { - assertEquals(listOfTimeSeries.size(), 1); - TimeSeries timeSeries = listOfTimeSeries.get(0); - if (timeSeries.getTagValues()[0].equals("New York")) { - assertFalse(foundNewYork, "Found multiple time-series for New York"); - foundNewYork = true; - Optional maxValue = - Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).max(Comparator.naturalOrder()); - assertTrue(maxValue.isPresent()); - assertEquals(maxValue.get().longValue(), 4L); - } - } - assertTrue(foundNewYork, "Did not find the expected time-series"); - } - - @Test - public void testTimeSeriesMinQuery() { - TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100); - ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount"); - TimeSeriesContext timeSeriesContext = - new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, - 0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", null)); - QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); - ServerQueryRequest serverQueryRequest = - new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); - InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); - assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock); - TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); - TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build(); - assertEquals(5, timeSeriesBlock.getSeriesMap().size()); - // For any city, say "Chicago", the min order item count should be 0 - boolean foundChicago = false; - for (var listOfTimeSeries : timeSeriesBlock.getSeriesMap().values()) { - assertEquals(listOfTimeSeries.size(), 1); - TimeSeries timeSeries = listOfTimeSeries.get(0); - if (timeSeries.getTagValues()[0].equals("Chicago")) { - assertFalse(foundChicago, "Found multiple time-series for Chicago"); - foundChicago = true; - Optional minValue = - Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).min(Comparator.naturalOrder()); - assertTrue(minValue.isPresent()); - assertEquals(minValue.get().longValue(), 0L); - } - } - assertTrue(foundChicago, "Did not find the expected time-series"); - } - @AfterClass public void tearDown() { for (IndexSegment segment : _indexSegments) { @@ -311,19 +210,4 @@ public void tearDown() { private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) { return new ServerQueryRequest(instanceRequest, ServerMetrics.get(), System.currentTimeMillis()); } - - private QueryContext getQueryContextForTimeSeries(TimeSeriesContext context) { - return getQueryContextForTimeSeries(context, Collections.singletonList( - ExpressionContext.forIdentifier("cityName"))); - } - - private QueryContext getQueryContextForTimeSeries(TimeSeriesContext context, List groupBy) { - QueryContext.Builder builder = new QueryContext.Builder(); - builder.setTableName(OFFLINE_TABLE_NAME); - builder.setTimeSeriesContext(context); - builder.setAliasList(Collections.emptyList()); - builder.setSelectExpressions(Collections.emptyList()); - builder.setGroupByExpressions(groupBy); - return builder.build(); - } } diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java index cf38b501b00c..fec5db4e1fc4 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java @@ -81,7 +81,10 @@ private List consumeGeneric(String pipeline) { int indexOfOpenBracket = pipeline.indexOf("{"); int indexOfClosedBracket = pipeline.indexOf("}"); result.add(pipeline.substring(0, indexOfOpenBracket)); - result.add(pipeline.substring(indexOfOpenBracket + 1, indexOfClosedBracket)); + String arg = pipeline.substring(indexOfOpenBracket + 1, indexOfClosedBracket); + if (!arg.isEmpty()) { + result.add(arg); + } return result; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java index 8b577105d3fd..d53ce8e29b8e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java @@ -19,27 +19,39 @@ package org.apache.pinot.query.runtime.timeseries; import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutorService; -import org.apache.commons.collections.MapUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.data.table.Record; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; +import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.logger.ServerQueryLogger; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.series.TimeSeries; import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; public class LeafTimeSeriesOperator extends BaseTimeSeriesOperator { + private final TimeSeriesExecutionContext _context; private final ServerQueryRequest _request; private final QueryExecutor _queryExecutor; private final ExecutorService _executorService; private final ServerQueryLogger _queryLogger; - public LeafTimeSeriesOperator(ServerQueryRequest serverQueryRequest, QueryExecutor queryExecutor, - ExecutorService executorService) { + public LeafTimeSeriesOperator(TimeSeriesExecutionContext context, ServerQueryRequest serverQueryRequest, + QueryExecutor queryExecutor, ExecutorService executorService) { super(Collections.emptyList()); + _context = context; _request = serverQueryRequest; _queryExecutor = queryExecutor; _executorService = executorService; @@ -50,20 +62,78 @@ public LeafTimeSeriesOperator(ServerQueryRequest serverQueryRequest, QueryExecut public TimeSeriesBlock getNextBlock() { Preconditions.checkNotNull(_queryExecutor, "Leaf time series operator has not been initialized"); InstanceResponseBlock instanceResponseBlock = _queryExecutor.execute(_request, _executorService); - assert instanceResponseBlock.getResultsBlock() instanceof TimeSeriesResultsBlock; _queryLogger.logQuery(_request, instanceResponseBlock, "TimeSeries"); if (MapUtils.isNotEmpty(instanceResponseBlock.getExceptions())) { - // TODO: Return error in the TimeSeriesBlock instead? - String oneException = instanceResponseBlock.getExceptions().values().iterator().next(); - throw new RuntimeException(oneException); + String message = instanceResponseBlock.getExceptions().values().iterator().next(); + throw new RuntimeException("Error running time-series query: " + message); + } + if (instanceResponseBlock.getResultsBlock() instanceof GroupByResultsBlock) { + return handleGroupByResultsBlock((GroupByResultsBlock) instanceResponseBlock.getResultsBlock()); + } else if (instanceResponseBlock.getResultsBlock() instanceof AggregationResultsBlock) { + return handleAggregationResultsBlock((AggregationResultsBlock) instanceResponseBlock.getResultsBlock()); + } else if (instanceResponseBlock.getResultsBlock() == null) { + throw new IllegalStateException("Found null results block in time-series query"); + } else { + throw new UnsupportedOperationException(String.format("Unknown results block: %s", + instanceResponseBlock.getResultsBlock().getClass().getName())); } - TimeSeriesResultsBlock timeSeriesResultsBlock = - ((TimeSeriesResultsBlock) instanceResponseBlock.getResultsBlock()); - return timeSeriesResultsBlock.getTimeSeriesBuilderBlock().build(); } @Override public String getExplainName() { return "TIME_SERIES_LEAF_STAGE_OPERATOR"; } + + private TimeSeriesBlock handleGroupByResultsBlock(GroupByResultsBlock groupByResultsBlock) { + if (groupByResultsBlock.getNumRows() == 0) { + return new TimeSeriesBlock(_context.getInitialTimeBuckets(), new HashMap<>()); + } + if (groupByResultsBlock.isNumGroupsLimitReached()) { + throw new IllegalStateException(String.format("Series limit reached. Number of series: %s", + groupByResultsBlock.getNumRows())); + } + Map> timeSeriesMap = new HashMap<>(groupByResultsBlock.getNumRows()); + List tagNames = getTagNamesFromDataSchema(Objects.requireNonNull(groupByResultsBlock.getDataSchema(), + "DataSchema is null in leaf stage of time-series query")); + Iterator recordIterator = groupByResultsBlock.getTable().iterator(); + while (recordIterator.hasNext()) { + Record record = recordIterator.next(); + Object[] recordValues = record.getValues(); + Object[] tagValues = new Object[recordValues.length - 1]; + for (int index = 0; index + 1 < recordValues.length; index++) { + tagValues[index] = recordValues[index] == null ? "null" : recordValues[index].toString(); + } + Double[] doubleValues = (Double[]) recordValues[recordValues.length - 1]; + long seriesHash = TimeSeries.hash(tagValues); + List timeSeriesList = new ArrayList<>(1); + timeSeriesList.add(new TimeSeries(Long.toString(seriesHash), null, _context.getInitialTimeBuckets(), + doubleValues, tagNames, tagValues)); + timeSeriesMap.put(seriesHash, timeSeriesList); + } + return new TimeSeriesBlock(_context.getInitialTimeBuckets(), timeSeriesMap); + } + + private TimeSeriesBlock handleAggregationResultsBlock(AggregationResultsBlock aggregationResultsBlock) { + if (aggregationResultsBlock.getResults() == null) { + return new TimeSeriesBlock(_context.getInitialTimeBuckets(), new HashMap<>()); + } + Double[] doubleValues = (Double[]) aggregationResultsBlock.getResults().get(0); + long seriesHash = TimeSeries.hash(new Object[0]); + List timeSeriesList = new ArrayList<>(1); + timeSeriesList.add(new TimeSeries(Long.toString(seriesHash), null, _context.getInitialTimeBuckets(), + doubleValues, Collections.emptyList(), new Object[0])); + Map> timeSeriesMap = new HashMap<>(); + timeSeriesMap.put(seriesHash, timeSeriesList); + return new TimeSeriesBlock(_context.getInitialTimeBuckets(), timeSeriesMap); + } + + private List getTagNamesFromDataSchema(DataSchema dataSchema) { + String[] columnNames = dataSchema.getColumnNames(); + int numTags = columnNames.length - 1; + List tagNames = new ArrayList<>(numTags); + for (int index = 0; index < numTags; index++) { + tagNames.add(columnNames[index]); + } + return tagNames; + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java index d300232625ea..6bb749cdb3fa 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java @@ -20,23 +20,30 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.pinot.common.function.TransformFunctionType; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.request.Literal; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FilterContext; +import org.apache.pinot.common.request.context.FunctionContext; import org.apache.pinot.common.request.context.RequestContextUtils; -import org.apache.pinot.common.request.context.TimeSeriesContext; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys; import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.TimeBuckets; import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; @@ -72,7 +79,7 @@ public void initLeafPlanNode(BaseTimeSeriesPlanNode planNode, TimeSeriesExecutio LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) childNode; List segments = context.getPlanIdToSegmentsMap().get(leafNode.getId()); ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(leafNode, segments, context); - TimeSeriesPhysicalTableScan physicalTableScan = new TimeSeriesPhysicalTableScan(childNode.getId(), + TimeSeriesPhysicalTableScan physicalTableScan = new TimeSeriesPhysicalTableScan(context, childNode.getId(), serverQueryRequest, _queryExecutor, _executorService); planNode.getChildren().set(index, physicalTableScan); } else { @@ -95,17 +102,17 @@ QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode, TimeSeriesExec List groupByExpressions = leafNode.getGroupByExpressions().stream() .map(RequestContextUtils::getExpression).collect(Collectors.toList()); ExpressionContext valueExpression = RequestContextUtils.getExpression(leafNode.getValueExpression()); - TimeSeriesContext timeSeriesContext = new TimeSeriesContext(context.getLanguage(), - leafNode.getTimeColumn(), leafNode.getTimeUnit(), context.getInitialTimeBuckets(), leafNode.getOffsetSeconds(), - valueExpression, leafNode.getAggInfo()); + ExpressionContext timeExpression = buildTimeTransform(leafNode.getTimeColumn(), leafNode.getTimeUnit(), + context.getInitialTimeBuckets(), leafNode.getOffsetSeconds() == null ? 0L : leafNode.getOffsetSeconds()); + ExpressionContext aggFunctionExpr = buildAggregationExpr(toPinotCoreAggregation(leafNode.getAggInfo()), + valueExpression, timeExpression, context.getInitialTimeBuckets().getNumBuckets()); return new QueryContext.Builder() .setTableName(leafNode.getTableName()) .setFilter(filterContext) .setGroupByExpressions(groupByExpressions) - .setSelectExpressions(Collections.emptyList()) + .setSelectExpressions(List.of(aggFunctionExpr)) .setQueryOptions(ImmutableMap.of(QueryOptionKey.TIMEOUT_MS, Long.toString(context.getTimeoutMs()))) .setAliasList(Collections.emptyList()) - .setTimeSeriesContext(timeSeriesContext) .setLimit(Integer.MAX_VALUE) .build(); } @@ -116,4 +123,38 @@ Map getServerQueryRequestMetadataMap(TimeSeriesExecutionContext result.put(MetadataKeys.REQUEST_ID, context.getMetadataMap().get(MetadataKeys.REQUEST_ID)); return result; } + + private ExpressionContext buildAggregationExpr(AggregationFunctionType functionType, + ExpressionContext valueExpression, ExpressionContext timeExpression, int numBuckets) { + ExpressionContext literalExpr = ExpressionContext.forLiteral(Literal.intValue(numBuckets)); + FunctionContext aggFunction = new FunctionContext(FunctionContext.Type.AGGREGATION, + functionType.getName(), List.of(valueExpression, timeExpression, literalExpr)); + return ExpressionContext.forFunction(aggFunction); + } + + private ExpressionContext buildTimeTransform(String timeColumn, TimeUnit timeUnit, TimeBuckets timeBuckets, + long offsetSeconds) { + final String functionName = TransformFunctionType.TIMESERIES_BUCKET_INDEX.name(); + final List arguments = new ArrayList<>(4); + arguments.add(RequestContextUtils.getExpression(timeColumn)); + arguments.add(ExpressionContext.forLiteral(Literal.stringValue(timeUnit.toString()))); + arguments.add(ExpressionContext.forLiteral(Literal.longValue(timeBuckets.getTimeBuckets()[0]))); + arguments.add(ExpressionContext.forLiteral(Literal.longValue(timeBuckets.getBucketSize().getSeconds()))); + arguments.add(ExpressionContext.forLiteral(Literal.longValue(offsetSeconds))); + return ExpressionContext.forFunction(new FunctionContext(FunctionContext.Type.TRANSFORM, functionName, arguments)); + } + + private AggregationFunctionType toPinotCoreAggregation(AggInfo aggInfo) { + // TODO(timeseries): This is hacky. + switch (aggInfo.getAggFunction().toUpperCase()) { + case "SUM": + return AggregationFunctionType.TIMESERIESSUM; + case "MIN": + return AggregationFunctionType.TIMESERIESMIN; + case "MAX": + return AggregationFunctionType.TIMESERIESMAX; + default: + throw new UnsupportedOperationException("Unsupported agg function type: " + aggInfo.getAggFunction()); + } + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java index 272e55649839..da2ebdf53a87 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java @@ -27,16 +27,19 @@ public class TimeSeriesPhysicalTableScan extends BaseTimeSeriesPlanNode { + private final TimeSeriesExecutionContext _context; private final ServerQueryRequest _request; private final QueryExecutor _queryExecutor; private final ExecutorService _executorService; public TimeSeriesPhysicalTableScan( + TimeSeriesExecutionContext context, String id, ServerQueryRequest serverQueryRequest, QueryExecutor queryExecutor, ExecutorService executorService) { super(id, Collections.emptyList()); + _context = context; _request = serverQueryRequest; _queryExecutor = queryExecutor; _executorService = executorService; @@ -65,6 +68,6 @@ public String getExplainName() { @Override public BaseTimeSeriesOperator run() { - return new LeafTimeSeriesOperator(_request, _queryExecutor, _executorService); + return new LeafTimeSeriesOperator(_context, _request, _queryExecutor, _executorService); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java index fabfde682923..c5ed3499b12f 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java @@ -51,11 +51,6 @@ public void testCompileQueryContext() { new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 0L, filterExpr, "orderCount", aggInfo, Collections.singletonList("cityName")); QueryContext queryContext = PhysicalTimeSeriesPlanVisitor.INSTANCE.compileQueryContext(leafNode, context); - assertNotNull(queryContext.getTimeSeriesContext()); - assertEquals(queryContext.getTimeSeriesContext().getLanguage(), "m3ql"); - assertEquals(queryContext.getTimeSeriesContext().getOffsetSeconds(), 0L); - assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(), timeColumn); - assertEquals(queryContext.getTimeSeriesContext().getValueExpression().getIdentifier(), "orderCount"); assertEquals(queryContext.getFilter().toString(), "(cityName = 'Chicago' AND orderTime > '990' AND orderTime <= '1990')"); assertEquals(Long.parseLong(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)), DUMMY_TIMEOUT_MS); @@ -72,11 +67,6 @@ public void testCompileQueryContext() { assertNotNull(queryContext); assertNotNull(queryContext.getGroupByExpressions()); assertEquals("concat(cityName,stateName,'-')", queryContext.getGroupByExpressions().get(0).toString()); - assertNotNull(queryContext.getTimeSeriesContext()); - assertEquals(queryContext.getTimeSeriesContext().getLanguage(), "m3ql"); - assertEquals(queryContext.getTimeSeriesContext().getOffsetSeconds(), 10L); - assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(), timeColumn); - assertEquals(queryContext.getTimeSeriesContext().getValueExpression().toString(), "times(orderCount,'2')"); assertNotNull(queryContext.getFilter()); assertEquals(queryContext.getFilter().toString(), "(cityName = 'Chicago' AND orderTime > '980' AND orderTime <= '1980')"); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index e92a60865dce..455aa7f61933 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -218,7 +218,10 @@ public enum AggregationFunctionType { SqlTypeName.OTHER), PERCENTILERAWKLLMV("percentileRawKLLMV", ReturnTypes.VARCHAR, OperandTypes.family(List.of(SqlTypeFamily.ARRAY, SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER), i -> i == 2), - SqlTypeName.OTHER); + SqlTypeName.OTHER), + TIMESERIESSUM("timeSeriesSum", SqlTypeName.OTHER), + TIMESERIESMIN("timeSeriesMin", SqlTypeName.OTHER), + TIMESERIESMAX("timeSeriesMax", SqlTypeName.OTHER); private static final Set NAMES = Arrays.stream(values()).flatMap(func -> Stream.of(func.name(), func.getName(), func.getName().toLowerCase())) From c93c55a3ebd4bc2351c0837f7baa4bd08f298fb9 Mon Sep 17 00:00:00 2001 From: ankitsultana Date: Tue, 3 Dec 2024 00:51:27 +0000 Subject: [PATCH 2/2] fix bug --- .../function/TimeSeriesTransformFunction.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesTransformFunction.java index 396376a254c8..6db61af0ccc9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesTransformFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesTransformFunction.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.operator.transform.function; +import java.time.Duration; import java.util.List; import java.util.Locale; import java.util.Map; @@ -40,22 +41,19 @@ public class TimeSeriesTransformFunction extends BaseTransformFunction { private TimeUnit _timeUnit; private long _reference = -1; private long _divisor = -1; - private long _offsetSeconds = 0; + private long _offset = 0; @Override public void init(List arguments, Map columnContextMap) { super.init(arguments, columnContextMap); _timeUnit = TimeUnit.valueOf(((LiteralTransformFunction) arguments.get(1)).getStringLiteral().toUpperCase( Locale.ENGLISH)); - _offsetSeconds = ((LiteralTransformFunction) arguments.get(4)).getLongLiteral(); final long startSeconds = ((LiteralTransformFunction) arguments.get(2)).getLongLiteral(); final long bucketSizeSeconds = ((LiteralTransformFunction) arguments.get(3)).getLongLiteral(); - _reference = (startSeconds - bucketSizeSeconds); - _divisor = bucketSizeSeconds; - if (_timeUnit == TimeUnit.MILLISECONDS) { - _reference *= 1000; - _divisor *= 1000; - } + _offset = _timeUnit.convert(Duration.ofSeconds( + ((LiteralTransformFunction) arguments.get(4)).getLongLiteral())); + _reference = _timeUnit.convert(Duration.ofSeconds(startSeconds - bucketSizeSeconds)); + _divisor = _timeUnit.convert(Duration.ofSeconds(bucketSizeSeconds)); } @Override @@ -64,7 +62,7 @@ public int[] transformToIntValuesSV(ValueBlock valueBlock) { initIntValuesSV(length); long[] inputValues = _arguments.get(0).transformToLongValuesSV(valueBlock); for (int docIndex = 0; docIndex < length; docIndex++) { - _intValuesSV[docIndex] = (int) (((inputValues[docIndex] + _offsetSeconds) - _reference - 1) / _divisor); + _intValuesSV[docIndex] = (int) (((inputValues[docIndex] + _offset) - _reference - 1) / _divisor); } return _intValuesSV; }