Skip to content

Commit

Permalink
Add ServerRouteInfo to replace pairs maintaining the same info. (#14921)
Browse files Browse the repository at this point in the history
  • Loading branch information
vrajat authored Jan 28, 2025
1 parent da5e0e7 commit 1edffab
Show file tree
Hide file tree
Showing 15 changed files with 134 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
Expand All @@ -56,6 +55,7 @@
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.ServerRouteInfo;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
Expand Down Expand Up @@ -157,11 +157,11 @@ public Map<String, Map<ServerInstance, List<String>>> getRoutingTable(
@ApiResponse(code = 404, message = "Routing not found"),
@ApiResponse(code = 500, message = "Internal server error")
})
public Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> getRoutingTableWithOptionalSegments(
public Map<String, Map<ServerInstance, ServerRouteInfo>> getRoutingTableWithOptionalSegments(
@ApiParam(value = "Name of the table") @PathParam("tableName") String tableName,
@Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> result = new TreeMap<>();
Map<String, Map<ServerInstance, ServerRouteInfo>> result = new TreeMap<>();
getRoutingTable(tableName, (tableNameWithType, routingTable) -> result.put(tableNameWithType,
routingTable.getServerInstanceToSegmentsMap()));
if (!result.isEmpty()) {
Expand Down Expand Up @@ -192,9 +192,9 @@ private void getRoutingTable(String tableName, BiConsumer<String, RoutingTable>
}

private static Map<ServerInstance, List<String>> removeOptionalSegments(
Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap) {
Map<ServerInstance, ServerRouteInfo> serverInstanceToSegmentsMap) {
Map<ServerInstance, List<String>> ret = new HashMap<>();
serverInstanceToSegmentsMap.forEach((k, v) -> ret.put(k, v.getLeft()));
serverInstanceToSegmentsMap.forEach((k, v) -> ret.put(k, v.getSegments()));
return ret;
}

Expand Down Expand Up @@ -231,7 +231,7 @@ public Map<ServerInstance, List<String>> getRoutingTableForQuery(
@ApiResponse(code = 404, message = "Routing not found"),
@ApiResponse(code = 500, message = "Internal server error")
})
public Map<ServerInstance, Pair<List<String>, List<String>>> getRoutingTableForQueryWithOptionalSegments(
public Map<ServerInstance, ServerRouteInfo> getRoutingTableForQueryWithOptionalSegments(
@ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query,
@Context HttpHeaders httpHeaders) {
BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.query.optimizer.QueryOptimizer;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.ServerRouteInfo;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.util.GapfillUtils;
Expand Down Expand Up @@ -617,8 +618,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
// Calculate routing table for the query
// TODO: Modify RoutingManager interface to directly take PinotQuery
long routingStartTimeNs = System.nanoTime();
Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable = null;
Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable = null;
Map<ServerInstance, ServerRouteInfo> offlineRoutingTable = null;
Map<ServerInstance, ServerRouteInfo> realtimeRoutingTable = null;
List<String> unavailableSegments = new ArrayList<>();
int numPrunedSegmentsTotal = 0;
boolean offlineTableDisabled = false;
Expand All @@ -633,7 +634,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap =
Map<ServerInstance, ServerRouteInfo> serverInstanceToSegmentsMap =
routingTable.getServerInstanceToSegmentsMap();
if (!serverInstanceToSegmentsMap.isEmpty()) {
offlineRoutingTable = serverInstanceToSegmentsMap;
Expand All @@ -654,7 +655,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap =
Map<ServerInstance, ServerRouteInfo> serverInstanceToSegmentsMap =
routingTable.getServerInstanceToSegmentsMap();
if (!serverInstanceToSegmentsMap.isEmpty()) {
realtimeRoutingTable = serverInstanceToSegmentsMap;
Expand Down Expand Up @@ -1874,9 +1875,9 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t
*/
protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
@Nullable Map<ServerInstance, ServerRouteInfo> offlineRoutingTable,
@Nullable BrokerRequest realtimeBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
@Nullable Map<ServerInstance, ServerRouteInfo> realtimeRoutingTable, long timeoutMs,
ServerStats serverStats, RequestContext requestContext)
throws Exception;

Expand Down Expand Up @@ -1906,8 +1907,8 @@ private static class QueryServers {
final String _query;
final Set<ServerInstance> _servers = new HashSet<>();

QueryServers(String query, @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable) {
QueryServers(String query, @Nullable Map<ServerInstance, ServerRouteInfo> offlineRoutingTable,
@Nullable Map<ServerInstance, ServerRouteInfo> realtimeRoutingTable) {
_query = query;
if (offlineRoutingTable != null) {
_servers.addAll(offlineRoutingTable.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
Expand All @@ -38,6 +37,7 @@
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
import org.apache.pinot.core.query.reduce.StreamingReduceService;
import org.apache.pinot.core.routing.ServerRouteInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -76,9 +76,9 @@ public void shutDown() {
@Override
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
@Nullable Map<ServerInstance, ServerRouteInfo> offlineRoutingTable,
@Nullable BrokerRequest realtimeBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
@Nullable Map<ServerInstance, ServerRouteInfo> realtimeRoutingTable, long timeoutMs,
ServerStats serverStats, RequestContext requestContext)
throws Exception {
// TODO: Support failure detection
Expand Down Expand Up @@ -106,12 +106,12 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
* Query pinot server for data table.
*/
private void sendRequest(long requestId, TableType tableType, BrokerRequest brokerRequest,
Map<ServerInstance, Pair<List<String>, List<String>>> routingTable,
Map<ServerInstance, ServerRouteInfo> routingTable,
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap, boolean trace) {
for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> routingEntry : routingTable.entrySet()) {
for (Map.Entry<ServerInstance, ServerRouteInfo> routingEntry : routingTable.entrySet()) {
ServerInstance serverInstance = routingEntry.getKey();
// TODO: support optional segments for GrpcQueryServer.
List<String> segments = routingEntry.getValue().getLeft();
List<String> segments = routingEntry.getValue().getSegments();
String serverHost = serverInstance.getHostname();
int port = serverInstance.getGrpcPort();
// TODO: enable throttling on per host bases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.failuredetector.FailureDetector;
import org.apache.pinot.broker.failuredetector.FailureDetectorFactory;
Expand All @@ -43,6 +42,7 @@
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.query.reduce.BrokerReduceService;
import org.apache.pinot.core.routing.ServerRouteInfo;
import org.apache.pinot.core.transport.AsyncQueryResponse;
import org.apache.pinot.core.transport.QueryResponse;
import org.apache.pinot.core.transport.QueryRouter;
Expand Down Expand Up @@ -99,9 +99,9 @@ public void shutDown() {
@Override
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
@Nullable Map<ServerInstance, ServerRouteInfo> offlineRoutingTable,
@Nullable BrokerRequest realtimeBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
@Nullable Map<ServerInstance, ServerRouteInfo> realtimeRoutingTable, long timeoutMs,
ServerStats serverStats, RequestContext requestContext)
throws Exception {
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.ServerRouteInfo;
import org.apache.pinot.core.routing.TablePartitionInfo;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
Expand Down Expand Up @@ -635,15 +636,15 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments());
}

private Map<ServerInstance, Pair<List<String>, List<String>>> getServerInstanceToSegmentsMap(String tableNameWithType,
private Map<ServerInstance, ServerRouteInfo> getServerInstanceToSegmentsMap(String tableNameWithType,
InstanceSelector.SelectionResult selectionResult) {
Map<ServerInstance, Pair<List<String>, List<String>>> merged = new HashMap<>();
Map<ServerInstance, ServerRouteInfo> merged = new HashMap<>();
for (Map.Entry<String, String> entry : selectionResult.getSegmentToInstanceMap().entrySet()) {
ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue());
if (serverInstance != null) {
Pair<List<String>, List<String>> pair =
merged.computeIfAbsent(serverInstance, k -> Pair.of(new ArrayList<>(), new ArrayList<>()));
pair.getLeft().add(entry.getKey());
ServerRouteInfo serverRouteInfoInfo =
merged.computeIfAbsent(serverInstance, k -> new ServerRouteInfo(new ArrayList<>(), new ArrayList<>()));
serverRouteInfoInfo.getSegments().add(entry.getKey());
} else {
// Should not happen in normal case unless encountered unexpected exception when updating routing entries
_brokerMetrics.addMeteredTableValue(tableNameWithType, BrokerMeter.SERVER_MISSING_FOR_ROUTING, 1L);
Expand All @@ -652,12 +653,12 @@ private Map<ServerInstance, Pair<List<String>, List<String>>> getServerInstanceT
for (Map.Entry<String, String> entry : selectionResult.getOptionalSegmentToInstanceMap().entrySet()) {
ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue());
if (serverInstance != null) {
Pair<List<String>, List<String>> pair = merged.get(serverInstance);
ServerRouteInfo serverRouteInfo = merged.get(serverInstance);
// Skip servers that don't have non-optional segments, so that servers always get some non-optional segments
// to process, to be backward compatible.
// TODO: allow servers only with optional segments
if (pair != null) {
pair.getRight().add(entry.getKey());
if (serverRouteInfo != null) {
serverRouteInfo.getOptionalSegments().add(entry.getKey());
}
}
// TODO: Report missing server metrics when we allow servers only with optional segments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void testResourceAndTagAssignment()
RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest, 0);
assertNotNull(routingTable);
assertEquals(routingTable.getServerInstanceToSegmentsMap().size(), NUM_SERVERS);
assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().getLeft().size(),
assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().getSegments().size(),
NUM_OFFLINE_SEGMENTS);
assertTrue(routingTable.getUnavailableSegments().isEmpty());

Expand All @@ -182,7 +182,7 @@ public void testResourceAndTagAssignment()

TestUtils.waitForCondition(aVoid ->
routingManager.getRoutingTable(brokerRequest, 0).getServerInstanceToSegmentsMap().values().iterator().next()
.getLeft().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L,
.getSegments().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L,
"Failed to add the new segment " + "into the routing table");

// Add a new table with different broker tenant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.broker.broker.AllowAllAccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
Expand All @@ -37,6 +36,7 @@
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.ServerRouteInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
Expand Down Expand Up @@ -167,8 +167,8 @@ public void testCancelQuery() {
when(routingManager.routingExists(tableName)).thenReturn(true);
when(routingManager.getQueryTimeoutMs(tableName)).thenReturn(10000L);
RoutingTable rt = mock(RoutingTable.class);
when(rt.getServerInstanceToSegmentsMap()).thenReturn(
Map.of(new ServerInstance(new InstanceConfig("server01_9000")), Pair.of(List.of("segment01"), List.of())));
when(rt.getServerInstanceToSegmentsMap()).thenReturn(Map.of(new ServerInstance(new InstanceConfig("server01_9000")),
new ServerRouteInfo(List.of("segment01"), List.of())));
when(routingManager.getRoutingTable(any(), Mockito.anyLong())).thenReturn(rt);
QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
when(queryQuotaManager.acquire(anyString())).thenReturn(true);
Expand All @@ -194,9 +194,9 @@ public void shutDown() {
@Override
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
@Nullable Map<ServerInstance, ServerRouteInfo> offlineRoutingTable,
@Nullable BrokerRequest realtimeBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
@Nullable Map<ServerInstance, ServerRouteInfo> realtimeRoutingTable, long timeoutMs,
ServerStats serverStats, RequestContext requestContext)
throws Exception {
testRequestId[0] = requestId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@
*/
package org.apache.pinot.connector.spark.common.reader

import org.apache.commons.lang3.tuple.Pair
import org.apache.helix.model.InstanceConfig
import org.apache.pinot.common.datatable.DataTable
import org.apache.pinot.common.metrics.BrokerMetrics
import org.apache.pinot.common.request.BrokerRequest
import org.apache.pinot.connector.spark.common.partition.PinotSplit
import org.apache.pinot.connector.spark.common.{Logging, PinotDataSourceReadOptions, PinotException}
import org.apache.pinot.core.routing.ServerRouteInfo
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager
import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, ServerInstance}
import org.apache.pinot.spi.config.table.TableType
import org.apache.pinot.spi.env.PinotConfiguration
import org.apache.pinot.spi.metrics.PinotMetricUtils
import org.apache.pinot.sql.parsers.CalciteSqlCompiler

import java.util.{Collections, List => JList, Map => JMap}
import java.util.{Map => JMap}
import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -93,23 +93,23 @@ private[reader] class PinotServerDataFetcher(
dataTables.filter(_.getNumberOfRows > 0)
}

private def createRoutingTableForRequest(): JMap[ServerInstance, Pair[JList[String], JList[String]]] = {
private def createRoutingTableForRequest(): JMap[ServerInstance, ServerRouteInfo] = {
val nullZkId: String = null
val instanceConfig = new InstanceConfig(nullZkId)
instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost)
instanceConfig.setPort(pinotSplit.serverAndSegments.serverPort)
// TODO: support netty-sec
val serverInstance = new ServerInstance(instanceConfig)
Map(
serverInstance -> Pair.of(pinotSplit.serverAndSegments.segments.asJava, List[String]().asJava)
serverInstance -> new ServerRouteInfo(pinotSplit.serverAndSegments.segments.asJava, List[String]().asJava)
).asJava
}

private def submitRequestToPinotServer(
offlineBrokerRequest: BrokerRequest,
offlineRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]],
offlineRoutingTable: JMap[ServerInstance, ServerRouteInfo],
realtimeBrokerRequest: BrokerRequest,
realtimeRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]]): AsyncQueryResponse = {
realtimeRoutingTable: JMap[ServerInstance, ServerRouteInfo]): AsyncQueryResponse = {
logInfo(s"Sending request to ${pinotSplit.serverAndSegments.toString}")
queryRouter.submitQuery(
partitionId,
Expand Down
Loading

0 comments on commit 1edffab

Please sign in to comment.