diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index c6df67438894..cb658f015572 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -106,7 +106,6 @@ import org.apache.pinot.spi.utils.CommonConstants.Broker; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; import org.apache.pinot.spi.utils.DataSizeUtils; -import org.apache.pinot.spi.utils.TimestampIndexUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.FilterKind; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index a3b46ad2701e..42d0f73da58f 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.integration.tests; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -28,7 +29,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -91,9 +91,10 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { protected static final List DEFAULT_NO_DICTIONARY_COLUMNS = Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime"); protected static final String DEFAULT_SORTED_COLUMN = "Carrier"; - protected static final List DEFAULT_INVERTED_INDEX_COLUMNS = Arrays.asList("FlightNum", "Origin", "Quarter"); - private static final List DEFAULT_BLOOM_FILTER_COLUMNS = Arrays.asList("FlightNum", "Origin"); - private static final List DEFAULT_RANGE_INDEX_COLUMNS = Collections.singletonList("Origin"); + protected static final List DEFAULT_INVERTED_INDEX_COLUMNS + = Lists.newArrayList("FlightNum", "Origin", "Quarter"); + private static final List DEFAULT_BLOOM_FILTER_COLUMNS = Lists.newArrayList("FlightNum", "Origin"); + private static final List DEFAULT_RANGE_INDEX_COLUMNS = Lists.newArrayList("Origin"); protected static final int DEFAULT_NUM_REPLICAS = 1; protected static final boolean DEFAULT_NULL_HANDLING_ENABLED = false; diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index 1338e9f529d3..05e534a20389 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -509,7 +509,7 @@ protected JsonNode getDebugInfo(final String uri) /** * Queries the broker's sql query endpoint (/query/sql) */ - protected JsonNode postQuery(String query) + public JsonNode postQuery(String query) throws Exception { return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(), useMultiStageQueryEngine()), null, getExtraQueryProperties()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java new file mode 100644 index 000000000000..dde36f0204d0 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import org.intellij.lang.annotations.Language; +import org.testng.Assert; + + +public interface ExplainIntegrationTestTrait { + + JsonNode postQuery(@Language("sql") String query) + throws Exception; + + default void explainLogical(@Language("sql") String query, String expected) { + try { + JsonNode jsonNode = postQuery("explain plan without implementation for " + query); + JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); + + Assert.assertEquals(plan.asText(), expected); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + default void explain(@Language("sql") String query, String expected) { + try { + JsonNode jsonNode = postQuery("explain plan for " + query); + JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); + + Assert.assertEquals(plan.asText(), expected); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + default void explainVerbose(@Language("sql") String query, String expected) { + try { + JsonNode jsonNode = postQuery("set explainPlanVerbose=true; explain plan for " + query); + JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); + + String actual = plan.asText() + .replaceAll("numDocs=\\[[^\\]]*]", "numDocs=[any]") + .replaceAll("segment=\\[[^\\]]*]", "segment=[any]") + .replaceAll("totalDocs=\\[[^\\]]*]", "totalDocs=[any]"); + + + Assert.assertEquals(actual, expected); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java index 8303a583d382..4840257130e0 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.integration.tests; -import com.fasterxml.jackson.databind.JsonNode; import java.io.File; import java.util.List; import org.apache.pinot.spi.config.table.TableConfig; @@ -26,16 +25,15 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.util.TestUtils; -import org.intellij.lang.annotations.Language; import org.testcontainers.shaded.org.apache.commons.io.FileUtils; -import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -public class MultiStageEngineExplainIntegrationTest extends BaseClusterIntegrationTest { +public class MultiStageEngineExplainIntegrationTest extends BaseClusterIntegrationTest + implements ExplainIntegrationTestTrait { @BeforeClass public void setUp() @@ -161,7 +159,7 @@ public void simpleQueryVerbose() { + " Project(columns=[[]])\n" + " DocIdSet(maxDocs=[10000])\n" + " FilterMatchEntireSegment(numDocs=[any])\n"); - //@formatter:on + //@formatter:on } @Test @@ -171,7 +169,7 @@ public void simpleQueryLogical() { "Execution Plan\n" + "LogicalProject(EXPR$0=[1])\n" + " LogicalTableScan(table=[[default, mytable]])\n"); - //@formatter:on + //@formatter:on } @AfterClass @@ -186,49 +184,4 @@ public void tearDown() FileUtils.deleteDirectory(_tempDir); } - - private void explainVerbose(@Language("sql") String query, String expected) { - try { - JsonNode jsonNode = postQuery("set explainPlanVerbose=true; explain plan for " + query); - JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); - - String actual = plan.asText() - .replaceAll("numDocs=\\[[^\\]]*]", "numDocs=[any]") - .replaceAll("segment=\\[[^\\]]*]", "segment=[any]") - .replaceAll("totalDocs=\\[[^\\]]*]", "totalDocs=[any]"); - - - Assert.assertEquals(actual, expected); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private void explain(@Language("sql") String query, String expected) { - try { - JsonNode jsonNode = postQuery("explain plan for " + query); - JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); - - Assert.assertEquals(plan.asText(), expected); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private void explainLogical(@Language("sql") String query, String expected) { - try { - JsonNode jsonNode = postQuery("set explainAskingServers=false; explain plan for " + query); - JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); - - Assert.assertEquals(plan.asText(), expected); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexTest.java new file mode 100644 index 000000000000..52d61d957f71 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexTest.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.custom; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.integration.tests.BaseClusterIntegrationTest; +import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils; +import org.apache.pinot.integration.tests.ExplainIntegrationTestTrait; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TimestampConfig; +import org.apache.pinot.spi.config.table.TimestampIndexGranularity; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TimestampIndexTest extends BaseClusterIntegrationTest implements ExplainIntegrationTestTrait { + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServers(2); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + String property = CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN; + brokerConf.setProperty(property, "true"); + } + + @Test + public void timestampIndexSubstitutedInProjectionsMSE() { + setUseMultiStageQueryEngine(true); + explain("SELECT datetrunc('SECOND',ArrTime) FROM mytable", + "Execution Plan\n" + + "PinotLogicalExchange(distribution=[broadcast])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " StreamingCombineSelect\n" + + " SelectStreaming(table=[mytable], totalDocs=[115545])\n" + + " Project(columns=[[$ArrTime$SECOND]])\n" + + " DocIdSet(maxDocs=[120000])\n" + + " FilterMatchEntireSegment(numDocs=[115545])\n"); + } + + @Test + public void timestampIndexSubstitutedInFiltersMSE() { + setUseMultiStageQueryEngine(true); + explain("SELECT 1 FROM mytable where datetrunc('SECOND',ArrTime) > 1", + "Execution Plan\n" + + "PinotLogicalExchange(distribution=[broadcast])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " StreamingCombineSelect\n" + + " SelectStreaming(table=[mytable], totalDocs=[115545])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[120000])\n" + + " FilterRangeIndex(predicate=[$ArrTime$SECOND > '1'], " + + "indexLookUp=[range_index], operator=[RANGE])\n"); + } + + @Test + public void timestampIndexSubstitutedInAggregatesMSE() { + setUseMultiStageQueryEngine(true); + explain("SELECT sum(case when datetrunc('SECOND',ArrTime) > 1 then 2 else 0 end) FROM mytable", + "Execution Plan\n" + + "LogicalProject(EXPR$0=[CASE(=($1, 0), null:BIGINT, $0)])\n" + + " PinotLogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[COUNT($1)], aggType=[FINAL])\n" + + " PinotLogicalExchange(distribution=[hash])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " CombineAggregate\n" + + " AggregateFiltered(aggregations=[[sum('2'), count(*)]])\n" + + " Transform(expressions=[['2']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[120000])\n" + + " FilterRangeIndex(predicate=[$ArrTime$SECOND > '1'], indexLookUp=[range_index], " + + "operator=[RANGE])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[120000])\n" + + " FilterMatchEntireSegment(numDocs=[115545])\n"); + } + + @Test + public void timestampIndexSubstitutedInJOinMSE() { + setUseMultiStageQueryEngine(true); + explain("SELECT 1 " + + "FROM mytable as a1 " + + "join mytable as a2 " + + "on datetrunc('SECOND',a1.ArrTime) = datetrunc('DAY',a2.ArrTime)", + "Execution Plan\n" + + "LogicalProject(EXPR$0=[1])\n" + + " LogicalJoin(condition=[=($0, $1)], joinType=[inner])\n" + + " PinotLogicalExchange(distribution=[hash[0]])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " StreamingCombineSelect\n" + + " SelectStreaming(table=[mytable], totalDocs=[115545])\n" + + " Project(columns=[[$ArrTime$SECOND]])\n" // substituted because we have SECOND granularity + + " DocIdSet(maxDocs=[120000])\n" + + " FilterMatchEntireSegment(numDocs=[115545])\n" + + " PinotLogicalExchange(distribution=[hash[0]])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " StreamingCombineSelect\n" + + " SelectStreaming(table=[mytable], totalDocs=[115545])\n" + + " Transform(expressions=[[datetrunc('DAY',ArrTime)]])\n" // we don't set the DAY granularity + + " Project(columns=[[ArrTime]])\n" + + " DocIdSet(maxDocs=[120000])\n" + + " FilterMatchEntireSegment(numDocs=[115545])\n"); + } + + + protected TableConfig createOfflineTableConfig() { + String colName = "ArrTime"; + + TableConfig tableConfig = super.createOfflineTableConfig(); + List fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList == null) { + fieldConfigList = new ArrayList<>(); + tableConfig.setFieldConfigList(fieldConfigList); + } else { + fieldConfigList.stream() + .filter(fieldConfig -> fieldConfig.getName().equals(colName)) + .findFirst() + .ifPresent( + fieldConfig -> { + throw new IllegalStateException("Time column already exists in the field config list"); + } + ); + } + FieldConfig newTimeFieldConfig = new FieldConfig.Builder(colName) + .withTimestampConfig( + new TimestampConfig(List.of(TimestampIndexGranularity.SECOND)) + ) + .build(); + fieldConfigList.add(newTimeFieldConfig); + return tableConfig; + } +}