Skip to content

Commit

Permalink
refactor code by adding ServerInfoCache
Browse files Browse the repository at this point in the history
  • Loading branch information
mqliang committed Nov 26, 2024
1 parent d5d5973 commit 7ef229c
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
Expand All @@ -48,12 +47,13 @@
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.util.ServerInfoCache;
import org.apache.pinot.controller.util.ServerInfoCache.ServerInfo;
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
Expand Down Expand Up @@ -82,6 +82,8 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh

private long _lastDisabledTableLogTimestamp = 0;

private ServerInfoCache _serverInfoCache;

/**
* Constructs the segment status checker.
* @param pinotHelixResourceManager The resource checker used to interact with Helix
Expand All @@ -94,6 +96,7 @@ public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager,
config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
controllerMetrics);

_serverInfoCache = new ServerInfoCache(pinotHelixResourceManager);
_waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
_tableSizeReader = tableSizeReader;
}
Expand All @@ -111,19 +114,6 @@ protected Context preprocess(Properties periodicTaskProperties) {
context._logDisabledTables = true;
_lastDisabledTableLogTimestamp = now;
}

// Read ZK once to build a set of queryable server instances
for (InstanceConfig instanceConfig : _pinotHelixResourceManager.getAllServerInstanceConfigs()) {
ZNRecord record = instanceConfig.getRecord();
boolean helixEnabled = record.getBooleanField(
InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), false);
boolean queriesDisabled = record.getBooleanField(Helix.QUERIES_DISABLED, false);
boolean shutdownInProgress = record.getBooleanField(Helix.IS_SHUTDOWN_IN_PROGRESS, false);
if (helixEnabled && !queriesDisabled && !shutdownInProgress) {
context._queryableServers.add(instanceConfig.getInstanceName());
}
}

return context;
}

Expand Down Expand Up @@ -355,8 +345,8 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
for (Map.Entry<String, String> entry : stateMap.entrySet()) {
String serverInstanceId = entry.getKey();
String state = entry.getValue();
if (context._queryableServers.contains(serverInstanceId)
&& (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) {
if (isServerQueryable(serverInstanceId)
&& (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) {
numEVReplicasUp++;
}
if (state.equals(SegmentStateModel.ERROR)) {
Expand Down Expand Up @@ -444,6 +434,14 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}
}

private boolean isServerQueryable(String serverInstanceId) {
ServerInfo serverInfo = _serverInfoCache.getServerInfo(serverInstanceId);
return serverInfo != null
&& serverInfo.isHelixEnabled()
&& !serverInfo.isQueriesDisabled()
&& !serverInfo.isShutdownInProgress();
}

private static String logSegments(List<?> segments) {
if (segments.size() <= MAX_SEGMENTS_TO_LOG) {
return segments.toString();
Expand Down Expand Up @@ -491,6 +489,5 @@ public static final class Context {
private final Set<String> _processedTables = new HashSet<>();
private final Set<String> _disabledTables = new HashSet<>();
private final Set<String> _pausedTables = new HashSet<>();
private final Set<String> _queryableServers = new HashSet<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -515,11 +515,6 @@ public List<InstanceConfig> getAllControllerInstanceConfigs() {
.filter(instance -> InstanceTypeUtils.isController(instance.getId())).collect(Collectors.toList());
}

public List<InstanceConfig> getAllServerInstanceConfigs() {
return HelixHelper.getInstanceConfigs(_helixZkManager).stream()
.filter(instance -> InstanceTypeUtils.isServer(instance.getId())).collect(Collectors.toList());
}

public List<InstanceConfig> getAllMinionInstanceConfigs() {
return HelixHelper.getInstanceConfigs(_helixZkManager).stream()
.filter(instance -> InstanceTypeUtils.isMinion(instance.getId())).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.util;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.InstanceTypeUtils;


/**
* This is a helper class maintaining a cache of server information. It is used to avoid repeated calls to Helix to
* get server information. This class is not thread safe.
*/
public class ServerInfoCache {
private PinotHelixResourceManager _pinotHelixResourceManager;
private Map<String, ServerInfo> _serverInfoMap;

public ServerInfoCache(PinotHelixResourceManager pinotHelixResourceManager) {
_pinotHelixResourceManager = pinotHelixResourceManager;
_serverInfoMap = new HashMap<>();
}

public ServerInfo getServerInfo(String instanceId) {
return _serverInfoMap.computeIfAbsent(instanceId, this::getServerInfoOndemand);
}

private ServerInfo getServerInfoOndemand(String instanceId) {
InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId);
if (instanceConfig == null || !InstanceTypeUtils.isServer(instanceId)) {
return null;
}
List<String> tags = instanceConfig.getTags();
ZNRecord record = instanceConfig.getRecord();
boolean helixEnabled = record.getBooleanField(
InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), false);
boolean queriesDisabled = record.getBooleanField(CommonConstants.Helix.QUERIES_DISABLED, false);
boolean shutdownInProgress = record.getBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, false);
return new ServerInfo(instanceId, tags, null, helixEnabled, queriesDisabled, shutdownInProgress);
}

public static class ServerInfo {
private String _instanceName;
private List<String> _tags;
private List<String> _tables;
private boolean _helixEnabled;
private boolean _queriesDisabled;
private boolean _shutdownInProgress;
private ServerInfo(String instanceName,
List<String> tags,
List<String> tables,
boolean helixEnabled,
boolean queriesDisabled,
boolean shutdownInProgress) {
_instanceName = instanceName;
_tags = tags;
_tables = tables;
_helixEnabled = helixEnabled;
_queriesDisabled = queriesDisabled;
_shutdownInProgress = shutdownInProgress;
}

public String getInstanceName() {
return _instanceName;
}

public void setInstanceName(String instanceName) {
_instanceName = instanceName;
}

public List<String> getTags() {
return _tags;
}

public void setTags(List<String> tags) {
_tags = tags;
}

public List<String> getTables() {
return _tables;
}

public void setTables(List<String> tables) {
_tables = tables;
}

public boolean isHelixEnabled() {
return _helixEnabled;
}

public void setHelixEnabled(boolean helixEnabled) {
_helixEnabled = helixEnabled;
}

public boolean isQueriesDisabled() {
return _queriesDisabled;
}

public void setQueriesDisabled(boolean queriesDisabled) {
_queriesDisabled = queriesDisabled;
}

public boolean isShutdownInProgress() {
return _shutdownInProgress;
}

public void setShutdownInProgress(boolean shutdownInProgress) {
_shutdownInProgress = shutdownInProgress;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,7 @@ public void offlineBasicTest() {
externalView.setState("myTable_4", "pinot1", "ONLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
newQuerableInstanceConfig("pinot1"),
newQuerableInstanceConfig("pinot2"),
newQuerableInstanceConfig("pinot3")
)
);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any"));
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -224,12 +218,7 @@ public void realtimeBasicTest() {
externalView.setState(seg3, "pinot3", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
newQuerableInstanceConfig("pinot1"),
newQuerableInstanceConfig("pinot2"),
newQuerableInstanceConfig("pinot3"))
);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any"));
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -295,14 +284,7 @@ public void realtimeMutableSegmentHasLessReplicaTest() {
externalView.setState(seg3, "pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
newQuerableInstanceConfig("pinot1"),
newQuerableInstanceConfig("pinot2"),
newQuerableInstanceConfig("pinot3"),
newQuerableInstanceConfig("pinot4")
)
);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any"));
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -368,14 +350,14 @@ public void realtimeServerNotQueryableTest() {
externalView.setState(seg3, "Server_pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
newQueryDisabledInstanceConfig("Server_pinot1"),
newShutdownInProgressInstanceConfig("Server_pinot2"),
newQuerableInstanceConfig("Server_pinot3"),
newQuerableInstanceConfig("Server_pinot4")
)
);
when(resourceManager.getHelixInstanceConfig("Server_pinot1")).
thenReturn(newQueryDisabledInstanceConfig("Server_pinot1"));
when(resourceManager.getHelixInstanceConfig("Server_pinot2")).
thenReturn(newShutdownInProgressInstanceConfig("Server_pinot2"));
when(resourceManager.getHelixInstanceConfig("Server_pinot3")).
thenReturn(newQuerableInstanceConfig("Server_pinot3"));
when(resourceManager.getHelixInstanceConfig("Server_pinot4")).
thenReturn(newQuerableInstanceConfig("Server_pinot4"));
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -461,14 +443,7 @@ public void realtimeImmutableSegmentHasLessReplicaTest() {
externalView.setState(seg3, "pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
newQuerableInstanceConfig("pinot1"),
newQuerableInstanceConfig("pinot2"),
newQuerableInstanceConfig("pinot3"),
newQuerableInstanceConfig("pinot4")
)
);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any"));
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -535,13 +510,7 @@ public void missingEVPartitionTest() {
externalView.setState("myTable_1", "pinot2", "ONLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
newQuerableInstanceConfig("pinot1"),
newQuerableInstanceConfig("pinot2"),
newQuerableInstanceConfig("pinot3")
)
);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any"));
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
Expand Down Expand Up @@ -632,12 +601,7 @@ public void missingEVPartitionPushTest() {
externalView.setState("myTable_2", "pinot1", "ONLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
newQuerableInstanceConfig("pinot1"),
newQuerableInstanceConfig("pinot2")
)
);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any"));
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
Expand Down Expand Up @@ -780,8 +744,7 @@ public void lessThanOnePercentSegmentsUnavailableTest() {
}

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(newQuerableInstanceConfig("pinot1")));
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any"));
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
Expand Down

0 comments on commit 7ef229c

Please sign in to comment.