Skip to content

Commit

Permalink
Add tests that verify timestamp indexes can be used in MSE
Browse files Browse the repository at this point in the history
  • Loading branch information
gortiz committed Dec 20, 2024
1 parent 1338c15 commit 71c958e
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
Expand All @@ -28,7 +29,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -91,9 +91,10 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
protected static final List<String> DEFAULT_NO_DICTIONARY_COLUMNS =
Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime");
protected static final String DEFAULT_SORTED_COLUMN = "Carrier";
protected static final List<String> DEFAULT_INVERTED_INDEX_COLUMNS = Arrays.asList("FlightNum", "Origin", "Quarter");
private static final List<String> DEFAULT_BLOOM_FILTER_COLUMNS = Arrays.asList("FlightNum", "Origin");
private static final List<String> DEFAULT_RANGE_INDEX_COLUMNS = Collections.singletonList("Origin");
protected static final List<String> DEFAULT_INVERTED_INDEX_COLUMNS
= Lists.newArrayList("FlightNum", "Origin", "Quarter");
private static final List<String> DEFAULT_BLOOM_FILTER_COLUMNS = Lists.newArrayList("FlightNum", "Origin");
private static final List<String> DEFAULT_RANGE_INDEX_COLUMNS = Lists.newArrayList("Origin");
protected static final int DEFAULT_NUM_REPLICAS = 1;
protected static final boolean DEFAULT_NULL_HANDLING_ENABLED = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ protected JsonNode getDebugInfo(final String uri)
/**
* Queries the broker's sql query endpoint (/query/sql)
*/
protected JsonNode postQuery(String query)
public JsonNode postQuery(String query)
throws Exception {
return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(), useMultiStageQueryEngine()), null,
getExtraQueryProperties());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import org.intellij.lang.annotations.Language;
import org.testng.Assert;


public interface ExplainIntegrationTestTrait {

JsonNode postQuery(@Language("sql") String query)
throws Exception;

default void explainLogical(@Language("sql") String query, String expected) {
try {
JsonNode jsonNode = postQuery("explain plan without implementation for " + query);
JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);

Assert.assertEquals(plan.asText(), expected);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

default void explain(@Language("sql") String query, String expected) {
try {
JsonNode jsonNode = postQuery("explain plan for " + query);
JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);

Assert.assertEquals(plan.asText(), expected);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

default void explainVerbose(@Language("sql") String query, String expected) {
try {
JsonNode jsonNode = postQuery("set explainPlanVerbose=true; explain plan for " + query);
JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);

String actual = plan.asText()
.replaceAll("numDocs=\\[[^\\]]*]", "numDocs=[any]")
.replaceAll("segment=\\[[^\\]]*]", "segment=[any]")
.replaceAll("totalDocs=\\[[^\\]]*]", "totalDocs=[any]");


Assert.assertEquals(actual, expected);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,22 @@
*/
package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.util.List;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
import org.intellij.lang.annotations.Language;
import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class MultiStageEngineExplainIntegrationTest extends BaseClusterIntegrationTest {
public class MultiStageEngineExplainIntegrationTest extends BaseClusterIntegrationTest
implements ExplainIntegrationTestTrait {

@BeforeClass
public void setUp()
Expand Down Expand Up @@ -161,7 +159,7 @@ public void simpleQueryVerbose() {
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n");
//@formatter:on
//@formatter:on
}

@Test
Expand All @@ -171,7 +169,7 @@ public void simpleQueryLogical() {
"Execution Plan\n"
+ "LogicalProject(EXPR$0=[1])\n"
+ " LogicalTableScan(table=[[default, mytable]])\n");
//@formatter:on
//@formatter:on
}

@AfterClass
Expand All @@ -186,49 +184,4 @@ public void tearDown()

FileUtils.deleteDirectory(_tempDir);
}

private void explainVerbose(@Language("sql") String query, String expected) {
try {
JsonNode jsonNode = postQuery("set explainPlanVerbose=true; explain plan for " + query);
JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);

String actual = plan.asText()
.replaceAll("numDocs=\\[[^\\]]*]", "numDocs=[any]")
.replaceAll("segment=\\[[^\\]]*]", "segment=[any]")
.replaceAll("totalDocs=\\[[^\\]]*]", "totalDocs=[any]");


Assert.assertEquals(actual, expected);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void explain(@Language("sql") String query, String expected) {
try {
JsonNode jsonNode = postQuery("explain plan for " + query);
JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);

Assert.assertEquals(plan.asText(), expected);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void explainLogical(@Language("sql") String query, String expected) {
try {
JsonNode jsonNode = postQuery("set explainAskingServers=false; explain plan for " + query);
JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);

Assert.assertEquals(plan.asText(), expected);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package org.apache.pinot.integration.tests.custom;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
import org.apache.pinot.integration.tests.ExplainIntegrationTestTrait;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TimestampConfig;
import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class TimestampIndexTest extends BaseClusterIntegrationTest implements ExplainIntegrationTestTrait {
@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);

// Start the Pinot cluster
startZk();
startController();
startBroker();
startServers(2);

// Create and upload the schema and table config
Schema schema = createSchema();
addSchema(schema);
TableConfig tableConfig = createOfflineTableConfig();
addTableConfig(tableConfig);

// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);

// Create and upload segments
ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
uploadSegments(getTableName(), _tarDir);

// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
}

protected void overrideBrokerConf(PinotConfiguration brokerConf) {
String property = CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN;
brokerConf.setProperty(property, "true");
}

@Test
public void timestampIndexSubstitutedInProjectionsMSE() {
setUseMultiStageQueryEngine(true);
explain("SELECT datetrunc('SECOND',ArrTime) FROM mytable",
"Execution Plan\n"
+ "PinotLogicalExchange(distribution=[broadcast])\n"
+ " LeafStageCombineOperator(table=[mytable])\n"
+ " StreamingInstanceResponse\n"
+ " StreamingCombineSelect\n"
+ " SelectStreaming(table=[mytable], totalDocs=[115545])\n"
+ " Project(columns=[[$ArrTime$SECOND]])\n"
+ " DocIdSet(maxDocs=[120000])\n"
+ " FilterMatchEntireSegment(numDocs=[115545])\n");
}

@Test
public void timestampIndexSubstitutedInFiltersMSE() {
setUseMultiStageQueryEngine(true);
explain("SELECT 1 FROM mytable where datetrunc('SECOND',ArrTime) > 1",
"Execution Plan\n"
+ "PinotLogicalExchange(distribution=[broadcast])\n"
+ " LeafStageCombineOperator(table=[mytable])\n"
+ " StreamingInstanceResponse\n"
+ " StreamingCombineSelect\n"
+ " SelectStreaming(table=[mytable], totalDocs=[115545])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[120000])\n"
+ " FilterRangeIndex(predicate=[$ArrTime$SECOND > '1'], "
+ "indexLookUp=[range_index], operator=[RANGE])\n");
}

@Test
public void timestampIndexSubstitutedInAggregatesMSE() {
setUseMultiStageQueryEngine(true);
explain("SELECT sum(case when datetrunc('SECOND',ArrTime) > 1 then 2 else 0 end) FROM mytable",
"Execution Plan\n"
+ "LogicalProject(EXPR$0=[CASE(=($1, 0), null:BIGINT, $0)])\n"
+ " PinotLogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[COUNT($1)], aggType=[FINAL])\n"
+ " PinotLogicalExchange(distribution=[hash])\n"
+ " LeafStageCombineOperator(table=[mytable])\n"
+ " StreamingInstanceResponse\n"
+ " CombineAggregate\n"
+ " AggregateFiltered(aggregations=[[sum('2'), count(*)]])\n"
+ " Transform(expressions=[['2']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[120000])\n"
+ " FilterRangeIndex(predicate=[$ArrTime$SECOND > '1'], indexLookUp=[range_index], "
+ "operator=[RANGE])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[120000])\n"
+ " FilterMatchEntireSegment(numDocs=[115545])\n");
}

@Test
public void timestampIndexSubstitutedInJOinMSE() {
setUseMultiStageQueryEngine(true);
explain("SELECT 1 "
+ "FROM mytable as a1 "
+ "join mytable as a2 "
+ "on datetrunc('SECOND',a1.ArrTime) = datetrunc('DAY',a2.ArrTime)",
"Execution Plan\n"
+ "LogicalProject(EXPR$0=[1])\n"
+ " LogicalJoin(condition=[=($0, $1)], joinType=[inner])\n"
+ " PinotLogicalExchange(distribution=[hash[0]])\n"
+ " LeafStageCombineOperator(table=[mytable])\n"
+ " StreamingInstanceResponse\n"
+ " StreamingCombineSelect\n"
+ " SelectStreaming(table=[mytable], totalDocs=[115545])\n"
+ " Project(columns=[[$ArrTime$SECOND]])\n" // substituted because we have SECOND granularity
+ " DocIdSet(maxDocs=[120000])\n"
+ " FilterMatchEntireSegment(numDocs=[115545])\n"
+ " PinotLogicalExchange(distribution=[hash[0]])\n"
+ " LeafStageCombineOperator(table=[mytable])\n"
+ " StreamingInstanceResponse\n"
+ " StreamingCombineSelect\n"
+ " SelectStreaming(table=[mytable], totalDocs=[115545])\n"
+ " Transform(expressions=[[datetrunc('DAY',ArrTime)]])\n" // we don't set the DAY granularity
+ " Project(columns=[[ArrTime]])\n"
+ " DocIdSet(maxDocs=[120000])\n"
+ " FilterMatchEntireSegment(numDocs=[115545])\n");
}


protected TableConfig createOfflineTableConfig() {
String colName = "ArrTime";

TableConfig tableConfig = super.createOfflineTableConfig();
List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
if (fieldConfigList == null) {
fieldConfigList = new ArrayList<>();
tableConfig.setFieldConfigList(fieldConfigList);
} else {
fieldConfigList.stream()
.filter(fieldConfig -> fieldConfig.getName().equals(colName))
.findFirst()
.ifPresent(
fieldConfig -> {
throw new IllegalStateException("Time column already exists in the field config list");
}
);
}
FieldConfig newTimeFieldConfig = new FieldConfig.Builder(colName)
.withTimestampConfig(
new TimestampConfig(List.of(TimestampIndexGranularity.SECOND))
)
.build();
fieldConfigList.add(newTimeFieldConfig);
return tableConfig;
}
}

0 comments on commit 71c958e

Please sign in to comment.