From d819d8a2042f273d3e06e4627224d5642bb233ab Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Tue, 21 Jan 2025 04:15:27 -0800 Subject: [PATCH] Improve reported dataTypes on V1 when no rows returned (#14836) V1 just uses STRING as default data type for all columns when no data has been returned by servers. This is an attempt to improve that by different strategies, being the preferred one use the schema validation implemented on V2 and reuse the data types determined by it. As a failback and only suitable for non-transformed fields, the base table schema is used. Otherwise, the default STRING type is still returned. --- .../BaseSingleStageBrokerRequestHandler.java | 4 + .../apache/pinot/common/utils/DataSchema.java | 4 + .../tests/EmptyResponseIntegrationTest.java | 276 ++++++++++++++++++ .../tests/SSBQueryIntegrationTest.java | 1 + .../apache/pinot/query/QueryEnvironment.java | 23 +- .../pinot/query/parser/utils/ParserUtils.java | 102 +++++++ 6 files changed, 403 insertions(+), 7 deletions(-) create mode 100644 pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 72b69a24fadb..65a2d0a79ef7 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -837,6 +837,10 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO 1); } + // server returns STRING as default dataType for all columns in (some) scenarios where no rows are returned + // this is an attempt to return more faithful information based on other sources + ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query); + // Set total query processing time long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis(); brokerResponse.setTimeUsedMs(totalTimeMs); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java index 7751d4dc5688..2fb2aef48a2e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java @@ -419,6 +419,10 @@ public boolean isWholeNumberArray() { return INTEGRAL_ARRAY_TYPES.contains(this); } + public boolean isUnknown() { + return UNKNOWN.equals(this); + } + public boolean isCompatible(ColumnDataType anotherColumnDataType) { // All numbers are compatible with each other return this == anotherColumnDataType || (this.isNumber() && anotherColumnDataType.isNumber()) || ( diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java new file mode 100644 index 000000000000..c90b24308a3b --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.pinot.common.utils.ServiceStatus; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Integration test that checks data types for queries with no rows returned. + */ +public class EmptyResponseIntegrationTest extends BaseClusterIntegrationTestSet { + private static final int NUM_BROKERS = 1; + private static final int NUM_SERVERS = 1; + + private final List _serviceStatusCallbacks = + new ArrayList<>(getNumBrokers() + getNumServers()); + + protected int getNumBrokers() { + return NUM_BROKERS; + } + + protected int getNumServers() { + return NUM_SERVERS; + } + + @Override + protected List getFieldConfigs() { + return Collections.singletonList( + new FieldConfig("DivAirports", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), + CompressionCodec.MV_ENTRY_DICT, null)); + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + // Set hyperloglog log2m value to 12. + HelixConfigScope scope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName()) + .build(); + _helixManager.getConfigAccessor() + .set(scope, CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, Integer.toString(12)); + startBrokers(); + startServers(); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments. For exhaustive testing, concurrently upload multiple segments with the same name + // and validate correctness with parallel push protection enabled. + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + // Create a copy of _tarDir to create multiple segments with the same name. + File tarDir2 = new File(_tempDir, "tarDir2"); + FileUtils.copyDirectory(_tarDir, tarDir2); + + List tarDirs = new ArrayList<>(); + tarDirs.add(_tarDir); + tarDirs.add(tarDir2); + try { + uploadSegments(getTableName(), TableType.OFFLINE, tarDirs); + } catch (Exception e) { + // If enableParallelPushProtection is enabled and the same segment is uploaded concurrently, we could get one + // of the three exception: + // - 409 conflict of the second call enters ProcessExistingSegment + // - segmentZkMetadata creation failure if both calls entered ProcessNewSegment + // - Failed to copy segment tar file to final location due to the same segment pushed twice concurrently + // In such cases we upload all the segments again to ensure that the data is set up correctly. + assertTrue(e.getMessage().contains("Another segment upload is in progress for segment") || e.getMessage() + .contains("Failed to create ZK metadata for segment") || e.getMessage() + .contains("java.nio.file.FileAlreadyExistsException"), e.getMessage()); + uploadSegments(getTableName(), _tarDir); + } + + // Set up service status callbacks + // NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the + // resources to monitor + registerCallbackHandlers(); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + @BeforeMethod + public void resetMultiStage() { + setUseMultiStageQueryEngine(false); + } + + protected void startBrokers() + throws Exception { + startBrokers(getNumBrokers()); + } + + protected void startServers() + throws Exception { + startServers(getNumServers()); + } + + private void registerCallbackHandlers() { + List instances = _helixAdmin.getInstancesInCluster(getHelixClusterName()); + instances.removeIf( + instanceId -> !InstanceTypeUtils.isBroker(instanceId) && !InstanceTypeUtils.isServer(instanceId)); + List resourcesInCluster = _helixAdmin.getResourcesInCluster(getHelixClusterName()); + resourcesInCluster.removeIf(resource -> (!TableNameBuilder.isTableResource(resource) + && !CommonConstants.Helix.BROKER_RESOURCE_INSTANCE.equals(resource))); + for (String instance : instances) { + List resourcesToMonitor = new ArrayList<>(); + for (String resourceName : resourcesInCluster) { + IdealState idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), resourceName); + for (String partitionName : idealState.getPartitionSet()) { + if (idealState.getInstanceSet(partitionName).contains(instance)) { + resourcesToMonitor.add(resourceName); + break; + } + } + } + _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of( + new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, getHelixClusterName(), + instance, resourcesToMonitor, 100.0), + new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, getHelixClusterName(), + instance, resourcesToMonitor, 100.0)))); + } + } + + @Test + public void testInstancesStarted() { + assertEquals(_serviceStatusCallbacks.size(), getNumBrokers() + getNumServers()); + for (ServiceStatus.ServiceStatusCallback serviceStatusCallback : _serviceStatusCallbacks) { + assertEquals(serviceStatusCallback.getServiceStatus(), ServiceStatus.Status.GOOD); + } + } + + @Test + public void testCompiledByV2StarField() throws Exception { + String sqlQuery = "SELECT * FROM myTable WHERE AirlineID = 0 LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "INT", "INT", "LONG", "INT", "FLOAT", "DOUBLE", "INT", "STRING", "INT", "INT", "INT", + "INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "DOUBLE", "FLOAT", "INT", "STRING", "INT", + "STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT_ARRAY", + "INT", "INT_ARRAY", "STRING_ARRAY", "INT", "INT", "FLOAT_ARRAY", "INT", "STRING_ARRAY", "LONG_ARRAY", + "INT_ARRAY", "INT_ARRAY", "INT", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "INT", "STRING", "INT", + "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "STRING_ARRAY", "INT", "STRING", "INT", "INT", + "INT", "STRING", "INT", "INT", "INT", "INT"); + } + + @Test + public void testCompiledByV2SelectionFields() throws Exception { + String sqlQuery = "SELECT AirlineID, ArrTime, ArrTimeBlk FROM myTable WHERE AirlineID = 0 LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "LONG", "INT", "STRING"); + } + + @Test + public void testCompiledByV2TrasformedFields() throws Exception { + String sqlQuery = "SELECT AirlineID, ArrTime, ArrTime+1 FROM myTable WHERE AirlineID = 0 LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "LONG", "INT", "INT"); + } + + @Test + public void testCompiledByV2AggregatedFields() throws Exception { + String sqlQuery = "SELECT AirlineID, avg(ArrTime) FROM myTable WHERE AirlineID = 0 GROUP BY AirlineID LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "LONG", "DOUBLE"); + } + + @Test + public void testSchemaFallbackStarField() throws Exception { + String sqlQuery = "SELECT * FROM myTable WHERE AirlineID = 0 AND add(AirTime, AirTime, ArrTime) > 0 LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "INT", "INT", "LONG", "INT", "FLOAT", "DOUBLE", "INT", "STRING", "INT", "INT", "INT", + "INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "DOUBLE", "FLOAT", "INT", "STRING", "INT", + "STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "INT", + "INT", "STRING", "INT", "INT", "FLOAT", "INT", "STRING", "LONG", "INT", "INT", "INT", "INT", "STRING", "INT", + "INT", "INT", "INT", "INT", "INT", "STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", + "INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "STRING", "INT", "INT", "INT", "INT"); + } + + @Test + public void testSchemaFallbackSelectionFields() throws Exception { + String sqlQuery = "SELECT AirlineID, ArrTime, ArrTimeBlk FROM myTable" + + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "LONG", "INT", "STRING"); + } + + @Test + public void testSchemaFallbackTransformedFields() throws Exception { + String sqlQuery = "SELECT AirlineID, ArrTime, ArrTime+1 FROM myTable" + + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "LONG", "INT", "STRING"); + } + + @Test + public void testSchemaFallbackAggregatedFields() throws Exception { + String sqlQuery = "SELECT AirlineID, avg(ArrTime) FROM myTable" + + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 GROUP BY AirlineID LIMIT 1"; + JsonNode response = postQuery(sqlQuery); + assertNoRowsReturned(response); + assertDataTypes(response, "LONG", "DOUBLE"); + } + + private void assertNoRowsReturned(JsonNode response) { + assertNotNull(response.get("resultTable")); + assertNotNull(response.get("resultTable").get("rows")); + assertEquals(response.get("resultTable").get("rows").size(), 0); + } + + private void assertDataTypes(JsonNode response, String... types) throws Exception { + assertNotNull(response.get("resultTable")); + assertNotNull(response.get("resultTable").get("dataSchema")); + assertNotNull(response.get("resultTable").get("dataSchema").get("columnDataTypes")); + String expected = new ObjectMapper().writeValueAsString(types); + assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").toString(), expected); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java index 5b78267d0136..a157a4e6196d 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java @@ -135,6 +135,7 @@ protected void testQueriesValidateAgainstH2(String query) } } } + Assert.assertFalse(h2ResultSet.next(), "Pinot result set is smaller than H2 result set after: " + numRows + " rows!"); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 63422f37e521..386171e5e8bd 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -279,18 +279,27 @@ public List getTableNamesForQuery(String sqlQuery) { * Returns whether the query can be successfully compiled in this query environment */ public boolean canCompileQuery(String query) { - SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); - try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) { + return getRelRootIfCanCompile(query) != null; + } + + /** + * Returns the RelRoot node if the query can be compiled, null otherwise. + */ + @Nullable + public RelRoot getRelRootIfCanCompile(String query) { + try { + SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); + PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions); SqlNode sqlNode = sqlNodeAndOptions.getSqlNode(); if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) { sqlNode = ((SqlExplain) sqlNode).getExplicandum(); } - compileQuery(sqlNode, plannerContext); + RelRoot node = compileQuery(sqlNode, plannerContext); LOGGER.debug("Successfully compiled query using the multi-stage query engine: `{}`", query); - return true; - } catch (Exception e) { - LOGGER.warn("Encountered an error while compiling query `{}` using the multi-stage query engine", query, e); - return false; + return node; + } catch (Throwable t) { + LOGGER.warn("Encountered an error while compiling query `{}` using the multi-stage query engine", query, t); + return null; } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java index cbee4e711709..27f2842ed738 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java @@ -18,8 +18,17 @@ */ package org.apache.pinot.query.parser.utils; +import java.util.List; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.QueryEnvironment; +import org.apache.pinot.query.planner.logical.RelToPlanNodeConverter; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,4 +51,97 @@ public static boolean canCompileWithMultiStageEngine(String query, String databa LOGGER.debug("Multi-stage query compilation time = {}ms", System.currentTimeMillis() - compileStartTime); return canCompile; } + + /** + * Tries to fill an empty or not properly filled schema when no rows have been returned. + * + * Priority is: + * - Types in schema provided by V2 validation for the given query. + * - Types in schema provided by V1 for the given table (only appliable to selection fields). + * - Types in response provided by V1 server (no action). + */ + public static void fillEmptyResponseSchema( + BrokerResponse response, TableCache tableCache, Schema schema, String database, String query + ) { + if (response == null || response.getNumRowsResultSet() > 0) { + return; + } + + QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null); + RelRoot node = queryEnvironment.getRelRootIfCanCompile(query); + DataSchema.ColumnDataType resolved; + + // V1 schema info for the response can be inaccurate or incomplete in several forms: + // 1) No schema provided at all (when no segments have been even pruned). + // 2) Schema provided but all columns set to default type (STRING) (when no segments have been matched). + // V2 schema will be available only if query compiles. + + boolean hasV1Schema = response.getResultTable() != null; + boolean hasV2Schema = node != null && node.validatedRowType != null; + + if (hasV1Schema && hasV2Schema) { + // match v1 column types with v2 column types using column names + // if no match, rely on v1 schema based on column name + // if no match either, just leave it as it is + DataSchema responseSchema = response.getResultTable().getDataSchema(); + List fields = node.validatedRowType.getFieldList(); + for (int i = 0; i < responseSchema.size(); i++) { + resolved = RelToPlanNodeConverter.convertToColumnDataType(fields.get(i).getType()); + if (resolved == null || resolved.isUnknown()) { + FieldSpec spec = schema.getFieldSpecFor(responseSchema.getColumnName(i)); + try { + resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), false); + } catch (Exception e) { + try { + resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), true); + } catch (Exception e2) { + resolved = DataSchema.ColumnDataType.UNKNOWN; + } + } + } + if (resolved == null || resolved.isUnknown()) { + resolved = responseSchema.getColumnDataType(i); + } + responseSchema.getColumnDataTypes()[i] = resolved; + } + } else if (hasV1Schema) { + // match v1 column types with v1 schema columns using column names + // if no match, just leave it as it is + DataSchema responseSchema = response.getResultTable().getDataSchema(); + for (int i = 0; i < responseSchema.size(); i++) { + FieldSpec spec = schema.getFieldSpecFor(responseSchema.getColumnName(i)); + try { + // try single value first + resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), true); + } catch (Exception e) { + try { + // fallback to multi value + resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), false); + } catch (Exception e2) { + resolved = DataSchema.ColumnDataType.UNKNOWN; + } + } + if (resolved == null || resolved.isUnknown()) { + resolved = responseSchema.getColumnDataType(i); + } + responseSchema.getColumnDataTypes()[i] = resolved; + } + } else if (hasV2Schema) { + // trust v2 column types blindly + // if a type cannot be resolved, leave it as UNKNOWN + List fields = node.validatedRowType.getFieldList(); + String[] columnNames = new String[fields.size()]; + DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + columnNames[i] = fields.get(i).getName(); + resolved = RelToPlanNodeConverter.convertToColumnDataType(fields.get(i).getType()); + if (resolved == null) { + resolved = DataSchema.ColumnDataType.UNKNOWN; + } + columnDataTypes[i] = resolved; + } + response.setResultTable(new ResultTable(new DataSchema(columnNames, columnDataTypes), List.of())); + } + // else { /* nothing else we can do */ } + } }