Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mqliang committed Dec 4, 2024
1 parent 57e8bdc commit 24660fb
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.util.ServerInfoFetcher;
import org.apache.pinot.controller.util.ServerInfoFetcher.ServerInfo;
import org.apache.pinot.controller.util.ServerQueryInfoFetcher;
import org.apache.pinot.controller.util.ServerQueryInfoFetcher.ServerQueryInfo;
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -82,8 +82,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh

private long _lastDisabledTableLogTimestamp = 0;

private ServerInfoFetcher _serverInfoFetcher;

/**
* Constructs the segment status checker.
* @param pinotHelixResourceManager The resource checker used to interact with Helix
Expand All @@ -95,8 +93,6 @@ public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager,
super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
controllerMetrics);

_serverInfoFetcher = new ServerInfoFetcher(pinotHelixResourceManager);
_waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
_tableSizeReader = tableSizeReader;
}
Expand Down Expand Up @@ -214,6 +210,8 @@ private void updateTableSizeMetrics(String tableNameWithType)
private void updateSegmentMetrics(String tableNameWithType, TableConfig tableConfig, Context context) {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);

ServerQueryInfoFetcher serverQueryInfoFetcher = new ServerQueryInfoFetcher(_pinotHelixResourceManager);

IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);

if (idealState == null) {
Expand Down Expand Up @@ -345,7 +343,7 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
for (Map.Entry<String, String> entry : stateMap.entrySet()) {
String serverInstanceId = entry.getKey();
String state = entry.getValue();
if (isServerQueryable(serverInstanceId)
if (isServerQueryable(serverQueryInfoFetcher, serverInstanceId)
&& (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) {
numEVReplicasUp++;
}
Expand Down Expand Up @@ -437,8 +435,8 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}
}

private boolean isServerQueryable(String serverInstanceId) {
ServerInfo serverInfo = _serverInfoFetcher.getServerInfo(serverInstanceId);
private boolean isServerQueryable(ServerQueryInfoFetcher serverQueryStateFetcher, String serverInstanceId) {
ServerQueryInfo serverInfo = serverQueryStateFetcher.getServerQueryInfo(serverInstanceId);
return serverInfo != null
&& serverInfo.isHelixEnabled()
&& !serverInfo.isQueriesDisabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@
* This is a helper class that fetch server information from Helix/ZK. It caches the server information to avoid
* repeated ZK access. This class is NOT thread-safe.
*/
public class ServerInfoFetcher {
public class ServerQueryInfoFetcher {
private PinotHelixResourceManager _pinotHelixResourceManager;
private Map<String, ServerInfo> _serverInfoCache;
private Map<String, ServerQueryInfo> _cache;

public ServerInfoFetcher(PinotHelixResourceManager pinotHelixResourceManager) {
public ServerQueryInfoFetcher(PinotHelixResourceManager pinotHelixResourceManager) {
_pinotHelixResourceManager = pinotHelixResourceManager;
_serverInfoCache = new HashMap<>();
_cache = new HashMap<>();
}

public ServerInfo getServerInfo(String instanceId) {
return _serverInfoCache.computeIfAbsent(instanceId, this::getServerInfoOndemand);
public ServerQueryInfo getServerQueryInfo(String instanceId) {
return _cache.computeIfAbsent(instanceId, this::getServerQueryInfoOndemand);
}

private ServerInfo getServerInfoOndemand(String instanceId) {
private ServerQueryInfo getServerQueryInfoOndemand(String instanceId) {
InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId);
if (instanceConfig == null || !InstanceTypeUtils.isServer(instanceId)) {
return null;
Expand All @@ -56,17 +56,17 @@ private ServerInfo getServerInfoOndemand(String instanceId) {
InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), false);
boolean queriesDisabled = record.getBooleanField(CommonConstants.Helix.QUERIES_DISABLED, false);
boolean shutdownInProgress = record.getBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, false);
return new ServerInfo(instanceId, tags, null, helixEnabled, queriesDisabled, shutdownInProgress);
return new ServerQueryInfo(instanceId, tags, null, helixEnabled, queriesDisabled, shutdownInProgress);
}

public static class ServerInfo {
public static class ServerQueryInfo {
private String _instanceName;
private List<String> _tags;
private List<String> _tables;
private boolean _helixEnabled;
private boolean _queriesDisabled;
private boolean _shutdownInProgress;
private ServerInfo(String instanceName,
private ServerQueryInfo(String instanceName,
List<String> tags,
List<String> tables,
boolean helixEnabled,
Expand Down

0 comments on commit 24660fb

Please sign in to comment.