Skip to content

Commit

Permalink
Return improved dataschema for empty results when all segments are pr…
Browse files Browse the repository at this point in the history
…uned by broker (#13831)

* Return datatypes when broker prunes all segments

* Integration test failure
  • Loading branch information
vvivekiyer authored Jan 24, 2025
1 parent 295f28a commit a29ecd7
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}

if (offlineBrokerRequest == null && realtimeBrokerRequest == null) {
return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity);
return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity, schema, query,
database);
}

if (offlineBrokerRequest != null && isFilterAlwaysTrue(offlineBrokerRequest.getPinotQuery())) {
Expand Down Expand Up @@ -710,7 +711,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
return new BrokerResponseNative(exceptions);
} else {
// When all segments have been pruned, we can just return an empty response.
return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity);
return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity, schema, query,
database);
}
}
long routingEndTimeNs = System.nanoTime();
Expand Down Expand Up @@ -895,14 +897,15 @@ private static String getRoutingPolicy(TableConfig tableConfig) {
}

private BrokerResponseNative getEmptyBrokerOnlyResponse(PinotQuery pinotQuery, RequestContext requestContext,
String tableName, @Nullable RequesterIdentity requesterIdentity) {
String tableName, @Nullable RequesterIdentity requesterIdentity, Schema schema, String query, String database) {
if (pinotQuery.isExplain()) {
// EXPLAIN PLAN results to show that query is evaluated exclusively by Broker.
return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
}

// Send empty response since we don't need to evaluate either offline or realtime request.
BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query);
brokerResponse.setTimeUsedMs(System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis());
_queryLogger.log(
new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse, requesterIdentity, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -260,6 +261,56 @@ public void testSchemaFallbackAggregatedFields() throws Exception {
assertDataTypes(response, "LONG", "DOUBLE");
}

@Test
public void testDataSchemaForBrokerPrunedEmptyResults() throws Exception {
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setRoutingConfig(
new RoutingConfig(null, Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE), null, null));
updateTableConfig(tableConfig);

TestUtils.waitForCondition(aVoid -> {
try {
TableConfig cfg = getOfflineTableConfig();
if (cfg.getRoutingConfig() == null || cfg.getRoutingConfig().getSegmentPrunerTypes().isEmpty()) {
return false;
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to update table config");

String query =
"Select DestAirportID, Carrier from myTable WHERE DaysSinceEpoch < -1231231 and FlightNum > 121231231231";

// Parse the Json response. Assert if DestAirportID has columnDatatype INT and Carrier has columnDatatype STRING.
JsonNode queryResponse = postQuery(query);
assertNoRowsReturned(queryResponse);
assertDataTypes(queryResponse, "INT", "STRING");

query = "Select DestAirportID, Carrier from myTable WHERE DaysSinceEpoch < -1231231";
queryResponse = postQuery(query);
assertNoRowsReturned(queryResponse);
assertDataTypes(queryResponse, "INT", "STRING");

// Reset and remove the Time Segment Pruner
tableConfig = getOfflineTableConfig();
tableConfig.setRoutingConfig(new RoutingConfig(null, Collections.emptyList(), null, null));
updateTableConfig(tableConfig);
TestUtils.waitForCondition(aVoid -> {
try {
TableConfig cfg = getOfflineTableConfig();
if (cfg.getRoutingConfig() == null || cfg.getRoutingConfig().getSegmentPrunerTypes().size() > 0) {
return false;
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to update table config");
}


private void assertNoRowsReturned(JsonNode response) {
assertNotNull(response.get("resultTable"));
assertNotNull(response.get("resultTable").get("rows"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,6 @@ private int getTotalDocs(String tableName)
String query = "SELECT COUNT(*) FROM " + tableName;
JsonNode response = postQuery(query);
JsonNode resTbl = response.get("resultTable");
return (resTbl == null) ? 0 : resTbl.get("rows").get(0).get(0).asInt();
return (resTbl.get("rows").size() == 0) ? 0 : resTbl.get("rows").get(0).get(0).asInt();
}
}

0 comments on commit a29ecd7

Please sign in to comment.