Skip to content

Commit

Permalink
apache#2963 Fix NPE during task rebalancing
Browse files Browse the repository at this point in the history
Reformatted touched files according to "helix-format"
  • Loading branch information
jacob-netguardians committed Dec 11, 2024
1 parent e2426c0 commit e8d1432
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,8 @@ public static GenericHelixController getLeaderController(String clusterName) {
if (clusterName != null) {
ImmutableSet<GenericHelixController> controllers = _helixControllerFactory.get(clusterName);
if (controllers != null) {
return controllers.stream()
.filter(controller -> controller._helixManager != null)
.filter(controller -> controller._helixManager.isLeader())
.findAny().orElse(null);
return controllers.stream().filter(controller -> controller._helixManager != null)
.filter(controller -> controller._helixManager.isLeader()).findAny().orElse(null);
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public String getStageName() {
return className;
}

public static <T> Future<T> asyncExecute(ExecutorService service, Callable<T> task) {
public static <T> Future asyncExecute(ExecutorService service, Callable<T> task) {
if (service != null) {
return service.submit(ExecutorTaskUtil.wrap(task));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,20 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map<Str
return;
}

_asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
try {
doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
currentStateOutput);
} catch (HelixRebalanceException e) {
if (_asyncPartialRebalanceEnabled) {
_rebalanceFailureCount.increment(1L);
}
LOG.error("Failed to calculate best possible assignment!", e);
return false;
}
return true;
}));
_asyncPartialRebalanceResult =
_bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
try {
doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
currentStateOutput);
} catch (HelixRebalanceException e) {
if (_asyncPartialRebalanceEnabled) {
_rebalanceFailureCount.increment(1L);
}
LOG.error("Failed to calculate best possible assignment!", e);
return false;
}
return true;
}));
if (!_asyncPartialRebalanceEnabled) {
try {
if (!_asyncPartialRebalanceResult.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public void submitEventToExecutor(NotificationContext.Type eventType, Notificati
logger.error("Failed to process callback. CallbackEventExecutor is already shut down.");
}
if (_futureCallBackProcessEvent == null || _futureCallBackProcessEvent.isDone()) {
_futureCallBackProcessEvent =
_threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
_futureCallBackProcessEvent = _threadPoolExecutor.submit(
ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
} else {
_callBackEventQueue.put(eventType, event);
}
Expand All @@ -102,8 +102,8 @@ private void submitPendingHandleCallBackEventToManagerThreadPool(CallbackHandler
if (_callBackEventQueue.size() != 0) {
try {
NotificationContext event = _callBackEventQueue.take();
_futureCallBackProcessEvent =
_threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
_futureCallBackProcessEvent = _threadPoolExecutor.submit(
ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
} catch (InterruptedException e) {
logger
.error("Error when submitting pending HandleCallBackEvent to manager thread pool", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public static <T> Callable<T> wrap(Callable<T> callable) {
try {
return callable.call();
} catch (Throwable t) {
LOG.error("Callable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t);
LOG.error("Callable run on thread {} raised an exception and exited",
Thread.currentThread().getName(), t);
throw t;
}
};
Expand All @@ -50,15 +51,17 @@ public static <T> Callable<T> wrap(Callable<T> callable) {
* Wrap a runnable so that any raised exception is logged
* (can be interesting in case the callable is used as a completely asynchronous task
* fed to an {@link java.util.concurrent.ExecutorService}), for which we are never
* calling any of the {@link java.util.concurrent.Future#get()} or {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)}
* calling any of the {@link java.util.concurrent.Future#get()} or
* {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)}
* methods.
*/
public static Runnable wrap(Runnable runnable) {
return () -> {
try {
runnable.run();
} catch (Throwable t) {
LOG.error("Runnable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t);
LOG.error("Runnable run on thread {} raised an exception and exited",
Thread.currentThread().getName(), t);
throw t;
}
};
Expand Down

0 comments on commit e8d1432

Please sign in to comment.