From 5950b1d78fa9d78479177fbd8ebb3642ebeb647e Mon Sep 17 00:00:00 2001 From: ankitsultana Date: Sun, 19 Jan 2025 05:26:50 +0000 Subject: [PATCH] self-review --- .../timeseries/TimeSeriesOperatorUtils.java | 4 +- .../TimeSeriesBucketTransformFunction.java | 4 +- .../TimeSeriesAggregationFunction.java | 118 ++++++++++++++---- .../core/function/FunctionRegistryTest.java | 3 +- .../query/executor/QueryExecutorTest.java | 6 +- .../timeseries/LeafTimeSeriesOperator.java | 4 +- .../org/apache/pinot/tsdb/spi/AggInfo.java | 22 ++-- .../spi/series/BaseTimeSeriesBuilder.java | 4 +- 8 files changed, 119 insertions(+), 46 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java index 2e014e728e6b..236d4f858ff0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java @@ -39,7 +39,7 @@ public class TimeSeriesOperatorUtils { private TimeSeriesOperatorUtils() { } - public static TimeSeriesBlock handleGroupByResultsBlock(TimeBuckets timeBuckets, + public static TimeSeriesBlock buildTimeSeriesBlock(TimeBuckets timeBuckets, GroupByResultsBlock groupByResultsBlock) { if (groupByResultsBlock.getNumRows() == 0) { return new TimeSeriesBlock(timeBuckets, new HashMap<>()); @@ -68,7 +68,7 @@ public static TimeSeriesBlock handleGroupByResultsBlock(TimeBuckets timeBuckets, return new TimeSeriesBlock(timeBuckets, timeSeriesMap); } - public static TimeSeriesBlock handleAggregationResultsBlock(TimeBuckets timeBuckets, + public static TimeSeriesBlock buildTimeSeriesBlock(TimeBuckets timeBuckets, AggregationResultsBlock aggregationResultsBlock) { if (aggregationResultsBlock.getResults() == null) { return new TimeSeriesBlock(timeBuckets, new HashMap<>()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesBucketTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesBucketTransformFunction.java index bb62d1967c3d..ab70ca9f9402 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesBucketTransformFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TimeSeriesBucketTransformFunction.java @@ -37,9 +37,9 @@ /** - * Usage: + * Used by the time-series engine to convert a time expression to an index in the {@link TimeBuckets}. *
- *   args: time column/expression, time-unit, first time bucket value, bucket size in seconds, offset in seconds
+ *   args: (timeExpression, timeUnit, first_time_bucket_value, bucket_window_seconds, offset)
  *   timeSeriesBucketIndex(secondsSinceEpoch, 'MILLISECONDS', 123, 10, 0)
  * 
*/ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java index a1f5ce500cb9..c5a9cb60194a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java @@ -44,11 +44,8 @@ /** - * Usage: - *
- *   Example:
- *     timeSeriesAggregate("m3ql", "MIN", valueExpr, bucketIndexReturningExpr, 1000, 10, 100, "aggParam1=value1")
- * 
+ * Aggregation function used by the Time Series Engine. + * TODO: This can't be used with SQL because the Object Serde is not implemented. */ public class TimeSeriesAggregationFunction implements AggregationFunction { private final TimeSeriesBuilderFactory _factory; @@ -57,6 +54,12 @@ public class TimeSeriesAggregationFunction implements AggregationFunction + * timeSeriesAggregate("m3ql", "MIN", valueExpr, timeBucketExpr, firstBucketValue, bucketLenSeconds, numBuckets, "aggParam1=value1") + * + */ public TimeSeriesAggregationFunction(List arguments) { // Initialize everything Preconditions.checkArgument(arguments.size() == 8, "Expected 8 arguments for time-series agg"); @@ -67,7 +70,7 @@ public TimeSeriesAggregationFunction(List arguments) { long firstBucketValue = arguments.get(4).getLiteral().getLongValue(); long bucketWindowSeconds = arguments.get(5).getLiteral().getLongValue(); int numBuckets = arguments.get(6).getLiteral().getIntValue(); - Map aggParams = AggInfo.loadSerializedParams(arguments.get(7).getLiteral().getStringValue()); + Map aggParams = AggInfo.deserializeParams(arguments.get(7).getLiteral().getStringValue()); AggInfo aggInfo = new AggInfo(aggFunctionName, true /* is partial agg */, aggParams); // Set all values _factory = TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(language); @@ -106,15 +109,19 @@ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int ma public void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map blockValSetMap) { int[] timeIndexes = blockValSetMap.get(_timeExpression).getIntValuesSV(); - double[] values = blockValSetMap.get(_valueExpression).getDoubleValuesSV(); - BaseTimeSeriesBuilder currentSeriesBuilder = aggregationResultHolder.getResult(); - if (currentSeriesBuilder == null) { - currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets, - BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES); - aggregationResultHolder.setValue(currentSeriesBuilder); - } - for (int docIndex = 0; docIndex < length; docIndex++) { - currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]); + BlockValSet valueBlockValSet = blockValSetMap.get(_valueExpression); + switch (valueBlockValSet.getValueType()) { + case DOUBLE: + case LONG: + case INT: + aggregateNumericValues(length, timeIndexes, aggregationResultHolder, valueBlockValSet); + break; + case STRING: + aggregateStringValues(length, timeIndexes, aggregationResultHolder, valueBlockValSet); + break; + default: + throw new UnsupportedOperationException(String.format("Unsupported type: %s in aggregate", + valueBlockValSet.getValueType())); } } @@ -122,16 +129,19 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map blockValSetMap) { final int[] timeIndexes = blockValSetMap.get(_timeExpression).getIntValuesSV(); - final double[] values = blockValSetMap.get(_valueExpression).getDoubleValuesSV(); - for (int docIndex = 0; docIndex < length; docIndex++) { - int groupId = groupKeyArray[docIndex]; - BaseTimeSeriesBuilder currentSeriesBuilder = groupByResultHolder.getResult(groupId); - if (currentSeriesBuilder == null) { - currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets, - BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES); - groupByResultHolder.setValueForKey(groupId, currentSeriesBuilder); - } - currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]); + BlockValSet valueBlockValSet = blockValSetMap.get(_valueExpression); + switch (valueBlockValSet.getValueType()) { + case DOUBLE: + case LONG: + case INT: + aggregateGroupByNumericValues(length, groupKeyArray, timeIndexes, groupByResultHolder, valueBlockValSet); + break; + case STRING: + aggregateGroupByStringValues(length, groupKeyArray, timeIndexes, groupByResultHolder, valueBlockValSet); + break; + default: + throw new UnsupportedOperationException(String.format("Unsupported type: %s in aggregate", + valueBlockValSet.getValueType())); } } @@ -178,6 +188,64 @@ public String toExplainString() { return "TIME_SERIES"; } + private void aggregateNumericValues(int length, int[] timeIndexes, AggregationResultHolder resultHolder, + BlockValSet blockValSet) { + double[] values = blockValSet.getDoubleValuesSV(); + BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult(); + if (currentSeriesBuilder == null) { + currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets, + BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES); + resultHolder.setValue(currentSeriesBuilder); + } + for (int docIndex = 0; docIndex < length; docIndex++) { + currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]); + } + } + + private void aggregateStringValues(int length, int[] timeIndexes, AggregationResultHolder resultHolder, + BlockValSet blockValSet) { + String[] values = blockValSet.getStringValuesSV(); + BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult(); + if (currentSeriesBuilder == null) { + currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets, + BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES); + resultHolder.setValue(currentSeriesBuilder); + } + for (int docIndex = 0; docIndex < length; docIndex++) { + currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]); + } + } + + private void aggregateGroupByNumericValues(int length, int[] groupKeyArray, int[] timeIndexes, + GroupByResultHolder resultHolder, BlockValSet blockValSet) { + final double[] values = blockValSet.getDoubleValuesSV(); + for (int docIndex = 0; docIndex < length; docIndex++) { + int groupId = groupKeyArray[docIndex]; + BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult(groupId); + if (currentSeriesBuilder == null) { + currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets, + BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES); + resultHolder.setValueForKey(groupId, currentSeriesBuilder); + } + currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]); + } + } + + private void aggregateGroupByStringValues(int length, int[] groupKeyArray, int[] timeIndexes, + GroupByResultHolder resultHolder, BlockValSet blockValSet) { + final String[] values = blockValSet.getStringValuesSV(); + for (int docIndex = 0; docIndex < length; docIndex++) { + int groupId = groupKeyArray[docIndex]; + BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult(groupId); + if (currentSeriesBuilder == null) { + currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets, + BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES); + resultHolder.setValueForKey(groupId, currentSeriesBuilder); + } + currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]); + } + } + public static ExpressionContext create(String language, String valueExpressionStr, ExpressionContext timeExpression, TimeBuckets timeBuckets, AggInfo aggInfo) { ExpressionContext valueExpression = RequestContextUtils.getExpression(valueExpressionStr); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/function/FunctionRegistryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/function/FunctionRegistryTest.java index 4dd7e5dd21cd..14b9c97263c9 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/function/FunctionRegistryTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/function/FunctionRegistryTest.java @@ -47,7 +47,8 @@ public class FunctionRegistryTest { TransformFunctionType.ARRAY_AVERAGE, TransformFunctionType.ARRAY_MIN, TransformFunctionType.ARRAY_MAX, TransformFunctionType.ARRAY_SUM, TransformFunctionType.VALUE_IN, TransformFunctionType.IN_ID_SET, TransformFunctionType.GROOVY, TransformFunctionType.CLP_DECODE, TransformFunctionType.CLP_ENCODED_VARS_MATCH, - TransformFunctionType.ST_POLYGON, TransformFunctionType.ST_AREA, TransformFunctionType.ITEM); + TransformFunctionType.ST_POLYGON, TransformFunctionType.ST_AREA, TransformFunctionType.ITEM, + TransformFunctionType.TIME_SERIES_BUCKET); private static final EnumSet IGNORED_FILTER_KINDS = EnumSet.of( // Special filter functions without implementation FilterKind.TEXT_MATCH, FilterKind.TEXT_CONTAINS, FilterKind.JSON_MATCH, FilterKind.VECTOR_SIMILARITY, diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index fbcf9b02ad1d..b0a5b3ac2e3b 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -230,7 +230,7 @@ public void testTimeSeriesSumQuery() { new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); assertTrue(instanceResponse.getResultsBlock() instanceof AggregationResultsBlock); - TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.handleAggregationResultsBlock(timeBuckets, + TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets, (AggregationResultsBlock) instanceResponse.getResultsBlock()); assertEquals(timeSeriesBlock.getSeriesMap().size(), 1); assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[0]); @@ -247,7 +247,7 @@ public void testTimeSeriesMaxQuery() { InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); assertTrue(instanceResponse.getResultsBlock() instanceof GroupByResultsBlock); GroupByResultsBlock resultsBlock = (GroupByResultsBlock) instanceResponse.getResultsBlock(); - TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.handleGroupByResultsBlock(timeBuckets, resultsBlock); + TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets, resultsBlock); assertEquals(5, timeSeriesBlock.getSeriesMap().size()); // For any city, say "New York", the max order item count should be 4 boolean foundNewYork = false; @@ -275,7 +275,7 @@ public void testTimeSeriesMinQuery() { new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); assertTrue(instanceResponse.getResultsBlock() instanceof GroupByResultsBlock); - TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.handleGroupByResultsBlock(timeBuckets, + TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets, (GroupByResultsBlock) instanceResponse.getResultsBlock()); assertEquals(5, timeSeriesBlock.getSeriesMap().size()); // For any city, say "Chicago", the min order item count should be 0 diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java index 5db6920be53c..bb700cfc394f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java @@ -62,10 +62,10 @@ public TimeSeriesBlock getNextBlock() { throw new RuntimeException("Error running time-series query: " + message); } if (instanceResponseBlock.getResultsBlock() instanceof GroupByResultsBlock) { - return TimeSeriesOperatorUtils.handleGroupByResultsBlock(_context.getInitialTimeBuckets(), + return TimeSeriesOperatorUtils.buildTimeSeriesBlock(_context.getInitialTimeBuckets(), (GroupByResultsBlock) instanceResponseBlock.getResultsBlock()); } else if (instanceResponseBlock.getResultsBlock() instanceof AggregationResultsBlock) { - return TimeSeriesOperatorUtils.handleAggregationResultsBlock(_context.getInitialTimeBuckets(), + return TimeSeriesOperatorUtils.buildTimeSeriesBlock(_context.getInitialTimeBuckets(), (AggregationResultsBlock) instanceResponseBlock.getResultsBlock()); } else if (instanceResponseBlock.getResultsBlock() == null) { throw new IllegalStateException("Found null results block in time-series query"); diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java index 6cfb49ee6081..1f45bf520623 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java @@ -31,18 +31,22 @@ * AggInfo is used to represent the aggregation function and its parameters. * Aggregation functions are stored as a string, since time-series languages * are allowed to implement their own aggregation functions. - * - * The class now includes a map of parameters, which can store various + *
+ * The class also includes a map of parameters, which can store various * configuration options for the aggregation function. This allows for - * more flexibility in defining and customizing aggregations. - * + * more flexibility in defining and customizing aggregations. The parameters + * must be able to serialize and deserialize via the {@link #serializeParams(Map)} + * and {@link #deserializeParams(String)}. + *
* Common parameters might include: * - window: Defines the time window for aggregation - * + *
* Example usage: - * Map params = new HashMap<>(); - * params.put("window", "5m"); - * AggInfo aggInfo = new AggInfo("rate", true, params); + *
+ *   Map params = new HashMap<>();
+ *   params.put("window", "5m");
+ *   AggInfo aggInfo = new AggInfo("rate", true, params);
+ * 
*/ public class AggInfo { private final String _aggFunction; @@ -104,7 +108,7 @@ public static String serializeParams(Map params) { return builder.toString(); } - public static Map loadSerializedParams(String serialized) { + public static Map deserializeParams(String serialized) { Map result = new HashMap<>(); if (StringUtils.isBlank(serialized)) { return result; diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java index f4f52f87dd82..84239e121713 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java @@ -44,7 +44,7 @@ public abstract class BaseTimeSeriesBuilder { /** * Note: The leaf stage will use {@link #UNINITIALISED_TAG_NAMES} and {@link #UNINITIALISED_TAG_VALUES} during - * the aggregation. This is because tag values are materialized very late. + * the aggregation. This is because tag values are materialized after the Combine Operator. */ public BaseTimeSeriesBuilder(String id, @Nullable Long[] timeValues, @Nullable TimeBuckets timeBuckets, List tagNames, Object[] tagValues) { @@ -58,7 +58,7 @@ public BaseTimeSeriesBuilder(String id, @Nullable Long[] timeValues, @Nullable T public abstract void addValueAtIndex(int timeBucketIndex, Double value); public void addValueAtIndex(int timeBucketIndex, String value) { - throw new IllegalStateException("This aggregation function does not support string input"); + throw new UnsupportedOperationException("This aggregation function does not support string input"); } public abstract void addValue(long timeValue, Double value);