diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 72b69a24fadb..65a2d0a79ef7 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -837,6 +837,10 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO 1); } + // server returns STRING as default dataType for all columns in (some) scenarios where no rows are returned + // this is an attempt to return more faithful information based on other sources + ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query); + // Set total query processing time long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis(); brokerResponse.setTimeUsedMs(totalTimeMs); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java index 7751d4dc5688..2fb2aef48a2e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java @@ -419,6 +419,10 @@ public boolean isWholeNumberArray() { return INTEGRAL_ARRAY_TYPES.contains(this); } + public boolean isUnknown() { + return UNKNOWN.equals(this); + } + public boolean isCompatible(ColumnDataType anotherColumnDataType) { // All numbers are compatible with each other return this == anotherColumnDataType || (this.isNumber() && anotherColumnDataType.isNumber()) || ( diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java new file mode 100644 index 000000000000..c90b24308a3b --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.pinot.common.utils.ServiceStatus; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Integration test that checks data types for queries with no rows returned. + */ +public class EmptyResponseIntegrationTest extends BaseClusterIntegrationTestSet { + private static final int NUM_BROKERS = 1; + private static final int NUM_SERVERS = 1; + + private final List _serviceStatusCallbacks = + new ArrayList<>(getNumBrokers() + getNumServers()); + + protected int getNumBrokers() { + return NUM_BROKERS; + } + + protected int getNumServers() { + return NUM_SERVERS; + } + + @Override + protected List getFieldConfigs() { + return Collections.singletonList( + new FieldConfig("DivAirports", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), + CompressionCodec.MV_ENTRY_DICT, null)); + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + // Set hyperloglog log2m value to 12. + HelixConfigScope scope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName()) + .build(); + _helixManager.getConfigAccessor() + .set(scope, CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, Integer.toString(12)); + startBrokers(); + startServers(); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments. For exhaustive testing, concurrently upload multiple segments with the same name + // and validate correctness with parallel push protection enabled. + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + // Create a copy of _tarDir to create multiple segments with the same name. + File tarDir2 = new File(_tempDir, "tarDir2"); + FileUtils.copyDirectory(_tarDir, tarDir2); + + List tarDirs = new ArrayList<>(); + tarDirs.add(_tarDir); + tarDirs.add(tarDir2); + try { + uploadSegments(getTableName(), TableType.OFFLINE, tarDirs); + } catch (Exception e) { + // If enableParallelPushProtection is enabled and the same segment is uploaded concurrently, we could get one + // of the three exception: + // - 409 conflict of the second call enters ProcessExistingSegment + // - segmentZkMetadata creation failure if both calls entered ProcessNewSegment + // - Failed to copy segment tar file to final location due to the same segment pushed twice concurrently + // In such cases we upload all the segments again to ensure that the data is set up correctly. + assertTrue(e.getMessage().contains("Another segment upload is in progress for segment") || e.getMessage() + .contains("Failed to create ZK metadata for segment") || e.getMessage() + .contains("java.nio.file.FileAlreadyExistsException"), e.getMessage()); + uploadSegments(getTableName(), _tarDir); + } + + // Set up service status callbacks + // NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the + // resources to monitor + registerCallbackHandlers(); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + @BeforeMethod + public void resetMultiStage() { + setUseMultiStageQueryEngine(false); + } + + protected void startBrokers() + throws Exception { + startBrokers(getNumBrokers()); + } + + protected void startServers() + throws Exception { + startServers(getNumServers()); + } + + private void registerCallbackHandlers() { + List instances = _helixAdmin.getInstancesInCluster(getHelixClusterName()); + instances.removeIf( + instanceId -> !InstanceTypeUtils.isBroker(instanceId) && !InstanceTypeUtils.isServer(instanceId)); + List resourcesInCluster = _helixAdmin.getResourcesInCluster(getHelixClusterName()); + resourcesInCluster.removeIf(resource -> (!TableNameBuilder.isTableResource(resource) + && !CommonConstants.Helix.BROKER_RESOURCE_INSTANCE.equals(resource))); + for (String instance : instances) { + List resourcesToMonitor = new ArrayList<>(); + for (String resourceName : resourcesInCluster) { + IdealState idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), resourceName); + for (String partitionName : idealState.getPartitionSet()) { + if (idealState.getInstanceSet(partitionName).contains(instance)) { + resourcesToMonitor.add(resourceName); + break; + } + } + } + _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of( + new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, getHelixClusterName(), + instance, resourcesToMonitor, 100.0), + new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, getHelixClusterName(), + instance, resourcesToMonitor, 100.0)))); + } + } + + @Test + public void testInstancesStarted() { + assertEquals(_serviceStatusCallbacks.size(), getNumBrokers() + getNumServers()); + for (ServiceStatus.ServiceStatusCallback serviceStatusCallback : _serviceStatusCallbacks) { + assertEquals(serviceStatusCallback.getServiceStatus(), ServiceStatus.Status.GOOD); + } + } + + @Test + public void testCompiledByV2StarField() throws Exception { + String sqlQuery = "SELECT * FROM myTable WHERE AirlineID = 0 LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "INT", "INT", "LONG", "INT", "FLOAT", "DOUBLE", "INT", "STRING", "INT", "INT", "INT", + "INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "DOUBLE", "FLOAT", "INT", "STRING", "INT", + "STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT_ARRAY", + "INT", "INT_ARRAY", "STRING_ARRAY", "INT", "INT", "FLOAT_ARRAY", "INT", "STRING_ARRAY", "LONG_ARRAY", + "INT_ARRAY", "INT_ARRAY", "INT", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "INT", "STRING", "INT", + "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "STRING_ARRAY", "INT", "STRING", "INT", "INT", + "INT", "STRING", "INT", "INT", "INT", "INT"); + } + + @Test + public void testCompiledByV2SelectionFields() throws Exception { + String sqlQuery = "SELECT AirlineID, ArrTime, ArrTimeBlk FROM myTable WHERE AirlineID = 0 LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "LONG", "INT", "STRING"); + } + + @Test + public void testCompiledByV2TrasformedFields() throws Exception { + String sqlQuery = "SELECT AirlineID, ArrTime, ArrTime+1 FROM myTable WHERE AirlineID = 0 LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "LONG", "INT", "INT"); + } + + @Test + public void testCompiledByV2AggregatedFields() throws Exception { + String sqlQuery = "SELECT AirlineID, avg(ArrTime) FROM myTable WHERE AirlineID = 0 GROUP BY AirlineID LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "LONG", "DOUBLE"); + } + + @Test + public void testSchemaFallbackStarField() throws Exception { + String sqlQuery = "SELECT * FROM myTable WHERE AirlineID = 0 AND add(AirTime, AirTime, ArrTime) > 0 LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "INT", "INT", "LONG", "INT", "FLOAT", "DOUBLE", "INT", "STRING", "INT", "INT", "INT", + "INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "DOUBLE", "FLOAT", "INT", "STRING", "INT", + "STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "INT", + "INT", "STRING", "INT", "INT", "FLOAT", "INT", "STRING", "LONG", "INT", "INT", "INT", "INT", "STRING", "INT", + "INT", "INT", "INT", "INT", "INT", "STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", + "INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "STRING", "INT", "INT", "INT", "INT"); + } + + @Test + public void testSchemaFallbackSelectionFields() throws Exception { + String sqlQuery = "SELECT AirlineID, ArrTime, ArrTimeBlk FROM myTable" + + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "LONG", "INT", "STRING"); + } + + @Test + public void testSchemaFallbackTransformedFields() throws Exception { + String sqlQuery = "SELECT AirlineID, ArrTime, ArrTime+1 FROM myTable" + + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "LONG", "INT", "STRING"); + } + + @Test + public void testSchemaFallbackAggregatedFields() throws Exception { + String sqlQuery = "SELECT AirlineID, avg(ArrTime) FROM myTable" + + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 GROUP BY AirlineID LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "LONG", "DOUBLE"); + } + + private void assertNoRowsReturned(JsonNode response) { + assertNotNull(response.get("resultTable")); + assertNotNull(response.get("resultTable").get("rows")); + assertEquals(response.get("resultTable").get("rows").size(), 0); + } + + private void assertDataTypes(JsonNode response, String... types) throws Exception { + assertNotNull(response.get("resultTable")); + assertNotNull(response.get("resultTable").get("dataSchema")); + assertNotNull(response.get("resultTable").get("dataSchema").get("columnDataTypes")); + String expected = new ObjectMapper().writeValueAsString(types); + assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").toString(), expected); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java index 5b78267d0136..a157a4e6196d 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java @@ -135,6 +135,7 @@ protected void testQueriesValidateAgainstH2(String query) } } } + Assert.assertFalse(h2ResultSet.next(), "Pinot result set is smaller than H2 result set after: " + numRows + " rows!"); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 63422f37e521..386171e5e8bd 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -279,18 +279,27 @@ public List getTableNamesForQuery(String sqlQuery) { * Returns whether the query can be successfully compiled in this query environment */ public boolean canCompileQuery(String query) { - SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); - try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) { + return getRelRootIfCanCompile(query) != null; + } + + /** + * Returns the RelRoot node if the query can be compiled, null otherwise. + */ + @Nullable + public RelRoot getRelRootIfCanCompile(String query) { + try { + SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); + PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions); SqlNode sqlNode = sqlNodeAndOptions.getSqlNode(); if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) { sqlNode = ((SqlExplain) sqlNode).getExplicandum(); } - compileQuery(sqlNode, plannerContext); + RelRoot node = compileQuery(sqlNode, plannerContext); LOGGER.debug("Successfully compiled query using the multi-stage query engine: `{}`", query); - return true; - } catch (Exception e) { - LOGGER.warn("Encountered an error while compiling query `{}` using the multi-stage query engine", query, e); - return false; + return node; + } catch (Throwable t) { + LOGGER.warn("Encountered an error while compiling query `{}` using the multi-stage query engine", query, t); + return null; } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java index cbee4e711709..27f2842ed738 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java @@ -18,8 +18,17 @@ */ package org.apache.pinot.query.parser.utils; +import java.util.List; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.QueryEnvironment; +import org.apache.pinot.query.planner.logical.RelToPlanNodeConverter; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,4 +51,97 @@ public static boolean canCompileWithMultiStageEngine(String query, String databa LOGGER.debug("Multi-stage query compilation time = {}ms", System.currentTimeMillis() - compileStartTime); return canCompile; } + + /** + * Tries to fill an empty or not properly filled schema when no rows have been returned. + * + * Priority is: + * - Types in schema provided by V2 validation for the given query. + * - Types in schema provided by V1 for the given table (only appliable to selection fields). + * - Types in response provided by V1 server (no action). + */ + public static void fillEmptyResponseSchema( + BrokerResponse response, TableCache tableCache, Schema schema, String database, String query + ) { + if (response == null || response.getNumRowsResultSet() > 0) { + return; + } + + QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null); + RelRoot node = queryEnvironment.getRelRootIfCanCompile(query); + DataSchema.ColumnDataType resolved; + + // V1 schema info for the response can be inaccurate or incomplete in several forms: + // 1) No schema provided at all (when no segments have been even pruned). + // 2) Schema provided but all columns set to default type (STRING) (when no segments have been matched). + // V2 schema will be available only if query compiles. + + boolean hasV1Schema = response.getResultTable() != null; + boolean hasV2Schema = node != null && node.validatedRowType != null; + + if (hasV1Schema && hasV2Schema) { + // match v1 column types with v2 column types using column names + // if no match, rely on v1 schema based on column name + // if no match either, just leave it as it is + DataSchema responseSchema = response.getResultTable().getDataSchema(); + List fields = node.validatedRowType.getFieldList(); + for (int i = 0; i < responseSchema.size(); i++) { + resolved = RelToPlanNodeConverter.convertToColumnDataType(fields.get(i).getType()); + if (resolved == null || resolved.isUnknown()) { + FieldSpec spec = schema.getFieldSpecFor(responseSchema.getColumnName(i)); + try { + resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), false); + } catch (Exception e) { + try { + resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), true); + } catch (Exception e2) { + resolved = DataSchema.ColumnDataType.UNKNOWN; + } + } + } + if (resolved == null || resolved.isUnknown()) { + resolved = responseSchema.getColumnDataType(i); + } + responseSchema.getColumnDataTypes()[i] = resolved; + } + } else if (hasV1Schema) { + // match v1 column types with v1 schema columns using column names + // if no match, just leave it as it is + DataSchema responseSchema = response.getResultTable().getDataSchema(); + for (int i = 0; i < responseSchema.size(); i++) { + FieldSpec spec = schema.getFieldSpecFor(responseSchema.getColumnName(i)); + try { + // try single value first + resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), true); + } catch (Exception e) { + try { + // fallback to multi value + resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), false); + } catch (Exception e2) { + resolved = DataSchema.ColumnDataType.UNKNOWN; + } + } + if (resolved == null || resolved.isUnknown()) { + resolved = responseSchema.getColumnDataType(i); + } + responseSchema.getColumnDataTypes()[i] = resolved; + } + } else if (hasV2Schema) { + // trust v2 column types blindly + // if a type cannot be resolved, leave it as UNKNOWN + List fields = node.validatedRowType.getFieldList(); + String[] columnNames = new String[fields.size()]; + DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + columnNames[i] = fields.get(i).getName(); + resolved = RelToPlanNodeConverter.convertToColumnDataType(fields.get(i).getType()); + if (resolved == null) { + resolved = DataSchema.ColumnDataType.UNKNOWN; + } + columnDataTypes[i] = resolved; + } + response.setResultTable(new ResultTable(new DataSchema(columnNames, columnDataTypes), List.of())); + } + // else { /* nothing else we can do */ } + } }