Skip to content

Commit

Permalink
Return datatypes when broker prunes all segments
Browse files Browse the repository at this point in the history
  • Loading branch information
vvivekiyer committed Jan 22, 2025
1 parent e05ef7b commit 04291d3
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}

if (offlineBrokerRequest == null && realtimeBrokerRequest == null) {
return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity);
return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity, schema, query,
database);
}

if (offlineBrokerRequest != null && isFilterAlwaysTrue(offlineBrokerRequest.getPinotQuery())) {
Expand Down Expand Up @@ -710,7 +711,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
return new BrokerResponseNative(exceptions);
} else {
// When all segments have been pruned, we can just return an empty response.
return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity);
return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity, schema, query,
database);
}
}
long routingEndTimeNs = System.nanoTime();
Expand Down Expand Up @@ -895,14 +897,15 @@ private static String getRoutingPolicy(TableConfig tableConfig) {
}

private BrokerResponseNative getEmptyBrokerOnlyResponse(PinotQuery pinotQuery, RequestContext requestContext,
String tableName, @Nullable RequesterIdentity requesterIdentity) {
String tableName, @Nullable RequesterIdentity requesterIdentity, Schema schema, String query, String database) {
if (pinotQuery.isExplain()) {
// EXPLAIN PLAN results to show that query is evaluated exclusively by Broker.
return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
}

// Send empty response since we don't need to evaluate either offline or realtime request.
BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query);
brokerResponse.setTimeUsedMs(System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis());
_queryLogger.log(
new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse, requesterIdentity, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -32,6 +34,7 @@
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -260,6 +263,56 @@ public void testSchemaFallbackAggregatedFields() throws Exception {
assertDataTypes(response, "LONG", "DOUBLE");
}

@Test(priority = 1)
public void testDataSchemaForBrokerPrunedEmptyResults() throws Exception {
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setRoutingConfig(
new RoutingConfig(null, Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE), null, null));
updateTableConfig(tableConfig);

TestUtils.waitForCondition(aVoid -> {
try {
TableConfig cfg = getOfflineTableConfig();
if (cfg.getRoutingConfig() == null || cfg.getRoutingConfig().getSegmentPrunerTypes().isEmpty()) {
return false;
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to update table config");

String query =
"Select DestAirportID, Carrier from myTable WHERE DaysSinceEpoch < -1231231 and FlightNum > 121231231231";

// Parse the Json response. Assert if DestAirportID has columnDatatype INT and Carrier has columnDatatype STRING.
JsonNode queryResponse = postQuery(query);
assertNoRowsReturned(queryResponse);
assertDataTypes(queryResponse, "INT", "STRING");

query = "Select DestAirportID, Carrier from myTable WHERE DaysSinceEpoch < -1231231";
queryResponse = postQuery(query);
assertNoRowsReturned(queryResponse);
assertDataTypes(queryResponse, "INT", "STRING");

// Reset and remove the Time Segment Pruner
tableConfig = getOfflineTableConfig();
tableConfig.setRoutingConfig(new RoutingConfig(null, Collections.emptyList(), null, null));
updateTableConfig(tableConfig);
TestUtils.waitForCondition(aVoid -> {
try {
TableConfig cfg = getOfflineTableConfig();
if (cfg.getRoutingConfig() == null || cfg.getRoutingConfig().getSegmentPrunerTypes().size() > 0) {
return false;
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to update table config");
}


private void assertNoRowsReturned(JsonNode response) {
assertNotNull(response.get("resultTable"));
assertNotNull(response.get("resultTable").get("rows"));
Expand Down

0 comments on commit 04291d3

Please sign in to comment.