diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 428bfa7fb5..b0e72a965c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -235,10 +235,8 @@ public static GenericHelixController getLeaderController(String clusterName) { if (clusterName != null) { ImmutableSet controllers = _helixControllerFactory.get(clusterName); if (controllers != null) { - return controllers.stream() - .filter(controller -> controller._helixManager != null) - .filter(controller -> controller._helixManager.isLeader()) - .findAny().orElse(null); + return controllers.stream().filter(controller -> controller._helixManager != null) + .filter(controller -> controller._helixManager.isLeader()).findAny().orElse(null); } } return null; diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java index c6d15a9ccb..08138f9be9 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java @@ -65,7 +65,7 @@ public String getStageName() { return className; } - public static Future asyncExecute(ExecutorService service, Callable task) { + public static Future asyncExecute(ExecutorService service, Callable task) { if (service != null) { return service.submit(ExecutorTaskUtil.wrap(task)); } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java index 141d398e92..5bb0244fbd 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java @@ -100,19 +100,20 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map { - try { - doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm, - currentStateOutput); - } catch (HelixRebalanceException e) { - if (_asyncPartialRebalanceEnabled) { - _rebalanceFailureCount.increment(1L); - } - LOG.error("Failed to calculate best possible assignment!", e); - return false; - } - return true; - })); + _asyncPartialRebalanceResult = + _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> { + try { + doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm, + currentStateOutput); + } catch (HelixRebalanceException e) { + if (_asyncPartialRebalanceEnabled) { + _rebalanceFailureCount.increment(1L); + } + LOG.error("Failed to calculate best possible assignment!", e); + return false; + } + return true; + })); if (!_asyncPartialRebalanceEnabled) { try { if (!_asyncPartialRebalanceResult.get()) { diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java index 17eb5a3e1e..7a5e727544 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java @@ -89,8 +89,8 @@ public void submitEventToExecutor(NotificationContext.Type eventType, Notificati logger.error("Failed to process callback. CallbackEventExecutor is already shut down."); } if (_futureCallBackProcessEvent == null || _futureCallBackProcessEvent.isDone()) { - _futureCallBackProcessEvent = - _threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); + _futureCallBackProcessEvent = _threadPoolExecutor.submit( + ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); } else { _callBackEventQueue.put(eventType, event); } @@ -102,8 +102,8 @@ private void submitPendingHandleCallBackEventToManagerThreadPool(CallbackHandler if (_callBackEventQueue.size() != 0) { try { NotificationContext event = _callBackEventQueue.take(); - _futureCallBackProcessEvent = - _threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); + _futureCallBackProcessEvent = _threadPoolExecutor.submit( + ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); } catch (InterruptedException e) { logger .error("Error when submitting pending HandleCallBackEvent to manager thread pool", e); diff --git a/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java b/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java index 1877a0c6a4..da2f82456c 100644 --- a/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java @@ -40,7 +40,8 @@ public static Callable wrap(Callable callable) { try { return callable.call(); } catch (Throwable t) { - LOG.error("Callable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t); + LOG.error("Callable run on thread {} raised an exception and exited", + Thread.currentThread().getName(), t); throw t; } }; @@ -50,7 +51,8 @@ public static Callable wrap(Callable callable) { * Wrap a runnable so that any raised exception is logged * (can be interesting in case the callable is used as a completely asynchronous task * fed to an {@link java.util.concurrent.ExecutorService}), for which we are never - * calling any of the {@link java.util.concurrent.Future#get()} or {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)} + * calling any of the {@link java.util.concurrent.Future#get()} or + * {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)} * methods. */ public static Runnable wrap(Runnable runnable) { @@ -58,7 +60,8 @@ public static Runnable wrap(Runnable runnable) { try { runnable.run(); } catch (Throwable t) { - LOG.error("Runnable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t); + LOG.error("Runnable run on thread {} raised an exception and exited", + Thread.currentThread().getName(), t); throw t; } };