diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java index cf8b018e8a3a..6358d0b9ece2 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java @@ -232,9 +232,10 @@ public enum TransformFunctionType { TANH("tanh"), DEGREES("degrees"), RADIANS("radians"), - // Complex type handling - ITEM("item"); + ITEM("item"), + // Time series functions + TIME_SERIES_BUCKET("timeSeriesBucket"); private final String _name; private final List _names; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java b/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java deleted file mode 100644 index ba7858ea1185..000000000000 --- a/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.common.request.context; - -import java.util.concurrent.TimeUnit; -import org.apache.pinot.tsdb.spi.AggInfo; -import org.apache.pinot.tsdb.spi.TimeBuckets; - - -public class TimeSeriesContext { - private final String _language; - private final String _timeColumn; - private final TimeUnit _timeUnit; - private final TimeBuckets _timeBuckets; - private final Long _offsetSeconds; - private final ExpressionContext _valueExpression; - private final AggInfo _aggInfo; - - public TimeSeriesContext(String language, String timeColumn, TimeUnit timeUnit, TimeBuckets timeBuckets, - Long offsetSeconds, ExpressionContext valueExpression, AggInfo aggInfo) { - _language = language; - _timeColumn = timeColumn; - _timeUnit = timeUnit; - _timeBuckets = timeBuckets; - _offsetSeconds = offsetSeconds; - _valueExpression = valueExpression; - _aggInfo = aggInfo; - } - - public String getLanguage() { - return _language; - } - - public String getTimeColumn() { - return _timeColumn; - } - - public TimeUnit getTimeUnit() { - return _timeUnit; - } - - public TimeBuckets getTimeBuckets() { - return _timeBuckets; - } - - public Long getOffsetSeconds() { - return _offsetSeconds; - } - - public ExpressionContext getValueExpression() { - return _valueExpression; - } - - public AggInfo getAggInfo() { - return _aggInfo; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TimeSeriesBuilderBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TimeSeriesBuilderBlock.java deleted file mode 100644 index 0f9eda897d6e..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TimeSeriesBuilderBlock.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.blocks; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.pinot.core.operator.combine.merger.TimeSeriesAggResultsBlockMerger; -import org.apache.pinot.tsdb.spi.TimeBuckets; -import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; -import org.apache.pinot.tsdb.spi.series.TimeSeries; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; - - -/** - * Block used by the {@link TimeSeriesAggResultsBlockMerger}. - */ -public class TimeSeriesBuilderBlock { - private final TimeBuckets _timeBuckets; - private final Map _seriesBuilderMap; - - public TimeSeriesBuilderBlock(TimeBuckets timeBuckets, Map seriesBuilderMap) { - _timeBuckets = timeBuckets; - _seriesBuilderMap = seriesBuilderMap; - } - - public TimeBuckets getTimeBuckets() { - return _timeBuckets; - } - - public Map getSeriesBuilderMap() { - return _seriesBuilderMap; - } - - public TimeSeriesBlock build() { - Map> seriesMap = new HashMap<>(); - for (var entry : _seriesBuilderMap.entrySet()) { - List result = new ArrayList<>(1); - result.add(entry.getValue().build()); - seriesMap.put(entry.getKey(), result); - } - return new TimeSeriesBlock(_timeBuckets, seriesMap); - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java index 402e89c93a0e..168e582cfaf6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.operator.blocks.results; -import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -26,10 +25,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FilterContext; -import org.apache.pinot.common.request.context.TimeSeriesContext; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; -import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.distinct.table.DistinctTable; @@ -44,9 +41,6 @@ private ResultsBlockUtils() { } public static BaseResultsBlock buildEmptyQueryResults(QueryContext queryContext) { - if (QueryContextUtils.isTimeSeriesQuery(queryContext)) { - return buildEmptyTimeSeriesResults(queryContext); - } if (QueryContextUtils.isSelectionQuery(queryContext)) { return buildEmptySelectionQueryResults(queryContext); } @@ -125,11 +119,4 @@ private static DistinctResultsBlock buildEmptyDistinctQueryResults(QueryContext queryContext.isNullHandlingEnabled()); return new DistinctResultsBlock(distinctTable, queryContext); } - - private static TimeSeriesResultsBlock buildEmptyTimeSeriesResults(QueryContext queryContext) { - TimeSeriesContext timeSeriesContext = queryContext.getTimeSeriesContext(); - Preconditions.checkNotNull(timeSeriesContext); - return new TimeSeriesResultsBlock( - new TimeSeriesBuilderBlock(timeSeriesContext.getTimeBuckets(), Collections.emptyMap())); - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java deleted file mode 100644 index f8e7fac944c5..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.blocks.results; - -import java.io.IOException; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.pinot.common.datatable.DataTable; -import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; -import org.apache.pinot.core.query.request.context.QueryContext; - - -// TODO(timeseries): Implement unsupported functions when merging with MSE. -public class TimeSeriesResultsBlock extends BaseResultsBlock { - private final TimeSeriesBuilderBlock _timeSeriesBuilderBlock; - - public TimeSeriesResultsBlock(TimeSeriesBuilderBlock timeSeriesBuilderBlock) { - _timeSeriesBuilderBlock = timeSeriesBuilderBlock; - } - - @Override - public int getNumRows() { - return _timeSeriesBuilderBlock.getSeriesBuilderMap().size(); - } - - @Nullable - @Override - public QueryContext getQueryContext() { - throw new UnsupportedOperationException("Time series results block does not support getting QueryContext yet"); - } - - @Nullable - @Override - public DataSchema getDataSchema() { - throw new UnsupportedOperationException("Time series results block does not support getting DataSchema yet"); - } - - @Nullable - @Override - public List getRows() { - throw new UnsupportedOperationException("Time series results block does not support getRows yet"); - } - - @Override - public DataTable getDataTable() - throws IOException { - throw new UnsupportedOperationException("Time series results block does not support returning DataTable"); - } - - public TimeSeriesBuilderBlock getTimeSeriesBuilderBlock() { - return _timeSeriesBuilderBlock; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/TimeSeriesCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/TimeSeriesCombineOperator.java deleted file mode 100644 index 3f410b58cb5e..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/TimeSeriesCombineOperator.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.combine; - -import java.util.List; -import java.util.concurrent.ExecutorService; -import javax.annotation.Nullable; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; -import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger; -import org.apache.pinot.core.query.request.context.QueryContext; - - -public class TimeSeriesCombineOperator extends BaseSingleBlockCombineOperator { - private static final String EXPLAIN_NAME = "TIME_SERIES_COMBINE"; - - public TimeSeriesCombineOperator(ResultsBlockMerger resultsBlockMerger, - List operators, QueryContext queryContext, ExecutorService executorService) { - super(resultsBlockMerger, operators, queryContext, executorService); - } - - @Nullable - @Override - public String toExplainString() { - return EXPLAIN_NAME; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java deleted file mode 100644 index 428cfde55511..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.combine.merger; - -import com.google.common.base.Preconditions; -import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; -import org.apache.pinot.tsdb.spi.AggInfo; -import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; - - -public class TimeSeriesAggResultsBlockMerger implements ResultsBlockMerger { - private final TimeSeriesBuilderFactory _seriesBuilderFactory; - private final AggInfo _aggInfo; - private final int _maxSeriesLimit; - private final long _maxDataPointsLimit; - - public TimeSeriesAggResultsBlockMerger(TimeSeriesBuilderFactory seriesBuilderFactory, AggInfo aggInfo) { - _seriesBuilderFactory = seriesBuilderFactory; - _aggInfo = aggInfo; - _maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit(); - _maxDataPointsLimit = _seriesBuilderFactory.getMaxDataPointsPerServerLimit(); - } - - @Override - public void mergeResultsBlocks(TimeSeriesResultsBlock mergedBlock, TimeSeriesResultsBlock blockToMerge) { - TimeSeriesBuilderBlock currentTimeSeriesBlock = mergedBlock.getTimeSeriesBuilderBlock(); - TimeSeriesBuilderBlock seriesBlockToMerge = blockToMerge.getTimeSeriesBuilderBlock(); - for (var entry : seriesBlockToMerge.getSeriesBuilderMap().entrySet()) { - long seriesHash = entry.getKey(); - BaseTimeSeriesBuilder currentTimeSeriesBuilder = currentTimeSeriesBlock.getSeriesBuilderMap().get(seriesHash); - BaseTimeSeriesBuilder newTimeSeriesToMerge = entry.getValue(); - if (currentTimeSeriesBuilder == null) { - currentTimeSeriesBlock.getSeriesBuilderMap().put(seriesHash, newTimeSeriesToMerge); - final long currentUniqueSeries = currentTimeSeriesBlock.getSeriesBuilderMap().size(); - final long numBuckets = currentTimeSeriesBlock.getTimeBuckets().getNumBuckets(); - Preconditions.checkState(currentUniqueSeries * numBuckets <= _maxDataPointsLimit, - "Max data points limit reached in combine operator. Limit: %s. Current count: %s", - _maxDataPointsLimit, currentUniqueSeries * numBuckets); - Preconditions.checkState(currentUniqueSeries <= _maxSeriesLimit, - "Max series limit reached in combine operator. Limit: %s. Current Size: %s", - _maxSeriesLimit, currentTimeSeriesBlock.getSeriesBuilderMap().size()); - } else { - currentTimeSeriesBuilder.mergeAlignedSeriesBuilder(newTimeSeriesToMerge); - } - } - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java deleted file mode 100644 index a39c996f4fb7..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java +++ /dev/null @@ -1,272 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.timeseries; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.NotImplementedException; -import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.core.common.BlockValSet; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseOperator; -import org.apache.pinot.core.operator.BaseProjectOperator; -import org.apache.pinot.core.operator.ExecutionStatistics; -import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; -import org.apache.pinot.core.operator.blocks.ValueBlock; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; -import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.tsdb.spi.AggInfo; -import org.apache.pinot.tsdb.spi.TimeBuckets; -import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; -import org.apache.pinot.tsdb.spi.series.TimeSeries; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; - - -/** - * Segment level operator which converts data in relational model to a time-series model, by aggregating on a - * configured aggregation expression. - */ -public class TimeSeriesAggregationOperator extends BaseOperator { - private static final String EXPLAIN_NAME = "TIME_SERIES_AGGREGATION"; - private final String _timeColumn; - private final TimeUnit _storedTimeUnit; - private final long _timeOffset; - private final AggInfo _aggInfo; - private final ExpressionContext _valueExpression; - private final List _groupByExpressions; - private final BaseProjectOperator _projectOperator; - private final TimeBuckets _timeBuckets; - private final TimeSeriesBuilderFactory _seriesBuilderFactory; - private final int _maxSeriesLimit; - private final long _maxDataPointsLimit; - private final long _numTotalDocs; - private long _numDocsScanned = 0; - - public TimeSeriesAggregationOperator( - String timeColumn, - TimeUnit timeUnit, - @Nullable Long timeOffsetSeconds, - AggInfo aggInfo, - ExpressionContext valueExpression, - List groupByExpressions, - TimeBuckets timeBuckets, - BaseProjectOperator projectOperator, - TimeSeriesBuilderFactory seriesBuilderFactory, - SegmentMetadata segmentMetadata) { - _timeColumn = timeColumn; - _storedTimeUnit = timeUnit; - _timeOffset = timeOffsetSeconds == null ? 0L : timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds)); - _aggInfo = aggInfo; - _valueExpression = valueExpression; - _groupByExpressions = groupByExpressions; - _projectOperator = projectOperator; - _timeBuckets = timeBuckets; - _seriesBuilderFactory = seriesBuilderFactory; - _maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit(); - _maxDataPointsLimit = _seriesBuilderFactory.getMaxDataPointsPerServerLimit(); - _numTotalDocs = segmentMetadata.getTotalDocs(); - } - - @Override - protected TimeSeriesResultsBlock getNextBlock() { - ValueBlock valueBlock; - Map seriesBuilderMap = new HashMap<>(1024); - while ((valueBlock = _projectOperator.nextBlock()) != null) { - int numDocs = valueBlock.getNumDocs(); - _numDocsScanned += numDocs; - // TODO: This is quite unoptimized and allocates liberally - BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn); - long[] timeValues = blockValSet.getLongValuesSV(); - applyTimeOffset(timeValues, numDocs); - int[] timeValueIndexes = getTimeValueIndex(timeValues, numDocs); - Object[][] tagValues = new Object[_groupByExpressions.size()][]; - for (int i = 0; i < _groupByExpressions.size(); i++) { - blockValSet = valueBlock.getBlockValueSet(_groupByExpressions.get(i)); - switch (blockValSet.getValueType()) { - case JSON: - case STRING: - tagValues[i] = blockValSet.getStringValuesSV(); - break; - case LONG: - tagValues[i] = ArrayUtils.toObject(blockValSet.getLongValuesSV()); - break; - case INT: - tagValues[i] = ArrayUtils.toObject(blockValSet.getIntValuesSV()); - break; - default: - throw new NotImplementedException("Can't handle types other than string and long"); - } - } - BlockValSet valueExpressionBlockValSet = valueBlock.getBlockValueSet(_valueExpression); - switch (valueExpressionBlockValSet.getValueType()) { - case LONG: - processLongExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues, numDocs); - break; - case INT: - processIntExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues, numDocs); - break; - case DOUBLE: - processDoubleExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues, numDocs); - break; - case STRING: - processStringExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues, numDocs); - break; - default: - // TODO: Support other types? - throw new IllegalStateException( - "Don't yet support value expression of type: " + valueExpressionBlockValSet.getValueType()); - } - Preconditions.checkState(seriesBuilderMap.size() * (long) _timeBuckets.getNumBuckets() <= _maxDataPointsLimit, - "Exceeded max data point limit per server. Limit: %s. Data points in current segment so far: %s", - _maxDataPointsLimit, seriesBuilderMap.size() * _timeBuckets.getNumBuckets()); - Preconditions.checkState(seriesBuilderMap.size() <= _maxSeriesLimit, - "Exceeded max unique series limit per server. Limit: %s. Series in current segment so far: %s", - _maxSeriesLimit, seriesBuilderMap.size()); - } - return new TimeSeriesResultsBlock(new TimeSeriesBuilderBlock(_timeBuckets, seriesBuilderMap)); - } - - @Override - @SuppressWarnings("rawtypes") - public List getChildOperators() { - return ImmutableList.of(_projectOperator); - } - - @Nullable - @Override - public String toExplainString() { - return EXPLAIN_NAME; - } - - @Override - public ExecutionStatistics getExecutionStatistics() { - long numEntriesScannedInFilter = _projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); - long numEntriesScannedPostFilter = _numDocsScanned * _projectOperator.getNumColumnsProjected(); - return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, - _numTotalDocs); - } - - @VisibleForTesting - protected int[] getTimeValueIndex(long[] actualTimeValues, int numDocs) { - if (_storedTimeUnit == TimeUnit.MILLISECONDS) { - return getTimeValueIndexMillis(actualTimeValues, numDocs); - } - int[] timeIndexes = new int[numDocs]; - final long reference = _timeBuckets.getTimeRangeStartExclusive(); - final long divisor = _timeBuckets.getBucketSize().getSeconds(); - for (int index = 0; index < numDocs; index++) { - timeIndexes[index] = (int) ((actualTimeValues[index] - reference - 1) / divisor); - } - return timeIndexes; - } - - private int[] getTimeValueIndexMillis(long[] actualTimeValues, int numDocs) { - int[] timeIndexes = new int[numDocs]; - final long reference = _timeBuckets.getTimeRangeStartExclusive() * 1000L; - final long divisor = _timeBuckets.getBucketSize().toMillis(); - for (int index = 0; index < numDocs; index++) { - timeIndexes[index] = (int) ((actualTimeValues[index] - reference - 1) / divisor); - } - return timeIndexes; - } - - public void processLongExpression(BlockValSet blockValSet, Map seriesBuilderMap, - int[] timeValueIndexes, Object[][] tagValues, int numDocs) { - long[] valueColumnValues = blockValSet.getLongValuesSV(); - for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) { - Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; - for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { - tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; - } - long hash = TimeSeries.hash(tagValuesForDoc); - seriesBuilderMap.computeIfAbsent(hash, - k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, - _groupByExpressions, - tagValuesForDoc)) - .addValueAtIndex(timeValueIndexes[docIdIndex], (double) valueColumnValues[docIdIndex]); - } - } - - public void processIntExpression(BlockValSet blockValSet, Map seriesBuilderMap, - int[] timeValueIndexes, Object[][] tagValues, int numDocs) { - int[] valueColumnValues = blockValSet.getIntValuesSV(); - for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) { - Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; - for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { - tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; - } - long hash = TimeSeries.hash(tagValuesForDoc); - seriesBuilderMap.computeIfAbsent(hash, - k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, - _groupByExpressions, - tagValuesForDoc)) - .addValueAtIndex(timeValueIndexes[docIdIndex], (double) valueColumnValues[docIdIndex]); - } - } - - public void processDoubleExpression(BlockValSet blockValSet, Map seriesBuilderMap, - int[] timeValueIndexes, Object[][] tagValues, int numDocs) { - double[] valueColumnValues = blockValSet.getDoubleValuesSV(); - for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) { - Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; - for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { - tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; - } - long hash = TimeSeries.hash(tagValuesForDoc); - seriesBuilderMap.computeIfAbsent(hash, - k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, - _groupByExpressions, - tagValuesForDoc)) - .addValueAtIndex(timeValueIndexes[docIdIndex], valueColumnValues[docIdIndex]); - } - } - - public void processStringExpression(BlockValSet blockValSet, Map seriesBuilderMap, - int[] timeValueIndexes, Object[][] tagValues, int numDocs) { - String[] valueColumnValues = blockValSet.getStringValuesSV(); - for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) { - Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; - for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { - tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; - } - long hash = TimeSeries.hash(tagValuesForDoc); - seriesBuilderMap.computeIfAbsent(hash, - k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, - _groupByExpressions, tagValuesForDoc)) - .addValueAtIndex(timeValueIndexes[docIdIndex], valueColumnValues[docIdIndex]); - } - } - - private void applyTimeOffset(long[] timeValues, int numDocs) { - if (_timeOffset == 0L) { - return; - } - for (int index = 0; index < numDocs; index++) { - timeValues[index] = timeValues[index] + _timeOffset; - } - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java new file mode 100644 index 000000000000..2e014e728e6b --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.timeseries; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; +import org.apache.pinot.tsdb.spi.series.TimeSeries; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; + + +public class TimeSeriesOperatorUtils { + private TimeSeriesOperatorUtils() { + } + + public static TimeSeriesBlock handleGroupByResultsBlock(TimeBuckets timeBuckets, + GroupByResultsBlock groupByResultsBlock) { + if (groupByResultsBlock.getNumRows() == 0) { + return new TimeSeriesBlock(timeBuckets, new HashMap<>()); + } + if (groupByResultsBlock.isNumGroupsLimitReached()) { + throw new IllegalStateException(String.format("Series limit reached. Number of series: %s", + groupByResultsBlock.getNumRows())); + } + Map> timeSeriesMap = new HashMap<>(groupByResultsBlock.getNumRows()); + List tagNames = getTagNamesFromDataSchema(Objects.requireNonNull(groupByResultsBlock.getDataSchema(), + "DataSchema is null in leaf stage of time-series query")); + Iterator recordIterator = groupByResultsBlock.getTable().iterator(); + while (recordIterator.hasNext()) { + Record record = recordIterator.next(); + Object[] recordValues = record.getValues(); + Object[] tagValues = new Object[recordValues.length - 1]; + for (int index = 0; index + 1 < recordValues.length; index++) { + tagValues[index] = recordValues[index] == null ? "null" : recordValues[index].toString(); + } + BaseTimeSeriesBuilder seriesBuilder = (BaseTimeSeriesBuilder) recordValues[recordValues.length - 1]; + long seriesHash = TimeSeries.hash(tagValues); + List timeSeriesList = new ArrayList<>(1); + timeSeriesList.add(seriesBuilder.buildWithTagOverrides(tagNames, tagValues)); + timeSeriesMap.put(seriesHash, timeSeriesList); + } + return new TimeSeriesBlock(timeBuckets, timeSeriesMap); + } + + public static TimeSeriesBlock handleAggregationResultsBlock(TimeBuckets timeBuckets, + AggregationResultsBlock aggregationResultsBlock) { + if (aggregationResultsBlock.getResults() == null) { + return new TimeSeriesBlock(timeBuckets, new HashMap<>()); + } + BaseTimeSeriesBuilder seriesBuilder = (BaseTimeSeriesBuilder) aggregationResultsBlock.getResults().get(0); + long seriesHash = TimeSeries.hash(new Object[0]); + List timeSeriesList = new ArrayList<>(1); + timeSeriesList.add(seriesBuilder.buildWithTagOverrides(Collections.emptyList(), new Object[]{})); + Map> timeSeriesMap = new HashMap<>(); + timeSeriesMap.put(seriesHash, timeSeriesList); + return new TimeSeriesBlock(timeBuckets, timeSeriesMap); + } + + private static List getTagNamesFromDataSchema(DataSchema dataSchema) { + String[] columnNames = dataSchema.getColumnNames(); + int numTags = columnNames.length - 1; + List tagNames = new ArrayList<>(numTags); + for (int index = 0; index < numTags; index++) { + tagNames.add(columnNames[index]); + } + return tagNames; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesSelectionOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesSelectionOperator.java deleted file mode 100644 index 62b069bb576f..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesSelectionOperator.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.timeseries; - -import com.google.common.collect.ImmutableList; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseOperator; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; -import org.apache.pinot.core.operator.transform.TransformOperator; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; - - -public class TimeSeriesSelectionOperator extends BaseOperator { - private static final String EXPLAIN_NAME = "TIME_SERIES_SELECTION_OPERATOR"; - private final Long _evaluationTimestamp; - private final TransformOperator _transformOperator; - private final TimeSeriesBuilderFactory _seriesBuilderFactory; - - public TimeSeriesSelectionOperator(Long evaluationTimestamp, - TransformOperator transformOperator, TimeSeriesBuilderFactory seriesBuilderFactory) { - _evaluationTimestamp = evaluationTimestamp; - _transformOperator = transformOperator; - _seriesBuilderFactory = seriesBuilderFactory; - } - - @Override - protected TimeSeriesResultsBlock getNextBlock() { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - public List getChildOperators() { - return ImmutableList.of(_transformOperator); - } - - @Nullable - @Override - public String toExplainString() { - return EXPLAIN_NAME; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesBucketTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesBucketTransformFunction.java new file mode 100644 index 000000000000..bb62d1967c3d --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesBucketTransformFunction.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.transform.function; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.function.TransformFunctionType; +import org.apache.pinot.common.request.Literal; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.core.operator.ColumnContext; +import org.apache.pinot.core.operator.blocks.ValueBlock; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.tsdb.spi.TimeBuckets; + + +/** + * Usage: + *
+ *   args: time column/expression, time-unit, first time bucket value, bucket size in seconds, offset in seconds
+ *   timeSeriesBucketIndex(secondsSinceEpoch, 'MILLISECONDS', 123, 10, 0)
+ * 
+ */ +public class TimeSeriesBucketTransformFunction extends BaseTransformFunction { + public static final String FUNCTION_NAME = TransformFunctionType.TIME_SERIES_BUCKET.getName(); + private TimeUnit _timeUnit; + private long _reference = -1; + private long _divisor = -1; + private long _offset = 0; + + @Override + public void init(List arguments, Map columnContextMap) { + super.init(arguments, columnContextMap); + _timeUnit = TimeUnit.valueOf(((LiteralTransformFunction) arguments.get(1)).getStringLiteral().toUpperCase( + Locale.ENGLISH)); + final long startSeconds = ((LiteralTransformFunction) arguments.get(2)).getLongLiteral(); + final long bucketSizeSeconds = ((LiteralTransformFunction) arguments.get(3)).getLongLiteral(); + _offset = _timeUnit.convert(Duration.ofSeconds( + ((LiteralTransformFunction) arguments.get(4)).getLongLiteral())); + _reference = _timeUnit.convert(Duration.ofSeconds(startSeconds - bucketSizeSeconds)); + _divisor = _timeUnit.convert(Duration.ofSeconds(bucketSizeSeconds)); + } + + @Override + public int[] transformToIntValuesSV(ValueBlock valueBlock) { + int length = valueBlock.getNumDocs(); + initIntValuesSV(length); + long[] inputValues = _arguments.get(0).transformToLongValuesSV(valueBlock); + for (int docIndex = 0; docIndex < length; docIndex++) { + _intValuesSV[docIndex] = (int) (((inputValues[docIndex] + _offset) - _reference - 1) / _divisor); + } + return _intValuesSV; + } + + @Override + public long[] transformToLongValuesSV(ValueBlock valueBlock) { + throw new UnsupportedOperationException("Only support int output for: " + FUNCTION_NAME + ". Try casting to int."); + } + + @Override + public double[] transformToDoubleValuesSV(ValueBlock valueBlock) { + throw new UnsupportedOperationException("Only support int output for: " + FUNCTION_NAME + ". Try casting to int."); + } + + @Override + public String getName() { + return FUNCTION_NAME; + } + + @Override + public TransformResultMetadata getResultMetadata() { + return new TransformResultMetadata(FieldSpec.DataType.INT, true, false); + } + + public static ExpressionContext create(String timeColumn, TimeUnit timeUnit, TimeBuckets timeBuckets, + long offsetSeconds) { + final String functionName = TransformFunctionType.TIME_SERIES_BUCKET.getName(); + final List arguments = new ArrayList<>(4); + arguments.add(RequestContextUtils.getExpression(timeColumn)); + arguments.add(ExpressionContext.forLiteral(Literal.stringValue(timeUnit.toString()))); + arguments.add(ExpressionContext.forLiteral(Literal.longValue(timeBuckets.getTimeBuckets()[0]))); + arguments.add(ExpressionContext.forLiteral(Literal.longValue(timeBuckets.getBucketSize().getSeconds()))); + arguments.add(ExpressionContext.forLiteral(Literal.longValue(offsetSeconds))); + return ExpressionContext.forFunction(new FunctionContext(FunctionContext.Type.TRANSFORM, functionName, arguments)); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java index 3709d08f2b99..85764bc16a77 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java @@ -251,6 +251,9 @@ private static Map> createRegistry() // Item functions typeToImplementation.put(TransformFunctionType.ITEM, ItemTransformFunction.class); + // Time Series functions + typeToImplementation.put(TransformFunctionType.TIME_SERIES_BUCKET, TimeSeriesBucketTransformFunction.class); + Map> registry = new HashMap<>(HashUtil.getHashMapCapacity(typeToImplementation.size())); for (Map.Entry> entry : typeToImplementation.entrySet()) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java index 26a92082259f..3cd14b95dff7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java @@ -32,8 +32,6 @@ import org.apache.pinot.core.operator.combine.MinMaxValueBasedSelectionOrderByCombineOperator; import org.apache.pinot.core.operator.combine.SelectionOnlyCombineOperator; import org.apache.pinot.core.operator.combine.SelectionOrderByCombineOperator; -import org.apache.pinot.core.operator.combine.TimeSeriesCombineOperator; -import org.apache.pinot.core.operator.combine.merger.TimeSeriesAggResultsBlockMerger; import org.apache.pinot.core.operator.streaming.StreamingSelectionOnlyCombineOperator; import org.apache.pinot.core.query.executor.ResultsBlockStreamer; import org.apache.pinot.core.query.request.context.QueryContext; @@ -44,7 +42,6 @@ import org.apache.pinot.spi.trace.InvocationRecording; import org.apache.pinot.spi.trace.InvocationScope; import org.apache.pinot.spi.trace.Tracing; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider; /** @@ -125,11 +122,7 @@ private BaseCombineOperator getCombineOperator() { }, _executorService, _queryContext.getEndTimeMs()); } - if (QueryContextUtils.isTimeSeriesQuery(_queryContext)) { - return new TimeSeriesCombineOperator(new TimeSeriesAggResultsBlockMerger( - TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_queryContext.getTimeSeriesContext().getLanguage()), - _queryContext.getTimeSeriesContext().getAggInfo()), operators, _queryContext, _executorService); - } else if (_streamer != null + if (_streamer != null && QueryContextUtils.isSelectionOnlyQuery(_queryContext) && _queryContext.getLimit() != 0) { // Use streaming operator only for non-empty selection-only query return new StreamingSelectionOnlyCombineOperator(operators, _queryContext, _executorService); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java deleted file mode 100644 index dae0479ebbab..000000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.plan; - -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.common.request.context.TimeSeriesContext; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseProjectOperator; -import org.apache.pinot.core.operator.blocks.ValueBlock; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; -import org.apache.pinot.core.operator.timeseries.TimeSeriesAggregationOperator; -import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.segment.spi.SegmentContext; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider; - - -public class TimeSeriesPlanNode implements PlanNode { - private final SegmentContext _segmentContext; - private final QueryContext _queryContext; - private final TimeSeriesContext _timeSeriesContext; - private final TimeSeriesBuilderFactory _seriesBuilderFactory; - - public TimeSeriesPlanNode(SegmentContext segmentContext, QueryContext queryContext) { - _segmentContext = segmentContext; - _queryContext = queryContext; - _timeSeriesContext = Objects.requireNonNull(queryContext.getTimeSeriesContext(), - "Missing time-series context in TimeSeriesPlanNode"); - _seriesBuilderFactory = TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_timeSeriesContext.getLanguage()); - } - - @Override - public Operator run() { - FilterPlanNode filterPlanNode = new FilterPlanNode(_segmentContext, _queryContext); - ProjectPlanNode projectPlanNode = new ProjectPlanNode( - _segmentContext, _queryContext, getProjectPlanNodeExpressions(), DocIdSetPlanNode.MAX_DOC_PER_CALL, - filterPlanNode.run()); - BaseProjectOperator projectionOperator = projectPlanNode.run(); - return new TimeSeriesAggregationOperator( - _timeSeriesContext.getTimeColumn(), - _timeSeriesContext.getTimeUnit(), - _timeSeriesContext.getOffsetSeconds(), - _timeSeriesContext.getAggInfo(), - _timeSeriesContext.getValueExpression(), - getGroupByColumns(), - _timeSeriesContext.getTimeBuckets(), - projectionOperator, - _seriesBuilderFactory, - _segmentContext.getIndexSegment().getSegmentMetadata()); - } - - private List getProjectPlanNodeExpressions() { - List result = new ArrayList<>(_queryContext.getSelectExpressions()); - if (CollectionUtils.isNotEmpty(_queryContext.getGroupByExpressions())) { - result.addAll(_queryContext.getGroupByExpressions()); - } - result.add(_queryContext.getTimeSeriesContext().getValueExpression()); - result.add(ExpressionContext.forIdentifier(_queryContext.getTimeSeriesContext().getTimeColumn())); - return result; - } - - private List getGroupByColumns() { - if (_queryContext.getGroupByExpressions() == null) { - return new ArrayList<>(); - } - List groupByColumns = new ArrayList<>(); - for (ExpressionContext expression : _queryContext.getGroupByExpressions()) { - Preconditions.checkState(expression.getType() == ExpressionContext.Type.IDENTIFIER); - groupByColumns.add(expression.getIdentifier()); - } - return groupByColumns; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index ca742456068e..a8f1725e30c7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -46,7 +46,6 @@ import org.apache.pinot.core.plan.SelectionPlanNode; import org.apache.pinot.core.plan.StreamingInstanceResponsePlanNode; import org.apache.pinot.core.plan.StreamingSelectionPlanNode; -import org.apache.pinot.core.plan.TimeSeriesPlanNode; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.executor.ResultsBlockStreamer; import org.apache.pinot.core.query.prefetch.FetchPlanner; @@ -271,9 +270,7 @@ private void applyQueryOptions(QueryContext queryContext) { @Override public PlanNode makeSegmentPlanNode(SegmentContext segmentContext, QueryContext queryContext) { rewriteQueryContextWithHints(queryContext, segmentContext.getIndexSegment()); - if (QueryContextUtils.isTimeSeriesQuery(queryContext)) { - return new TimeSeriesPlanNode(segmentContext, queryContext); - } else if (QueryContextUtils.isAggregationQuery(queryContext)) { + if (QueryContextUtils.isAggregationQuery(queryContext)) { List groupByExpressions = queryContext.getGroupByExpressions(); if (groupByExpressions != null) { // Group-by query diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 96491acbd47e..e6ddb994d2fa 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -479,6 +479,8 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio return new DistinctCountULLAggregationFunction(arguments); case DISTINCTCOUNTRAWULL: return new DistinctCountRawULLAggregationFunction(arguments); + case TIMESERIESAGGREGATE: + return new TimeSeriesAggregationFunction(arguments); default: throw new IllegalArgumentException("Unsupported aggregation function type: " + functionType); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java new file mode 100644 index 000000000000..a1f5ce500cb9 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.request.Literal; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider; + + +/** + * Usage: + *
+ *   Example:
+ *     timeSeriesAggregate("m3ql", "MIN", valueExpr, bucketIndexReturningExpr, 1000, 10, 100, "aggParam1=value1")
+ * 
+ */ +public class TimeSeriesAggregationFunction implements AggregationFunction { + private final TimeSeriesBuilderFactory _factory; + private final AggInfo _aggInfo; + private final ExpressionContext _valueExpression; + private final ExpressionContext _timeExpression; + private final TimeBuckets _timeBuckets; + + public TimeSeriesAggregationFunction(List arguments) { + // Initialize everything + Preconditions.checkArgument(arguments.size() == 8, "Expected 8 arguments for time-series agg"); + String language = arguments.get(0).getLiteral().getStringValue(); + String aggFunctionName = arguments.get(1).getLiteral().getStringValue(); + ExpressionContext valueExpression = arguments.get(2); + ExpressionContext bucketIndexReturningExpr = arguments.get(3); + long firstBucketValue = arguments.get(4).getLiteral().getLongValue(); + long bucketWindowSeconds = arguments.get(5).getLiteral().getLongValue(); + int numBuckets = arguments.get(6).getLiteral().getIntValue(); + Map aggParams = AggInfo.loadSerializedParams(arguments.get(7).getLiteral().getStringValue()); + AggInfo aggInfo = new AggInfo(aggFunctionName, true /* is partial agg */, aggParams); + // Set all values + _factory = TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(language); + _valueExpression = valueExpression; + _timeExpression = bucketIndexReturningExpr; + _timeBuckets = TimeBuckets.ofSeconds(firstBucketValue, Duration.ofSeconds(bucketWindowSeconds), numBuckets); + _aggInfo = aggInfo; + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.TIMESERIESAGGREGATE; + } + + @Override + public String getResultColumnName() { + return AggregationFunctionType.TIMESERIESAGGREGATE.getName(); + } + + @Override + public List getInputExpressions() { + return List.of(_valueExpression, _timeExpression); + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map blockValSetMap) { + int[] timeIndexes = blockValSetMap.get(_timeExpression).getIntValuesSV(); + double[] values = blockValSetMap.get(_valueExpression).getDoubleValuesSV(); + BaseTimeSeriesBuilder currentSeriesBuilder = aggregationResultHolder.getResult(); + if (currentSeriesBuilder == null) { + currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets, + BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES); + aggregationResultHolder.setValue(currentSeriesBuilder); + } + for (int docIndex = 0; docIndex < length; docIndex++) { + currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]); + } + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map blockValSetMap) { + final int[] timeIndexes = blockValSetMap.get(_timeExpression).getIntValuesSV(); + final double[] values = blockValSetMap.get(_valueExpression).getDoubleValuesSV(); + for (int docIndex = 0; docIndex < length; docIndex++) { + int groupId = groupKeyArray[docIndex]; + BaseTimeSeriesBuilder currentSeriesBuilder = groupByResultHolder.getResult(groupId); + if (currentSeriesBuilder == null) { + currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets, + BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES); + groupByResultHolder.setValueForKey(groupId, currentSeriesBuilder); + } + currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]); + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map blockValSetMap) { + throw new UnsupportedOperationException("MV not supported yet"); + } + + @Override + public BaseTimeSeriesBuilder extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + return aggregationResultHolder.getResult(); + } + + @Override + public BaseTimeSeriesBuilder extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + return groupByResultHolder.getResult(groupKey); + } + + @Override + public BaseTimeSeriesBuilder merge(BaseTimeSeriesBuilder ir1, BaseTimeSeriesBuilder ir2) { + ir1.mergeAlignedSeriesBuilder(ir2); + return ir1; + } + + @Override + public DataSchema.ColumnDataType getIntermediateResultColumnType() { + return DataSchema.ColumnDataType.OBJECT; + } + + @Override + public DataSchema.ColumnDataType getFinalResultColumnType() { + return DataSchema.ColumnDataType.OBJECT; + } + + @Override + public DoubleArrayList extractFinalResult(BaseTimeSeriesBuilder seriesBuilder) { + Double[] doubleValues = seriesBuilder.build().getDoubleValues(); + return new DoubleArrayList(Arrays.asList(doubleValues)); + } + + @Override + public String toExplainString() { + return "TIME_SERIES"; + } + + public static ExpressionContext create(String language, String valueExpressionStr, ExpressionContext timeExpression, + TimeBuckets timeBuckets, AggInfo aggInfo) { + ExpressionContext valueExpression = RequestContextUtils.getExpression(valueExpressionStr); + List arguments = new ArrayList<>(); + arguments.add(ExpressionContext.forLiteral(Literal.stringValue(language))); + arguments.add(ExpressionContext.forLiteral(Literal.stringValue(aggInfo.getAggFunction()))); + arguments.add(valueExpression); + arguments.add(timeExpression); + arguments.add(ExpressionContext.forLiteral(Literal.longValue(timeBuckets.getTimeBuckets()[0]))); + arguments.add(ExpressionContext.forLiteral(Literal.longValue(timeBuckets.getBucketSize().getSeconds()))); + arguments.add(ExpressionContext.forLiteral(Literal.intValue(timeBuckets.getNumBuckets()))); + arguments.add(ExpressionContext.forLiteral(Literal.stringValue(AggInfo.serializeParams(aggInfo.getParams())))); + FunctionContext functionContext = new FunctionContext(FunctionContext.Type.AGGREGATION, + AggregationFunctionType.TIMESERIESAGGREGATE.getName(), arguments); + return ExpressionContext.forFunction(functionContext); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java index e1e3c37a8dfd..415c89bfcf32 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java @@ -35,7 +35,6 @@ import org.apache.pinot.common.request.context.FunctionContext; import org.apache.pinot.common.request.context.OrderByExpressionContext; import org.apache.pinot.common.request.context.RequestContextUtils; -import org.apache.pinot.common.request.context.TimeSeriesContext; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; @@ -75,7 +74,6 @@ public class QueryContext { private final String _tableName; private final QueryContext _subquery; - private final TimeSeriesContext _timeSeriesContext; private final List _selectExpressions; private final boolean _distinct; private final List _aliasList; @@ -134,14 +132,13 @@ public class QueryContext { private Map> _skipIndexes; private QueryContext(@Nullable String tableName, @Nullable QueryContext subquery, - @Nullable TimeSeriesContext timeSeriesContext, List selectExpressions, boolean distinct, - List aliasList, @Nullable FilterContext filter, @Nullable List groupByExpressions, + List selectExpressions, boolean distinct, List aliasList, + @Nullable FilterContext filter, @Nullable List groupByExpressions, @Nullable FilterContext havingFilter, @Nullable List orderByExpressions, int limit, int offset, Map queryOptions, @Nullable Map expressionOverrideHints, ExplainMode explain) { _tableName = tableName; _subquery = subquery; - _timeSeriesContext = timeSeriesContext; _selectExpressions = selectExpressions; _distinct = distinct; _aliasList = Collections.unmodifiableList(aliasList); @@ -172,11 +169,6 @@ public QueryContext getSubquery() { return _subquery; } - @Nullable - public TimeSeriesContext getTimeSeriesContext() { - return _timeSeriesContext; - } - /** * Returns a list of expressions in the SELECT clause. */ @@ -478,7 +470,6 @@ public boolean isIndexUseAllowed(DataSource dataSource, FieldConfig.IndexType in public static class Builder { private String _tableName; private QueryContext _subquery; - private TimeSeriesContext _timeSeriesContext; private List _selectExpressions; private boolean _distinct; private List _aliasList; @@ -502,11 +493,6 @@ public Builder setSubquery(QueryContext subquery) { return this; } - public Builder setTimeSeriesContext(TimeSeriesContext timeSeriesContext) { - _timeSeriesContext = timeSeriesContext; - return this; - } - public Builder setSelectExpressions(List selectExpressions) { _selectExpressions = selectExpressions; return this; @@ -583,7 +569,7 @@ public QueryContext build() { _queryOptions = Collections.emptyMap(); } QueryContext queryContext = - new QueryContext(_tableName, _subquery, _timeSeriesContext, _selectExpressions, _distinct, _aliasList, + new QueryContext(_tableName, _subquery, _selectExpressions, _distinct, _aliasList, _filter, _groupByExpressions, _havingFilter, _orderByExpressions, _limit, _offset, _queryOptions, _expressionOverrideHints, _explain); queryContext.setNullHandlingEnabled(QueryOptionsUtils.isNullHandlingEnabled(_queryOptions)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java index fa7f53e31485..b5beee1a4013 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java @@ -35,7 +35,7 @@ private QueryContextUtils() { * Returns {@code true} if the given query is a selection query, {@code false} otherwise. */ public static boolean isSelectionQuery(QueryContext query) { - return !query.isDistinct() && query.getAggregationFunctions() == null && !isTimeSeriesQuery(query); + return !query.isDistinct() && query.getAggregationFunctions() == null; } /** @@ -51,14 +51,7 @@ public static boolean isSelectionOnlyQuery(QueryContext query) { * Returns {@code true} if the given query is an aggregation query, {@code false} otherwise. */ public static boolean isAggregationQuery(QueryContext query) { - return query.getAggregationFunctions() != null && !isTimeSeriesQuery(query); - } - - /** - * Returns {@code true} if the given query is a time series query, {@code false} otherwise. - */ - public static boolean isTimeSeriesQuery(QueryContext query) { - return query.getTimeSeriesContext() != null; + return query.getAggregationFunctions() != null; } /** diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java deleted file mode 100644 index b6e97c849ff4..000000000000 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.timeseries; - -import java.time.Duration; -import java.util.Collections; -import java.util.Random; -import java.util.concurrent.TimeUnit; -import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.core.common.BlockValSet; -import org.apache.pinot.core.operator.BaseProjectOperator; -import org.apache.pinot.core.operator.blocks.ValueBlock; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; -import org.apache.pinot.core.plan.DocIdSetPlanNode; -import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.tsdb.spi.AggInfo; -import org.apache.pinot.tsdb.spi.TimeBuckets; -import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; -import org.testng.annotations.Test; - -import static org.mockito.Mockito.*; -import static org.testng.Assert.*; - - -public class TimeSeriesAggregationOperatorTest { - private static final Random RANDOM = new Random(); - private static final String DUMMY_TIME_COLUMN = "someTimeColumn"; - private static final String GROUP_BY_COLUMN = "city"; - private static final AggInfo AGG_INFO = new AggInfo("SUM", false, Collections.emptyMap()); - private static final ExpressionContext VALUE_EXPRESSION = ExpressionContext.forIdentifier("someValueColumn"); - private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, Duration.ofSeconds(100), 10); - private static final int NUM_DOCS_IN_DUMMY_DATA = 1000; - - @Test - public void testTimeSeriesAggregationOperator() { - TimeSeriesAggregationOperator timeSeriesAggregationOperator = buildOperatorWithSampleData( - new SimpleTimeSeriesBuilderFactory()); - TimeSeriesResultsBlock resultsBlock = timeSeriesAggregationOperator.getNextBlock(); - // Expect 2 series: Chicago and San Francisco - assertNotNull(resultsBlock); - assertEquals(2, resultsBlock.getNumRows()); - } - - @Test - public void testTimeSeriesAggregationOperatorWhenSeriesLimit() { - // Since we test with 2 series, use 1 as the limit. - TimeSeriesAggregationOperator timeSeriesAggregationOperator = buildOperatorWithSampleData( - new SimpleTimeSeriesBuilderFactory(1, 100_000_000L)); - try { - timeSeriesAggregationOperator.getNextBlock(); - fail(); - } catch (IllegalStateException e) { - assertTrue(e.getMessage().contains("Limit: 1. Series in current")); - } - } - - @Test - public void testTimeSeriesAggregationOperatorWhenDataPoints() { - // Since we test with 2 series, use 1 as the limit. - TimeSeriesAggregationOperator timeSeriesAggregationOperator = buildOperatorWithSampleData( - new SimpleTimeSeriesBuilderFactory(1000, 11)); - try { - timeSeriesAggregationOperator.getNextBlock(); - fail(); - } catch (IllegalStateException e) { - assertTrue(e.getMessage().contains("Limit: 11. Data points in current")); - } - } - - @Test - public void testGetTimeValueIndexForSeconds() { - /* - * TimeBuckets: [10_000, 10_100, 10_200, ..., 10_900] - * storedTimeValues: [9_999, 10_000, 9_999, 9_901, 10_100, 10_899, 10_900] - * expected indexes: [0, 0, 0, 0, 1, 9, 9] - */ - final int[] expectedIndexes = new int[]{0, 0, 0, 0, 1, 9, 9}; - final TimeUnit storedTimeUnit = TimeUnit.SECONDS; - TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000, Duration.ofSeconds(100), 10); - TimeSeriesAggregationOperator aggregationOperator = buildOperator(storedTimeUnit, timeBuckets); - long[] storedTimeValues = new long[]{9_999L, 10_000L, 9_999L, 9_901L, 10_100L, 10_899L, 10_900L}; - int[] indexes = aggregationOperator.getTimeValueIndex(storedTimeValues, storedTimeValues.length); - assertEquals(indexes, expectedIndexes); - } - - @Test - public void testGetTimeValueIndexForMillis() { - /* - * TimeBuckets: [10_000, 10_100, 10_200, ..., 10_900] - * storedTimeValues: [9_999_000, 10_000_000, 10_500_000, 10_899_999, 10_800_001, 10_900_000] - * expected indexes: [0, 0, 5, 9, 9, 9] - */ - final int[] expectedIndexes = new int[]{0, 0, 5, 9, 9, 9}; - final TimeUnit storedTimeUnit = TimeUnit.MILLISECONDS; - TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000, Duration.ofSeconds(100), 10); - TimeSeriesAggregationOperator aggregationOperator = buildOperator(storedTimeUnit, timeBuckets); - long[] storedTimeValues = new long[]{9_999_000L, 10_000_000L, 10_500_000L, 10_899_999L, 10_800_001L, 10_900_000L}; - int[] indexes = aggregationOperator.getTimeValueIndex(storedTimeValues, storedTimeValues.length); - assertEquals(indexes, expectedIndexes); - } - - @Test - public void testGetTimeValueIndexOutOfBounds() { - final TimeUnit storedTimeUnit = TimeUnit.SECONDS; - final int numTimeBuckets = 10; - final int windowSeconds = 100; - TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000, Duration.ofSeconds(windowSeconds), numTimeBuckets); - TimeSeriesAggregationOperator aggregationOperator = buildOperator(storedTimeUnit, timeBuckets); - testOutOfBoundsTimeValueIndex(new long[]{8_000}, numTimeBuckets, aggregationOperator); - testOutOfBoundsTimeValueIndex(new long[]{timeBuckets.getTimeRangeEndInclusive() + 1}, numTimeBuckets, - aggregationOperator); - } - - private void testOutOfBoundsTimeValueIndex(long[] storedTimeValues, int numTimeBuckets, - TimeSeriesAggregationOperator aggOperator) { - assertEquals(storedTimeValues.length, 1, "Misconfigured test: pass single stored time value"); - int[] indexes = aggOperator.getTimeValueIndex(storedTimeValues, storedTimeValues.length); - assertTrue(indexes[0] < 0 || indexes[0] >= numTimeBuckets, "Expected time index to spill beyond valid range"); - } - - private TimeSeriesAggregationOperator buildOperatorWithSampleData(TimeSeriesBuilderFactory seriesBuilderFactory) { - BaseProjectOperator mockProjectOperator = mock(BaseProjectOperator.class); - ValueBlock valueBlock = buildValueBlockForProjectOperator(); - when(mockProjectOperator.nextBlock()).thenReturn(valueBlock, (ValueBlock) null); - return new TimeSeriesAggregationOperator(DUMMY_TIME_COLUMN, - TimeUnit.SECONDS, 0L, AGG_INFO, VALUE_EXPRESSION, Collections.singletonList(GROUP_BY_COLUMN), - TIME_BUCKETS, mockProjectOperator, seriesBuilderFactory, mock(SegmentMetadata.class)); - } - - private static ValueBlock buildValueBlockForProjectOperator() { - ValueBlock valueBlock = mock(ValueBlock.class); - doReturn(NUM_DOCS_IN_DUMMY_DATA).when(valueBlock).getNumDocs(); - doReturn(buildBlockValSetForTime()).when(valueBlock).getBlockValueSet(DUMMY_TIME_COLUMN); - doReturn(buildBlockValSetForValues()).when(valueBlock).getBlockValueSet(VALUE_EXPRESSION); - doReturn(buildBlockValSetForGroupByColumns()).when(valueBlock).getBlockValueSet(GROUP_BY_COLUMN); - return valueBlock; - } - - private static BlockValSet buildBlockValSetForGroupByColumns() { - BlockValSet blockValSet = mock(BlockValSet.class); - String[] stringArray = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL]; - for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) { - stringArray[index] = RANDOM.nextBoolean() ? "Chicago" : "San Francisco"; - } - doReturn(stringArray).when(blockValSet).getStringValuesSV(); - doReturn(FieldSpec.DataType.STRING).when(blockValSet).getValueType(); - return blockValSet; - } - - private static BlockValSet buildBlockValSetForValues() { - BlockValSet blockValSet = mock(BlockValSet.class); - long[] valuesArray = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL]; - for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) { - valuesArray[index] = index; - } - doReturn(valuesArray).when(blockValSet).getLongValuesSV(); - doReturn(FieldSpec.DataType.LONG).when(blockValSet).getValueType(); - return blockValSet; - } - - private static BlockValSet buildBlockValSetForTime() { - BlockValSet blockValSet = mock(BlockValSet.class); - long[] timeValueArray = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL]; - for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) { - timeValueArray[index] = 901 + RANDOM.nextInt(1000); - } - doReturn(timeValueArray).when(blockValSet).getLongValuesSV(); - return blockValSet; - } - - private TimeSeriesAggregationOperator buildOperator(TimeUnit storedTimeUnit, TimeBuckets timeBuckets) { - return new TimeSeriesAggregationOperator( - DUMMY_TIME_COLUMN, storedTimeUnit, 0L, AGG_INFO, VALUE_EXPRESSION, Collections.emptyList(), - timeBuckets, mock(BaseProjectOperator.class), mock(TimeSeriesBuilderFactory.class), - mock(SegmentMetadata.class)); - } -} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index 0b59468e0d75..fbcf9b02ad1d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -32,19 +32,23 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.common.request.context.TimeSeriesContext; +import org.apache.pinot.common.request.context.RequestContextUtils; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider; import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.operator.timeseries.TimeSeriesOperatorUtils; +import org.apache.pinot.core.operator.transform.function.TimeSeriesBucketTransformFunction; +import org.apache.pinot.core.query.aggregation.function.TimeSeriesAggregationFunction; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.local.data.manager.TableDataManager; @@ -220,17 +224,14 @@ public void testMinQuery() { @Test public void testTimeSeriesSumQuery() { TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofHours(2), 2); - ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderAmount"); - TimeSeriesContext timeSeriesContext = - new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, - 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", false, Collections.emptyMap())); - QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext, Collections.emptyList()); + QueryContext queryContext = getQueryContextForTimeSeries("orderAmount", timeBuckets, 0L, + new AggInfo("SUM", false, Collections.emptyMap()), Collections.emptyList()); ServerQueryRequest serverQueryRequest = new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); - assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock); - TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); - TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build(); + assertTrue(instanceResponse.getResultsBlock() instanceof AggregationResultsBlock); + TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.handleAggregationResultsBlock(timeBuckets, + (AggregationResultsBlock) instanceResponse.getResultsBlock()); assertEquals(timeSeriesBlock.getSeriesMap().size(), 1); assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[0]); assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[1], 29885544.0); @@ -239,17 +240,14 @@ public void testTimeSeriesSumQuery() { @Test public void testTimeSeriesMaxQuery() { TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100); - ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount"); - TimeSeriesContext timeSeriesContext = - new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, - 0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", false, Collections.emptyMap())); - QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); + QueryContext queryContext = getQueryContextForTimeSeries("orderItemCount", timeBuckets, 0L, + new AggInfo("MAX", false, Collections.emptyMap()), List.of("cityName")); ServerQueryRequest serverQueryRequest = new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); - assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock); - TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); - TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build(); + assertTrue(instanceResponse.getResultsBlock() instanceof GroupByResultsBlock); + GroupByResultsBlock resultsBlock = (GroupByResultsBlock) instanceResponse.getResultsBlock(); + TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.handleGroupByResultsBlock(timeBuckets, resultsBlock); assertEquals(5, timeSeriesBlock.getSeriesMap().size()); // For any city, say "New York", the max order item count should be 4 boolean foundNewYork = false; @@ -271,17 +269,14 @@ public void testTimeSeriesMaxQuery() { @Test public void testTimeSeriesMinQuery() { TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100); - ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount"); - TimeSeriesContext timeSeriesContext = - new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, - 0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", false, Collections.emptyMap())); - QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); + QueryContext queryContext = getQueryContextForTimeSeries("orderItemCount", timeBuckets, 0L, + new AggInfo("MIN", false, Collections.emptyMap()), List.of("cityName")); ServerQueryRequest serverQueryRequest = new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); - assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock); - TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); - TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build(); + assertTrue(instanceResponse.getResultsBlock() instanceof GroupByResultsBlock); + TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.handleGroupByResultsBlock(timeBuckets, + (GroupByResultsBlock) instanceResponse.getResultsBlock()); assertEquals(5, timeSeriesBlock.getSeriesMap().size()); // For any city, say "Chicago", the min order item count should be 0 boolean foundChicago = false; @@ -312,18 +307,21 @@ private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) { return new ServerQueryRequest(instanceRequest, ServerMetrics.get(), System.currentTimeMillis()); } - private QueryContext getQueryContextForTimeSeries(TimeSeriesContext context) { - return getQueryContextForTimeSeries(context, Collections.singletonList( - ExpressionContext.forIdentifier("cityName"))); - } - - private QueryContext getQueryContextForTimeSeries(TimeSeriesContext context, List groupBy) { + private QueryContext getQueryContextForTimeSeries(String valueExpression, TimeBuckets timeBuckets, long offsetSeconds, + AggInfo aggInfo, List groupBy) { + List groupByExpList = groupBy.stream().map(RequestContextUtils::getExpression) + .collect(Collectors.toList()); + ExpressionContext timeExpression = TimeSeriesBucketTransformFunction.create(TIME_SERIES_TIME_COL_NAME, + TimeUnit.SECONDS, timeBuckets, offsetSeconds); + ExpressionContext aggregateExpr = TimeSeriesAggregationFunction.create(TIME_SERIES_LANGUAGE_NAME, valueExpression, + timeExpression, timeBuckets, aggInfo); QueryContext.Builder builder = new QueryContext.Builder(); builder.setTableName(OFFLINE_TABLE_NAME); - builder.setTimeSeriesContext(context); builder.setAliasList(Collections.emptyList()); - builder.setSelectExpressions(Collections.emptyList()); - builder.setGroupByExpressions(groupBy); + builder.setSelectExpressions(List.of(aggregateExpr)); + // We pass in null to group-by exp to get AggregationResultsBlock in response to test both group-by and agg paths. + builder.setGroupByExpressions(groupByExpList.isEmpty() ? null : groupByExpList); + builder.setLimit(Integer.MAX_VALUE); return builder.build(); } } diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java index cf38b501b00c..fec5db4e1fc4 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java @@ -81,7 +81,10 @@ private List consumeGeneric(String pipeline) { int indexOfOpenBracket = pipeline.indexOf("{"); int indexOfClosedBracket = pipeline.indexOf("}"); result.add(pipeline.substring(0, indexOfOpenBracket)); - result.add(pipeline.substring(indexOfOpenBracket + 1, indexOfClosedBracket)); + String arg = pipeline.substring(indexOfOpenBracket + 1, indexOfClosedBracket); + if (!arg.isEmpty()) { + result.add(arg); + } return result; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java index 8b577105d3fd..5db6920be53c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java @@ -23,7 +23,9 @@ import java.util.concurrent.ExecutorService; import org.apache.commons.collections.MapUtils; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; +import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.operator.timeseries.TimeSeriesOperatorUtils; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.logger.ServerQueryLogger; import org.apache.pinot.core.query.request.ServerQueryRequest; @@ -32,14 +34,16 @@ public class LeafTimeSeriesOperator extends BaseTimeSeriesOperator { + private final TimeSeriesExecutionContext _context; private final ServerQueryRequest _request; private final QueryExecutor _queryExecutor; private final ExecutorService _executorService; private final ServerQueryLogger _queryLogger; - public LeafTimeSeriesOperator(ServerQueryRequest serverQueryRequest, QueryExecutor queryExecutor, - ExecutorService executorService) { + public LeafTimeSeriesOperator(TimeSeriesExecutionContext context, ServerQueryRequest serverQueryRequest, + QueryExecutor queryExecutor, ExecutorService executorService) { super(Collections.emptyList()); + _context = context; _request = serverQueryRequest; _queryExecutor = queryExecutor; _executorService = executorService; @@ -50,16 +54,25 @@ public LeafTimeSeriesOperator(ServerQueryRequest serverQueryRequest, QueryExecut public TimeSeriesBlock getNextBlock() { Preconditions.checkNotNull(_queryExecutor, "Leaf time series operator has not been initialized"); InstanceResponseBlock instanceResponseBlock = _queryExecutor.execute(_request, _executorService); - assert instanceResponseBlock.getResultsBlock() instanceof TimeSeriesResultsBlock; + assert instanceResponseBlock.getResultsBlock() instanceof GroupByResultsBlock; _queryLogger.logQuery(_request, instanceResponseBlock, "TimeSeries"); if (MapUtils.isNotEmpty(instanceResponseBlock.getExceptions())) { // TODO: Return error in the TimeSeriesBlock instead? - String oneException = instanceResponseBlock.getExceptions().values().iterator().next(); - throw new RuntimeException(oneException); + String message = instanceResponseBlock.getExceptions().values().iterator().next(); + throw new RuntimeException("Error running time-series query: " + message); + } + if (instanceResponseBlock.getResultsBlock() instanceof GroupByResultsBlock) { + return TimeSeriesOperatorUtils.handleGroupByResultsBlock(_context.getInitialTimeBuckets(), + (GroupByResultsBlock) instanceResponseBlock.getResultsBlock()); + } else if (instanceResponseBlock.getResultsBlock() instanceof AggregationResultsBlock) { + return TimeSeriesOperatorUtils.handleAggregationResultsBlock(_context.getInitialTimeBuckets(), + (AggregationResultsBlock) instanceResponseBlock.getResultsBlock()); + } else if (instanceResponseBlock.getResultsBlock() == null) { + throw new IllegalStateException("Found null results block in time-series query"); + } else { + throw new UnsupportedOperationException( + String.format("Unknown results block: %s", instanceResponseBlock.getResultsBlock().getClass().getName())); } - TimeSeriesResultsBlock timeSeriesResultsBlock = - ((TimeSeriesResultsBlock) instanceResponseBlock.getResultsBlock()); - return timeSeriesResultsBlock.getTimeSeriesBuilderBlock().build(); } @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java index b2be6b2f5622..cfcd80a9e7ea 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java @@ -31,13 +31,15 @@ import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FilterContext; import org.apache.pinot.common.request.context.RequestContextUtils; -import org.apache.pinot.common.request.context.TimeSeriesContext; +import org.apache.pinot.core.operator.transform.function.TimeSeriesBucketTransformFunction; +import org.apache.pinot.core.query.aggregation.function.TimeSeriesAggregationFunction; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys; import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.tsdb.spi.TimeBuckets; import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; @@ -84,7 +86,8 @@ private TimeSeriesPhysicalTableScan convertLeafToPhysicalTableScan(LeafTimeSerie TimeSeriesExecutionContext context) { List segments = context.getPlanIdToSegmentsMap().getOrDefault(leafNode.getId(), Collections.emptyList()); ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(leafNode, segments, context); - return new TimeSeriesPhysicalTableScan(leafNode.getId(), serverQueryRequest, _queryExecutor, _executorService); + return new TimeSeriesPhysicalTableScan(context, leafNode.getId(), serverQueryRequest, _queryExecutor, + _executorService); } public ServerQueryRequest compileLeafServerQueryRequest(LeafTimeSeriesPlanNode leafNode, List segments, @@ -93,6 +96,9 @@ public ServerQueryRequest compileLeafServerQueryRequest(LeafTimeSeriesPlanNode l segments, getServerQueryRequestMetadataMap(context), _serverMetrics); } + /** + * Create a transform expression, and link it with an aggregation function. + */ @VisibleForTesting QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode, TimeSeriesExecutionContext context) { FilterContext filterContext = @@ -100,18 +106,18 @@ QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode, TimeSeriesExec leafNode.getEffectiveFilter(context.getInitialTimeBuckets()))); List groupByExpressions = leafNode.getGroupByExpressions().stream() .map(RequestContextUtils::getExpression).collect(Collectors.toList()); - ExpressionContext valueExpression = RequestContextUtils.getExpression(leafNode.getValueExpression()); - TimeSeriesContext timeSeriesContext = new TimeSeriesContext(context.getLanguage(), - leafNode.getTimeColumn(), leafNode.getTimeUnit(), context.getInitialTimeBuckets(), leafNode.getOffsetSeconds(), - valueExpression, leafNode.getAggInfo()); + TimeBuckets timeBuckets = context.getInitialTimeBuckets(); + ExpressionContext timeTransform = TimeSeriesBucketTransformFunction.create(leafNode.getTimeColumn(), + leafNode.getTimeUnit(), timeBuckets, leafNode.getOffsetSeconds() == null ? 0 : leafNode.getOffsetSeconds()); + ExpressionContext aggregation = TimeSeriesAggregationFunction.create(context.getLanguage(), + leafNode.getValueExpression(), timeTransform, timeBuckets, leafNode.getAggInfo()); return new QueryContext.Builder() .setTableName(leafNode.getTableName()) .setFilter(filterContext) .setGroupByExpressions(groupByExpressions) - .setSelectExpressions(Collections.emptyList()) + .setSelectExpressions(List.of(aggregation)) .setQueryOptions(ImmutableMap.of(QueryOptionKey.TIMEOUT_MS, Long.toString(context.getRemainingTimeMs()))) .setAliasList(Collections.emptyList()) - .setTimeSeriesContext(timeSeriesContext) .setLimit(Integer.MAX_VALUE) .build(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java index 959f580b10fd..d6787c3b0204 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java @@ -29,16 +29,19 @@ public class TimeSeriesPhysicalTableScan extends BaseTimeSeriesPlanNode { + private final TimeSeriesExecutionContext _context; private final ServerQueryRequest _request; private final QueryExecutor _queryExecutor; private final ExecutorService _executorService; public TimeSeriesPhysicalTableScan( + TimeSeriesExecutionContext context, String id, ServerQueryRequest serverQueryRequest, QueryExecutor queryExecutor, ExecutorService executorService) { super(id, Collections.emptyList()); + _context = context; _request = serverQueryRequest; _queryExecutor = queryExecutor; _executorService = executorService; @@ -59,7 +62,7 @@ public ExecutorService getExecutorService() { @Override public BaseTimeSeriesPlanNode withInputs(List newInputs) { Preconditions.checkState(newInputs.isEmpty(), "Attempted to add inputs to physical table scan"); - return new TimeSeriesPhysicalTableScan(_id, _request, _queryExecutor, _executorService); + return new TimeSeriesPhysicalTableScan(_context, _id, _request, _queryExecutor, _executorService); } public String getKlass() { @@ -73,6 +76,6 @@ public String getExplainName() { @Override public BaseTimeSeriesOperator run() { - return new LeafTimeSeriesOperator(_request, _queryExecutor, _executorService); + return new LeafTimeSeriesOperator(_context, _request, _queryExecutor, _executorService); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java index b30a82d165ee..f98e4228e09b 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java @@ -67,11 +67,6 @@ public void testCompileQueryContext() { new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 0L, filterExpr, "orderCount", aggInfo, Collections.singletonList("cityName")); QueryContext queryContext = serverPlanVisitor.compileQueryContext(leafNode, context); - assertNotNull(queryContext.getTimeSeriesContext()); - assertEquals(queryContext.getTimeSeriesContext().getLanguage(), LANGUAGE); - assertEquals(queryContext.getTimeSeriesContext().getOffsetSeconds(), 0L); - assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(), timeColumn); - assertEquals(queryContext.getTimeSeriesContext().getValueExpression().getIdentifier(), "orderCount"); assertEquals(queryContext.getFilter().toString(), "(cityName = 'Chicago' AND orderTime > '990' AND orderTime <= '1990')"); assertTrue(isNumber(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS))); @@ -88,11 +83,6 @@ public void testCompileQueryContext() { assertNotNull(queryContext); assertNotNull(queryContext.getGroupByExpressions()); assertEquals("concat(cityName,stateName,'-')", queryContext.getGroupByExpressions().get(0).toString()); - assertNotNull(queryContext.getTimeSeriesContext()); - assertEquals(queryContext.getTimeSeriesContext().getLanguage(), LANGUAGE); - assertEquals(queryContext.getTimeSeriesContext().getOffsetSeconds(), 10L); - assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(), timeColumn); - assertEquals(queryContext.getTimeSeriesContext().getValueExpression().toString(), "times(orderCount,'2')"); assertNotNull(queryContext.getFilter()); assertEquals(queryContext.getFilter().toString(), "(cityName = 'Chicago' AND orderTime > '980' AND orderTime <= '1980')"); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 09fb71c968c6..9840f684c0a6 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -220,7 +220,8 @@ public enum AggregationFunctionType { SqlTypeName.OTHER), PERCENTILERAWKLLMV("percentileRawKLLMV", ReturnTypes.VARCHAR, OperandTypes.family(List.of(SqlTypeFamily.ARRAY, SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER), i -> i == 2), - SqlTypeName.OTHER); + SqlTypeName.OTHER), + TIMESERIESAGGREGATE("timeSeriesAggregate", SqlTypeName.OTHER, SqlTypeName.VARCHAR); private static final Set NAMES = Arrays.stream(values()) .flatMap(func -> Stream.of(func.name(), func.getName(), func.getName().toLowerCase())) diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java index 33b66bff1f7a..6cfb49ee6081 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang3.StringUtils; /** @@ -88,4 +90,30 @@ public Map getParams() { public String getParam(String key) { return _params.get(key); } + + public static String serializeParams(Map params) { + StringBuilder builder = new StringBuilder(); + for (var entry : params.entrySet()) { + if (builder.length() > 0) { + builder.append(","); + } + builder.append(entry.getKey()); + builder.append("="); + builder.append(entry.getValue()); + } + return builder.toString(); + } + + public static Map loadSerializedParams(String serialized) { + Map result = new HashMap<>(); + if (StringUtils.isBlank(serialized)) { + return result; + } + String[] parts = serialized.split(","); + for (String keyValue : parts) { + String[] keyValueArray = keyValue.split("="); + result.put(keyValueArray[0], keyValueArray[1]); + } + return result; + } } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java index 9cca55ebcbb6..f4f52f87dd82 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.tsdb.spi.series; +import java.util.Collections; import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.tsdb.spi.TimeBuckets; @@ -31,6 +32,8 @@ * Important: Refer to {@link TimeSeries} for details on Series ID and how to use it in general. */ public abstract class BaseTimeSeriesBuilder { + public static final List UNINITIALISED_TAG_NAMES = Collections.emptyList(); + public static final Object[] UNINITIALISED_TAG_VALUES = new Object[]{}; protected final String _id; @Nullable protected final Long[] _timeValues; @@ -40,8 +43,8 @@ public abstract class BaseTimeSeriesBuilder { protected final Object[] _tagValues; /** - * Note that ID should be hashed to a Long to become the key in the Map<Long, List<TimeSeries>> in - * {@link TimeSeriesBlock}. Refer to {@link TimeSeries} for more details. + * Note: The leaf stage will use {@link #UNINITIALISED_TAG_NAMES} and {@link #UNINITIALISED_TAG_VALUES} during + * the aggregation. This is because tag values are materialized very late. */ public BaseTimeSeriesBuilder(String id, @Nullable Long[] timeValues, @Nullable TimeBuckets timeBuckets, List tagNames, Object[] tagValues) { @@ -81,4 +84,9 @@ public void mergeAlignedSeriesBuilder(BaseTimeSeriesBuilder builder) { } public abstract TimeSeries build(); + + /** + * Used by the leaf stage, because the leaf stage materializes tag values very late. + */ + public abstract TimeSeries buildWithTagOverrides(List tagNames, Object[] tagValues); } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxTimeSeriesBuilder.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxTimeSeriesBuilder.java index 589bbd1bafd7..28c21b0b1c7a 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxTimeSeriesBuilder.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxTimeSeriesBuilder.java @@ -54,4 +54,9 @@ public void addValue(long timeValue, Double value) { public TimeSeries build() { return new TimeSeries(_id, null, _timeBuckets, _values, _tagNames, _tagValues); } + + @Override + public TimeSeries buildWithTagOverrides(List tagNames, Object[] tagValues) { + return new TimeSeries(_id, null, _timeBuckets, _values, tagNames, tagValues); + } } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MinTimeSeriesBuilder.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MinTimeSeriesBuilder.java index 6247114d6145..327c5625ac07 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MinTimeSeriesBuilder.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MinTimeSeriesBuilder.java @@ -54,4 +54,9 @@ public void addValue(long timeValue, Double value) { public TimeSeries build() { return new TimeSeries(_id, null, _timeBuckets, _values, _tagNames, _tagValues); } + + @Override + public TimeSeries buildWithTagOverrides(List tagNames, Object[] tagValues) { + return new TimeSeries(_id, null, _timeBuckets, _values, tagNames, tagValues); + } } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingTimeSeriesBuilder.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingTimeSeriesBuilder.java index 8123dde1bf46..7f958acc406b 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingTimeSeriesBuilder.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingTimeSeriesBuilder.java @@ -54,4 +54,9 @@ public void addValue(long timeValue, Double value) { public TimeSeries build() { return new TimeSeries(_id, null, _timeBuckets, _values, _tagNames, _tagValues); } + + @Override + public TimeSeries buildWithTagOverrides(List tagNames, Object[] tagValues) { + return new TimeSeries(_id, null, _timeBuckets, _values, tagNames, tagValues); + } }