From 5dd6f8ac565d3a5d45d0f85a974e1aebf6c7c437 Mon Sep 17 00:00:00 2001 From: Liang Mingqiang Date: Fri, 10 Jan 2025 15:17:31 -0800 Subject: [PATCH] Enhance SegmentStatusChecker to honor no-queryable servers and instance assignment config (#14536) --- .../helix/SegmentStatusChecker.java | 78 +++--- .../util/ServerQueryInfoFetcher.java | 95 +++++++ .../helix/SegmentStatusCheckerTest.java | 246 +++++++++++++++++- 3 files changed, 380 insertions(+), 39 deletions(-) create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerQueryInfoFetcher.java diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 1a5f542dd798..bb78f4257670 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -48,6 +48,8 @@ import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; +import org.apache.pinot.controller.util.ServerQueryInfoFetcher; +import org.apache.pinot.controller.util.ServerQueryInfoFetcher.ServerQueryInfo; import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -92,7 +94,6 @@ public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(), config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics); - _waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds(); _tableSizeReader = tableSizeReader; } @@ -210,6 +211,8 @@ private void updateTableSizeMetrics(String tableNameWithType) private void updateSegmentMetrics(String tableNameWithType, TableConfig tableConfig, Context context) { TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + ServerQueryInfoFetcher serverQueryInfoFetcher = new ServerQueryInfoFetcher(_pinotHelixResourceManager); + IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType); if (idealState == null) { @@ -270,10 +273,12 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType); - // Maximum number of replicas in ideal state - int maxISReplicas = Integer.MIN_VALUE; - // Minimum number of replicas in external view - int minEVReplicas = Integer.MAX_VALUE; + // Maximum number of replicas that is up (ONLINE/CONSUMING) in ideal state + int maxISReplicasUp = Integer.MIN_VALUE; + // Minimum number of replicas that is up (ONLINE/CONSUMING) in external view + int minEVReplicasUp = Integer.MAX_VALUE; + // Minimum percentage of replicas that is up (ONLINE/CONSUMING) in external view + int minEVReplicasUpPercent = 100; // Total compressed segment size in deep store long tableCompressedSize = 0; // Segments without ZK metadata @@ -287,18 +292,19 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon List segmentsInvalidStartTime = new ArrayList<>(); List segmentsInvalidEndTime = new ArrayList<>(); for (String segment : segments) { - int numISReplicas = 0; + // Number of replicas in ideal state that is in ONLINE/CONSUMING state + int numISReplicasUp = 0; for (Map.Entry entry : idealState.getInstanceStateMap(segment).entrySet()) { String state = entry.getValue(); if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) { - numISReplicas++; + numISReplicasUp++; } } - // Skip segments not ONLINE/CONSUMING in ideal state - if (numISReplicas == 0) { + // Skip segments with no ONLINE/CONSUMING in ideal state + if (numISReplicasUp == 0) { continue; } - maxISReplicas = Math.max(maxISReplicas, numISReplicas); + maxISReplicasUp = Math.max(maxISReplicasUp, numISReplicasUp); SegmentZKMetadata segmentZKMetadata = _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, segment); // Skip the segment when it doesn't have ZK metadata. Most likely the segment is just deleted. @@ -331,46 +337,49 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } - int numEVReplicas = 0; + int numEVReplicasUp = 0; if (externalView != null) { Map stateMap = externalView.getStateMap(segment); if (stateMap != null) { for (Map.Entry entry : stateMap.entrySet()) { - String state = entry.getValue(); - if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) { - numEVReplicas++; + String serverInstanceId = entry.getKey(); + String segmentState = entry.getValue(); + if ((segmentState.equals(SegmentStateModel.ONLINE) || segmentState.equals(SegmentStateModel.CONSUMING)) + && isServerQueryable(serverQueryInfoFetcher.getServerQueryInfo(serverInstanceId))) { + numEVReplicasUp++; } - if (state.equals(SegmentStateModel.ERROR)) { + if (segmentState.equals(SegmentStateModel.ERROR)) { errorSegments.add(Pair.of(segment, entry.getKey())); } } } } - if (numEVReplicas == 0) { + if (numEVReplicasUp == 0) { offlineSegments.add(segment); - } else if (numEVReplicas < numISReplicas) { + } else if (numEVReplicasUp < numISReplicasUp) { partialOnlineSegments.add(segment); } else { - // Do not allow nReplicasEV to be larger than nReplicasIS - numEVReplicas = numISReplicas; + // Do not allow numEVReplicasUp to be larger than numISReplicasUp + numEVReplicasUp = numISReplicasUp; } - minEVReplicas = Math.min(minEVReplicas, numEVReplicas); + + minEVReplicasUp = Math.min(minEVReplicasUp, numEVReplicasUp); + // Total number of replicas in ideal state (including ERROR/OFFLINE states) + int numISReplicasTotal = Math.max(idealState.getInstanceStateMap(segment).entrySet().size(), 1); + minEVReplicasUpPercent = Math.min(minEVReplicasUpPercent, numEVReplicasUp * 100 / numISReplicasTotal); } - if (maxISReplicas == Integer.MIN_VALUE) { + if (maxISReplicasUp == Integer.MIN_VALUE) { try { - maxISReplicas = Math.max(Integer.parseInt(idealState.getReplicas()), 1); + maxISReplicasUp = Math.max(Integer.parseInt(idealState.getReplicas()), 1); } catch (NumberFormatException e) { - maxISReplicas = 1; + maxISReplicasUp = 1; } } - // Do not allow minEVReplicas to be larger than maxISReplicas - minEVReplicas = Math.min(minEVReplicas, maxISReplicas); - if (minEVReplicas < maxISReplicas) { - LOGGER.warn("Table {} has at least one segment running with only {} replicas, below replication threshold :{}", - tableNameWithType, minEVReplicas, maxISReplicas); - } + // Do not allow minEVReplicasUp to be larger than maxISReplicasUp + minEVReplicasUp = Math.min(minEVReplicasUp, maxISReplicasUp); + int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size(); if (numSegmentsWithoutZKMetadata > 0) { LOGGER.warn("Table {} has {} segments without ZK metadata: {}", tableNameWithType, numSegmentsWithoutZKMetadata, @@ -403,9 +412,9 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge - _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicasUp); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, - minEVReplicas * 100L / maxISReplicas); + minEVReplicasUpPercent); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, numErrorSegments); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, @@ -428,6 +437,13 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } + private boolean isServerQueryable(ServerQueryInfo serverInfo) { + return serverInfo != null + && serverInfo.isHelixEnabled() + && !serverInfo.isQueriesDisabled() + && !serverInfo.isShutdownInProgress(); + } + private static String logSegments(List segments) { if (segments.size() <= MAX_SEGMENTS_TO_LOG) { return segments.toString(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerQueryInfoFetcher.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerQueryInfoFetcher.java new file mode 100644 index 000000000000..2ac53ae508e3 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerQueryInfoFetcher.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.util; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; + + +/** + * This is a helper class that fetch server information from Helix/ZK. It caches the server information to avoid + * repeated ZK access. This class is NOT thread-safe. + */ +public class ServerQueryInfoFetcher { + private final PinotHelixResourceManager _pinotHelixResourceManager; + private final Map _cache; + + public ServerQueryInfoFetcher(PinotHelixResourceManager pinotHelixResourceManager) { + _pinotHelixResourceManager = pinotHelixResourceManager; + _cache = new HashMap<>(); + } + + @Nullable + public ServerQueryInfo getServerQueryInfo(String instanceId) { + return _cache.computeIfAbsent(instanceId, this::getServerQueryInfoOndemand); + } + + @Nullable + private ServerQueryInfo getServerQueryInfoOndemand(String instanceId) { + InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId); + if (instanceConfig == null || !InstanceTypeUtils.isServer(instanceId)) { + return null; + } + List tags = instanceConfig.getTags(); + ZNRecord record = instanceConfig.getRecord(); + boolean helixEnabled = instanceConfig.getInstanceEnabled(); + boolean queriesDisabled = record.getBooleanField(CommonConstants.Helix.QUERIES_DISABLED, false); + boolean shutdownInProgress = record.getBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, false); + + return new ServerQueryInfo(instanceId, tags, null, helixEnabled, queriesDisabled, shutdownInProgress); + } + + public static class ServerQueryInfo { + private final String _instanceName; + private final List _tags; + private final List _tables; + private final boolean _helixEnabled; + private final boolean _queriesDisabled; + private final boolean _shutdownInProgress; + + private ServerQueryInfo(String instanceName, List tags, List tables, boolean helixEnabled, + boolean queriesDisabled, boolean shutdownInProgress) { + _instanceName = instanceName; + _tags = tags; + _tables = tables; + _helixEnabled = helixEnabled; + _queriesDisabled = queriesDisabled; + _shutdownInProgress = shutdownInProgress; + } + + public boolean isHelixEnabled() { + return _helixEnabled; + } + + public boolean isQueriesDisabled() { + return _queriesDisabled; + } + + public boolean isShutdownInProgress() { + return _shutdownInProgress; + } + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index 5f2ae7ea32f4..f41084f1a6ab 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -28,6 +28,7 @@ import org.apache.helix.AccessOption; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.lineage.LineageEntry; @@ -56,14 +57,9 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.annotations.Test; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; @SuppressWarnings("unchecked") @@ -111,6 +107,7 @@ public void offlineBasicTest() { externalView.setState("myTable_4", "pinot1", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); @@ -196,9 +193,11 @@ public void realtimeBasicTest() { idealState.setPartitionState(seg1, "pinot1", "ONLINE"); idealState.setPartitionState(seg1, "pinot2", "ONLINE"); idealState.setPartitionState(seg1, "pinot3", "ONLINE"); + idealState.setPartitionState(seg2, "pinot1", "ONLINE"); idealState.setPartitionState(seg2, "pinot2", "ONLINE"); idealState.setPartitionState(seg2, "pinot3", "ONLINE"); + idealState.setPartitionState(seg3, "pinot1", "CONSUMING"); idealState.setPartitionState(seg3, "pinot2", "CONSUMING"); idealState.setPartitionState(seg3, "pinot3", "OFFLINE"); @@ -209,14 +208,17 @@ public void realtimeBasicTest() { externalView.setState(seg1, "pinot1", "ONLINE"); externalView.setState(seg1, "pinot2", "ONLINE"); externalView.setState(seg1, "pinot3", "ONLINE"); + externalView.setState(seg2, "pinot1", "CONSUMING"); externalView.setState(seg2, "pinot2", "ONLINE"); externalView.setState(seg2, "pinot3", "CONSUMING"); + externalView.setState(seg3, "pinot1", "CONSUMING"); externalView.setState(seg3, "pinot2", "CONSUMING"); externalView.setState(seg3, "pinot3", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -239,6 +241,231 @@ public void realtimeBasicTest() { ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); } + @Test + public void realtimeMutableSegmentHasLessReplicaTest() { + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn") + .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()) + .build(); + + String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName(); + String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, System.currentTimeMillis()).getSegmentName(); + String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1, System.currentTimeMillis()).getSegmentName(); + IdealState idealState = new IdealState(REALTIME_TABLE_NAME); + idealState.setPartitionState(seg1, "pinot1", "ONLINE"); + idealState.setPartitionState(seg1, "pinot2", "ONLINE"); + idealState.setPartitionState(seg1, "pinot3", "ONLINE"); + + idealState.setPartitionState(seg2, "pinot1", "ONLINE"); + idealState.setPartitionState(seg2, "pinot2", "ONLINE"); + idealState.setPartitionState(seg2, "pinot3", "ONLINE"); + + idealState.setPartitionState(seg3, "pinot1", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot2", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot3", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot4", "OFFLINE"); + + idealState.setReplicas("3"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + + ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME); + externalView.setState(seg1, "pinot1", "ONLINE"); + externalView.setState(seg1, "pinot2", "ONLINE"); + externalView.setState(seg1, "pinot3", "ONLINE"); + + externalView.setState(seg2, "pinot1", "CONSUMING"); + externalView.setState(seg2, "pinot2", "ONLINE"); + externalView.setState(seg2, "pinot3", "CONSUMING"); + externalView.setState(seg2, "pinot4", "CONSUMING"); + + externalView.setState(seg3, "pinot1", "CONSUMING"); + externalView.setState(seg3, "pinot2", "CONSUMING"); + externalView.setState(seg3, "pinot3", "CONSUMING"); + externalView.setState(seg3, "pinot4", "OFFLINE"); + + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); + when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); + when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); + when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView); + SegmentZKMetadata committedSegmentZKMetadata = mockCommittedSegmentZKMetadata(); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg1)).thenReturn(committedSegmentZKMetadata); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg2)).thenReturn(committedSegmentZKMetadata); + SegmentZKMetadata consumingSegmentZKMetadata = mockConsumingSegmentZKMetadata(11111L); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg3)).thenReturn(consumingSegmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + ZNRecord znRecord = new ZNRecord("0"); + znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); + when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 3, 75, 0, 100, 0, 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, REALTIME_TABLE_NAME, + ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); + } + + @Test + public void realtimeServerNotQueryableTest() { + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn") + .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()) + .build(); + + String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName(); + String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, System.currentTimeMillis()).getSegmentName(); + String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1, System.currentTimeMillis()).getSegmentName(); + IdealState idealState = new IdealState(REALTIME_TABLE_NAME); + idealState.setPartitionState(seg1, "Server_pinot1", "ONLINE"); + idealState.setPartitionState(seg1, "Server_pinot2", "ONLINE"); + idealState.setPartitionState(seg1, "Server_pinot3", "ONLINE"); + + idealState.setPartitionState(seg2, "Server_pinot1", "ONLINE"); + idealState.setPartitionState(seg2, "Server_pinot2", "ONLINE"); + idealState.setPartitionState(seg2, "Server_pinot3", "ONLINE"); + + idealState.setPartitionState(seg3, "Server_pinot1", "CONSUMING"); + idealState.setPartitionState(seg3, "Server_pinot2", "CONSUMING"); + idealState.setPartitionState(seg3, "Server_pinot3", "CONSUMING"); + idealState.setPartitionState(seg3, "Server_pinot4", "OFFLINE"); + + idealState.setReplicas("3"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + + ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME); + externalView.setState(seg1, "Server_pinot1", "ONLINE"); + externalView.setState(seg1, "Server_pinot2", "ONLINE"); + externalView.setState(seg1, "Server_pinot3", "ONLINE"); + + externalView.setState(seg2, "Server_pinot1", "CONSUMING"); + externalView.setState(seg2, "Server_pinot2", "ONLINE"); + externalView.setState(seg2, "Server_pinot3", "CONSUMING"); + externalView.setState(seg2, "Server_pinot4", "CONSUMING"); + + externalView.setState(seg3, "Server_pinot1", "CONSUMING"); + externalView.setState(seg3, "Server_pinot2", "CONSUMING"); + externalView.setState(seg3, "Server_pinot3", "CONSUMING"); + externalView.setState(seg3, "Server_pinot4", "OFFLINE"); + + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig("Server_pinot1")). + thenReturn(newQueryDisabledInstanceConfig("Server_pinot1")); + when(resourceManager.getHelixInstanceConfig("Server_pinot2")). + thenReturn(newShutdownInProgressInstanceConfig("Server_pinot2")); + when(resourceManager.getHelixInstanceConfig("Server_pinot3")). + thenReturn(newQuerableInstanceConfig("Server_pinot3")); + when(resourceManager.getHelixInstanceConfig("Server_pinot4")). + thenReturn(newQuerableInstanceConfig("Server_pinot4")); + when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); + when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); + when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView); + SegmentZKMetadata committedSegmentZKMetadata = mockCommittedSegmentZKMetadata(); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg1)).thenReturn(committedSegmentZKMetadata); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg2)).thenReturn(committedSegmentZKMetadata); + SegmentZKMetadata consumingSegmentZKMetadata = mockConsumingSegmentZKMetadata(11111L); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg3)).thenReturn(consumingSegmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + ZNRecord znRecord = new ZNRecord("0"); + znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); + when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 1, 25, 0, 100, 3, 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, REALTIME_TABLE_NAME, + ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); + } + + private InstanceConfig newQueryDisabledInstanceConfig(String instanceName) { + ZNRecord znRecord = new ZNRecord(instanceName); + znRecord.setBooleanField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), true); + znRecord.setBooleanField(CommonConstants.Helix.QUERIES_DISABLED, true); + return new InstanceConfig(znRecord); + } + + private InstanceConfig newShutdownInProgressInstanceConfig(String instanceName) { + ZNRecord znRecord = new ZNRecord(instanceName); + znRecord.setBooleanField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), true); + znRecord.setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, true); + return new InstanceConfig(znRecord); + } + + private InstanceConfig newQuerableInstanceConfig(String instanceName) { + ZNRecord znRecord = new ZNRecord(instanceName); + znRecord.setBooleanField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), true); + return new InstanceConfig(znRecord); + } + + @Test + public void realtimeImmutableSegmentHasLessReplicaTest() { + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn") + .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()) + .build(); + + String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName(); + String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, System.currentTimeMillis()).getSegmentName(); + String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1, System.currentTimeMillis()).getSegmentName(); + IdealState idealState = new IdealState(REALTIME_TABLE_NAME); + idealState.setPartitionState(seg1, "pinot1", "ONLINE"); + idealState.setPartitionState(seg1, "pinot2", "ONLINE"); + idealState.setPartitionState(seg1, "pinot3", "ONLINE"); + + idealState.setPartitionState(seg2, "pinot1", "ONLINE"); + idealState.setPartitionState(seg2, "pinot2", "ONLINE"); + idealState.setPartitionState(seg2, "pinot3", "ONLINE"); + + idealState.setPartitionState(seg3, "pinot1", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot2", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot3", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot4", "OFFLINE"); + + idealState.setReplicas("3"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + + ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME); + externalView.setState(seg1, "pinot1", "ONLINE"); + externalView.setState(seg1, "pinot2", "ONLINE"); + externalView.setState(seg1, "pinot3", "OFFLINE"); + + externalView.setState(seg2, "pinot1", "CONSUMING"); + externalView.setState(seg2, "pinot2", "ONLINE"); + externalView.setState(seg2, "pinot3", "CONSUMING"); + externalView.setState(seg2, "pinot4", "CONSUMING"); + + externalView.setState(seg3, "pinot1", "CONSUMING"); + externalView.setState(seg3, "pinot2", "CONSUMING"); + externalView.setState(seg3, "pinot3", "CONSUMING"); + externalView.setState(seg3, "pinot4", "OFFLINE"); + + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); + when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); + when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); + when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView); + SegmentZKMetadata committedSegmentZKMetadata = mockCommittedSegmentZKMetadata(); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg1)).thenReturn(committedSegmentZKMetadata); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg2)).thenReturn(committedSegmentZKMetadata); + SegmentZKMetadata consumingSegmentZKMetadata = mockConsumingSegmentZKMetadata(11111L); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg3)).thenReturn(consumingSegmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + ZNRecord znRecord = new ZNRecord("0"); + znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); + when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 2, 66, 0, 100, 1, 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, REALTIME_TABLE_NAME, + ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); + } + private Map getStreamConfigMap() { return Map.of("streamType", "kafka", "stream.kafka.consumer.type", "simple", "stream.kafka.topic.name", "test", "stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder", @@ -283,6 +510,7 @@ public void missingEVPartitionTest() { externalView.setState("myTable_1", "pinot2", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); @@ -373,6 +601,7 @@ public void missingEVPartitionPushTest() { externalView.setState("myTable_2", "pinot1", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); @@ -515,6 +744,7 @@ public void lessThanOnePercentSegmentsUnavailableTest() { } PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);