From ff95e231c74807f230167c93f166ac416e833c21 Mon Sep 17 00:00:00 2001 From: Vivek Iyer Vaidyanathan Iyer Date: Wed, 22 Jan 2025 13:54:30 -0800 Subject: [PATCH 1/2] Return datatypes when broker prunes all segments --- .../BaseSingleStageBrokerRequestHandler.java | 9 ++-- .../tests/EmptyResponseIntegrationTest.java | 51 +++++++++++++++++++ 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 65a2d0a79ef7..b8c04140dc74 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -601,7 +601,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } if (offlineBrokerRequest == null && realtimeBrokerRequest == null) { - return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity); + return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity, schema, query, + database); } if (offlineBrokerRequest != null && isFilterAlwaysTrue(offlineBrokerRequest.getPinotQuery())) { @@ -710,7 +711,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO return new BrokerResponseNative(exceptions); } else { // When all segments have been pruned, we can just return an empty response. - return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity); + return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity, schema, query, + database); } } long routingEndTimeNs = System.nanoTime(); @@ -895,7 +897,7 @@ private static String getRoutingPolicy(TableConfig tableConfig) { } private BrokerResponseNative getEmptyBrokerOnlyResponse(PinotQuery pinotQuery, RequestContext requestContext, - String tableName, @Nullable RequesterIdentity requesterIdentity) { + String tableName, @Nullable RequesterIdentity requesterIdentity, Schema schema, String query, String database) { if (pinotQuery.isExplain()) { // EXPLAIN PLAN results to show that query is evaluated exclusively by Broker. return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT; @@ -903,6 +905,7 @@ private BrokerResponseNative getEmptyBrokerOnlyResponse(PinotQuery pinotQuery, R // Send empty response since we don't need to evaluate either offline or realtime request. BrokerResponseNative brokerResponse = BrokerResponseNative.empty(); + ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query); brokerResponse.setTimeUsedMs(System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis()); _queryLogger.log( new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse, requesterIdentity, null)); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java index c90b24308a3b..677b6270edea 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java @@ -32,6 +32,7 @@ import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; +import org.apache.pinot.spi.config.table.RoutingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; @@ -260,6 +261,56 @@ public void testSchemaFallbackAggregatedFields() throws Exception { assertDataTypes(response, "LONG", "DOUBLE"); } + @Test(priority = 1) + public void testDataSchemaForBrokerPrunedEmptyResults() throws Exception { + TableConfig tableConfig = getOfflineTableConfig(); + tableConfig.setRoutingConfig( + new RoutingConfig(null, Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE), null, null)); + updateTableConfig(tableConfig); + + TestUtils.waitForCondition(aVoid -> { + try { + TableConfig cfg = getOfflineTableConfig(); + if (cfg.getRoutingConfig() == null || cfg.getRoutingConfig().getSegmentPrunerTypes().isEmpty()) { + return false; + } + return true; + } catch (Exception e) { + throw new RuntimeException(e); + } + }, 600_000L, "Failed to update table config"); + + String query = + "Select DestAirportID, Carrier from myTable WHERE DaysSinceEpoch < -1231231 and FlightNum > 121231231231"; + + // Parse the Json response. Assert if DestAirportID has columnDatatype INT and Carrier has columnDatatype STRING. + JsonNode queryResponse = postQuery(query); + assertNoRowsReturned(queryResponse); + assertDataTypes(queryResponse, "INT", "STRING"); + + query = "Select DestAirportID, Carrier from myTable WHERE DaysSinceEpoch < -1231231"; + queryResponse = postQuery(query); + assertNoRowsReturned(queryResponse); + assertDataTypes(queryResponse, "INT", "STRING"); + + // Reset and remove the Time Segment Pruner + tableConfig = getOfflineTableConfig(); + tableConfig.setRoutingConfig(new RoutingConfig(null, Collections.emptyList(), null, null)); + updateTableConfig(tableConfig); + TestUtils.waitForCondition(aVoid -> { + try { + TableConfig cfg = getOfflineTableConfig(); + if (cfg.getRoutingConfig() == null || cfg.getRoutingConfig().getSegmentPrunerTypes().size() > 0) { + return false; + } + return true; + } catch (Exception e) { + throw new RuntimeException(e); + } + }, 600_000L, "Failed to update table config"); + } + + private void assertNoRowsReturned(JsonNode response) { assertNotNull(response.get("resultTable")); assertNotNull(response.get("resultTable").get("rows")); From 96d2c89ae7174efbb36c66d048c66c4c76a36ab7 Mon Sep 17 00:00:00 2001 From: Vivek Iyer Vaidyanathan Iyer Date: Thu, 23 Jan 2025 10:11:35 -0800 Subject: [PATCH 2/2] Integration test failure --- .../pinot/integration/tests/EmptyResponseIntegrationTest.java | 2 +- .../tests/SegmentGenerationMinionClusterIntegrationTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java index 677b6270edea..d49d2555abfc 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java @@ -261,7 +261,7 @@ public void testSchemaFallbackAggregatedFields() throws Exception { assertDataTypes(response, "LONG", "DOUBLE"); } - @Test(priority = 1) + @Test public void testDataSchemaForBrokerPrunedEmptyResults() throws Exception { TableConfig tableConfig = getOfflineTableConfig(); tableConfig.setRoutingConfig( diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java index d2adb436d66a..bd4673f85361 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java @@ -180,6 +180,6 @@ private int getTotalDocs(String tableName) String query = "SELECT COUNT(*) FROM " + tableName; JsonNode response = postQuery(query); JsonNode resTbl = response.get("resultTable"); - return (resTbl == null) ? 0 : resTbl.get("rows").get(0).get(0).asInt(); + return (resTbl.get("rows").size() == 0) ? 0 : resTbl.get("rows").get(0).get(0).asInt(); } }