From bf098d3cd535ebdbf6b121eae6e7adb172225e7a Mon Sep 17 00:00:00 2001 From: Mingqiang Liang Date: Mon, 25 Nov 2024 19:39:24 -0800 Subject: [PATCH] pre build a queryable server set to reduce ZK acess --- .../helix/SegmentStatusChecker.java | 37 ++++------ .../helix/core/PinotHelixResourceManager.java | 5 ++ .../helix/SegmentStatusCheckerTest.java | 74 ++++++++++++++----- 3 files changed, 75 insertions(+), 41 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 1685a706069d..d03433cb733d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -110,6 +110,17 @@ protected Context preprocess(Properties periodicTaskProperties) { context._logDisabledTables = true; _lastDisabledTableLogTimestamp = now; } + + // Read ZK once to build a set of queryable server instances + for (InstanceConfig instanceConfig : _pinotHelixResourceManager.getAllServerInstanceConfigs()) { + ZNRecord record = instanceConfig.getRecord(); + boolean queriesDisabled = Boolean.valueOf(record.getSimpleField("queriesDisabled")); + boolean shutdownInProgress = Boolean.valueOf(record.getSimpleField("shutdownInProgress")); + if (!queriesDisabled && !shutdownInProgress) { + context._queryableServers.add(instanceConfig.getInstanceName()); + } + } + return context; } @@ -341,7 +352,7 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon for (Map.Entry entry : stateMap.entrySet()) { String serverInstanceId = entry.getKey(); String state = entry.getValue(); - if (isServerQueryable(serverInstanceId, context) + if (context._queryableServers.contains(serverInstanceId) && (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) { numEVReplicasUp++; } @@ -430,28 +441,6 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } - private boolean isServerQueryable(String instanceId, Context context) { - if (context._serverQueryableMap.containsKey(instanceId)) { - return context._serverQueryableMap.get(instanceId); - } - InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId); - // Instance not found in Helix cluster or not Server we assume it is down - if (instanceConfig == null || !instanceConfig.getInstanceName().startsWith("Server_")) { - LOGGER.warn("Instance {} not found in Helix cluster, or not a server instance", instanceId); - LOGGER.warn("will assume instance is down!"); - context._serverQueryableMap.put(instanceId, false); - return false; - } - - ZNRecord record = instanceConfig.getRecord(); - boolean queriesDisabled = Boolean.valueOf(record.getSimpleField("queriesDisabled")); - boolean shutdownInProgress = Boolean.valueOf(record.getSimpleField("shutdownInProgress")); - boolean queryable = !queriesDisabled && !shutdownInProgress; - - context._serverQueryableMap.put(instanceId, queryable); - return queryable; - } - private static String logSegments(List segments) { if (segments.size() <= MAX_SEGMENTS_TO_LOG) { return segments.toString(); @@ -499,6 +488,6 @@ public static final class Context { private final Set _processedTables = new HashSet<>(); private final Set _disabledTables = new HashSet<>(); private final Set _pausedTables = new HashSet<>(); - private final Map _serverQueryableMap = new HashMap<>(); + private final Set _queryableServers = new HashSet<>(); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index e7affa4287d6..f009cbddc9ae 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -515,6 +515,11 @@ public List getAllControllerInstanceConfigs() { .filter(instance -> InstanceTypeUtils.isController(instance.getId())).collect(Collectors.toList()); } + public List getAllServerInstanceConfigs() { + return HelixHelper.getInstanceConfigs(_helixZkManager).stream() + .filter(instance -> InstanceTypeUtils.isServer(instance.getId())).collect(Collectors.toList()); + } + public List getAllMinionInstanceConfigs() { return HelixHelper.getInstanceConfigs(_helixZkManager).stream() .filter(instance -> InstanceTypeUtils.isMinion(instance.getId())).collect(Collectors.toList()); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index f40337369c1c..1eacd2828016 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -107,7 +107,13 @@ public void offlineBasicTest() { externalView.setState("myTable_4", "pinot1", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); @@ -218,7 +224,13 @@ public void realtimeBasicTest() { externalView.setState(seg3, "pinot3", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -284,7 +296,13 @@ public void realtimeMutableSegmentHasLessReplicaTest() { externalView.setState(seg3, "pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -350,15 +368,13 @@ public void realtimeServerNotQueryableTest() { externalView.setState(seg3, "Server_pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig("Server_pinot1")) - .thenReturn(newQueryDisabledInstanceConfig("Server_pinot1")); - when(resourceManager.getHelixInstanceConfig("Server_pinot2")) - .thenReturn(newShutdownInProgress("Server_pinot2")); - when(resourceManager.getHelixInstanceConfig("Server_pinot3")) - .thenReturn(new InstanceConfig("Server_pinot3")); - when(resourceManager.getHelixInstanceConfig("Server_pinot4")) - .thenReturn(new InstanceConfig("Server_pinot4")); - + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + newQueryDisabledInstanceConfig("Server_pinot1"), + newShutdownInProgressInstanceConfig("Server_pinot2"), + new InstanceConfig("Server_pinot3"), + new InstanceConfig("Server_pinot4")) + ); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -387,7 +403,7 @@ private InstanceConfig newQueryDisabledInstanceConfig(String instanceName) { return new InstanceConfig(znRecord); } - private InstanceConfig newShutdownInProgress(String instanceName) { + private InstanceConfig newShutdownInProgressInstanceConfig(String instanceName) { ZNRecord znRecord = new ZNRecord(instanceName); znRecord.setSimpleField("shutdownInProgress", "true"); return new InstanceConfig(znRecord); @@ -436,7 +452,13 @@ public void realtimeImmutableSegmentHasLessReplicaTest() { externalView.setState(seg3, "pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -503,7 +525,13 @@ public void missingEVPartitionTest() { externalView.setState("myTable_1", "pinot2", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); @@ -594,7 +622,13 @@ public void missingEVPartitionPushTest() { externalView.setState("myTable_2", "pinot1", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); @@ -737,7 +771,13 @@ public void lessThanOnePercentSegmentsUnavailableTest() { } PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);