Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2963 Fix NPE when rebalancing #2977

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ public static GenericHelixController getLeaderController(String clusterName) {
if (clusterName != null) {
ImmutableSet<GenericHelixController> controllers = _helixControllerFactory.get(clusterName);
if (controllers != null) {
return controllers.stream().filter(controller -> controller._helixManager.isLeader())
return controllers.stream()
.filter(controller -> controller._helixManager != null)
.filter(controller -> controller._helixManager.isLeader())
.findAny().orElse(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure using Helix format???

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, not sure I understand what you mean with "helix format". Is the formatting of the lines somehow faulty? From what I see the ".filter(...)" lines are indented the same way as the ".findAny()" line was before, so I don't get what is wrong, sorry.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I now understood what you meant. I applied the format to all the files I touched. I don't necessarily find it more readable, but then I guess this is also a question of taste. Hopefully this is ok now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure you load the Helix intellij / eclipse auto code formatter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. reverted global reformatting and only applied it on the lines I had changed initially.

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.helix.common.DedupEventProcessor;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.util.ExecutorTaskUtil;

public class AbstractBaseStage implements Stage {
protected String _eventId;
Expand Down Expand Up @@ -64,9 +65,9 @@ public String getStageName() {
return className;
}

public static <T> Future asyncExecute(ExecutorService service, Callable<T> task) {
public static <T> Future<T> asyncExecute(ExecutorService service, Callable<T> task) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to change this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the boy-scout rule, every time I see something which could be easily improved, I try to do so.

Here for type-safety, it is useful for the caller of this method to know/remember that the "get" method on the future they just got would supply an object of type T instead of an object of any forgotten type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's minimize the this kind of change. The reason is that Helix release not only support java 11 but backward compatible with 8.

Any this kind of "improvement" for working code may break backward build.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "improvement" would have been compatible with Java 1.5, for all I know, but OK, your choice. Rolled back.

if (service != null) {
return service.submit(task);
return service.submit(ExecutorTaskUtil.wrap(task));
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.monitoring.metrics.MetricCollector;
import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
import org.apache.helix.util.ExecutorTaskUtil;
import org.apache.helix.util.RebalanceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -117,7 +116,7 @@ public void globalRebalance(ResourceControllerDataProvider clusterData, Map<Stri
if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
// Calculate the Baseline assignment for global rebalance.
Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
Future<Boolean> result = _baselineCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
try {
// If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should
// be triggered again after baseline is finished.
Expand All @@ -132,7 +131,7 @@ public void globalRebalance(ResourceControllerDataProvider clusterData, Map<Stri
return false;
}
return true;
});
}));
if (waitForGlobalRebalance) {
try {
if (!result.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
import org.apache.helix.util.ExecutorTaskUtil;
import org.apache.helix.util.RebalanceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -99,7 +100,7 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map<Str
return;
}

_asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() -> {
_asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
try {
doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
currentStateOutput);
Expand All @@ -111,7 +112,7 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map<Str
return false;
}
return true;
});
}));
if (!_asyncPartialRebalanceEnabled) {
try {
if (!_asyncPartialRebalanceResult.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.util.ExecutorTaskUtil;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.MessageUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -401,7 +402,7 @@ private void addGeneratedMessageToMap(final Message message,
private void schedulePendingMessageCleanUp(
final Map<String, Map<String, Message>> pendingMessagesToPurge, ExecutorService workerPool,
final HelixDataAccessor accessor) {
workerPool.submit(new Callable<Object>() {
workerPool.submit(ExecutorTaskUtil.wrap(new Callable<Object>() {
@Override
public Object call() {
for (Map.Entry<String, Map<String, Message>> entry : pendingMessagesToPurge.entrySet()) {
Expand All @@ -415,7 +416,7 @@ public Object call() {
}
return null;
}
});
}));
}

private boolean shouldCleanUpPendingMessage(Message pendingMsg, Map<String, String> sessionIdMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.common.DedupEventBlockingQueue;
import org.apache.helix.util.ExecutorTaskUtil;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void submitEventToExecutor(NotificationContext.Type eventType, Notificati
}
if (_futureCallBackProcessEvent == null || _futureCallBackProcessEvent.isDone()) {
_futureCallBackProcessEvent =
_threadPoolExecutor.submit(new CallbackProcessor(handler, event));
_threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
} else {
_callBackEventQueue.put(eventType, event);
}
Expand All @@ -102,7 +103,7 @@ private void submitPendingHandleCallBackEventToManagerThreadPool(CallbackHandler
try {
NotificationContext event = _callBackEventQueue.take();
_futureCallBackProcessEvent =
_threadPoolExecutor.submit(new CallbackProcessor(handler, event));
_threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
} catch (InterruptedException e) {
logger
.error("Error when submitting pending HandleCallBackEvent to manager thread pool", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.ExecutorTaskUtil;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
Expand Down Expand Up @@ -465,7 +466,7 @@ public boolean scheduleTask(MessageTask task) {
}

LOG.info("Submit task: " + taskId + " to pool: " + exeSvc);
Future<HelixTaskResult> future = exeSvc.submit(task);
Future<HelixTaskResult> future = exeSvc.submit(ExecutorTaskUtil.wrap(task));

_messageTaskMap
.putIfAbsent(getMessageTarget(message.getResourceName(), message.getPartitionName()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor;
import org.apache.helix.util.ExecutorTaskUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -971,7 +972,7 @@ private void recordPropagationLatency(final long currentTime,
// restrict running report task count to be 1.
// Any parallel tasks will be skipped. So the reporting metric data is sampled.
if (_reportingTask == null || _reportingTask.isDone()) {
_reportingTask = _reportExecutor.submit(new Callable<Object>() {
_reportingTask = _reportExecutor.submit(ExecutorTaskUtil.wrap(new Callable<Object>() {
@Override
public Object call() {
// getNewCurrentStateEndTimes() needs to iterate all current states. Make it async to
Expand Down Expand Up @@ -1000,7 +1001,7 @@ public Object call() {
}
return null;
}
});
}));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.util.ExecutorTaskUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -329,7 +330,7 @@ private void startTask(Message msg, String taskPartition) {
_taskRunner =
new TaskRunner(task, msg.getResourceName(), taskPartition, msg.getTgtName(), _manager,
msg.getTgtSessionId(), this);
_taskExecutor.submit(_taskRunner);
_taskExecutor.submit(ExecutorTaskUtil.wrap(_taskRunner));
_taskRunner.waitTillStarted();

// Set up a timer to cancel the task when its time out expires.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.apache.helix.util;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.concurrent.Callable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutorTaskUtil {

private static final Logger LOG = LoggerFactory.getLogger(ExecutorTaskUtil.class);

/**
* Wrap a callable so that any raised exception is logged
* (can be interesting in case the callable is used as a completely asynchronous task
* fed to an {@link java.util.concurrent.ExecutorService}), for which we are never
* calling any of the {@link java.util.concurrent.Future#get()} or {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)}
* methods.
*/
public static <T> Callable<T> wrap(Callable<T> callable) {
return () -> {
try {
return callable.call();
} catch (Throwable t) {
LOG.error("Callable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t);
throw t;
}
};
}

/**
* Wrap a runnable so that any raised exception is logged
* (can be interesting in case the callable is used as a completely asynchronous task
* fed to an {@link java.util.concurrent.ExecutorService}), for which we are never
* calling any of the {@link java.util.concurrent.Future#get()} or {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)}
* methods.
*/
public static Runnable wrap(Runnable runnable) {
return () -> {
try {
runnable.run();
} catch (Throwable t) {
LOG.error("Runnable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t);
throw t;
}
};
}
}
Loading