Skip to content

Commit

Permalink
Enhance SegmentStatusChecker to honor no-queryable servers and instan…
Browse files Browse the repository at this point in the history
…ce assignment config (#14536)
  • Loading branch information
mqliang authored Jan 10, 2025
1 parent 6a8bdb5 commit 5dd6f8a
Show file tree
Hide file tree
Showing 3 changed files with 380 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.util.ServerQueryInfoFetcher;
import org.apache.pinot.controller.util.ServerQueryInfoFetcher.ServerQueryInfo;
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -92,7 +94,6 @@ public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager,
super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
controllerMetrics);

_waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
_tableSizeReader = tableSizeReader;
}
Expand Down Expand Up @@ -210,6 +211,8 @@ private void updateTableSizeMetrics(String tableNameWithType)
private void updateSegmentMetrics(String tableNameWithType, TableConfig tableConfig, Context context) {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);

ServerQueryInfoFetcher serverQueryInfoFetcher = new ServerQueryInfoFetcher(_pinotHelixResourceManager);

IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);

if (idealState == null) {
Expand Down Expand Up @@ -270,10 +273,12 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon

ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType);

// Maximum number of replicas in ideal state
int maxISReplicas = Integer.MIN_VALUE;
// Minimum number of replicas in external view
int minEVReplicas = Integer.MAX_VALUE;
// Maximum number of replicas that is up (ONLINE/CONSUMING) in ideal state
int maxISReplicasUp = Integer.MIN_VALUE;
// Minimum number of replicas that is up (ONLINE/CONSUMING) in external view
int minEVReplicasUp = Integer.MAX_VALUE;
// Minimum percentage of replicas that is up (ONLINE/CONSUMING) in external view
int minEVReplicasUpPercent = 100;
// Total compressed segment size in deep store
long tableCompressedSize = 0;
// Segments without ZK metadata
Expand All @@ -287,18 +292,19 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
List<String> segmentsInvalidStartTime = new ArrayList<>();
List<String> segmentsInvalidEndTime = new ArrayList<>();
for (String segment : segments) {
int numISReplicas = 0;
// Number of replicas in ideal state that is in ONLINE/CONSUMING state
int numISReplicasUp = 0;
for (Map.Entry<String, String> entry : idealState.getInstanceStateMap(segment).entrySet()) {
String state = entry.getValue();
if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) {
numISReplicas++;
numISReplicasUp++;
}
}
// Skip segments not ONLINE/CONSUMING in ideal state
if (numISReplicas == 0) {
// Skip segments with no ONLINE/CONSUMING in ideal state
if (numISReplicasUp == 0) {
continue;
}
maxISReplicas = Math.max(maxISReplicas, numISReplicas);
maxISReplicasUp = Math.max(maxISReplicasUp, numISReplicasUp);

SegmentZKMetadata segmentZKMetadata = _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, segment);
// Skip the segment when it doesn't have ZK metadata. Most likely the segment is just deleted.
Expand Down Expand Up @@ -331,46 +337,49 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}
}

int numEVReplicas = 0;
int numEVReplicasUp = 0;
if (externalView != null) {
Map<String, String> stateMap = externalView.getStateMap(segment);
if (stateMap != null) {
for (Map.Entry<String, String> entry : stateMap.entrySet()) {
String state = entry.getValue();
if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) {
numEVReplicas++;
String serverInstanceId = entry.getKey();
String segmentState = entry.getValue();
if ((segmentState.equals(SegmentStateModel.ONLINE) || segmentState.equals(SegmentStateModel.CONSUMING))
&& isServerQueryable(serverQueryInfoFetcher.getServerQueryInfo(serverInstanceId))) {
numEVReplicasUp++;
}
if (state.equals(SegmentStateModel.ERROR)) {
if (segmentState.equals(SegmentStateModel.ERROR)) {
errorSegments.add(Pair.of(segment, entry.getKey()));
}
}
}
}
if (numEVReplicas == 0) {
if (numEVReplicasUp == 0) {
offlineSegments.add(segment);
} else if (numEVReplicas < numISReplicas) {
} else if (numEVReplicasUp < numISReplicasUp) {
partialOnlineSegments.add(segment);
} else {
// Do not allow nReplicasEV to be larger than nReplicasIS
numEVReplicas = numISReplicas;
// Do not allow numEVReplicasUp to be larger than numISReplicasUp
numEVReplicasUp = numISReplicasUp;
}
minEVReplicas = Math.min(minEVReplicas, numEVReplicas);

minEVReplicasUp = Math.min(minEVReplicasUp, numEVReplicasUp);
// Total number of replicas in ideal state (including ERROR/OFFLINE states)
int numISReplicasTotal = Math.max(idealState.getInstanceStateMap(segment).entrySet().size(), 1);
minEVReplicasUpPercent = Math.min(minEVReplicasUpPercent, numEVReplicasUp * 100 / numISReplicasTotal);
}

if (maxISReplicas == Integer.MIN_VALUE) {
if (maxISReplicasUp == Integer.MIN_VALUE) {
try {
maxISReplicas = Math.max(Integer.parseInt(idealState.getReplicas()), 1);
maxISReplicasUp = Math.max(Integer.parseInt(idealState.getReplicas()), 1);
} catch (NumberFormatException e) {
maxISReplicas = 1;
maxISReplicasUp = 1;
}
}
// Do not allow minEVReplicas to be larger than maxISReplicas
minEVReplicas = Math.min(minEVReplicas, maxISReplicas);

if (minEVReplicas < maxISReplicas) {
LOGGER.warn("Table {} has at least one segment running with only {} replicas, below replication threshold :{}",
tableNameWithType, minEVReplicas, maxISReplicas);
}
// Do not allow minEVReplicasUp to be larger than maxISReplicasUp
minEVReplicasUp = Math.min(minEVReplicasUp, maxISReplicasUp);

int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size();
if (numSegmentsWithoutZKMetadata > 0) {
LOGGER.warn("Table {} has {} segments without ZK metadata: {}", tableNameWithType, numSegmentsWithoutZKMetadata,
Expand Down Expand Up @@ -403,9 +412,9 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}

// Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicasUp);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS,
minEVReplicas * 100L / maxISReplicas);
minEVReplicasUpPercent);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE,
numErrorSegments);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
Expand All @@ -428,6 +437,13 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}
}

private boolean isServerQueryable(ServerQueryInfo serverInfo) {
return serverInfo != null
&& serverInfo.isHelixEnabled()
&& !serverInfo.isQueriesDisabled()
&& !serverInfo.isShutdownInProgress();
}

private static String logSegments(List<?> segments) {
if (segments.size() <= MAX_SEGMENTS_TO_LOG) {
return segments.toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.util;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.InstanceTypeUtils;


/**
* This is a helper class that fetch server information from Helix/ZK. It caches the server information to avoid
* repeated ZK access. This class is NOT thread-safe.
*/
public class ServerQueryInfoFetcher {
private final PinotHelixResourceManager _pinotHelixResourceManager;
private final Map<String, ServerQueryInfo> _cache;

public ServerQueryInfoFetcher(PinotHelixResourceManager pinotHelixResourceManager) {
_pinotHelixResourceManager = pinotHelixResourceManager;
_cache = new HashMap<>();
}

@Nullable
public ServerQueryInfo getServerQueryInfo(String instanceId) {
return _cache.computeIfAbsent(instanceId, this::getServerQueryInfoOndemand);
}

@Nullable
private ServerQueryInfo getServerQueryInfoOndemand(String instanceId) {
InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId);
if (instanceConfig == null || !InstanceTypeUtils.isServer(instanceId)) {
return null;
}
List<String> tags = instanceConfig.getTags();
ZNRecord record = instanceConfig.getRecord();
boolean helixEnabled = instanceConfig.getInstanceEnabled();
boolean queriesDisabled = record.getBooleanField(CommonConstants.Helix.QUERIES_DISABLED, false);
boolean shutdownInProgress = record.getBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, false);

return new ServerQueryInfo(instanceId, tags, null, helixEnabled, queriesDisabled, shutdownInProgress);
}

public static class ServerQueryInfo {
private final String _instanceName;
private final List<String> _tags;
private final List<String> _tables;
private final boolean _helixEnabled;
private final boolean _queriesDisabled;
private final boolean _shutdownInProgress;

private ServerQueryInfo(String instanceName, List<String> tags, List<String> tables, boolean helixEnabled,
boolean queriesDisabled, boolean shutdownInProgress) {
_instanceName = instanceName;
_tags = tags;
_tables = tables;
_helixEnabled = helixEnabled;
_queriesDisabled = queriesDisabled;
_shutdownInProgress = shutdownInProgress;
}

public boolean isHelixEnabled() {
return _helixEnabled;
}

public boolean isQueriesDisabled() {
return _queriesDisabled;
}

public boolean isShutdownInProgress() {
return _shutdownInProgress;
}
}
}
Loading

0 comments on commit 5dd6f8a

Please sign in to comment.