Skip to content

Commit

Permalink
add numServersQueried and numServersResponded to MSQE response (#14806)
Browse files Browse the repository at this point in the history
  • Loading branch information
albertobastos authored Jan 22, 2025
1 parent 7356304 commit f845a97
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.runtime.MultiStageStatsTreeBuilder;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
Expand Down Expand Up @@ -199,8 +201,13 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}

DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan();
Set<String> tableNames = queryPlanResult.getTableNames();

Set<QueryServerInstance> servers = new HashSet<>();
for (DispatchablePlanFragment planFragment: dispatchableSubPlan.getQueryStageList()) {
servers.addAll(planFragment.getServerInstances());
}

Set<String> tableNames = queryPlanResult.getTableNames();
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_STAGE_QUERIES_GLOBAL, 1);
for (String tableName : tableNames) {
_brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.MULTI_STAGE_QUERIES, 1);
Expand Down Expand Up @@ -277,8 +284,12 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
brokerResponse.setResultTable(queryResults.getResultTable());
brokerResponse.setTablesQueried(tableNames);
// TODO: Add servers queried/responded stats
brokerResponse.setBrokerReduceTimeMs(queryResults.getBrokerReduceTimeMs());
// MSE cannot finish if a single queried server did not respond, so we can use the same count for
// both the queried and responded stats. Minus one prevents the broker to be included in the count
// (it will always be included because of the root of the query plan)
brokerResponse.setNumServersQueried(servers.size() - 1);
brokerResponse.setNumServersResponded(servers.size() - 1);

// Attach unavailable segments
int numUnavailableSegments = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,16 @@ public void testConcurrentQueries() {
executorService.shutdownNow();
}

@Test
public void testNumServersQueried() throws Exception {
String query = "select * from mytable limit 10";
JsonNode jsonNode = postQuery(query);
JsonNode numServersQueried = jsonNode.get("numServersQueried");
assertNotNull(numServersQueried);
assertTrue(numServersQueried.isInt());
assertTrue(numServersQueried.asInt() > 0);
}

private void checkQueryResultForDBTest(String column, String tableName)
throws Exception {
checkQueryResultForDBTest(column, tableName, null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.routing.QueryServerInstance;
Expand Down Expand Up @@ -112,4 +113,8 @@ public void setServerInstanceToWorkerIdMap(Map<QueryServerInstance, List<Integer
_serverInstanceToWorkerIdMap.clear();
_serverInstanceToWorkerIdMap.putAll(serverInstanceToWorkerIdMap);
}

public Set<QueryServerInstance> getServerInstances() {
return _serverInstanceToWorkerIdMap.keySet();
}
}

0 comments on commit f845a97

Please sign in to comment.