Skip to content

Commit

Permalink
Handle mutable and immutable segments replicas metric separately.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ming Liang authored and mqliang committed Nov 25, 2024
1 parent 18cd776 commit 5697b33
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,14 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon

ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType);

// Maximum number of replicas in ideal state
int maxISReplicas = Integer.MIN_VALUE;
// Number of replicas in ideal state
int maxISImmutableReplicas = Integer.MIN_VALUE;
int maxISMutableReplicas = Integer.MIN_VALUE;

// Minimum number of replicas in external view
int minEVReplicas = Integer.MAX_VALUE;
int minEVImmutableReplicas = Integer.MAX_VALUE;
int minEVMutableReplicas = Integer.MAX_VALUE;

// Total compressed segment size in deep store
long tableCompressedSize = 0;
// Segments without ZK metadata
Expand All @@ -286,18 +290,21 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
List<String> segmentsInvalidStartTime = new ArrayList<>();
List<String> segmentsInvalidEndTime = new ArrayList<>();
for (String segment : segments) {
int numISReplicas = 0;
int numISOnlineReplicas = 0;
int numISConsumingReplicas = 0;
for (Map.Entry<String, String> entry : idealState.getInstanceStateMap(segment).entrySet()) {
String state = entry.getValue();
if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) {
numISReplicas++;
if (state.equals(SegmentStateModel.ONLINE)) {
numISOnlineReplicas++;
}
if (state.equals(SegmentStateModel.CONSUMING)) {
numISConsumingReplicas++;
}
}
// Skip segments not ONLINE/CONSUMING in ideal state
if (numISReplicas == 0) {
// Skip segments with no ONLINE/CONSUMING in ideal state
if (numISOnlineReplicas == 0 && numISConsumingReplicas == 0) {
continue;
}
maxISReplicas = Math.max(maxISReplicas, numISReplicas);

SegmentZKMetadata segmentZKMetadata = _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, segment);
// Skip the segment when it doesn't have ZK metadata. Most likely the segment is just deleted.
Expand Down Expand Up @@ -330,46 +337,58 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}
}

int numEVReplicas = 0;
int numEVOnlineReplicas = 0;
int numEVConsumingReplicas = 0;
if (externalView != null) {
Map<String, String> stateMap = externalView.getStateMap(segment);
if (stateMap != null) {
for (Map.Entry<String, String> entry : stateMap.entrySet()) {
String state = entry.getValue();
if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) {
numEVReplicas++;
if (state.equals(SegmentStateModel.ONLINE)) {
numEVOnlineReplicas++;
}
if (state.equals(SegmentStateModel.CONSUMING)) {
numEVConsumingReplicas++;
}
if (state.equals(SegmentStateModel.ERROR)) {
errorSegments.add(Pair.of(segment, entry.getKey()));
}
}
}
}
if (numEVReplicas == 0) {
if (numEVOnlineReplicas == 0 && numEVConsumingReplicas == 0) {
offlineSegments.add(segment);
} else if (numEVReplicas < numISReplicas) {
} else if (numEVOnlineReplicas + numEVConsumingReplicas < numISOnlineReplicas + numISConsumingReplicas) {
partialOnlineSegments.add(segment);
} else {
// Do not allow nReplicasEV to be larger than nReplicasIS
numEVReplicas = numISReplicas;
}
minEVReplicas = Math.min(minEVReplicas, numEVReplicas);
}

if (maxISReplicas == Integer.MIN_VALUE) {
try {
maxISReplicas = Math.max(Integer.parseInt(idealState.getReplicas()), 1);
} catch (NumberFormatException e) {
maxISReplicas = 1;
if (numEVConsumingReplicas == 0) { // it's a immutable segment
minEVImmutableReplicas = Math.min(minEVImmutableReplicas, numEVOnlineReplicas);
} else { // it's a mutable segment
minEVMutableReplicas = Math.min(minEVMutableReplicas, numEVOnlineReplicas + numEVConsumingReplicas);
}
}

if (maxISImmutableReplicas == Integer.MIN_VALUE) {
maxISImmutableReplicas = getMaxISImmutableReplicas(idealState);
}
if (maxISMutableReplicas == Integer.MIN_VALUE) {
maxISMutableReplicas = getMaxISMutableReplicas(idealState, tableConfig);
}

// Do not allow minEVReplicas to be larger than maxISReplicas
minEVReplicas = Math.min(minEVReplicas, maxISReplicas);
minEVImmutableReplicas = Math.min(minEVImmutableReplicas, maxISImmutableReplicas);
minEVMutableReplicas = Math.min(minEVMutableReplicas, maxISMutableReplicas);

if (minEVReplicas < maxISReplicas) {
LOGGER.warn("Table {} has at least one segment running with only {} replicas, below replication threshold :{}",
tableNameWithType, minEVReplicas, maxISReplicas);
if (minEVImmutableReplicas < maxISImmutableReplicas) {
LOGGER.warn("Table {} has at least one immutable(offline/completed) segment running with only {} replicas, "
+ "below replication threshold :{}", tableNameWithType, minEVImmutableReplicas, maxISImmutableReplicas);
}
if (minEVMutableReplicas < maxISMutableReplicas) {
LOGGER.warn("Table {} has at least one mutable(consuming) segment running with only {} replicas, "
+ "below replication threshold :{}", tableNameWithType, minEVMutableReplicas, maxISMutableReplicas);
}

int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size();
if (numSegmentsWithoutZKMetadata > 0) {
LOGGER.warn("Table {} has {} segments without ZK metadata: {}", tableNameWithType, numSegmentsWithoutZKMetadata,
Expand Down Expand Up @@ -402,9 +421,15 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}

// Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS,
Math.min(minEVImmutableReplicas, minEVMutableReplicas)
);

_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS,
minEVReplicas * 100L / maxISReplicas);
Math.min(minEVImmutableReplicas * 100L / maxISImmutableReplicas,
minEVMutableReplicas * 100L / maxISMutableReplicas)
);

_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE,
numErrorSegments);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
Expand All @@ -426,6 +451,34 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}
}

private static int getMaxISImmutableReplicas(IdealState idealState) {
try {
return Math.max(Integer.parseInt(idealState.getReplicas()), 1);
} catch (NumberFormatException e) {
return 1;
}
}

private static int getMaxISMutableReplicas(IdealState idealState, TableConfig config) {
int numConsumingReplicaGroups = 1;
int numOnlineReplicaGroups = 1;
int numISReplicas = 1;
try {
numISReplicas = Integer.parseInt(idealState.getReplicas());
} catch (NumberFormatException e) {
// ignore exceptions if the ideal state replica config is not present
}
try {
numConsumingReplicaGroups = config.getInstanceAssignmentConfigMap()
.get(SegmentStateModel.CONSUMING).getReplicaGroupPartitionConfig().getNumReplicaGroups();
numOnlineReplicaGroups = config.getInstanceAssignmentConfigMap()
.get(SegmentStateModel.ONLINE).getReplicaGroupPartitionConfig().getNumReplicaGroups();
} catch (NullPointerException e) {
// ignore exceptions if the instance assignment replica group config is not present
}
return Math.max(numISReplicas, Math.max(numConsumingReplicaGroups, numOnlineReplicaGroups));
}

private static String logSegments(List<?> segments) {
if (segments.size() <= MAX_SEGMENTS_TO_LOG) {
return segments.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
Expand All @@ -56,14 +59,9 @@
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import static org.testng.Assert.*;


@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -196,9 +194,11 @@ public void realtimeBasicTest() {
idealState.setPartitionState(seg1, "pinot1", "ONLINE");
idealState.setPartitionState(seg1, "pinot2", "ONLINE");
idealState.setPartitionState(seg1, "pinot3", "ONLINE");

idealState.setPartitionState(seg2, "pinot1", "ONLINE");
idealState.setPartitionState(seg2, "pinot2", "ONLINE");
idealState.setPartitionState(seg2, "pinot3", "ONLINE");

idealState.setPartitionState(seg3, "pinot1", "CONSUMING");
idealState.setPartitionState(seg3, "pinot2", "CONSUMING");
idealState.setPartitionState(seg3, "pinot3", "OFFLINE");
Expand All @@ -209,9 +209,11 @@ public void realtimeBasicTest() {
externalView.setState(seg1, "pinot1", "ONLINE");
externalView.setState(seg1, "pinot2", "ONLINE");
externalView.setState(seg1, "pinot3", "ONLINE");

externalView.setState(seg2, "pinot1", "CONSUMING");
externalView.setState(seg2, "pinot2", "ONLINE");
externalView.setState(seg2, "pinot3", "CONSUMING");

externalView.setState(seg3, "pinot1", "CONSUMING");
externalView.setState(seg3, "pinot2", "CONSUMING");
externalView.setState(seg3, "pinot3", "OFFLINE");
Expand Down Expand Up @@ -239,6 +241,150 @@ public void realtimeBasicTest() {
ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2);
}

@Test
public void realtimeMutableSegmnetHasLessReplicaTest() {
InstanceAssignmentConfig instanceAssignmentConfig =
new InstanceAssignmentConfig(new InstanceTagPoolConfig("DefaultTenant", true, 0, null), null,
new InstanceReplicaGroupPartitionConfig(true, 4, 4, 0, 0, 0, false, null), null, false);

TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn")
.setNumReplicas(3).setStreamConfigs(getStreamConfigMap())
.setInstanceAssignmentConfigMap(
Map.of("CONSUMING", instanceAssignmentConfig)
)
.build();

String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName();
String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, System.currentTimeMillis()).getSegmentName();
String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1, System.currentTimeMillis()).getSegmentName();
IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
idealState.setPartitionState(seg1, "pinot1", "ONLINE");
idealState.setPartitionState(seg1, "pinot2", "ONLINE");
idealState.setPartitionState(seg1, "pinot3", "ONLINE");

idealState.setPartitionState(seg2, "pinot1", "ONLINE");
idealState.setPartitionState(seg2, "pinot2", "ONLINE");
idealState.setPartitionState(seg2, "pinot3", "ONLINE");

idealState.setPartitionState(seg3, "pinot1", "CONSUMING");
idealState.setPartitionState(seg3, "pinot2", "CONSUMING");
idealState.setPartitionState(seg3, "pinot3", "CONSUMING");
idealState.setPartitionState(seg3, "pinot4", "OFFLINE");

idealState.setReplicas("3");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);

ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
externalView.setState(seg1, "pinot1", "ONLINE");
externalView.setState(seg1, "pinot2", "ONLINE");
externalView.setState(seg1, "pinot3", "ONLINE");

externalView.setState(seg2, "pinot1", "CONSUMING");
externalView.setState(seg2, "pinot2", "ONLINE");
externalView.setState(seg2, "pinot3", "CONSUMING");
externalView.setState(seg2, "pinot4", "CONSUMING");

externalView.setState(seg3, "pinot1", "CONSUMING");
externalView.setState(seg3, "pinot2", "CONSUMING");
externalView.setState(seg3, "pinot3", "CONSUMING");
externalView.setState(seg3, "pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView);
SegmentZKMetadata committedSegmentZKMetadata = mockCommittedSegmentZKMetadata();
when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg1)).thenReturn(committedSegmentZKMetadata);
when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg2)).thenReturn(committedSegmentZKMetadata);
SegmentZKMetadata consumingSegmentZKMetadata = mockConsumingSegmentZKMetadata(11111L);
when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg3)).thenReturn(consumingSegmentZKMetadata);

ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
ZNRecord znRecord = new ZNRecord("0");
znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000");
when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord);

runSegmentStatusChecker(resourceManager, 0);
verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 3, 75, 0, 100, 0, 0);
assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, REALTIME_TABLE_NAME,
ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2);
}

@Test
public void realtimeImmutableSegmnetHasLessReplicaTest() {
InstanceAssignmentConfig instanceAssignmentConfig =
new InstanceAssignmentConfig(new InstanceTagPoolConfig("DefaultTenant", true, 0, null), null,
new InstanceReplicaGroupPartitionConfig(true, 4, 4, 0, 0, 0, false, null), null, false);

TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn")
.setNumReplicas(3).setStreamConfigs(getStreamConfigMap())
.setInstanceAssignmentConfigMap(
Map.of("CONSUMING", instanceAssignmentConfig)
)
.build();

String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName();
String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, System.currentTimeMillis()).getSegmentName();
String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1, System.currentTimeMillis()).getSegmentName();
IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
idealState.setPartitionState(seg1, "pinot1", "ONLINE");
idealState.setPartitionState(seg1, "pinot2", "ONLINE");
idealState.setPartitionState(seg1, "pinot3", "ONLINE");

idealState.setPartitionState(seg2, "pinot1", "ONLINE");
idealState.setPartitionState(seg2, "pinot2", "ONLINE");
idealState.setPartitionState(seg2, "pinot3", "ONLINE");

idealState.setPartitionState(seg3, "pinot1", "CONSUMING");
idealState.setPartitionState(seg3, "pinot2", "CONSUMING");
idealState.setPartitionState(seg3, "pinot3", "CONSUMING");
idealState.setPartitionState(seg3, "pinot4", "OFFLINE");

idealState.setReplicas("3");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);

ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
externalView.setState(seg1, "pinot1", "ONLINE");
externalView.setState(seg1, "pinot2", "ONLINE");
externalView.setState(seg1, "pinot3", "OFFLINE");

externalView.setState(seg2, "pinot1", "CONSUMING");
externalView.setState(seg2, "pinot2", "ONLINE");
externalView.setState(seg2, "pinot3", "CONSUMING");
externalView.setState(seg2, "pinot4", "CONSUMING");

externalView.setState(seg3, "pinot1", "CONSUMING");
externalView.setState(seg3, "pinot2", "CONSUMING");
externalView.setState(seg3, "pinot3", "CONSUMING");
externalView.setState(seg3, "pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView);
SegmentZKMetadata committedSegmentZKMetadata = mockCommittedSegmentZKMetadata();
when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg1)).thenReturn(committedSegmentZKMetadata);
when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg2)).thenReturn(committedSegmentZKMetadata);
SegmentZKMetadata consumingSegmentZKMetadata = mockConsumingSegmentZKMetadata(11111L);
when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg3)).thenReturn(consumingSegmentZKMetadata);

ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
ZNRecord znRecord = new ZNRecord("0");
znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000");
when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord);

runSegmentStatusChecker(resourceManager, 0);
verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 2, 66, 0, 100, 1, 0);
assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, REALTIME_TABLE_NAME,
ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2);
}

private Map<String, String> getStreamConfigMap() {
return Map.of("streamType", "kafka", "stream.kafka.consumer.type", "simple", "stream.kafka.topic.name", "test",
"stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",
Expand Down

0 comments on commit 5697b33

Please sign in to comment.