Skip to content

Commit

Permalink
Enhance data schema generation for empty response
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Jan 25, 2025
1 parent a29ecd7 commit 2c3dea9
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,9 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO

// server returns STRING as default dataType for all columns in (some) scenarios where no rows are returned
// this is an attempt to return more faithful information based on other sources
ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query);
if (brokerResponse.getNumRowsResultSet() == 0) {
ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query);
}

// Set total query processing time
long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,6 @@ public boolean isWholeNumberArray() {
return INTEGRAL_ARRAY_TYPES.contains(this);
}

public boolean isUnknown() {
return UNKNOWN.equals(this);
}

public boolean isCompatible(ColumnDataType anotherColumnDataType) {
// All numbers are compatible with each other
return this == anotherColumnDataType || (this.isNumber() && anotherColumnDataType.isNumber()) || (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
Expand All @@ -55,6 +54,15 @@
public class EmptyResponseIntegrationTest extends BaseClusterIntegrationTestSet {
private static final int NUM_BROKERS = 1;
private static final int NUM_SERVERS = 1;
private static final String[] SELECT_STAR_TYPES = new String[]{
"INT", "INT", "LONG", "INT", "FLOAT", "DOUBLE", "INT", "STRING", "INT", "INT", "INT", "INT", "STRING", "INT",
"STRING", "INT", "INT", "INT", "INT", "INT", "DOUBLE", "FLOAT", "INT", "STRING", "INT", "STRING", "INT", "INT",
"INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT_ARRAY", "INT", "INT_ARRAY",
"STRING_ARRAY", "INT", "INT", "FLOAT_ARRAY", "INT", "STRING_ARRAY", "LONG_ARRAY", "INT_ARRAY", "INT_ARRAY",
"INT", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "INT", "STRING", "INT", "INT", "INT", "STRING",
"STRING", "INT", "STRING", "INT", "INT", "STRING_ARRAY", "INT", "STRING", "INT", "INT", "INT", "STRING", "INT",
"INT", "INT", "INT"
};

private final List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbacks =
new ArrayList<>(getNumBrokers() + getNumServers());
Expand Down Expand Up @@ -134,11 +142,6 @@ public void setUp()
waitForAllDocsLoaded(600_000L);
}

@BeforeMethod
public void resetMultiStage() {
setUseMultiStageQueryEngine(false);
}

protected void startBrokers()
throws Exception {
startBrokers(getNumBrokers());
Expand Down Expand Up @@ -184,67 +187,63 @@ public void testInstancesStarted() {
}

@Test
public void testCompiledByV2StarField() throws Exception {
public void testStarField()
throws Exception {
String sqlQuery = "SELECT * FROM myTable WHERE AirlineID = 0 LIMIT 1";
JsonNode response = postQuery(sqlQuery);
assertNoRowsReturned(response);
assertDataTypes(response, "INT", "INT", "LONG", "INT", "FLOAT", "DOUBLE", "INT", "STRING", "INT", "INT", "INT",
"INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "DOUBLE", "FLOAT", "INT", "STRING", "INT",
"STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT_ARRAY",
"INT", "INT_ARRAY", "STRING_ARRAY", "INT", "INT", "FLOAT_ARRAY", "INT", "STRING_ARRAY", "LONG_ARRAY",
"INT_ARRAY", "INT_ARRAY", "INT", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "INT", "STRING", "INT",
"INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "STRING_ARRAY", "INT", "STRING", "INT", "INT",
"INT", "STRING", "INT", "INT", "INT", "INT");
assertDataTypes(response, SELECT_STAR_TYPES);
}

@Test
public void testCompiledByV2SelectionFields() throws Exception {
public void testSelectionFields()
throws Exception {
String sqlQuery = "SELECT AirlineID, ArrTime, ArrTimeBlk FROM myTable WHERE AirlineID = 0 LIMIT 1";
JsonNode response = postQuery(sqlQuery);
assertNoRowsReturned(response);
assertDataTypes(response, "LONG", "INT", "STRING");
}

@Test
public void testCompiledByV2TrasformedFields() throws Exception {
public void testTransformedFields()
throws Exception {
String sqlQuery = "SELECT AirlineID, ArrTime, ArrTime+1 FROM myTable WHERE AirlineID = 0 LIMIT 1";
JsonNode response = postQuery(sqlQuery);
assertNoRowsReturned(response);
assertDataTypes(response, "LONG", "INT", "INT");
}

@Test
public void testCompiledByV2AggregatedFields() throws Exception {
public void testAggregatedFields()
throws Exception {
String sqlQuery = "SELECT AirlineID, avg(ArrTime) FROM myTable WHERE AirlineID = 0 GROUP BY AirlineID LIMIT 1";
JsonNode response = postQuery(sqlQuery);
assertNoRowsReturned(response);
assertDataTypes(response, "LONG", "DOUBLE");
}

@Test
public void testSchemaFallbackStarField() throws Exception {
public void testSchemaFallbackStarField()
throws Exception {
String sqlQuery = "SELECT * FROM myTable WHERE AirlineID = 0 AND add(AirTime, AirTime, ArrTime) > 0 LIMIT 1";
JsonNode response = postQuery(sqlQuery);
assertNoRowsReturned(response);
assertDataTypes(response, "INT", "INT", "LONG", "INT", "FLOAT", "DOUBLE", "INT", "STRING", "INT", "INT", "INT",
"INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "DOUBLE", "FLOAT", "INT", "STRING", "INT",
"STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "INT",
"INT", "STRING", "INT", "INT", "FLOAT", "INT", "STRING", "LONG", "INT", "INT", "INT", "INT", "STRING", "INT",
"INT", "INT", "INT", "INT", "INT", "STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT",
"INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "STRING", "INT", "INT", "INT", "INT");
assertDataTypes(response, SELECT_STAR_TYPES);
}

@Test
public void testSchemaFallbackSelectionFields() throws Exception {
public void testSchemaFallbackSelectionFields()
throws Exception {
String sqlQuery = "SELECT AirlineID, ArrTime, ArrTimeBlk FROM myTable"
+ " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 LIMIT 1";
+ " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 LIMIT 1";
JsonNode response = postQuery(sqlQuery);
assertNoRowsReturned(response);
assertDataTypes(response, "LONG", "INT", "STRING");
}

@Test
public void testSchemaFallbackTransformedFields() throws Exception {
public void testSchemaFallbackTransformedFields()
throws Exception {
String sqlQuery = "SELECT AirlineID, ArrTime, ArrTime+1 FROM myTable"
+ " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 LIMIT 1";
JsonNode response = postQuery(sqlQuery);
Expand All @@ -253,7 +252,8 @@ public void testSchemaFallbackTransformedFields() throws Exception {
}

@Test
public void testSchemaFallbackAggregatedFields() throws Exception {
public void testSchemaFallbackAggregatedFields()
throws Exception {
String sqlQuery = "SELECT AirlineID, avg(ArrTime) FROM myTable"
+ " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 GROUP BY AirlineID LIMIT 1";
JsonNode response = postQuery(sqlQuery);
Expand All @@ -262,24 +262,13 @@ public void testSchemaFallbackAggregatedFields() throws Exception {
}

@Test
public void testDataSchemaForBrokerPrunedEmptyResults() throws Exception {
public void testDataSchemaForBrokerPrunedEmptyResults()
throws Exception {
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setRoutingConfig(
new RoutingConfig(null, Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE), null, null));
updateTableConfig(tableConfig);

TestUtils.waitForCondition(aVoid -> {
try {
TableConfig cfg = getOfflineTableConfig();
if (cfg.getRoutingConfig() == null || cfg.getRoutingConfig().getSegmentPrunerTypes().isEmpty()) {
return false;
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to update table config");

String query =
"Select DestAirportID, Carrier from myTable WHERE DaysSinceEpoch < -1231231 and FlightNum > 121231231231";

Expand All @@ -293,31 +282,19 @@ public void testDataSchemaForBrokerPrunedEmptyResults() throws Exception {
assertNoRowsReturned(queryResponse);
assertDataTypes(queryResponse, "INT", "STRING");

// Reset and remove the Time Segment Pruner
tableConfig = getOfflineTableConfig();
tableConfig.setRoutingConfig(new RoutingConfig(null, Collections.emptyList(), null, null));
// Reset the routing config
tableConfig.setRoutingConfig(null);
updateTableConfig(tableConfig);
TestUtils.waitForCondition(aVoid -> {
try {
TableConfig cfg = getOfflineTableConfig();
if (cfg.getRoutingConfig() == null || cfg.getRoutingConfig().getSegmentPrunerTypes().size() > 0) {
return false;
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to update table config");
}


private void assertNoRowsReturned(JsonNode response) {
assertNotNull(response.get("resultTable"));
assertNotNull(response.get("resultTable").get("rows"));
assertEquals(response.get("resultTable").get("rows").size(), 0);
}

private void assertDataTypes(JsonNode response, String... types) throws Exception {
private void assertDataTypes(JsonNode response, String... types)
throws Exception {
assertNotNull(response.get("resultTable"));
assertNotNull(response.get("resultTable").get("dataSchema"));
assertNotNull(response.get("resultTable").get("dataSchema").get("columnDataTypes"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.apache.pinot.common.function.scalar.StringFunctions.*;
Expand Down Expand Up @@ -267,11 +266,6 @@ private void reloadAllSegments(String testQuery, boolean forceDownload, long num
}, 600_000L, "Failed to reload table with force download");
}

@BeforeMethod
public void resetMultiStage() {
setUseMultiStageQueryEngine(false);
}

protected void startBrokers()
throws Exception {
startBrokers(getNumBrokers());
Expand Down
Loading

0 comments on commit 2c3dea9

Please sign in to comment.