diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index b031e47617dc..72b69a24fadb 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -106,7 +106,6 @@ import org.apache.pinot.spi.utils.CommonConstants.Broker; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; import org.apache.pinot.spi.utils.DataSizeUtils; -import org.apache.pinot.spi.utils.TimestampIndexUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.FilterKind; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; @@ -938,24 +937,7 @@ private void setTimestampIndexExpressionOverrideHints(@Nullable Expression expre return; } Function function = expression.getFunctionCall(); - switch (function.getOperator()) { - case "datetrunc": - String granularString = function.getOperands().get(0).getLiteral().getStringValue().toUpperCase(); - Expression timeExpression = function.getOperands().get(1); - if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3 && "MILLISECONDS".equalsIgnoreCase( - function.getOperands().get(2).getLiteral().getStringValue()))) && TimestampIndexUtils.isValidGranularity( - granularString) && timeExpression.getIdentifier() != null) { - String timeColumn = timeExpression.getIdentifier().getName(); - String timeColumnWithGranularity = TimestampIndexUtils.getColumnWithGranularity(timeColumn, granularString); - if (timestampIndexColumns.contains(timeColumnWithGranularity)) { - pinotQuery.putToExpressionOverrideHints(expression, - RequestUtils.getIdentifierExpression(timeColumnWithGranularity)); - } - } - break; - default: - break; - } + RequestUtils.applyTimestampIndexOverrideHints(expression, pinotQuery, timestampIndexColumns::contains); function.getOperands() .forEach(operand -> setTimestampIndexExpressionOverrideHints(operand, timestampIndexColumns, pinotQuery)); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java index b8c013427d1c..2d1e38d84a64 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -42,6 +43,7 @@ import org.apache.calcite.sql.SqlNumericLiteral; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.function.TransformFunctionType; import org.apache.pinot.common.request.DataSource; import org.apache.pinot.common.request.Expression; import org.apache.pinot.common.request.ExpressionType; @@ -53,6 +55,7 @@ import org.apache.pinot.spi.utils.BigDecimalUtils; import org.apache.pinot.spi.utils.BytesUtils; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request; +import org.apache.pinot.spi.utils.TimestampIndexUtils; import org.apache.pinot.sql.FilterKind; import org.apache.pinot.sql.parsers.CalciteSqlParser; import org.apache.pinot.sql.parsers.SqlCompilationException; @@ -631,4 +634,32 @@ public static Map getOptionsFromJson(JsonNode request, String op public static Map getOptionsFromString(String optionStr) { return Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=').split(optionStr); } + + public static void applyTimestampIndexOverrideHints(Expression expression, PinotQuery query) { + applyTimestampIndexOverrideHints(expression, query, timeColumnWithGranularity -> true); + } + + public static void applyTimestampIndexOverrideHints( + Expression expression, PinotQuery query, Predicate timeColumnWithGranularityPredicate + ) { + if (!expression.isSetFunctionCall()) { + return; + } + Function function = expression.getFunctionCall(); + if (!function.getOperator().equalsIgnoreCase(TransformFunctionType.DATE_TRUNC.getName())) { + return; + } + String granularString = function.getOperands().get(0).getLiteral().getStringValue().toUpperCase(); + Expression timeExpression = function.getOperands().get(1); + if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3 && "MILLISECONDS".equalsIgnoreCase( + function.getOperands().get(2).getLiteral().getStringValue()))) && TimestampIndexUtils.isValidGranularity( + granularString) && timeExpression.getIdentifier() != null) { + String timeColumn = timeExpression.getIdentifier().getName(); + String timeColumnWithGranularity = TimestampIndexUtils.getColumnWithGranularity(timeColumn, granularString); + + if (timeColumnWithGranularityPredicate.test(timeColumnWithGranularity)) { + query.putToExpressionOverrideHints(expression, getIdentifierExpression(timeColumnWithGranularity)); + } + } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index cadce4bcf6d0..ca742456068e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FilterContext; @@ -46,6 +47,7 @@ import org.apache.pinot.core.plan.StreamingInstanceResponsePlanNode; import org.apache.pinot.core.plan.StreamingSelectionPlanNode; import org.apache.pinot.core.plan.TimeSeriesPlanNode; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.executor.ResultsBlockStreamer; import org.apache.pinot.core.query.prefetch.FetchPlanner; import org.apache.pinot.core.query.prefetch.FetchPlannerRegistry; @@ -321,6 +323,7 @@ public Plan makeStreamingInstancePlan(List segmentContexts, Quer public PlanNode makeStreamingSegmentPlanNode(SegmentContext segmentContext, QueryContext queryContext) { if (QueryContextUtils.isSelectionOnlyQuery(queryContext) && queryContext.getLimit() != 0) { // Use streaming operator only for non-empty selection-only query + rewriteQueryContextWithHints(queryContext, segmentContext.getIndexSegment()); return new StreamingSelectionPlanNode(segmentContext, queryContext); } else { return makeSegmentPlanNode(segmentContext, queryContext); @@ -344,6 +347,17 @@ public static void rewriteQueryContextWithHints(QueryContext queryContext, Index selectExpressions.replaceAll( expression -> overrideWithExpressionHints(expression, indexSegment, expressionOverrideHints)); + List> filtAggrFuns = queryContext.getFilteredAggregationFunctions(); + if (filtAggrFuns != null) { + for (Pair filteredAggregationFunction : filtAggrFuns) { + FilterContext right = filteredAggregationFunction.getRight(); + if (right != null) { + Predicate predicate = right.getPredicate(); + predicate.setLhs(overrideWithExpressionHints(predicate.getLhs(), indexSegment, expressionOverrideHints)); + } + } + } + List groupByExpressions = queryContext.getGroupByExpressions(); if (CollectionUtils.isNotEmpty(groupByExpressions)) { groupByExpressions.replaceAll( diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index a3b46ad2701e..a6cbad653efb 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -186,22 +186,22 @@ protected String getSortedColumn() { @Nullable protected List getInvertedIndexColumns() { - return DEFAULT_INVERTED_INDEX_COLUMNS; + return new ArrayList<>(DEFAULT_INVERTED_INDEX_COLUMNS); } @Nullable protected List getNoDictionaryColumns() { - return DEFAULT_NO_DICTIONARY_COLUMNS; + return new ArrayList<>(DEFAULT_NO_DICTIONARY_COLUMNS); } @Nullable protected List getRangeIndexColumns() { - return DEFAULT_RANGE_INDEX_COLUMNS; + return new ArrayList<>(DEFAULT_RANGE_INDEX_COLUMNS); } @Nullable protected List getBloomFilterColumns() { - return DEFAULT_BLOOM_FILTER_COLUMNS; + return new ArrayList<>(DEFAULT_BLOOM_FILTER_COLUMNS); } @Nullable diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index 1338e9f529d3..05e534a20389 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -509,7 +509,7 @@ protected JsonNode getDebugInfo(final String uri) /** * Queries the broker's sql query endpoint (/query/sql) */ - protected JsonNode postQuery(String query) + public JsonNode postQuery(String query) throws Exception { return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(), useMultiStageQueryEngine()), null, getExtraQueryProperties()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java new file mode 100644 index 000000000000..cbe0ffd09fbe --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.pinot.spi.utils.JsonUtils; +import org.intellij.lang.annotations.Language; +import org.testng.Assert; + + +public interface ExplainIntegrationTestTrait { + + JsonNode postQuery(@Language("sql") String query) + throws Exception; + + default void explainLogical(@Language("sql") String query, String expected) { + try { + JsonNode jsonNode = postQuery("explain plan without implementation for " + query); + JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); + + Assert.assertEquals(plan.asText(), expected); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + default void explainSse(boolean verbose, @Language("sql") String query, Object... expected) { + try { + @Language("sql") + String actualQuery = "SET useMultistageEngine=false; explain plan for " + query; + if (verbose) { + actualQuery = "SET explainPlanVerbose=true; " + actualQuery; + } + JsonNode jsonNode = postQuery(actualQuery); + JsonNode plan = jsonNode.get("resultTable").get("rows"); + List planAsStrList = (List) JsonUtils.jsonNodeToObject(plan, List.class).stream() + .map(Object::toString) + .collect(Collectors.toList()); + + if (planAsStrList.size() != expected.length) { + Assert.fail("Actual: " + planAsStrList + ", Expected: " + Arrays.toString(expected) + + ". Size mismatch. Actual: " + planAsStrList.size() + ", Expected: " + expected.length); + } + for (int i = 0; i < planAsStrList.size(); i++) { + String planAsStr = planAsStrList.get(i); + Object expectedObj = expected[i]; + if (expectedObj instanceof Pattern) { + Assert.assertTrue(((Pattern) expectedObj).matcher(planAsStr).matches(), + "Pattern doesn't match. Actual: " + planAsStr + ", Expected: " + expectedObj + + ", Actual complete plan: " + planAsStrList); + } else if (expectedObj instanceof String) { + Assert.assertEquals(planAsStr, expectedObj, "Actual: " + planAsStr + ", Expected: " + expectedObj + + ", Actual complete plan: " + planAsStrList); + } else { + Assert.fail("Expected object should be either Pattern or String in position " + i + ". Actual: " + + expectedObj + " of type " + expectedObj.getClass()); + } + } + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + default void explainSse(@Language("sql") String query, Object... expected) { + explainSse(false, query, expected); + } + + default void explain(@Language("sql") String query, String expected) { + try { + JsonNode jsonNode = postQuery("explain plan for " + query); + JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); + + Assert.assertEquals(plan.asText(), expected); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + default void explainVerbose(@Language("sql") String query, String expected) { + try { + JsonNode jsonNode = postQuery("set explainPlanVerbose=true; explain plan for " + query); + JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); + + String actual = plan.asText() + .replaceAll("numDocs=\\[[^\\]]*]", "numDocs=[any]") + .replaceAll("segment=\\[[^\\]]*]", "segment=[any]") + .replaceAll("totalDocs=\\[[^\\]]*]", "totalDocs=[any]"); + + + Assert.assertEquals(actual, expected); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java index 8303a583d382..52c568780143 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.integration.tests; -import com.fasterxml.jackson.databind.JsonNode; import java.io.File; import java.util.List; import org.apache.pinot.spi.config.table.TableConfig; @@ -26,16 +25,15 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.util.TestUtils; -import org.intellij.lang.annotations.Language; import org.testcontainers.shaded.org.apache.commons.io.FileUtils; -import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -public class MultiStageEngineExplainIntegrationTest extends BaseClusterIntegrationTest { +public class MultiStageEngineExplainIntegrationTest extends BaseClusterIntegrationTest + implements ExplainIntegrationTestTrait { @BeforeClass public void setUp() @@ -78,7 +76,6 @@ public void resetMultiStage() { @Test public void simpleQuery() { explain("SELECT 1 FROM mytable", - //@formatter:off "Execution Plan\n" + "PinotLogicalExchange(distribution=[broadcast])\n" + " LeafStageCombineOperator(table=[mytable])\n" @@ -89,13 +86,11 @@ public void simpleQuery() { + " Project(columns=[[]])\n" + " DocIdSet(maxDocs=[120000])\n" + " FilterMatchEntireSegment(numDocs=[115545])\n"); - //@formatter:on } @Test public void simpleQueryVerbose() { explainVerbose("SELECT 1 FROM mytable", - //@formatter:off "Execution Plan\n" + "PinotLogicalExchange(distribution=[broadcast])\n" + " LeafStageCombineOperator(table=[mytable])\n" @@ -161,17 +156,14 @@ public void simpleQueryVerbose() { + " Project(columns=[[]])\n" + " DocIdSet(maxDocs=[10000])\n" + " FilterMatchEntireSegment(numDocs=[any])\n"); - //@formatter:on } @Test public void simpleQueryLogical() { explainLogical("SELECT 1 FROM mytable", - //@formatter:off "Execution Plan\n" + "LogicalProject(EXPR$0=[1])\n" + " LogicalTableScan(table=[[default, mytable]])\n"); - //@formatter:on } @AfterClass @@ -186,49 +178,4 @@ public void tearDown() FileUtils.deleteDirectory(_tempDir); } - - private void explainVerbose(@Language("sql") String query, String expected) { - try { - JsonNode jsonNode = postQuery("set explainPlanVerbose=true; explain plan for " + query); - JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); - - String actual = plan.asText() - .replaceAll("numDocs=\\[[^\\]]*]", "numDocs=[any]") - .replaceAll("segment=\\[[^\\]]*]", "segment=[any]") - .replaceAll("totalDocs=\\[[^\\]]*]", "totalDocs=[any]"); - - - Assert.assertEquals(actual, expected); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private void explain(@Language("sql") String query, String expected) { - try { - JsonNode jsonNode = postQuery("explain plan for " + query); - JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); - - Assert.assertEquals(plan.asText(), expected); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private void explainLogical(@Language("sql") String query, String expected) { - try { - JsonNode jsonNode = postQuery("set explainAskingServers=false; explain plan for " + query); - JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); - - Assert.assertEquals(plan.asText(), expected); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexMseTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexMseTest.java new file mode 100644 index 000000000000..072b21f3bced --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexMseTest.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.custom; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.integration.tests.BaseClusterIntegrationTest; +import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils; +import org.apache.pinot.integration.tests.ExplainIntegrationTestTrait; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TimestampConfig; +import org.apache.pinot.spi.config.table.TimestampIndexGranularity; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TimestampIndexMseTest extends BaseClusterIntegrationTest implements ExplainIntegrationTestTrait { + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServers(2); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + String property = CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN; + brokerConf.setProperty(property, "true"); + } + + @Test + public void timestampIndexSubstitutedInProjections() { + setUseMultiStageQueryEngine(true); + explain("SELECT datetrunc('SECOND',ArrTime) FROM mytable", + "Execution Plan\n" + + "PinotLogicalExchange(distribution=[broadcast])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " StreamingCombineSelect\n" + + " SelectStreaming(table=[mytable], totalDocs=[115545])\n" + + " Project(columns=[[$ArrTime$SECOND]])\n" + + " DocIdSet(maxDocs=[120000])\n" + + " FilterMatchEntireSegment(numDocs=[115545])\n"); + } + + @Test + public void timestampIndexSubstitutedInFilters() { + setUseMultiStageQueryEngine(true); + explain("SELECT 1 FROM mytable where datetrunc('SECOND',ArrTime) > 1", + "Execution Plan\n" + + "PinotLogicalExchange(distribution=[broadcast])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " StreamingCombineSelect\n" + + " SelectStreaming(table=[mytable], totalDocs=[115545])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[120000])\n" + + " FilterRangeIndex(predicate=[$ArrTime$SECOND > '1'], indexLookUp=[range_index], " + + "operator=[RANGE])\n"); + } + + @Test + public void timestampIndexSubstitutedInAggregateFilter() { + setUseMultiStageQueryEngine(true); + explain("SELECT sum(case when datetrunc('SECOND',ArrTime) > 1 then 2 else 0 end) FROM mytable", + "Execution Plan\n" + + "LogicalProject(EXPR$0=[CASE(=($1, 0), null:BIGINT, $0)])\n" + + " PinotLogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[COUNT($1)], aggType=[FINAL])\n" + + " PinotLogicalExchange(distribution=[hash])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " CombineAggregate\n" + + " AggregateFiltered(aggregations=[[sum('2'), count(*)]])\n" + + " Transform(expressions=[['2']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[120000])\n" + + " FilterRangeIndex(predicate=[$ArrTime$SECOND > '1'], indexLookUp=[range_index], " + + "operator=[RANGE])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[120000])\n" + + " FilterMatchEntireSegment(numDocs=[115545])\n"); + } + + @Test + public void timestampIndexSubstitutedInGroupBy() { + setUseMultiStageQueryEngine(true); + explain("SELECT count(*) FROM mytable group by datetrunc('SECOND',ArrTime)", + "Execution Plan\n" + + "LogicalProject(EXPR$0=[$1])\n" + + " PinotLogicalAggregate(group=[{0}], agg#0=[COUNT($1)], aggType=[FINAL])\n" + + " PinotLogicalExchange(distribution=[hash[0]])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " CombineGroupBy\n" + + " GroupBy(groupKeys=[[$ArrTime$SECOND]], aggregations=[[count(*)]])\n" + + " Project(columns=[[$ArrTime$SECOND]])\n" + + " DocIdSet(maxDocs=[120000])\n" + + " FilterMatchEntireSegment(numDocs=[115545])\n"); + } + + @Test + public void timestampIndexSubstitutedInJoinMSE() { + setUseMultiStageQueryEngine(true); + explain("SELECT 1 " + + "FROM mytable as a1 " + + "join mytable as a2 " + + "on datetrunc('SECOND',a1.ArrTime) = datetrunc('DAY',a2.ArrTime)", + "Execution Plan\n" + + "LogicalProject(EXPR$0=[1])\n" + + " LogicalJoin(condition=[=($0, $1)], joinType=[inner])\n" + + " PinotLogicalExchange(distribution=[hash[0]])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " StreamingCombineSelect\n" + + " SelectStreaming(table=[mytable], totalDocs=[115545])\n" + + " Project(columns=[[$ArrTime$SECOND]])\n" // substituted because we have SECOND granularity + + " DocIdSet(maxDocs=[120000])\n" + + " FilterMatchEntireSegment(numDocs=[115545])\n" + + " PinotLogicalExchange(distribution=[hash[0]])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " StreamingCombineSelect\n" + + " SelectStreaming(table=[mytable], totalDocs=[115545])\n" + + " Transform(expressions=[[datetrunc('DAY',ArrTime)]])\n" // we don't set the DAY granularity + + " Project(columns=[[ArrTime]])\n" + + " DocIdSet(maxDocs=[120000])\n" + + " FilterMatchEntireSegment(numDocs=[115545])\n"); + } + + + protected TableConfig createOfflineTableConfig() { + String colName = "ArrTime"; + + TableConfig tableConfig = super.createOfflineTableConfig(); + List fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList == null) { + fieldConfigList = new ArrayList<>(); + tableConfig.setFieldConfigList(fieldConfigList); + } else { + fieldConfigList.stream() + .filter(fieldConfig -> fieldConfig.getName().equals(colName)) + .findFirst() + .ifPresent( + fieldConfig -> { + throw new IllegalStateException("Time column already exists in the field config list"); + } + ); + } + FieldConfig newTimeFieldConfig = new FieldConfig.Builder(colName) + .withTimestampConfig( + new TimestampConfig(List.of(TimestampIndexGranularity.SECOND)) + ) + .build(); + fieldConfigList.add(newTimeFieldConfig); + return tableConfig; + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexSseTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexSseTest.java new file mode 100644 index 000000000000..062077869374 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexSseTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.custom; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import org.apache.pinot.integration.tests.BaseClusterIntegrationTest; +import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils; +import org.apache.pinot.integration.tests.ExplainIntegrationTestTrait; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TimestampConfig; +import org.apache.pinot.spi.config.table.TimestampIndexGranularity; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TimestampIndexSseTest extends BaseClusterIntegrationTest implements ExplainIntegrationTestTrait { + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServers(2); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + @Test + public void timestampIndexSubstitutedInProjections() { + setUseMultiStageQueryEngine(false); + explainSse("SELECT datetrunc('SECOND',ArrTime) FROM mytable", + "[BROKER_REDUCE(limit:10), 1, 0]", + "[COMBINE_SELECT, 2, 1]", + "[PLAN_START(numSegmentsForThisPlan:1), -1, -1]", + "[SELECT(selectList:$ArrTime$SECOND), 3, 2]", + "[PROJECT($ArrTime$SECOND), 4, 3]", + "[DOC_ID_SET, 5, 4]", + Pattern.compile("\\[FILTER_MATCH_ENTIRE_SEGMENT\\(docs:[0-9]+\\), 6, 5]")); + } + + @Test + public void timestampIndexSubstitutedInFilters() { + setUseMultiStageQueryEngine(false); + explainSse("SELECT ArrTime FROM mytable where datetrunc('SECOND',ArrTime) > 1", + "[BROKER_REDUCE(limit:10), 1, 0]", + "[COMBINE_SELECT, 2, 1]", + "[PLAN_START(numSegmentsForThisPlan:12), -1, -1]", + "[SELECT(selectList:ArrTime), 3, 2]", + "[PROJECT(ArrTime), 4, 3]", + "[DOC_ID_SET, 5, 4]", + "[FILTER_RANGE_INDEX(indexLookUp:range_index,operator:RANGE,predicate:$ArrTime$SECOND > '1'), 6, 5]"); + } + + @Test + public void timestampIndexSubstitutedInAggregateFilter() { + setUseMultiStageQueryEngine(false); + explainSse("SELECT sum(case when datetrunc('SECOND',ArrTime) > 1 then 2 else 0 end) FROM mytable", + "[BROKER_REDUCE(limit:10), 1, 0]", + "[COMBINE_AGGREGATE, 2, 1]", + "[PLAN_START(numSegmentsForThisPlan:1), -1, -1]", + "[AGGREGATE(aggregations:sum(case(greater_than($ArrTime$SECOND,'1'),'2','0'))), 3, 2]", + "[TRANSFORM(case(greater_than($ArrTime$SECOND,'1'),'2','0')), 4, 3]", + "[PROJECT($ArrTime$SECOND), 5, 4]", + "[DOC_ID_SET, 6, 5]", + Pattern.compile("\\[FILTER_MATCH_ENTIRE_SEGMENT\\(docs:[0-9]+\\), 7, 6]")); + } + + @Test + public void timestampIndexSubstitutedInGroupBy() { + setUseMultiStageQueryEngine(false); + explainSse("SELECT count(*) FROM mytable group by datetrunc('SECOND',ArrTime)", + "[BROKER_REDUCE(limit:10), 1, 0]", + "[COMBINE_GROUP_BY, 2, 1]", + "[PLAN_START(numSegmentsForThisPlan:1), -1, -1]", + "[GROUP_BY(groupKeys:$ArrTime$SECOND, aggregations:count(*)), 3, 2]", + "[PROJECT($ArrTime$SECOND), 4, 3]", + "[DOC_ID_SET, 5, 4]", + Pattern.compile("\\[FILTER_MATCH_ENTIRE_SEGMENT\\(docs:[0-9]+\\), 6, 5]")); + } + + protected TableConfig createOfflineTableConfig() { + String colName = "ArrTime"; + + TableConfig tableConfig = super.createOfflineTableConfig(); + List fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList == null) { + fieldConfigList = new ArrayList<>(); + tableConfig.setFieldConfigList(fieldConfigList); + } else { + fieldConfigList.stream() + .filter(fieldConfig -> fieldConfig.getName().equals(colName)) + .findFirst() + .ifPresent( + fieldConfig -> { + throw new IllegalStateException("Time column already exists in the field config list"); + } + ); + } + FieldConfig newTimeFieldConfig = new FieldConfig.Builder(colName) + .withTimestampConfig( + new TimestampConfig(List.of(TimestampIndexGranularity.SECOND)) + ) + .build(); + fieldConfigList.add(newTimeFieldConfig); + return tableConfig; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java index 1ac11809aa26..8db378471923 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java @@ -27,6 +27,7 @@ import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.request.DataSource; import org.apache.pinot.common.request.Expression; +import org.apache.pinot.common.request.Function; import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.request.RequestUtils; @@ -76,9 +77,12 @@ public Void visitAggregate(AggregateNode node, ServerPlanRequestContext context) if (!groupByList.isEmpty()) { pinotQuery.setGroupByList(groupByList); } - pinotQuery.setSelectList( - CalciteRexExpressionParser.convertAggregateList(groupByList, node.getAggCalls(), node.getFilterArgs(), - pinotQuery)); + List selectList = CalciteRexExpressionParser.convertAggregateList(groupByList, node.getAggCalls(), + node.getFilterArgs(), pinotQuery); + for (Expression expression : selectList) { + applyTimestampIndex(expression, pinotQuery); + } + pinotQuery.setSelectList(selectList); if (node.getAggType() == AggregateNode.AggType.DIRECT) { pinotQuery.putToQueryOptions(CommonConstants.Broker.Request.QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true"); } else if (node.isLeafReturnFinalResult()) { @@ -127,7 +131,9 @@ public Void visitFilter(FilterNode node, ServerPlanRequestContext context) { if (visit(node.getInputs().get(0), context)) { PinotQuery pinotQuery = context.getPinotQuery(); if (pinotQuery.getFilterExpression() == null) { - pinotQuery.setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(), pinotQuery)); + Expression expression = CalciteRexExpressionParser.toExpression(node.getCondition(), pinotQuery); + applyTimestampIndex(expression, pinotQuery); + pinotQuery.setFilterExpression(expression); } else { // if filter is already applied then it cannot have another one on leaf. context.setLeafStageBoundaryNode(node.getInputs().get(0)); @@ -191,7 +197,11 @@ public Void visitMailboxSend(MailboxSendNode node, ServerPlanRequestContext cont public Void visitProject(ProjectNode node, ServerPlanRequestContext context) { if (visit(node.getInputs().get(0), context)) { PinotQuery pinotQuery = context.getPinotQuery(); - pinotQuery.setSelectList(CalciteRexExpressionParser.convertRexNodes(node.getProjects(), pinotQuery)); + List selectList = CalciteRexExpressionParser.convertRexNodes(node.getProjects(), pinotQuery); + for (Expression expression : selectList) { + applyTimestampIndex(expression, pinotQuery); + } + pinotQuery.setSelectList(selectList); } return null; } @@ -249,4 +259,14 @@ private boolean visit(PlanNode node, ServerPlanRequestContext context) { node.visit(this, context); return context.getLeafStageBoundaryNode() == null; } + + private void applyTimestampIndex(Expression expression, PinotQuery pinotQuery) { + RequestUtils.applyTimestampIndexOverrideHints(expression, pinotQuery); + Function functionCall = expression.getFunctionCall(); + if (expression.isSetFunctionCall()) { + for (Expression operand : functionCall.getOperands()) { + applyTimestampIndex(operand, pinotQuery); + } + } + } }