Skip to content

Commit

Permalink
pre build a queryable server set to reduce ZK acess
Browse files Browse the repository at this point in the history
  • Loading branch information
mqliang committed Nov 26, 2024
1 parent d77cc4e commit bf098d3
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ protected Context preprocess(Properties periodicTaskProperties) {
context._logDisabledTables = true;
_lastDisabledTableLogTimestamp = now;
}

// Read ZK once to build a set of queryable server instances
for (InstanceConfig instanceConfig : _pinotHelixResourceManager.getAllServerInstanceConfigs()) {
ZNRecord record = instanceConfig.getRecord();
boolean queriesDisabled = Boolean.valueOf(record.getSimpleField("queriesDisabled"));
boolean shutdownInProgress = Boolean.valueOf(record.getSimpleField("shutdownInProgress"));
if (!queriesDisabled && !shutdownInProgress) {
context._queryableServers.add(instanceConfig.getInstanceName());
}
}

return context;
}

Expand Down Expand Up @@ -341,7 +352,7 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
for (Map.Entry<String, String> entry : stateMap.entrySet()) {
String serverInstanceId = entry.getKey();
String state = entry.getValue();
if (isServerQueryable(serverInstanceId, context)
if (context._queryableServers.contains(serverInstanceId)
&& (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) {
numEVReplicasUp++;
}
Expand Down Expand Up @@ -430,28 +441,6 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}
}

private boolean isServerQueryable(String instanceId, Context context) {
if (context._serverQueryableMap.containsKey(instanceId)) {
return context._serverQueryableMap.get(instanceId);
}
InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId);
// Instance not found in Helix cluster or not Server we assume it is down
if (instanceConfig == null || !instanceConfig.getInstanceName().startsWith("Server_")) {
LOGGER.warn("Instance {} not found in Helix cluster, or not a server instance", instanceId);
LOGGER.warn("will assume instance is down!");
context._serverQueryableMap.put(instanceId, false);
return false;
}

ZNRecord record = instanceConfig.getRecord();
boolean queriesDisabled = Boolean.valueOf(record.getSimpleField("queriesDisabled"));
boolean shutdownInProgress = Boolean.valueOf(record.getSimpleField("shutdownInProgress"));
boolean queryable = !queriesDisabled && !shutdownInProgress;

context._serverQueryableMap.put(instanceId, queryable);
return queryable;
}

private static String logSegments(List<?> segments) {
if (segments.size() <= MAX_SEGMENTS_TO_LOG) {
return segments.toString();
Expand Down Expand Up @@ -499,6 +488,6 @@ public static final class Context {
private final Set<String> _processedTables = new HashSet<>();
private final Set<String> _disabledTables = new HashSet<>();
private final Set<String> _pausedTables = new HashSet<>();
private final Map<String, Boolean> _serverQueryableMap = new HashMap<>();
private final Set<String> _queryableServers = new HashSet<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,11 @@ public List<InstanceConfig> getAllControllerInstanceConfigs() {
.filter(instance -> InstanceTypeUtils.isController(instance.getId())).collect(Collectors.toList());
}

public List<InstanceConfig> getAllServerInstanceConfigs() {
return HelixHelper.getInstanceConfigs(_helixZkManager).stream()
.filter(instance -> InstanceTypeUtils.isServer(instance.getId())).collect(Collectors.toList());
}

public List<InstanceConfig> getAllMinionInstanceConfigs() {
return HelixHelper.getInstanceConfigs(_helixZkManager).stream()
.filter(instance -> InstanceTypeUtils.isMinion(instance.getId())).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,13 @@ public void offlineBasicTest() {
externalView.setState("myTable_4", "pinot1", "ONLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -218,7 +224,13 @@ public void realtimeBasicTest() {
externalView.setState(seg3, "pinot3", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -284,7 +296,13 @@ public void realtimeMutableSegmentHasLessReplicaTest() {
externalView.setState(seg3, "pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -350,15 +368,13 @@ public void realtimeServerNotQueryableTest() {
externalView.setState(seg3, "Server_pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig("Server_pinot1"))
.thenReturn(newQueryDisabledInstanceConfig("Server_pinot1"));
when(resourceManager.getHelixInstanceConfig("Server_pinot2"))
.thenReturn(newShutdownInProgress("Server_pinot2"));
when(resourceManager.getHelixInstanceConfig("Server_pinot3"))
.thenReturn(new InstanceConfig("Server_pinot3"));
when(resourceManager.getHelixInstanceConfig("Server_pinot4"))
.thenReturn(new InstanceConfig("Server_pinot4"));

when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
newQueryDisabledInstanceConfig("Server_pinot1"),
newShutdownInProgressInstanceConfig("Server_pinot2"),
new InstanceConfig("Server_pinot3"),
new InstanceConfig("Server_pinot4"))
);
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -387,7 +403,7 @@ private InstanceConfig newQueryDisabledInstanceConfig(String instanceName) {
return new InstanceConfig(znRecord);
}

private InstanceConfig newShutdownInProgress(String instanceName) {
private InstanceConfig newShutdownInProgressInstanceConfig(String instanceName) {
ZNRecord znRecord = new ZNRecord(instanceName);
znRecord.setSimpleField("shutdownInProgress", "true");
return new InstanceConfig(znRecord);
Expand Down Expand Up @@ -436,7 +452,13 @@ public void realtimeImmutableSegmentHasLessReplicaTest() {
externalView.setState(seg3, "pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -503,7 +525,13 @@ public void missingEVPartitionTest() {
externalView.setState("myTable_1", "pinot2", "ONLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
Expand Down Expand Up @@ -594,7 +622,13 @@ public void missingEVPartitionPushTest() {
externalView.setState("myTable_2", "pinot1", "ONLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
Expand Down Expand Up @@ -737,7 +771,13 @@ public void lessThanOnePercentSegmentsUnavailableTest() {
}

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
Expand Down

0 comments on commit bf098d3

Please sign in to comment.