Skip to content

Commit

Permalink
self-review
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitsultana committed Jan 19, 2025
1 parent a89a328 commit 5950b1d
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class TimeSeriesOperatorUtils {
private TimeSeriesOperatorUtils() {
}

public static TimeSeriesBlock handleGroupByResultsBlock(TimeBuckets timeBuckets,
public static TimeSeriesBlock buildTimeSeriesBlock(TimeBuckets timeBuckets,
GroupByResultsBlock groupByResultsBlock) {
if (groupByResultsBlock.getNumRows() == 0) {
return new TimeSeriesBlock(timeBuckets, new HashMap<>());
Expand Down Expand Up @@ -68,7 +68,7 @@ public static TimeSeriesBlock handleGroupByResultsBlock(TimeBuckets timeBuckets,
return new TimeSeriesBlock(timeBuckets, timeSeriesMap);
}

public static TimeSeriesBlock handleAggregationResultsBlock(TimeBuckets timeBuckets,
public static TimeSeriesBlock buildTimeSeriesBlock(TimeBuckets timeBuckets,
AggregationResultsBlock aggregationResultsBlock) {
if (aggregationResultsBlock.getResults() == null) {
return new TimeSeriesBlock(timeBuckets, new HashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@


/**
* Usage:
* Used by the time-series engine to convert a time expression to an index in the {@link TimeBuckets}.
* <pre>
* args: time column/expression, time-unit, first time bucket value, bucket size in seconds, offset in seconds
* args: (timeExpression, timeUnit, first_time_bucket_value, bucket_window_seconds, offset)
* timeSeriesBucketIndex(secondsSinceEpoch, 'MILLISECONDS', 123, 10, 0)
* </pre>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,8 @@


/**
* Usage:
* <pre>
* Example:
* timeSeriesAggregate("m3ql", "MIN", valueExpr, bucketIndexReturningExpr, 1000, 10, 100, "aggParam1=value1")
* </pre>
* Aggregation function used by the Time Series Engine.
* TODO: This can't be used with SQL because the Object Serde is not implemented.
*/
public class TimeSeriesAggregationFunction implements AggregationFunction<BaseTimeSeriesBuilder, DoubleArrayList> {
private final TimeSeriesBuilderFactory _factory;
Expand All @@ -57,6 +54,12 @@ public class TimeSeriesAggregationFunction implements AggregationFunction<BaseTi
private final ExpressionContext _timeExpression;
private final TimeBuckets _timeBuckets;

/**
* Arguments are as shown below:
* <pre>
* timeSeriesAggregate("m3ql", "MIN", valueExpr, timeBucketExpr, firstBucketValue, bucketLenSeconds, numBuckets, "aggParam1=value1")
* </pre>
*/
public TimeSeriesAggregationFunction(List<ExpressionContext> arguments) {
// Initialize everything
Preconditions.checkArgument(arguments.size() == 8, "Expected 8 arguments for time-series agg");
Expand All @@ -67,7 +70,7 @@ public TimeSeriesAggregationFunction(List<ExpressionContext> arguments) {
long firstBucketValue = arguments.get(4).getLiteral().getLongValue();
long bucketWindowSeconds = arguments.get(5).getLiteral().getLongValue();
int numBuckets = arguments.get(6).getLiteral().getIntValue();
Map<String, String> aggParams = AggInfo.loadSerializedParams(arguments.get(7).getLiteral().getStringValue());
Map<String, String> aggParams = AggInfo.deserializeParams(arguments.get(7).getLiteral().getStringValue());
AggInfo aggInfo = new AggInfo(aggFunctionName, true /* is partial agg */, aggParams);
// Set all values
_factory = TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(language);
Expand Down Expand Up @@ -106,32 +109,39 @@ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int ma
public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
int[] timeIndexes = blockValSetMap.get(_timeExpression).getIntValuesSV();
double[] values = blockValSetMap.get(_valueExpression).getDoubleValuesSV();
BaseTimeSeriesBuilder currentSeriesBuilder = aggregationResultHolder.getResult();
if (currentSeriesBuilder == null) {
currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets,
BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
aggregationResultHolder.setValue(currentSeriesBuilder);
}
for (int docIndex = 0; docIndex < length; docIndex++) {
currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]);
BlockValSet valueBlockValSet = blockValSetMap.get(_valueExpression);
switch (valueBlockValSet.getValueType()) {
case DOUBLE:
case LONG:
case INT:
aggregateNumericValues(length, timeIndexes, aggregationResultHolder, valueBlockValSet);
break;
case STRING:
aggregateStringValues(length, timeIndexes, aggregationResultHolder, valueBlockValSet);
break;
default:
throw new UnsupportedOperationException(String.format("Unsupported type: %s in aggregate",
valueBlockValSet.getValueType()));
}
}

@Override
public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
final int[] timeIndexes = blockValSetMap.get(_timeExpression).getIntValuesSV();
final double[] values = blockValSetMap.get(_valueExpression).getDoubleValuesSV();
for (int docIndex = 0; docIndex < length; docIndex++) {
int groupId = groupKeyArray[docIndex];
BaseTimeSeriesBuilder currentSeriesBuilder = groupByResultHolder.getResult(groupId);
if (currentSeriesBuilder == null) {
currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets,
BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
groupByResultHolder.setValueForKey(groupId, currentSeriesBuilder);
}
currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]);
BlockValSet valueBlockValSet = blockValSetMap.get(_valueExpression);
switch (valueBlockValSet.getValueType()) {
case DOUBLE:
case LONG:
case INT:
aggregateGroupByNumericValues(length, groupKeyArray, timeIndexes, groupByResultHolder, valueBlockValSet);
break;
case STRING:
aggregateGroupByStringValues(length, groupKeyArray, timeIndexes, groupByResultHolder, valueBlockValSet);
break;
default:
throw new UnsupportedOperationException(String.format("Unsupported type: %s in aggregate",
valueBlockValSet.getValueType()));
}
}

Expand Down Expand Up @@ -178,6 +188,64 @@ public String toExplainString() {
return "TIME_SERIES";
}

private void aggregateNumericValues(int length, int[] timeIndexes, AggregationResultHolder resultHolder,
BlockValSet blockValSet) {
double[] values = blockValSet.getDoubleValuesSV();
BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult();
if (currentSeriesBuilder == null) {
currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets,
BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
resultHolder.setValue(currentSeriesBuilder);
}
for (int docIndex = 0; docIndex < length; docIndex++) {
currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]);
}
}

private void aggregateStringValues(int length, int[] timeIndexes, AggregationResultHolder resultHolder,
BlockValSet blockValSet) {
String[] values = blockValSet.getStringValuesSV();
BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult();
if (currentSeriesBuilder == null) {
currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets,
BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
resultHolder.setValue(currentSeriesBuilder);
}
for (int docIndex = 0; docIndex < length; docIndex++) {
currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]);
}
}

private void aggregateGroupByNumericValues(int length, int[] groupKeyArray, int[] timeIndexes,
GroupByResultHolder resultHolder, BlockValSet blockValSet) {
final double[] values = blockValSet.getDoubleValuesSV();
for (int docIndex = 0; docIndex < length; docIndex++) {
int groupId = groupKeyArray[docIndex];
BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult(groupId);
if (currentSeriesBuilder == null) {
currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets,
BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
resultHolder.setValueForKey(groupId, currentSeriesBuilder);
}
currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]);
}
}

private void aggregateGroupByStringValues(int length, int[] groupKeyArray, int[] timeIndexes,
GroupByResultHolder resultHolder, BlockValSet blockValSet) {
final String[] values = blockValSet.getStringValuesSV();
for (int docIndex = 0; docIndex < length; docIndex++) {
int groupId = groupKeyArray[docIndex];
BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult(groupId);
if (currentSeriesBuilder == null) {
currentSeriesBuilder = _factory.newTimeSeriesBuilder(_aggInfo, "TO_BE_REMOVED", _timeBuckets,
BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
resultHolder.setValueForKey(groupId, currentSeriesBuilder);
}
currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]);
}
}

public static ExpressionContext create(String language, String valueExpressionStr, ExpressionContext timeExpression,
TimeBuckets timeBuckets, AggInfo aggInfo) {
ExpressionContext valueExpression = RequestContextUtils.getExpression(valueExpressionStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class FunctionRegistryTest {
TransformFunctionType.ARRAY_AVERAGE, TransformFunctionType.ARRAY_MIN, TransformFunctionType.ARRAY_MAX,
TransformFunctionType.ARRAY_SUM, TransformFunctionType.VALUE_IN, TransformFunctionType.IN_ID_SET,
TransformFunctionType.GROOVY, TransformFunctionType.CLP_DECODE, TransformFunctionType.CLP_ENCODED_VARS_MATCH,
TransformFunctionType.ST_POLYGON, TransformFunctionType.ST_AREA, TransformFunctionType.ITEM);
TransformFunctionType.ST_POLYGON, TransformFunctionType.ST_AREA, TransformFunctionType.ITEM,
TransformFunctionType.TIME_SERIES_BUCKET);
private static final EnumSet<FilterKind> IGNORED_FILTER_KINDS = EnumSet.of(
// Special filter functions without implementation
FilterKind.TEXT_MATCH, FilterKind.TEXT_CONTAINS, FilterKind.JSON_MATCH, FilterKind.VECTOR_SIMILARITY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void testTimeSeriesSumQuery() {
new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get());
InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS);
assertTrue(instanceResponse.getResultsBlock() instanceof AggregationResultsBlock);
TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.handleAggregationResultsBlock(timeBuckets,
TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets,
(AggregationResultsBlock) instanceResponse.getResultsBlock());
assertEquals(timeSeriesBlock.getSeriesMap().size(), 1);
assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[0]);
Expand All @@ -247,7 +247,7 @@ public void testTimeSeriesMaxQuery() {
InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS);
assertTrue(instanceResponse.getResultsBlock() instanceof GroupByResultsBlock);
GroupByResultsBlock resultsBlock = (GroupByResultsBlock) instanceResponse.getResultsBlock();
TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.handleGroupByResultsBlock(timeBuckets, resultsBlock);
TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets, resultsBlock);
assertEquals(5, timeSeriesBlock.getSeriesMap().size());
// For any city, say "New York", the max order item count should be 4
boolean foundNewYork = false;
Expand Down Expand Up @@ -275,7 +275,7 @@ public void testTimeSeriesMinQuery() {
new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get());
InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS);
assertTrue(instanceResponse.getResultsBlock() instanceof GroupByResultsBlock);
TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.handleGroupByResultsBlock(timeBuckets,
TimeSeriesBlock timeSeriesBlock = TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets,
(GroupByResultsBlock) instanceResponse.getResultsBlock());
assertEquals(5, timeSeriesBlock.getSeriesMap().size());
// For any city, say "Chicago", the min order item count should be 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ public TimeSeriesBlock getNextBlock() {
throw new RuntimeException("Error running time-series query: " + message);
}
if (instanceResponseBlock.getResultsBlock() instanceof GroupByResultsBlock) {
return TimeSeriesOperatorUtils.handleGroupByResultsBlock(_context.getInitialTimeBuckets(),
return TimeSeriesOperatorUtils.buildTimeSeriesBlock(_context.getInitialTimeBuckets(),
(GroupByResultsBlock) instanceResponseBlock.getResultsBlock());
} else if (instanceResponseBlock.getResultsBlock() instanceof AggregationResultsBlock) {
return TimeSeriesOperatorUtils.handleAggregationResultsBlock(_context.getInitialTimeBuckets(),
return TimeSeriesOperatorUtils.buildTimeSeriesBlock(_context.getInitialTimeBuckets(),
(AggregationResultsBlock) instanceResponseBlock.getResultsBlock());
} else if (instanceResponseBlock.getResultsBlock() == null) {
throw new IllegalStateException("Found null results block in time-series query");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,22 @@
* AggInfo is used to represent the aggregation function and its parameters.
* Aggregation functions are stored as a string, since time-series languages
* are allowed to implement their own aggregation functions.
*
* The class now includes a map of parameters, which can store various
* <br />
* The class also includes a map of parameters, which can store various
* configuration options for the aggregation function. This allows for
* more flexibility in defining and customizing aggregations.
*
* more flexibility in defining and customizing aggregations. The parameters
* <b>must be able to serialize and deserialize</b> via the {@link #serializeParams(Map)}
* and {@link #deserializeParams(String)}.
* <br />
* Common parameters might include:
* - window: Defines the time window for aggregation
*
* <br />
* Example usage:
* Map<String, String> params = new HashMap<>();
* params.put("window", "5m");
* AggInfo aggInfo = new AggInfo("rate", true, params);
* <pre>
* Map<String, String> params = new HashMap<>();
* params.put("window", "5m");
* AggInfo aggInfo = new AggInfo("rate", true, params);
* </pre>
*/
public class AggInfo {
private final String _aggFunction;
Expand Down Expand Up @@ -104,7 +108,7 @@ public static String serializeParams(Map<String, String> params) {
return builder.toString();
}

public static Map<String, String> loadSerializedParams(String serialized) {
public static Map<String, String> deserializeParams(String serialized) {
Map<String, String> result = new HashMap<>();
if (StringUtils.isBlank(serialized)) {
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public abstract class BaseTimeSeriesBuilder {

/**
* <b>Note:</b> The leaf stage will use {@link #UNINITIALISED_TAG_NAMES} and {@link #UNINITIALISED_TAG_VALUES} during
* the aggregation. This is because tag values are materialized very late.
* the aggregation. This is because tag values are materialized after the Combine Operator.
*/
public BaseTimeSeriesBuilder(String id, @Nullable Long[] timeValues, @Nullable TimeBuckets timeBuckets,
List<String> tagNames, Object[] tagValues) {
Expand All @@ -58,7 +58,7 @@ public BaseTimeSeriesBuilder(String id, @Nullable Long[] timeValues, @Nullable T
public abstract void addValueAtIndex(int timeBucketIndex, Double value);

public void addValueAtIndex(int timeBucketIndex, String value) {
throw new IllegalStateException("This aggregation function does not support string input");
throw new UnsupportedOperationException("This aggregation function does not support string input");
}

public abstract void addValue(long timeValue, Double value);
Expand Down

0 comments on commit 5950b1d

Please sign in to comment.