From 6f4df8f8d05bed55bc3b98371f9cd31998a567e6 Mon Sep 17 00:00:00 2001 From: Jacob Deker Date: Mon, 18 Nov 2024 14:45:56 +0100 Subject: [PATCH] #2963 Fix NPE during task rebalancing In case _helixManager in one of the known controllers is _null_, finding the "leader controller" can lead to a NPE. When run as an asynchronous task, the NPE may not even be logged, and the task finishes without any trace, hence the addition of ExecutorTaskUtil class to wrap callables/runnables in such a manner that such an exception at least gets logged. --- .../controller/GenericHelixController.java | 4 +- .../pipeline/AbstractBaseStage.java | 5 +- .../waged/GlobalRebalanceRunner.java | 7 +- .../waged/PartialRebalanceRunner.java | 5 +- .../stages/MessageGenerationPhase.java | 5 +- .../manager/zk/CallbackEventExecutor.java | 5 +- .../messaging/handling/HelixTaskExecutor.java | 3 +- .../helix/spectator/RoutingTableProvider.java | 5 +- .../org/apache/helix/task/TaskStateModel.java | 3 +- .../apache/helix/util/ExecutorTaskUtil.java | 66 +++++++++++++++++++ 10 files changed, 91 insertions(+), 17 deletions(-) create mode 100644 helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index c641b4d71f..428bfa7fb5 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -235,7 +235,9 @@ public static GenericHelixController getLeaderController(String clusterName) { if (clusterName != null) { ImmutableSet controllers = _helixControllerFactory.get(clusterName); if (controllers != null) { - return controllers.stream().filter(controller -> controller._helixManager.isLeader()) + return controllers.stream() + .filter(controller -> controller._helixManager != null) + .filter(controller -> controller._helixManager.isLeader()) .findAny().orElse(null); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java index 8e0e37f8d1..c6d15a9ccb 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java @@ -27,6 +27,7 @@ import org.apache.helix.common.DedupEventProcessor; import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.ClusterEvent; +import org.apache.helix.util.ExecutorTaskUtil; public class AbstractBaseStage implements Stage { protected String _eventId; @@ -64,9 +65,9 @@ public String getStageName() { return className; } - public static Future asyncExecute(ExecutorService service, Callable task) { + public static Future asyncExecute(ExecutorService service, Callable task) { if (service != null) { - return service.submit(task); + return service.submit(ExecutorTaskUtil.wrap(task)); } return null; } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java index d710425cf1..5b7a7baba3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java @@ -35,14 +35,13 @@ import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider; import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.model.ClusterTopologyConfig; -import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.monitoring.metrics.MetricCollector; import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; import org.apache.helix.monitoring.metrics.model.CountMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; +import org.apache.helix.util.ExecutorTaskUtil; import org.apache.helix.util.RebalanceUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,7 +116,7 @@ public void globalRebalance(ResourceControllerDataProvider clusterData, Map result = _baselineCalculateExecutor.submit(() -> { + Future result = _baselineCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> { try { // If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should // be triggered again after baseline is finished. @@ -132,7 +131,7 @@ public void globalRebalance(ResourceControllerDataProvider clusterData, Map { + _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> { try { doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm, currentStateOutput); @@ -111,7 +112,7 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map> pendingMessagesToPurge, ExecutorService workerPool, final HelixDataAccessor accessor) { - workerPool.submit(new Callable() { + workerPool.submit(ExecutorTaskUtil.wrap(ExecutorTaskUtil.wrap(new Callable() { @Override public Object call() { for (Map.Entry> entry : pendingMessagesToPurge.entrySet()) { @@ -415,7 +416,7 @@ public Object call() { } return null; } - }); + }))); } private boolean shouldCleanUpPendingMessage(Message pendingMsg, Map sessionIdMap, diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java index 1bf3721bae..17eb5a3e1e 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java @@ -25,6 +25,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.NotificationContext; import org.apache.helix.common.DedupEventBlockingQueue; +import org.apache.helix.util.ExecutorTaskUtil; import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,7 +90,7 @@ public void submitEventToExecutor(NotificationContext.Type eventType, Notificati } if (_futureCallBackProcessEvent == null || _futureCallBackProcessEvent.isDone()) { _futureCallBackProcessEvent = - _threadPoolExecutor.submit(new CallbackProcessor(handler, event)); + _threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); } else { _callBackEventQueue.put(eventType, event); } @@ -102,7 +103,7 @@ private void submitPendingHandleCallBackEventToManagerThreadPool(CallbackHandler try { NotificationContext event = _callBackEventQueue.take(); _futureCallBackProcessEvent = - _threadPoolExecutor.submit(new CallbackProcessor(handler, event)); + _threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); } catch (InterruptedException e) { logger .error("Error when submitting pending HandleCallBackEvent to manager thread pool", e); diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index f006bd42bd..5c44d78e31 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -76,6 +76,7 @@ import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.helix.task.TaskConstants; +import org.apache.helix.util.ExecutorTaskUtil; import org.apache.helix.util.HelixUtil; import org.apache.helix.util.StatusUpdateUtil; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -465,7 +466,7 @@ public boolean scheduleTask(MessageTask task) { } LOG.info("Submit task: " + taskId + " to pool: " + exeSvc); - Future future = exeSvc.submit(task); + Future future = exeSvc.submit(ExecutorTaskUtil.wrap(task)); _messageTaskMap .putIfAbsent(getMessageTarget(message.getResourceName(), message.getPartitionName()), diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java index c27f084627..646e8f3087 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java @@ -64,6 +64,7 @@ import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor; +import org.apache.helix.util.ExecutorTaskUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -971,7 +972,7 @@ private void recordPropagationLatency(final long currentTime, // restrict running report task count to be 1. // Any parallel tasks will be skipped. So the reporting metric data is sampled. if (_reportingTask == null || _reportingTask.isDone()) { - _reportingTask = _reportExecutor.submit(new Callable() { + _reportingTask = _reportExecutor.submit(ExecutorTaskUtil.wrap(new Callable() { @Override public Object call() { // getNewCurrentStateEndTimes() needs to iterate all current states. Make it async to @@ -1000,7 +1001,7 @@ public Object call() { } return null; } - }); + })); } } diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java index cc19444fdb..a8df83c064 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java @@ -31,6 +31,7 @@ import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelInfo; import org.apache.helix.participant.statemachine.Transition; +import org.apache.helix.util.ExecutorTaskUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -329,7 +330,7 @@ private void startTask(Message msg, String taskPartition) { _taskRunner = new TaskRunner(task, msg.getResourceName(), taskPartition, msg.getTgtName(), _manager, msg.getTgtSessionId(), this); - _taskExecutor.submit(_taskRunner); + _taskExecutor.submit(ExecutorTaskUtil.wrap(_taskRunner)); _taskRunner.waitTillStarted(); // Set up a timer to cancel the task when its time out expires. diff --git a/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java b/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java new file mode 100644 index 0000000000..1877a0c6a4 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java @@ -0,0 +1,66 @@ +package org.apache.helix.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.concurrent.Callable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExecutorTaskUtil { + + private static final Logger LOG = LoggerFactory.getLogger(ExecutorTaskUtil.class); + + /** + * Wrap a callable so that any raised exception is logged + * (can be interesting in case the callable is used as a completely asynchronous task + * fed to an {@link java.util.concurrent.ExecutorService}), for which we are never + * calling any of the {@link java.util.concurrent.Future#get()} or {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)} + * methods. + */ + public static Callable wrap(Callable callable) { + return () -> { + try { + return callable.call(); + } catch (Throwable t) { + LOG.error("Callable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t); + throw t; + } + }; + } + + /** + * Wrap a runnable so that any raised exception is logged + * (can be interesting in case the callable is used as a completely asynchronous task + * fed to an {@link java.util.concurrent.ExecutorService}), for which we are never + * calling any of the {@link java.util.concurrent.Future#get()} or {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)} + * methods. + */ + public static Runnable wrap(Runnable runnable) { + return () -> { + try { + runnable.run(); + } catch (Throwable t) { + LOG.error("Runnable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t); + throw t; + } + }; + } +}