From 5697b332c2f0335a503bb86f41c452f94b12877b Mon Sep 17 00:00:00 2001 From: Ming Liang Date: Sat, 23 Nov 2024 15:46:08 -0800 Subject: [PATCH] Handle mutable and immutable segments replicas metric separately. --- .../helix/SegmentStatusChecker.java | 113 ++++++++---- .../helix/SegmentStatusCheckerTest.java | 162 +++++++++++++++++- 2 files changed, 237 insertions(+), 38 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index c9a48022c0be..41793f17c026 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -269,10 +269,14 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType); - // Maximum number of replicas in ideal state - int maxISReplicas = Integer.MIN_VALUE; + // Number of replicas in ideal state + int maxISImmutableReplicas = Integer.MIN_VALUE; + int maxISMutableReplicas = Integer.MIN_VALUE; + // Minimum number of replicas in external view - int minEVReplicas = Integer.MAX_VALUE; + int minEVImmutableReplicas = Integer.MAX_VALUE; + int minEVMutableReplicas = Integer.MAX_VALUE; + // Total compressed segment size in deep store long tableCompressedSize = 0; // Segments without ZK metadata @@ -286,18 +290,21 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon List segmentsInvalidStartTime = new ArrayList<>(); List segmentsInvalidEndTime = new ArrayList<>(); for (String segment : segments) { - int numISReplicas = 0; + int numISOnlineReplicas = 0; + int numISConsumingReplicas = 0; for (Map.Entry entry : idealState.getInstanceStateMap(segment).entrySet()) { String state = entry.getValue(); - if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) { - numISReplicas++; + if (state.equals(SegmentStateModel.ONLINE)) { + numISOnlineReplicas++; + } + if (state.equals(SegmentStateModel.CONSUMING)) { + numISConsumingReplicas++; } } - // Skip segments not ONLINE/CONSUMING in ideal state - if (numISReplicas == 0) { + // Skip segments with no ONLINE/CONSUMING in ideal state + if (numISOnlineReplicas == 0 && numISConsumingReplicas == 0) { continue; } - maxISReplicas = Math.max(maxISReplicas, numISReplicas); SegmentZKMetadata segmentZKMetadata = _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, segment); // Skip the segment when it doesn't have ZK metadata. Most likely the segment is just deleted. @@ -330,14 +337,18 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } - int numEVReplicas = 0; + int numEVOnlineReplicas = 0; + int numEVConsumingReplicas = 0; if (externalView != null) { Map stateMap = externalView.getStateMap(segment); if (stateMap != null) { for (Map.Entry entry : stateMap.entrySet()) { String state = entry.getValue(); - if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) { - numEVReplicas++; + if (state.equals(SegmentStateModel.ONLINE)) { + numEVOnlineReplicas++; + } + if (state.equals(SegmentStateModel.CONSUMING)) { + numEVConsumingReplicas++; } if (state.equals(SegmentStateModel.ERROR)) { errorSegments.add(Pair.of(segment, entry.getKey())); @@ -345,31 +356,39 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } } - if (numEVReplicas == 0) { + if (numEVOnlineReplicas == 0 && numEVConsumingReplicas == 0) { offlineSegments.add(segment); - } else if (numEVReplicas < numISReplicas) { + } else if (numEVOnlineReplicas + numEVConsumingReplicas < numISOnlineReplicas + numISConsumingReplicas) { partialOnlineSegments.add(segment); - } else { - // Do not allow nReplicasEV to be larger than nReplicasIS - numEVReplicas = numISReplicas; } - minEVReplicas = Math.min(minEVReplicas, numEVReplicas); - } - if (maxISReplicas == Integer.MIN_VALUE) { - try { - maxISReplicas = Math.max(Integer.parseInt(idealState.getReplicas()), 1); - } catch (NumberFormatException e) { - maxISReplicas = 1; + if (numEVConsumingReplicas == 0) { // it's a immutable segment + minEVImmutableReplicas = Math.min(minEVImmutableReplicas, numEVOnlineReplicas); + } else { // it's a mutable segment + minEVMutableReplicas = Math.min(minEVMutableReplicas, numEVOnlineReplicas + numEVConsumingReplicas); } } + + if (maxISImmutableReplicas == Integer.MIN_VALUE) { + maxISImmutableReplicas = getMaxISImmutableReplicas(idealState); + } + if (maxISMutableReplicas == Integer.MIN_VALUE) { + maxISMutableReplicas = getMaxISMutableReplicas(idealState, tableConfig); + } + // Do not allow minEVReplicas to be larger than maxISReplicas - minEVReplicas = Math.min(minEVReplicas, maxISReplicas); + minEVImmutableReplicas = Math.min(minEVImmutableReplicas, maxISImmutableReplicas); + minEVMutableReplicas = Math.min(minEVMutableReplicas, maxISMutableReplicas); - if (minEVReplicas < maxISReplicas) { - LOGGER.warn("Table {} has at least one segment running with only {} replicas, below replication threshold :{}", - tableNameWithType, minEVReplicas, maxISReplicas); + if (minEVImmutableReplicas < maxISImmutableReplicas) { + LOGGER.warn("Table {} has at least one immutable(offline/completed) segment running with only {} replicas, " + + "below replication threshold :{}", tableNameWithType, minEVImmutableReplicas, maxISImmutableReplicas); + } + if (minEVMutableReplicas < maxISMutableReplicas) { + LOGGER.warn("Table {} has at least one mutable(consuming) segment running with only {} replicas, " + + "below replication threshold :{}", tableNameWithType, minEVMutableReplicas, maxISMutableReplicas); } + int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size(); if (numSegmentsWithoutZKMetadata > 0) { LOGGER.warn("Table {} has {} segments without ZK metadata: {}", tableNameWithType, numSegmentsWithoutZKMetadata, @@ -402,9 +421,15 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge - _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, + Math.min(minEVImmutableReplicas, minEVMutableReplicas) + ); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, - minEVReplicas * 100L / maxISReplicas); + Math.min(minEVImmutableReplicas * 100L / maxISImmutableReplicas, + minEVMutableReplicas * 100L / maxISMutableReplicas) + ); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, numErrorSegments); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, @@ -426,6 +451,34 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } + private static int getMaxISImmutableReplicas(IdealState idealState) { + try { + return Math.max(Integer.parseInt(idealState.getReplicas()), 1); + } catch (NumberFormatException e) { + return 1; + } + } + + private static int getMaxISMutableReplicas(IdealState idealState, TableConfig config) { + int numConsumingReplicaGroups = 1; + int numOnlineReplicaGroups = 1; + int numISReplicas = 1; + try { + numISReplicas = Integer.parseInt(idealState.getReplicas()); + } catch (NumberFormatException e) { + // ignore exceptions if the ideal state replica config is not present + } + try { + numConsumingReplicaGroups = config.getInstanceAssignmentConfigMap() + .get(SegmentStateModel.CONSUMING).getReplicaGroupPartitionConfig().getNumReplicaGroups(); + numOnlineReplicaGroups = config.getInstanceAssignmentConfigMap() + .get(SegmentStateModel.ONLINE).getReplicaGroupPartitionConfig().getNumReplicaGroups(); + } catch (NullPointerException e) { + // ignore exceptions if the instance assignment replica group config is not present + } + return Math.max(numISReplicas, Math.max(numConsumingReplicaGroups, numOnlineReplicaGroups)); + } + private static String logSegments(List segments) { if (segments.size() <= MAX_SEGMENTS_TO_LOG) { return segments.toString(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index 5f2ae7ea32f4..4b2f18f7285e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -47,6 +47,9 @@ import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; @@ -56,14 +59,9 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.annotations.Test; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; @SuppressWarnings("unchecked") @@ -196,9 +194,11 @@ public void realtimeBasicTest() { idealState.setPartitionState(seg1, "pinot1", "ONLINE"); idealState.setPartitionState(seg1, "pinot2", "ONLINE"); idealState.setPartitionState(seg1, "pinot3", "ONLINE"); + idealState.setPartitionState(seg2, "pinot1", "ONLINE"); idealState.setPartitionState(seg2, "pinot2", "ONLINE"); idealState.setPartitionState(seg2, "pinot3", "ONLINE"); + idealState.setPartitionState(seg3, "pinot1", "CONSUMING"); idealState.setPartitionState(seg3, "pinot2", "CONSUMING"); idealState.setPartitionState(seg3, "pinot3", "OFFLINE"); @@ -209,9 +209,11 @@ public void realtimeBasicTest() { externalView.setState(seg1, "pinot1", "ONLINE"); externalView.setState(seg1, "pinot2", "ONLINE"); externalView.setState(seg1, "pinot3", "ONLINE"); + externalView.setState(seg2, "pinot1", "CONSUMING"); externalView.setState(seg2, "pinot2", "ONLINE"); externalView.setState(seg2, "pinot3", "CONSUMING"); + externalView.setState(seg3, "pinot1", "CONSUMING"); externalView.setState(seg3, "pinot2", "CONSUMING"); externalView.setState(seg3, "pinot3", "OFFLINE"); @@ -239,6 +241,150 @@ public void realtimeBasicTest() { ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); } + @Test + public void realtimeMutableSegmnetHasLessReplicaTest() { + InstanceAssignmentConfig instanceAssignmentConfig = + new InstanceAssignmentConfig(new InstanceTagPoolConfig("DefaultTenant", true, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 4, 4, 0, 0, 0, false, null), null, false); + + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn") + .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()) + .setInstanceAssignmentConfigMap( + Map.of("CONSUMING", instanceAssignmentConfig) + ) + .build(); + + String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName(); + String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, System.currentTimeMillis()).getSegmentName(); + String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1, System.currentTimeMillis()).getSegmentName(); + IdealState idealState = new IdealState(REALTIME_TABLE_NAME); + idealState.setPartitionState(seg1, "pinot1", "ONLINE"); + idealState.setPartitionState(seg1, "pinot2", "ONLINE"); + idealState.setPartitionState(seg1, "pinot3", "ONLINE"); + + idealState.setPartitionState(seg2, "pinot1", "ONLINE"); + idealState.setPartitionState(seg2, "pinot2", "ONLINE"); + idealState.setPartitionState(seg2, "pinot3", "ONLINE"); + + idealState.setPartitionState(seg3, "pinot1", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot2", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot3", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot4", "OFFLINE"); + + idealState.setReplicas("3"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + + ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME); + externalView.setState(seg1, "pinot1", "ONLINE"); + externalView.setState(seg1, "pinot2", "ONLINE"); + externalView.setState(seg1, "pinot3", "ONLINE"); + + externalView.setState(seg2, "pinot1", "CONSUMING"); + externalView.setState(seg2, "pinot2", "ONLINE"); + externalView.setState(seg2, "pinot3", "CONSUMING"); + externalView.setState(seg2, "pinot4", "CONSUMING"); + + externalView.setState(seg3, "pinot1", "CONSUMING"); + externalView.setState(seg3, "pinot2", "CONSUMING"); + externalView.setState(seg3, "pinot3", "CONSUMING"); + externalView.setState(seg3, "pinot4", "OFFLINE"); + + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); + when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); + when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView); + SegmentZKMetadata committedSegmentZKMetadata = mockCommittedSegmentZKMetadata(); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg1)).thenReturn(committedSegmentZKMetadata); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg2)).thenReturn(committedSegmentZKMetadata); + SegmentZKMetadata consumingSegmentZKMetadata = mockConsumingSegmentZKMetadata(11111L); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg3)).thenReturn(consumingSegmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + ZNRecord znRecord = new ZNRecord("0"); + znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); + when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 3, 75, 0, 100, 0, 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, REALTIME_TABLE_NAME, + ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); + } + + @Test + public void realtimeImmutableSegmnetHasLessReplicaTest() { + InstanceAssignmentConfig instanceAssignmentConfig = + new InstanceAssignmentConfig(new InstanceTagPoolConfig("DefaultTenant", true, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 4, 4, 0, 0, 0, false, null), null, false); + + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn") + .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()) + .setInstanceAssignmentConfigMap( + Map.of("CONSUMING", instanceAssignmentConfig) + ) + .build(); + + String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName(); + String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, System.currentTimeMillis()).getSegmentName(); + String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1, System.currentTimeMillis()).getSegmentName(); + IdealState idealState = new IdealState(REALTIME_TABLE_NAME); + idealState.setPartitionState(seg1, "pinot1", "ONLINE"); + idealState.setPartitionState(seg1, "pinot2", "ONLINE"); + idealState.setPartitionState(seg1, "pinot3", "ONLINE"); + + idealState.setPartitionState(seg2, "pinot1", "ONLINE"); + idealState.setPartitionState(seg2, "pinot2", "ONLINE"); + idealState.setPartitionState(seg2, "pinot3", "ONLINE"); + + idealState.setPartitionState(seg3, "pinot1", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot2", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot3", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot4", "OFFLINE"); + + idealState.setReplicas("3"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + + ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME); + externalView.setState(seg1, "pinot1", "ONLINE"); + externalView.setState(seg1, "pinot2", "ONLINE"); + externalView.setState(seg1, "pinot3", "OFFLINE"); + + externalView.setState(seg2, "pinot1", "CONSUMING"); + externalView.setState(seg2, "pinot2", "ONLINE"); + externalView.setState(seg2, "pinot3", "CONSUMING"); + externalView.setState(seg2, "pinot4", "CONSUMING"); + + externalView.setState(seg3, "pinot1", "CONSUMING"); + externalView.setState(seg3, "pinot2", "CONSUMING"); + externalView.setState(seg3, "pinot3", "CONSUMING"); + externalView.setState(seg3, "pinot4", "OFFLINE"); + + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); + when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); + when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView); + SegmentZKMetadata committedSegmentZKMetadata = mockCommittedSegmentZKMetadata(); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg1)).thenReturn(committedSegmentZKMetadata); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg2)).thenReturn(committedSegmentZKMetadata); + SegmentZKMetadata consumingSegmentZKMetadata = mockConsumingSegmentZKMetadata(11111L); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg3)).thenReturn(consumingSegmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + ZNRecord znRecord = new ZNRecord("0"); + znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); + when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 2, 66, 0, 100, 1, 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, REALTIME_TABLE_NAME, + ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); + } + private Map getStreamConfigMap() { return Map.of("streamType", "kafka", "stream.kafka.consumer.type", "simple", "stream.kafka.topic.name", "test", "stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",