-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance SegmentStatusChecker to honor no-queryable servers and instance assignment config #14536
Changes from 10 commits
87308b4
1f10cbf
28df5a8
d77cc4e
bf098d3
d5d5973
7ef229c
eec01cc
57e8bdc
bbba882
d49e8ab
9007e0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,8 @@ | |
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; | ||
import org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder; | ||
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; | ||
import org.apache.pinot.controller.util.ServerQueryInfoFetcher; | ||
import org.apache.pinot.controller.util.ServerQueryInfoFetcher.ServerQueryInfo; | ||
import org.apache.pinot.controller.util.TableSizeReader; | ||
import org.apache.pinot.spi.config.table.TableConfig; | ||
import org.apache.pinot.spi.config.table.TableType; | ||
|
@@ -91,7 +93,6 @@ public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, | |
super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(), | ||
config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, | ||
controllerMetrics); | ||
|
||
_waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds(); | ||
_tableSizeReader = tableSizeReader; | ||
} | ||
|
@@ -209,6 +210,8 @@ private void updateTableSizeMetrics(String tableNameWithType) | |
private void updateSegmentMetrics(String tableNameWithType, TableConfig tableConfig, Context context) { | ||
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); | ||
|
||
ServerQueryInfoFetcher serverQueryInfoFetcher = new ServerQueryInfoFetcher(_pinotHelixResourceManager); | ||
|
||
IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType); | ||
|
||
if (idealState == null) { | ||
|
@@ -269,10 +272,12 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon | |
|
||
ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType); | ||
|
||
// Maximum number of replicas in ideal state | ||
int maxISReplicas = Integer.MIN_VALUE; | ||
// Minimum number of replicas in external view | ||
int minEVReplicas = Integer.MAX_VALUE; | ||
// Maximum number of replicas that is up (ONLINE/CONSUMING) in ideal state | ||
int maxISReplicasUp = Integer.MIN_VALUE; | ||
// Minimum number of replicas that is up (ONLINE/CONSUMING) in external view | ||
int minEVReplicasUp = Integer.MAX_VALUE; | ||
// Minimum percentage of replicas that is up (ONLINE/CONSUMING) in external view | ||
int minEVReplicasUpPercent = 100; | ||
// Total compressed segment size in deep store | ||
long tableCompressedSize = 0; | ||
// Segments without ZK metadata | ||
|
@@ -286,18 +291,19 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon | |
List<String> segmentsInvalidStartTime = new ArrayList<>(); | ||
List<String> segmentsInvalidEndTime = new ArrayList<>(); | ||
for (String segment : segments) { | ||
int numISReplicas = 0; | ||
// Number of replicas in ideal state that is in ONLINE/CONSUMING state | ||
int numISReplicasUp = 0; | ||
for (Map.Entry<String, String> entry : idealState.getInstanceStateMap(segment).entrySet()) { | ||
String state = entry.getValue(); | ||
if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) { | ||
numISReplicas++; | ||
numISReplicasUp++; | ||
} | ||
} | ||
// Skip segments not ONLINE/CONSUMING in ideal state | ||
if (numISReplicas == 0) { | ||
// Skip segments with no ONLINE/CONSUMING in ideal state | ||
if (numISReplicasUp == 0) { | ||
continue; | ||
} | ||
maxISReplicas = Math.max(maxISReplicas, numISReplicas); | ||
maxISReplicasUp = Math.max(maxISReplicasUp, numISReplicasUp); | ||
|
||
SegmentZKMetadata segmentZKMetadata = _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, segment); | ||
// Skip the segment when it doesn't have ZK metadata. Most likely the segment is just deleted. | ||
|
@@ -330,46 +336,49 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon | |
} | ||
} | ||
|
||
int numEVReplicas = 0; | ||
int numEVReplicasUp = 0; | ||
if (externalView != null) { | ||
Map<String, String> stateMap = externalView.getStateMap(segment); | ||
if (stateMap != null) { | ||
for (Map.Entry<String, String> entry : stateMap.entrySet()) { | ||
String state = entry.getValue(); | ||
if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) { | ||
numEVReplicas++; | ||
String serverInstanceId = entry.getKey(); | ||
String segmentState = entry.getValue(); | ||
if (isServerQueryable(serverQueryInfoFetcher.getServerQueryInfo(serverInstanceId)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check segment state first because the overhead for that is much smaller There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
&& (segmentState.equals(SegmentStateModel.ONLINE) || segmentState.equals(SegmentStateModel.CONSUMING))) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (totally optional) We can probably make this a set called acceptableStates and do acceptableStates.contains(segmentState) |
||
numEVReplicasUp++; | ||
} | ||
if (state.equals(SegmentStateModel.ERROR)) { | ||
if (segmentState.equals(SegmentStateModel.ERROR)) { | ||
errorSegments.add(Pair.of(segment, entry.getKey())); | ||
} | ||
} | ||
} | ||
} | ||
if (numEVReplicas == 0) { | ||
if (numEVReplicasUp == 0) { | ||
offlineSegments.add(segment); | ||
} else if (numEVReplicas < numISReplicas) { | ||
} else if (numEVReplicasUp < numISReplicasUp) { | ||
partialOnlineSegments.add(segment); | ||
} else { | ||
// Do not allow nReplicasEV to be larger than nReplicasIS | ||
numEVReplicas = numISReplicas; | ||
// Do not allow numEVReplicasUp to be larger than numISReplicasUp | ||
numEVReplicasUp = numISReplicasUp; | ||
} | ||
minEVReplicas = Math.min(minEVReplicas, numEVReplicas); | ||
|
||
minEVReplicasUp = Math.min(minEVReplicasUp, numEVReplicasUp); | ||
// Total number of replicas in ideal state (including ERROR/OFFLINE states) | ||
int numISReplicasTotal = Math.max(idealState.getInstanceStateMap(segment).entrySet().size(), 1); | ||
minEVReplicasUpPercent = Math.min(minEVReplicasUpPercent, numEVReplicasUp * 100 / numISReplicasTotal); | ||
} | ||
|
||
if (maxISReplicas == Integer.MIN_VALUE) { | ||
if (maxISReplicasUp == Integer.MIN_VALUE) { | ||
try { | ||
maxISReplicas = Math.max(Integer.parseInt(idealState.getReplicas()), 1); | ||
maxISReplicasUp = Math.max(Integer.parseInt(idealState.getReplicas()), 1); | ||
} catch (NumberFormatException e) { | ||
maxISReplicas = 1; | ||
maxISReplicasUp = 1; | ||
} | ||
} | ||
// Do not allow minEVReplicas to be larger than maxISReplicas | ||
minEVReplicas = Math.min(minEVReplicas, maxISReplicas); | ||
|
||
if (minEVReplicas < maxISReplicas) { | ||
LOGGER.warn("Table {} has at least one segment running with only {} replicas, below replication threshold :{}", | ||
tableNameWithType, minEVReplicas, maxISReplicas); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sajjad-moradi This log need to be deleted, as
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
// Do not allow minEVReplicasUp to be larger than maxISReplicasUp | ||
minEVReplicasUp = Math.min(minEVReplicasUp, maxISReplicasUp); | ||
|
||
int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size(); | ||
if (numSegmentsWithoutZKMetadata > 0) { | ||
LOGGER.warn("Table {} has {} segments without ZK metadata: {}", tableNameWithType, numSegmentsWithoutZKMetadata, | ||
|
@@ -402,9 +411,9 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon | |
} | ||
|
||
// Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge | ||
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas); | ||
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicasUp); | ||
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, | ||
minEVReplicas * 100L / maxISReplicas); | ||
minEVReplicasUpPercent); | ||
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, | ||
numErrorSegments); | ||
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, | ||
|
@@ -426,6 +435,13 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon | |
} | ||
} | ||
|
||
private boolean isServerQueryable(ServerQueryInfo serverInfo) { | ||
return serverInfo != null | ||
&& serverInfo.isHelixEnabled() | ||
&& !serverInfo.isQueriesDisabled() | ||
&& !serverInfo.isShutdownInProgress(); | ||
} | ||
|
||
private static String logSegments(List<?> segments) { | ||
if (segments.size() <= MAX_SEGMENTS_TO_LOG) { | ||
return segments.toString(); | ||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,131 @@ | ||||||||
/** | ||||||||
* Licensed to the Apache Software Foundation (ASF) under one | ||||||||
* or more contributor license agreements. See the NOTICE file | ||||||||
* distributed with this work for additional information | ||||||||
* regarding copyright ownership. The ASF licenses this file | ||||||||
* to you under the Apache License, Version 2.0 (the | ||||||||
* "License"); you may not use this file except in compliance | ||||||||
* with the License. You may obtain a copy of the License at | ||||||||
* | ||||||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||||||
* | ||||||||
* Unless required by applicable law or agreed to in writing, | ||||||||
* software distributed under the License is distributed on an | ||||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||||
* KIND, either express or implied. See the License for the | ||||||||
* specific language governing permissions and limitations | ||||||||
* under the License. | ||||||||
*/ | ||||||||
package org.apache.pinot.controller.util; | ||||||||
|
||||||||
import java.util.HashMap; | ||||||||
import java.util.List; | ||||||||
import java.util.Map; | ||||||||
import org.apache.helix.model.InstanceConfig; | ||||||||
import org.apache.helix.zookeeper.datamodel.ZNRecord; | ||||||||
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; | ||||||||
import org.apache.pinot.spi.utils.CommonConstants; | ||||||||
import org.apache.pinot.spi.utils.InstanceTypeUtils; | ||||||||
|
||||||||
|
||||||||
/** | ||||||||
* This is a helper class that fetch server information from Helix/ZK. It caches the server information to avoid | ||||||||
* repeated ZK access. This class is NOT thread-safe. | ||||||||
*/ | ||||||||
public class ServerQueryInfoFetcher { | ||||||||
private PinotHelixResourceManager _pinotHelixResourceManager; | ||||||||
private Map<String, ServerQueryInfo> _cache; | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (minor) They can be |
||||||||
|
||||||||
public ServerQueryInfoFetcher(PinotHelixResourceManager pinotHelixResourceManager) { | ||||||||
_pinotHelixResourceManager = pinotHelixResourceManager; | ||||||||
_cache = new HashMap<>(); | ||||||||
} | ||||||||
|
||||||||
public ServerQueryInfo getServerQueryInfo(String instanceId) { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Annotate the return as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||||
return _cache.computeIfAbsent(instanceId, this::getServerQueryInfoOndemand); | ||||||||
} | ||||||||
|
||||||||
private ServerQueryInfo getServerQueryInfoOndemand(String instanceId) { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||||
InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId); | ||||||||
if (instanceConfig == null || !InstanceTypeUtils.isServer(instanceId)) { | ||||||||
return null; | ||||||||
} | ||||||||
List<String> tags = instanceConfig.getTags(); | ||||||||
ZNRecord record = instanceConfig.getRecord(); | ||||||||
boolean helixEnabled = record.getBooleanField( | ||||||||
InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), false); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Although in this PR it's OK to combine all the 3 boolean into a single one, it's better to keep them separate. We may will have a PR to emit a metric about "how many or how many percentage of servers for a table that were marked as |
||||||||
boolean queriesDisabled = record.getBooleanField(CommonConstants.Helix.QUERIES_DISABLED, false); | ||||||||
boolean shutdownInProgress = record.getBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, false); | ||||||||
return new ServerQueryInfo(instanceId, tags, null, helixEnabled, queriesDisabled, shutdownInProgress); | ||||||||
} | ||||||||
|
||||||||
public static class ServerQueryInfo { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't need setters for this class, and we can make all fields There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems most of the fields are not used. Are you planning to use them? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, @jasperjiaguo will need those fields very soon |
||||||||
private String _instanceName; | ||||||||
private List<String> _tags; | ||||||||
private List<String> _tables; | ||||||||
private boolean _helixEnabled; | ||||||||
private boolean _queriesDisabled; | ||||||||
private boolean _shutdownInProgress; | ||||||||
private ServerQueryInfo(String instanceName, | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (format) Add an empty line, and reformat the constructor per Pinot Style There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||||
List<String> tags, | ||||||||
List<String> tables, | ||||||||
boolean helixEnabled, | ||||||||
boolean queriesDisabled, | ||||||||
boolean shutdownInProgress) { | ||||||||
_instanceName = instanceName; | ||||||||
_tags = tags; | ||||||||
_tables = tables; | ||||||||
_helixEnabled = helixEnabled; | ||||||||
_queriesDisabled = queriesDisabled; | ||||||||
_shutdownInProgress = shutdownInProgress; | ||||||||
} | ||||||||
|
||||||||
public String getInstanceName() { | ||||||||
return _instanceName; | ||||||||
} | ||||||||
|
||||||||
public void setInstanceName(String instanceName) { | ||||||||
_instanceName = instanceName; | ||||||||
} | ||||||||
|
||||||||
public List<String> getTags() { | ||||||||
return _tags; | ||||||||
} | ||||||||
|
||||||||
public void setTags(List<String> tags) { | ||||||||
_tags = tags; | ||||||||
} | ||||||||
|
||||||||
public List<String> getTables() { | ||||||||
return _tables; | ||||||||
} | ||||||||
|
||||||||
public void setTables(List<String> tables) { | ||||||||
_tables = tables; | ||||||||
} | ||||||||
|
||||||||
public boolean isHelixEnabled() { | ||||||||
return _helixEnabled; | ||||||||
} | ||||||||
|
||||||||
public void setHelixEnabled(boolean helixEnabled) { | ||||||||
_helixEnabled = helixEnabled; | ||||||||
} | ||||||||
|
||||||||
public boolean isQueriesDisabled() { | ||||||||
return _queriesDisabled; | ||||||||
} | ||||||||
|
||||||||
public void setQueriesDisabled(boolean queriesDisabled) { | ||||||||
_queriesDisabled = queriesDisabled; | ||||||||
} | ||||||||
|
||||||||
public boolean isShutdownInProgress() { | ||||||||
return _shutdownInProgress; | ||||||||
} | ||||||||
|
||||||||
public void setShutdownInProgress(boolean shutdownInProgress) { | ||||||||
_shutdownInProgress = shutdownInProgress; | ||||||||
} | ||||||||
} | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(totally optional) maybe encapsulate this in the serverQueryInfoFetcher as well