Skip to content

Commit

Permalink
Treat segments on not-queryable servers as down
Browse files Browse the repository at this point in the history
  • Loading branch information
mqliang committed Nov 26, 2024
1 parent 87308b4 commit 1f10cbf
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
Expand Down Expand Up @@ -338,8 +339,10 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
Map<String, String> stateMap = externalView.getStateMap(segment);
if (stateMap != null) {
for (Map.Entry<String, String> entry : stateMap.entrySet()) {
String serverInstanceId = entry.getKey();
String state = entry.getValue();
if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) {
if (isServerQueryable(serverInstanceId, context) &&
(state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) {
numEVReplicasUp++;
}
if (state.equals(SegmentStateModel.ERROR)) {
Expand All @@ -355,9 +358,9 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}

minEVReplicasUp = Math.min(minEVReplicasUp, numEVReplicasUp);
// Number of replicas in ideal state (including ERROR/OFFLINE states)
int numISReplicas = idealState.getInstanceStateMap(segment).entrySet().size();
minEVReplicasUpPercent = Math.min(minEVReplicasUpPercent, numEVReplicasUp * 100 / numISReplicas);
// Total number of replicas in ideal state (including ERROR/OFFLINE states)
int numISReplicasTotal = idealState.getInstanceStateMap(segment).entrySet().size();
minEVReplicasUpPercent = Math.min(minEVReplicasUpPercent, numEVReplicasUp * 100 / numISReplicasTotal);
}

if (maxISReplicasUp == Integer.MIN_VALUE) {
Expand Down Expand Up @@ -426,6 +429,28 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}
}

private boolean isServerQueryable(String instanceId, Context context) {
if (context._serverQueryableMap.containsKey(instanceId)) {
return context._serverQueryableMap.get(instanceId);
}
InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId);
// Instance not found in Helix cluster or not Server we assume it is down
if (instanceConfig == null || !instanceConfig.getInstanceName().startsWith("Server_")) {
LOGGER.warn("Instance {} not found in Helix cluster, or not a server instance", instanceId);
LOGGER.warn("will assume instance is down!");
context._serverQueryableMap.put(instanceId, false);
return false;
}

ZNRecord record = instanceConfig.getRecord();
boolean queriesDisabled = Boolean.valueOf(record.getSimpleField("queriesDisabled"));
boolean shutdownInProgress = Boolean.valueOf(record.getSimpleField("shutdownInProgress"));
boolean queryable = !queriesDisabled && !shutdownInProgress;

context._serverQueryableMap.put(instanceId, queryable);
return queryable;
}

private static String logSegments(List<?> segments) {
if (segments.size() <= MAX_SEGMENTS_TO_LOG) {
return segments.toString();
Expand Down Expand Up @@ -473,5 +498,6 @@ public static final class Context {
private final Set<String> _processedTables = new HashSet<>();
private final Set<String> _disabledTables = new HashSet<>();
private final Set<String> _pausedTables = new HashSet<>();
private final Map<String, Boolean> _serverQueryableMap = new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.lineage.LineageEntry;
Expand All @@ -47,9 +48,6 @@
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
Expand Down Expand Up @@ -109,6 +107,7 @@ public void offlineBasicTest() {
externalView.setState("myTable_4", "pinot1", "ONLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -219,6 +218,7 @@ public void realtimeBasicTest() {
externalView.setState(seg3, "pinot3", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand All @@ -242,17 +242,10 @@ public void realtimeBasicTest() {
}

@Test
public void realtimeMutableSegmnetHasLessReplicaTest() {
InstanceAssignmentConfig instanceAssignmentConfig =
new InstanceAssignmentConfig(new InstanceTagPoolConfig("DefaultTenant", true, 0, null), null,
new InstanceReplicaGroupPartitionConfig(true, 4, 4, 0, 0, 0, false, null), null, false);

public void realtimeMutableSegmentHasLessReplicaTest() {
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn")
.setNumReplicas(3).setStreamConfigs(getStreamConfigMap())
.setInstanceAssignmentConfigMap(
Map.of("CONSUMING", instanceAssignmentConfig)
)
.build();

String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName();
Expand Down Expand Up @@ -291,6 +284,7 @@ public void realtimeMutableSegmnetHasLessReplicaTest() {
externalView.setState(seg3, "pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand All @@ -314,17 +308,96 @@ public void realtimeMutableSegmnetHasLessReplicaTest() {
}

@Test
public void realtimeImmutableSegmnetHasLessReplicaTest() {
InstanceAssignmentConfig instanceAssignmentConfig =
new InstanceAssignmentConfig(new InstanceTagPoolConfig("DefaultTenant", true, 0, null), null,
new InstanceReplicaGroupPartitionConfig(true, 4, 4, 0, 0, 0, false, null), null, false);
public void realtimeServerNotQueryableTest() {
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn")
.setNumReplicas(3).setStreamConfigs(getStreamConfigMap())
.build();

String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName();
String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, System.currentTimeMillis()).getSegmentName();
String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1, System.currentTimeMillis()).getSegmentName();
IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
idealState.setPartitionState(seg1, "Server_pinot1", "ONLINE");
idealState.setPartitionState(seg1, "Server_pinot2", "ONLINE");
idealState.setPartitionState(seg1, "Server_pinot3", "ONLINE");

idealState.setPartitionState(seg2, "Server_pinot1", "ONLINE");
idealState.setPartitionState(seg2, "Server_pinot2", "ONLINE");
idealState.setPartitionState(seg2, "Server_pinot3", "ONLINE");

idealState.setPartitionState(seg3, "Server_pinot1", "CONSUMING");
idealState.setPartitionState(seg3, "Server_pinot2", "CONSUMING");
idealState.setPartitionState(seg3, "Server_pinot3", "CONSUMING");
idealState.setPartitionState(seg3, "Server_pinot4", "OFFLINE");

idealState.setReplicas("3");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);

ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
externalView.setState(seg1, "Server_pinot1", "ONLINE");
externalView.setState(seg1, "Server_pinot2", "ONLINE");
externalView.setState(seg1, "Server_pinot3", "ONLINE");

externalView.setState(seg2, "Server_pinot1", "CONSUMING");
externalView.setState(seg2, "Server_pinot2", "ONLINE");
externalView.setState(seg2, "Server_pinot3", "CONSUMING");
externalView.setState(seg2, "Server_pinot4", "CONSUMING");

externalView.setState(seg3, "Server_pinot1", "CONSUMING");
externalView.setState(seg3, "Server_pinot2", "CONSUMING");
externalView.setState(seg3, "Server_pinot3", "CONSUMING");
externalView.setState(seg3, "Server_pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig("Server_pinot1"))
.thenReturn(newQueryDisabledInstanceConfig("Server_pinot1"));
when(resourceManager.getHelixInstanceConfig("Server_pinot2"))
.thenReturn(newShutdownInProgress("Server_pinot2"));
when(resourceManager.getHelixInstanceConfig("Server_pinot3"))
.thenReturn(new InstanceConfig("Server_pinot3"));
when(resourceManager.getHelixInstanceConfig("Server_pinot4"))
.thenReturn(new InstanceConfig("Server_pinot4"));

when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView);
SegmentZKMetadata committedSegmentZKMetadata = mockCommittedSegmentZKMetadata();
when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg1)).thenReturn(committedSegmentZKMetadata);
when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg2)).thenReturn(committedSegmentZKMetadata);
SegmentZKMetadata consumingSegmentZKMetadata = mockConsumingSegmentZKMetadata(11111L);
when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg3)).thenReturn(consumingSegmentZKMetadata);

ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
ZNRecord znRecord = new ZNRecord("0");
znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000");
when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord);

runSegmentStatusChecker(resourceManager, 0);
verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 1, 25, 0, 100, 3, 0);
assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, REALTIME_TABLE_NAME,
ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2);
}

private InstanceConfig newQueryDisabledInstanceConfig(String instanceName) {
ZNRecord znRecord = new ZNRecord(instanceName);
znRecord.setSimpleField("queriesDisabled", "true");
return new InstanceConfig(znRecord);
}

private InstanceConfig newShutdownInProgress(String instanceName) {
ZNRecord znRecord = new ZNRecord(instanceName);
znRecord.setSimpleField("shutdownInProgress", "true");
return new InstanceConfig(znRecord);
}

@Test
public void realtimeImmutableSegmentHasLessReplicaTest() {
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn")
.setNumReplicas(3).setStreamConfigs(getStreamConfigMap())
.setInstanceAssignmentConfigMap(
Map.of("CONSUMING", instanceAssignmentConfig)
)
.build();

String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName();
Expand Down Expand Up @@ -363,6 +436,7 @@ public void realtimeImmutableSegmnetHasLessReplicaTest() {
externalView.setState(seg3, "pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -429,6 +503,7 @@ public void missingEVPartitionTest() {
externalView.setState("myTable_1", "pinot2", "ONLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
Expand Down Expand Up @@ -519,6 +594,7 @@ public void missingEVPartitionPushTest() {
externalView.setState("myTable_2", "pinot1", "ONLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
Expand Down Expand Up @@ -661,6 +737,7 @@ public void lessThanOnePercentSegmentsUnavailableTest() {
}

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any"));
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
Expand Down

0 comments on commit 1f10cbf

Please sign in to comment.