Skip to content

Commit

Permalink
apache#2963 Fix NPE during task rebalancing
Browse files Browse the repository at this point in the history
In case _helixManager in one of the known controllers is _null_, finding
the "leader controller" can lead to a NPE.

When run as an asynchronous task, the NPE may not even be logged, and the
task finishes without any trace, hence the addition of ExecutorTaskUtil
class to wrap callables/runnables in such a manner that such an exception
at least gets logged.
  • Loading branch information
jacob-netguardians committed Nov 19, 2024
1 parent f37f465 commit c76a46e
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ public static GenericHelixController getLeaderController(String clusterName) {
if (clusterName != null) {
ImmutableSet<GenericHelixController> controllers = _helixControllerFactory.get(clusterName);
if (controllers != null) {
return controllers.stream().filter(controller -> controller._helixManager.isLeader())
return controllers.stream()
.filter(controller -> controller._helixManager != null)
.filter(controller -> controller._helixManager.isLeader())
.findAny().orElse(null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.helix.common.DedupEventProcessor;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.util.ExecutorTaskUtil;

public class AbstractBaseStage implements Stage {
protected String _eventId;
Expand Down Expand Up @@ -64,9 +65,9 @@ public String getStageName() {
return className;
}

public static <T> Future asyncExecute(ExecutorService service, Callable<T> task) {
public static <T> Future<T> asyncExecute(ExecutorService service, Callable<T> task) {
if (service != null) {
return service.submit(task);
return service.submit(ExecutorTaskUtil.wrap(task));
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.monitoring.metrics.MetricCollector;
import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
import org.apache.helix.util.ExecutorTaskUtil;
import org.apache.helix.util.RebalanceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -117,7 +116,7 @@ public void globalRebalance(ResourceControllerDataProvider clusterData, Map<Stri
if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
// Calculate the Baseline assignment for global rebalance.
Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
Future<Boolean> result = _baselineCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
try {
// If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should
// be triggered again after baseline is finished.
Expand All @@ -132,7 +131,7 @@ public void globalRebalance(ResourceControllerDataProvider clusterData, Map<Stri
return false;
}
return true;
});
}));
if (waitForGlobalRebalance) {
try {
if (!result.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
import org.apache.helix.util.ExecutorTaskUtil;
import org.apache.helix.util.RebalanceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -99,7 +100,7 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map<Str
return;
}

_asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() -> {
_asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
try {
doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
currentStateOutput);
Expand All @@ -111,7 +112,7 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map<Str
return false;
}
return true;
});
}));
if (!_asyncPartialRebalanceEnabled) {
try {
if (!_asyncPartialRebalanceResult.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.util.ExecutorTaskUtil;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.MessageUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -401,7 +402,7 @@ private void addGeneratedMessageToMap(final Message message,
private void schedulePendingMessageCleanUp(
final Map<String, Map<String, Message>> pendingMessagesToPurge, ExecutorService workerPool,
final HelixDataAccessor accessor) {
workerPool.submit(new Callable<Object>() {
workerPool.submit(ExecutorTaskUtil.wrap(new Callable<Object>() {
@Override
public Object call() {
for (Map.Entry<String, Map<String, Message>> entry : pendingMessagesToPurge.entrySet()) {
Expand All @@ -415,7 +416,7 @@ public Object call() {
}
return null;
}
});
}));
}

private boolean shouldCleanUpPendingMessage(Message pendingMsg, Map<String, String> sessionIdMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.common.DedupEventBlockingQueue;
import org.apache.helix.util.ExecutorTaskUtil;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void submitEventToExecutor(NotificationContext.Type eventType, Notificati
}
if (_futureCallBackProcessEvent == null || _futureCallBackProcessEvent.isDone()) {
_futureCallBackProcessEvent =
_threadPoolExecutor.submit(new CallbackProcessor(handler, event));
_threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
} else {
_callBackEventQueue.put(eventType, event);
}
Expand All @@ -102,7 +103,7 @@ private void submitPendingHandleCallBackEventToManagerThreadPool(CallbackHandler
try {
NotificationContext event = _callBackEventQueue.take();
_futureCallBackProcessEvent =
_threadPoolExecutor.submit(new CallbackProcessor(handler, event));
_threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
} catch (InterruptedException e) {
logger
.error("Error when submitting pending HandleCallBackEvent to manager thread pool", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.ExecutorTaskUtil;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
Expand Down Expand Up @@ -465,7 +466,7 @@ public boolean scheduleTask(MessageTask task) {
}

LOG.info("Submit task: " + taskId + " to pool: " + exeSvc);
Future<HelixTaskResult> future = exeSvc.submit(task);
Future<HelixTaskResult> future = exeSvc.submit(ExecutorTaskUtil.wrap(task));

_messageTaskMap
.putIfAbsent(getMessageTarget(message.getResourceName(), message.getPartitionName()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor;
import org.apache.helix.util.ExecutorTaskUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -971,7 +972,7 @@ private void recordPropagationLatency(final long currentTime,
// restrict running report task count to be 1.
// Any parallel tasks will be skipped. So the reporting metric data is sampled.
if (_reportingTask == null || _reportingTask.isDone()) {
_reportingTask = _reportExecutor.submit(new Callable<Object>() {
_reportingTask = _reportExecutor.submit(ExecutorTaskUtil.wrap(new Callable<Object>() {
@Override
public Object call() {
// getNewCurrentStateEndTimes() needs to iterate all current states. Make it async to
Expand Down Expand Up @@ -1000,7 +1001,7 @@ public Object call() {
}
return null;
}
});
}));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.util.ExecutorTaskUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -329,7 +330,7 @@ private void startTask(Message msg, String taskPartition) {
_taskRunner =
new TaskRunner(task, msg.getResourceName(), taskPartition, msg.getTgtName(), _manager,
msg.getTgtSessionId(), this);
_taskExecutor.submit(_taskRunner);
_taskExecutor.submit(ExecutorTaskUtil.wrap(_taskRunner));
_taskRunner.waitTillStarted();

// Set up a timer to cancel the task when its time out expires.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.apache.helix.util;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.concurrent.Callable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutorTaskUtil {

private static final Logger LOG = LoggerFactory.getLogger(ExecutorTaskUtil.class);

/**
* Wrap a callable so that any raised exception is logged
* (can be interesting in case the callable is used as a completely asynchronous task
* fed to an {@link java.util.concurrent.ExecutorService}), for which we are never
* calling any of the {@link java.util.concurrent.Future#get()} or {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)}
* methods.
*/
public static <T> Callable<T> wrap(Callable<T> callable) {
return () -> {
try {
return callable.call();
} catch (Throwable t) {
LOG.error("Callable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t);
throw t;
}
};
}

/**
* Wrap a runnable so that any raised exception is logged
* (can be interesting in case the callable is used as a completely asynchronous task
* fed to an {@link java.util.concurrent.ExecutorService}), for which we are never
* calling any of the {@link java.util.concurrent.Future#get()} or {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)}
* methods.
*/
public static Runnable wrap(Runnable runnable) {
return () -> {
try {
runnable.run();
} catch (Throwable t) {
LOG.error("Runnable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t);
throw t;
}
};
}
}

0 comments on commit c76a46e

Please sign in to comment.