diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index e84ac45cca..428bfa7fb5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -129,7 +129,13 @@
* 4. select the messages that can be sent, needs messages and state model constraints
* 5. send messages
*/
-public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener, TaskCurrentStateChangeListener, CustomizedStateRootChangeListener, CustomizedStateChangeListener, CustomizedStateConfigChangeListener, ControllerChangeListener, InstanceConfigChangeListener, ResourceConfigChangeListener, ClusterConfigChangeListener {
+public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener,
+ MessageListener, CurrentStateChangeListener,
+ TaskCurrentStateChangeListener,
+ CustomizedStateRootChangeListener,
+ CustomizedStateChangeListener,
+ CustomizedStateConfigChangeListener, ControllerChangeListener,
+ InstanceConfigChangeListener, ResourceConfigChangeListener, ClusterConfigChangeListener {
private static final Logger logger =
LoggerFactory.getLogger(GenericHelixController.class.getName());
@@ -229,8 +235,10 @@ public static GenericHelixController getLeaderController(String clusterName) {
if (clusterName != null) {
ImmutableSet controllers = _helixControllerFactory.get(clusterName);
if (controllers != null) {
- return controllers.stream().filter(controller -> controller._helixManager != null)
- .filter(controller -> controller._helixManager.isLeader()).findAny().orElse(null);
+ return controllers.stream()
+ .filter(controller -> controller._helixManager != null)
+ .filter(controller -> controller._helixManager.isLeader())
+ .findAny().orElse(null);
}
}
return null;
@@ -289,7 +297,8 @@ public GenericHelixController(String clusterName) {
public GenericHelixController(String clusterName, Set enabledPipelins) {
this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()),
createTaskRegistry(Pipeline.Type.TASK.name()),
- createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()), clusterName,
+ createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()),
+ clusterName,
enabledPipelins);
}
@@ -333,8 +342,8 @@ public long getNextRebalanceTime() {
public void run() {
try {
if (_shouldRefreshCacheOption.orElse(
- _clusterEventType.equals(ClusterEventType.PeriodicalRebalance)
- || _clusterEventType.equals(ClusterEventType.OnDemandRebalance))) {
+ _clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || _clusterEventType
+ .equals(ClusterEventType.OnDemandRebalance))) {
requestDataProvidersFullRefresh();
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
@@ -363,8 +372,8 @@ private void forceRebalance(HelixManager manager, ClusterEventType eventType) {
NotificationContext changeContext = new NotificationContext(manager);
changeContext.setType(NotificationContext.Type.CALLBACK);
pushToEventQueues(eventType, changeContext, Collections.EMPTY_MAP);
- logger.info(
- String.format("Controller rebalance pipeline triggered with event type: %s for cluster %s",
+ logger.info(String
+ .format("Controller rebalance pipeline triggered with event type: %s for cluster %s",
eventType, _clusterName));
}
@@ -381,9 +390,9 @@ void startPeriodRebalance(long period, HelixManager manager) {
synchronized (_periodicalRebalanceExecutor) {
lastScheduledFuture = _periodicRebalanceFutureTask;
_timerPeriod = period;
- _periodicRebalanceFutureTask = _periodicalRebalanceExecutor.scheduleAtFixedRate(
- new RebalanceTask(manager, ClusterEventType.PeriodicalRebalance), _timerPeriod,
- _timerPeriod, TimeUnit.MILLISECONDS);
+ _periodicRebalanceFutureTask = _periodicalRebalanceExecutor
+ .scheduleAtFixedRate(new RebalanceTask(manager, ClusterEventType.PeriodicalRebalance),
+ _timerPeriod, _timerPeriod, TimeUnit.MILLISECONDS);
}
if (lastScheduledFuture != null && !lastScheduledFuture.isCancelled()) {
lastScheduledFuture.cancel(false /* mayInterruptIfRunning */);
@@ -412,7 +421,6 @@ private void shutdownOnDemandTimer() {
_onDemandRebalanceTimer.cancel();
}
}
-
/**
* This function is deprecated. Please use RebalanceUtil.scheduleInstantPipeline method instead.
* schedule a future rebalance pipeline run, delayed at given time.
@@ -420,8 +428,8 @@ private void shutdownOnDemandTimer() {
@Deprecated
public void scheduleRebalance(long rebalanceTime) {
if (_helixManager == null) {
- logger.warn("Failed to schedule a future pipeline run for cluster " + _clusterName
- + " helix manager is null!");
+ logger.warn(
+ "Failed to schedule a future pipeline run for cluster " + _clusterName + " helix manager is null!");
return;
}
@@ -440,8 +448,9 @@ public void scheduleRebalance(long rebalanceTime) {
new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime);
_onDemandRebalanceTimer.schedule(newTask, delay);
- logger.info("Scheduled a future pipeline run for cluster " + _helixManager.getClusterName()
- + " in delay " + delay);
+ logger.info(
+ "Scheduled a future pipeline run for cluster " + _helixManager.getClusterName() + " in delay "
+ + delay);
preTask = _nextRebalanceTask.getAndSet(newTask);
if (preTask != null) {
@@ -466,8 +475,7 @@ public void scheduleOnDemandRebalance(long delay) {
*/
public void scheduleOnDemandRebalance(long delay, boolean shouldRefreshCache) {
if (_helixManager == null) {
- logger.error(
- "Failed to schedule a future pipeline run for cluster {}. Helix manager is null!",
+ logger.error("Failed to schedule a future pipeline run for cluster {}. Helix manager is null!",
_clusterName);
return;
}
@@ -487,7 +495,7 @@ public void scheduleOnDemandRebalance(long delay, boolean shouldRefreshCache) {
shouldRefreshCache);
_onDemandRebalanceTimer.schedule(newTask, delay);
- logger.info("Scheduled instant pipeline run for cluster {}.", _helixManager.getClusterName());
+ logger.info("Scheduled instant pipeline run for cluster {}." , _helixManager.getClusterName());
RebalanceTask preTask = _nextRebalanceTask.getAndSet(newTask);
if (preTask != null) {
@@ -553,19 +561,23 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) {
rebalancePipeline);
registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess,
rebalancePipeline);
- registry.register(ClusterEventType.ClusterConfigChange, dataRefresh,
- autoExitMaintenancePipeline, dataPreprocess, rebalancePipeline);
- registry.register(ClusterEventType.LiveInstanceChange, dataRefresh,
- autoExitMaintenancePipeline, liveInstancePipeline, dataPreprocess, externalViewPipeline,
- customizedViewPipeline, rebalancePipeline);
- registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess,
- rebalancePipeline);
+ registry
+ .register(ClusterEventType.ClusterConfigChange, dataRefresh, autoExitMaintenancePipeline,
+ dataPreprocess, rebalancePipeline);
+ registry
+ .register(ClusterEventType.LiveInstanceChange, dataRefresh, autoExitMaintenancePipeline,
+ liveInstancePipeline, dataPreprocess, externalViewPipeline, customizedViewPipeline,
+ rebalancePipeline);
+ registry
+ .register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline);
registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline,
rebalancePipeline);
- registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh,
- autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline);
- registry.register(ClusterEventType.OnDemandRebalance, dataRefresh,
- autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline);
+ registry
+ .register(ClusterEventType.PeriodicalRebalance, dataRefresh, autoExitMaintenancePipeline,
+ dataPreprocess, externalViewPipeline, rebalancePipeline);
+ registry
+ .register(ClusterEventType.OnDemandRebalance, dataRefresh, autoExitMaintenancePipeline,
+ dataPreprocess, externalViewPipeline, rebalancePipeline);
registry.register(ClusterEventType.ControllerChange, dataRefresh, autoExitMaintenancePipeline,
dataPreprocess, externalViewPipeline, rebalancePipeline);
// TODO: We now include rebalance pipeline in customized state change for correctness.
@@ -620,8 +632,8 @@ private static PipelineRegistry createTaskRegistry(String pipelineName) {
rebalancePipeline);
registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline,
dataPreprocess, rebalancePipeline);
- registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess,
- rebalancePipeline);
+ registry
+ .register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline);
registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, rebalancePipeline);
registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, dataPreprocess,
rebalancePipeline);
@@ -652,10 +664,13 @@ private static PipelineRegistry createManagementModeRegistry(String pipelineName
managementMode.addStage(new ManagementMessageDispatchStage());
PipelineRegistry registry = new PipelineRegistry();
- Arrays.asList(ClusterEventType.ControllerChange, ClusterEventType.LiveInstanceChange,
- ClusterEventType.MessageChange, ClusterEventType.OnDemandRebalance,
- ClusterEventType.PeriodicalRebalance)
- .forEach(type -> registry.register(type, dataRefresh, dataPreprocess, managementMode));
+ Arrays.asList(
+ ClusterEventType.ControllerChange,
+ ClusterEventType.LiveInstanceChange,
+ ClusterEventType.MessageChange,
+ ClusterEventType.OnDemandRebalance,
+ ClusterEventType.PeriodicalRebalance
+ ).forEach(type -> registry.register(type, dataRefresh, dataPreprocess, managementMode));
return registry;
}
@@ -683,8 +698,7 @@ private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskR
_asyncTasksThreadPool =
Executors.newScheduledThreadPool(ASYNC_TASKS_THREADPOOL_SIZE, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
+ @Override public Thread newThread(Runnable r) {
return new Thread(r, "HelixController-async_tasks-" + _clusterName);
}
});
@@ -762,7 +776,7 @@ private void shutdownAsyncFIFOWorkers() {
private boolean isEventQueueEmpty(boolean taskQueue) {
if (taskQueue) {
- return _taskEventQueue == null || _taskEventQueue.isEmpty();
+ return _taskEventQueue == null || _taskEventQueue.isEmpty();
} else {
return _eventQueue == null || _eventQueue.isEmpty();
}
@@ -810,8 +824,8 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv
// If manager session changes, no need to run pipeline for the stale event.
if (!eventSessionId.isPresent() || !eventSessionId.get().equals(managerSessionId)) {
logger.warn(
- "Controller pipeline is not invoked because event session doesn't match cluster "
- + "manager session. Event type: {}, id: {}, session: {}, actual manager session: "
+ "Controller pipeline is not invoked because event session doesn't match cluster " +
+ "manager session. Event type: {}, id: {}, session: {}, actual manager session: "
+ "{}, instance: {}, cluster: {}", event.getEventType(), event.getEventId(),
eventSessionId.orElse("NOT_PRESENT"), managerSessionId, manager.getInstanceName(),
manager.getClusterName());
@@ -837,8 +851,8 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv
pipelines = _managementModeRegistry.getPipelinesForEvent(event.getEventType());
isManagementPipeline = true;
} else {
- logger.warn(
- String.format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(),
+ logger.warn(String
+ .format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(),
event.getEventType(), event.getEventId()));
return;
}
@@ -864,8 +878,7 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv
} else {
// TODO: should be in the initialization of controller.
if (_resourceControlDataProvider != null) {
- checkRebalancingTimer(manager, Collections.emptyList(),
- dataProvider.getClusterConfig());
+ checkRebalancingTimer(manager, Collections.emptyList(), dataProvider.getClusterConfig());
}
if (_isMonitoring) {
_clusterStatusMonitor.setEnabled(!_inManagementMode);
@@ -876,8 +889,7 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv
}
dataProvider.setClusterEventId(event.getEventId());
- event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(),
- _lastPipelineEndTimestamp);
+ event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), _lastPipelineEndTimestamp);
event.addAttribute(AttributeName.ControllerDataProvider.name(), dataProvider);
logger.info("START: Invoking {} controller pipeline for cluster: {}. Event type: {}, ID: {}. "
@@ -900,22 +912,20 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv
helixMetaDataAccessRebalanceFail = true;
// If pipeline failed due to read/write fails to zookeeper, retry the pipeline.
dataProvider.requireFullRefresh();
- logger.warn("Rebalance pipeline failed due to read failure from zookeeper, cluster: "
- + _clusterName);
+ logger.warn("Rebalance pipeline failed due to read failure from zookeeper, cluster: " + _clusterName);
// only push a retry event when there is no pending event in the corresponding event queue.
if (isEventQueueEmpty(isTaskFrameworkPipeline)) {
- _continuousRebalanceFailureCount++;
+ _continuousRebalanceFailureCount ++;
long delay = getRetryDelay(_continuousRebalanceFailureCount);
if (delay == 0) {
forceRebalance(manager, ClusterEventType.RetryRebalance);
} else {
- _asyncTasksThreadPool.schedule(
- new RebalanceTask(manager, ClusterEventType.RetryRebalance), delay,
- TimeUnit.MILLISECONDS);
+ _asyncTasksThreadPool
+ .schedule(new RebalanceTask(manager, ClusterEventType.RetryRebalance), delay,
+ TimeUnit.MILLISECONDS);
}
- logger.info(
- "Retry rebalance pipeline with delay " + delay + "ms for cluster: " + _clusterName);
+ logger.info("Retry rebalance pipeline with delay " + delay + "ms for cluster: " + _clusterName);
}
}
_clusterStatusMonitor.reportRebalanceFailure();
@@ -946,18 +956,20 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv
if (notificationContext != null) {
zkCallbackTime = notificationContext.getCreationTime();
if (_isMonitoring) {
- _clusterStatusMonitor.updateClusterEventDuration(
- ClusterEventMonitor.PhaseName.Callback.name(), enqueueTime - zkCallbackTime);
+ _clusterStatusMonitor
+ .updateClusterEventDuration(ClusterEventMonitor.PhaseName.Callback.name(),
+ enqueueTime - zkCallbackTime);
}
sb.append(String.format("Callback time for event: %s took: %s ms\n", event.getEventType(),
enqueueTime - zkCallbackTime));
}
if (_isMonitoring) {
- _clusterStatusMonitor.updateClusterEventDuration(
- ClusterEventMonitor.PhaseName.InQueue.name(), startTime - enqueueTime);
- _clusterStatusMonitor.updateClusterEventDuration(
- ClusterEventMonitor.PhaseName.TotalProcessed.name(),
- _lastPipelineEndTimestamp - startTime);
+ _clusterStatusMonitor
+ .updateClusterEventDuration(ClusterEventMonitor.PhaseName.InQueue.name(),
+ startTime - enqueueTime);
+ _clusterStatusMonitor
+ .updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(),
+ _lastPipelineEndTimestamp - startTime);
}
sb.append(String.format("InQueue time for event: %s took: %s ms\n", event.getEventType(),
startTime - enqueueTime));
@@ -978,13 +990,13 @@ private void updateContinuousRebalancedFailureCount(boolean isTaskFrameworkPipel
if (isTaskFrameworkPipeline) {
_continuousTaskRebalanceFailureCount =
resetToZero ? 0 : _continuousTaskRebalanceFailureCount + 1;
- _clusterStatusMonitor.reportContinuousTaskRebalanceFailureCount(
- _continuousTaskRebalanceFailureCount);
+ _clusterStatusMonitor
+ .reportContinuousTaskRebalanceFailureCount(_continuousTaskRebalanceFailureCount);
} else {
_continuousResourceRebalanceFailureCount =
resetToZero ? 0 : _continuousResourceRebalanceFailureCount + 1;
- _clusterStatusMonitor.reportContinuousResourceRebalanceFailureCount(
- _continuousResourceRebalanceFailureCount);
+ _clusterStatusMonitor
+ .reportContinuousResourceRebalanceFailureCount(_continuousResourceRebalanceFailureCount);
}
}
@@ -1007,8 +1019,8 @@ public void onStateChange(String instanceName, List statesInfo,
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onStateChange()");
notifyCaches(changeContext, ChangeType.CURRENT_STATE);
- pushToEventQueues(ClusterEventType.CurrentStateChange, changeContext,
- Collections.singletonMap(AttributeName.instanceName.name(), instanceName));
+ pushToEventQueues(ClusterEventType.CurrentStateChange, changeContext, Collections
+ .singletonMap(AttributeName.instanceName.name(), instanceName));
logger.info("END: GenericClusterController.onStateChange()");
}
@@ -1018,8 +1030,8 @@ public void onTaskCurrentStateChange(String instanceName, List sta
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onTaskCurrentStateChange()");
notifyCaches(changeContext, ChangeType.TASK_CURRENT_STATE);
- pushToEventQueues(ClusterEventType.TaskCurrentStateChange, changeContext,
- Collections.singletonMap(AttributeName.instanceName.name(), instanceName));
+ pushToEventQueues(ClusterEventType.TaskCurrentStateChange, changeContext, Collections
+ .singletonMap(AttributeName.instanceName.name(), instanceName));
logger.info("END: GenericClusterController.onTaskCurrentStateChange()");
}
@@ -1082,8 +1094,8 @@ public void onCustomizedStateChange(String instanceName, List s
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onCustomizedStateChange()");
notifyCaches(changeContext, ChangeType.CUSTOMIZED_STATE);
- pushToEventQueues(ClusterEventType.CustomizedStateChange, changeContext,
- Collections.singletonMap(AttributeName.instanceName.name(), instanceName));
+ pushToEventQueues(ClusterEventType.CustomizedStateChange, changeContext, Collections
+ .singletonMap(AttributeName.instanceName.name(), instanceName));
logger.info("END: GenericClusterController.onCustomizedStateChange()");
}
@@ -1102,8 +1114,7 @@ public void onMessage(String instanceName, List messages,
@Override
public void onLiveInstanceChange(List liveInstances,
NotificationContext changeContext) {
- logger.info("START: Generic GenericClusterController.onLiveInstanceChange() for cluster "
- + _clusterName);
+ logger.info("START: Generic GenericClusterController.onLiveInstanceChange() for cluster " + _clusterName);
notifyCaches(changeContext, ChangeType.LIVE_INSTANCE);
if (liveInstances == null) {
@@ -1197,40 +1208,44 @@ public void onInstanceConfigChange(List instanceConfigs,
@Override
@PreFetch(enabled = false)
- public void onResourceConfigChange(List resourceConfigs,
- NotificationContext context) {
+ public void onResourceConfigChange(
+ List resourceConfigs, NotificationContext context) {
logger.info(
"START: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName);
notifyCaches(context, ChangeType.RESOURCE_CONFIG);
pushToEventQueues(ClusterEventType.ResourceConfigChange, context,
Collections.emptyMap());
- logger.info(
- "END: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName);
+ logger
+ .info("END: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName);
}
@Override
@PreFetch(enabled = false)
- public void onCustomizedStateConfigChange(CustomizedStateConfig customizedStateConfig,
+ public void onCustomizedStateConfigChange(
+ CustomizedStateConfig customizedStateConfig,
NotificationContext context) {
- logger.info("START: GenericClusterController.onCustomizedStateConfigChange() for cluster "
- + _clusterName);
+ logger.info(
+ "START: GenericClusterController.onCustomizedStateConfigChange() for cluster "
+ + _clusterName);
notifyCaches(context, ChangeType.CUSTOMIZED_STATE_CONFIG);
pushToEventQueues(ClusterEventType.CustomizeStateConfigChange, context,
- Collections.emptyMap());
- logger.info("END: GenericClusterController.onCustomizedStateConfigChange() for cluster "
- + _clusterName);
+ Collections. emptyMap());
+ logger.info(
+ "END: GenericClusterController.onCustomizedStateConfigChange() for cluster "
+ + _clusterName);
}
@Override
@PreFetch(enabled = false)
- public void onClusterConfigChange(ClusterConfig clusterConfig, NotificationContext context) {
+ public void onClusterConfigChange(ClusterConfig clusterConfig,
+ NotificationContext context) {
logger.info(
"START: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName);
notifyCaches(context, ChangeType.CLUSTER_CONFIG);
pushToEventQueues(ClusterEventType.ClusterConfigChange, context,
Collections.emptyMap());
- logger.info(
- "END: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName);
+ logger
+ .info("END: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName);
}
private void notifyCaches(NotificationContext context, ChangeType changeType) {
@@ -1411,16 +1426,17 @@ protected void checkLiveInstancesObservation(List liveInstances,
}
}
- for (String instance : curInstances.keySet()) {
- if (lastInstances == null || !lastInstances.containsKey(instance)) {
- try {
- manager.addCustomizedStateRootChangeListener(this, instance);
- logger.info(manager.getInstanceName() + " added root path listener for customized "
- + "state change for " + instance + ", listener: " + this);
- } catch (Exception e) {
- logger.error("Fail to add root path listener for customized state change for instance: "
- + instance, e);
- }
+ for (String instance : curInstances.keySet()) {
+ if (lastInstances == null || !lastInstances.containsKey(instance)) {
+ try {
+ manager.addCustomizedStateRootChangeListener(this, instance);
+ logger.info(manager.getInstanceName() + " added root path listener for customized "
+ + "state change for " + instance + ", listener: " + this);
+ } catch (Exception e) {
+ logger.error(
+ "Fail to add root path listener for customized state change for instance: "
+ + instance, e);
+ }
}
}
@@ -1435,8 +1451,8 @@ public void shutdown() throws InterruptedException {
stopPeriodRebalance();
_periodicalRebalanceExecutor.shutdown();
- if (!_periodicalRebalanceExecutor.awaitTermination(EVENT_THREAD_JOIN_TIMEOUT,
- TimeUnit.MILLISECONDS)) {
+ if (!_periodicalRebalanceExecutor
+ .awaitTermination(EVENT_THREAD_JOIN_TIMEOUT, TimeUnit.MILLISECONDS)) {
_periodicalRebalanceExecutor.shutdownNow();
}
@@ -1534,16 +1550,16 @@ public void run() {
while (!isInterrupted()) {
try {
ClusterEvent newClusterEvent = _eventBlockingQueue.take();
- String threadName = String.format("HelixController-pipeline-%s-(%s)", _processorName,
- newClusterEvent.getEventId());
+ String threadName = String.format(
+ "HelixController-pipeline-%s-(%s)", _processorName, newClusterEvent.getEventId());
this.setName(threadName);
handleEvent(newClusterEvent, _cache);
} catch (InterruptedException e) {
logger.warn("ClusterEventProcessor interrupted " + _processorName, e);
interrupt();
} catch (ZkInterruptedException e) {
- logger.warn("ClusterEventProcessor caught a ZK connection interrupt " + _processorName,
- e);
+ logger
+ .warn("ClusterEventProcessor caught a ZK connection interrupt " + _processorName, e);
interrupt();
} catch (ThreadDeath death) {
logger.error("ClusterEventProcessor caught a ThreadDeath " + _processorName, death);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
index af9b14697f..5b7a7baba3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
@@ -27,7 +27,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
@@ -47,6 +46,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Global Rebalance does the baseline recalculation when certain changes happen.
* The Global Baseline calculation does not consider any temporary status, such as participants' offline/disabled.
@@ -60,9 +60,9 @@ class GlobalRebalanceRunner implements AutoCloseable {
// When any of the following change happens, the rebalancer needs to do a global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
private static final Set GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
- ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
- HelixConstants.ChangeType.IDEAL_STATE, HelixConstants.ChangeType.CLUSTER_CONFIG,
- HelixConstants.ChangeType.INSTANCE_CONFIG);
+ ImmutableSet
+ .of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE,
+ HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
// To calculate the baseline asynchronously
private final ExecutorService _baselineCalculateExecutor;
@@ -77,8 +77,10 @@ class GlobalRebalanceRunner implements AutoCloseable {
private boolean _asyncGlobalRebalanceEnabled;
public GlobalRebalanceRunner(AssignmentManager assignmentManager,
- AssignmentMetadataStore assignmentMetadataStore, MetricCollector metricCollector,
- LatencyMetric writeLatency, CountMetric rebalanceFailureCount,
+ AssignmentMetadataStore assignmentMetadataStore,
+ MetricCollector metricCollector,
+ LatencyMetric writeLatency,
+ CountMetric rebalanceFailureCount,
boolean isAsyncGlobalRebalanceEnabled) {
_baselineCalculateExecutor = Executors.newSingleThreadExecutor();
_assignmentManager = assignmentManager;
@@ -104,17 +106,14 @@ public GlobalRebalanceRunner(AssignmentManager assignmentManager,
* @param algorithm
* @throws HelixRebalanceException
*/
- public void globalRebalance(ResourceControllerDataProvider clusterData,
- Map resourceMap, final CurrentStateOutput currentStateOutput,
- RebalanceAlgorithm algorithm) throws HelixRebalanceException {
+ public void globalRebalance(ResourceControllerDataProvider clusterData, Map resourceMap,
+ final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException {
_changeDetector.updateSnapshots(clusterData);
// Get all the changed items' information. Filter for the items that have content changed.
- final Map> clusterChanges =
- _changeDetector.getAllChanges();
+ final Map> clusterChanges = _changeDetector.getAllChanges();
Set allAssignableInstances = clusterData.getAssignableInstances();
- if (clusterChanges.keySet().stream()
- .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
+ if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
// Calculate the Baseline assignment for global rebalance.
Future result = _baselineCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
@@ -154,8 +153,8 @@ public void globalRebalance(ResourceControllerDataProvider clusterData,
*/
private void doGlobalRebalance(ResourceControllerDataProvider clusterData,
Map resourceMap, Set allAssignableInstances,
- RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput,
- boolean shouldTriggerMainPipeline, Map> clusterChanges)
+ RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline,
+ Map> clusterChanges)
throws HelixRebalanceException {
LOG.info("Start calculating the new baseline.");
_baselineCalcCounter.increment(1L);
@@ -166,8 +165,7 @@ private void doGlobalRebalance(ResourceControllerDataProvider clusterData,
// 1. Ignore node status (disable/offline).
// 2. Use the previous Baseline as the only parameter about the previous assignment.
Map currentBaseline =
- _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
- resourceMap.keySet());
+ _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
ClusterModel clusterModel;
try {
clusterModel = ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap,
@@ -177,8 +175,7 @@ private void doGlobalRebalance(ResourceControllerDataProvider clusterData,
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
- Map newBaseline =
- WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
+ Map newBaseline = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
boolean isBaselineChanged =
_assignmentMetadataStore != null && _assignmentMetadataStore.isBaselineChanged(newBaseline);
// Write the new baseline to metadata store
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
index d307fe3f8b..141d398e92 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
@@ -26,7 +26,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
@@ -45,6 +44,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Compute the best possible assignment based on the Baseline and the previous Best Possible assignment.
* The coordinator compares the previous Best Possible assignment with the current cluster state so as to derive a
@@ -68,8 +68,10 @@ class PartialRebalanceRunner implements AutoCloseable {
private Future _asyncPartialRebalanceResult;
public PartialRebalanceRunner(AssignmentManager assignmentManager,
- AssignmentMetadataStore assignmentMetadataStore, MetricCollector metricCollector,
- CountMetric rebalanceFailureCount, boolean isAsyncPartialRebalanceEnabled) {
+ AssignmentMetadataStore assignmentMetadataStore,
+ MetricCollector metricCollector,
+ CountMetric rebalanceFailureCount,
+ boolean isAsyncPartialRebalanceEnabled) {
_assignmentManager = assignmentManager;
_assignmentMetadataStore = assignmentMetadataStore;
_bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor();
@@ -80,16 +82,16 @@ public PartialRebalanceRunner(AssignmentManager assignmentManager,
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
CountMetric.class);
_partialRebalanceLatency = metricCollector.getMetric(
- WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(),
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
+ .name(),
LatencyMetric.class);
_baselineDivergenceGauge = metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
BaselineDivergenceGauge.class);
}
- public void partialRebalance(ResourceControllerDataProvider clusterData,
- Map resourceMap, Set activeNodes,
- final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
+ public void partialRebalance(ResourceControllerDataProvider clusterData, Map resourceMap,
+ Set activeNodes, final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
// If partial rebalance is async and the previous result is not completed yet,
// do not start another partial rebalance.
@@ -98,20 +100,19 @@ public void partialRebalance(ResourceControllerDataProvider clusterData,
return;
}
- _asyncPartialRebalanceResult =
- _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
- try {
- doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
- currentStateOutput);
- } catch (HelixRebalanceException e) {
- if (_asyncPartialRebalanceEnabled) {
- _rebalanceFailureCount.increment(1L);
- }
- LOG.error("Failed to calculate best possible assignment!", e);
- return false;
- }
- return true;
- }));
+ _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
+ try {
+ doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
+ currentStateOutput);
+ } catch (HelixRebalanceException e) {
+ if (_asyncPartialRebalanceEnabled) {
+ _rebalanceFailureCount.increment(1L);
+ }
+ LOG.error("Failed to calculate best possible assignment!", e);
+ return false;
+ }
+ return true;
+ }));
if (!_asyncPartialRebalanceEnabled) {
try {
if (!_asyncPartialRebalanceResult.get()) {
@@ -130,9 +131,9 @@ public void partialRebalance(ResourceControllerDataProvider clusterData,
* If the result differ from the persisted result, persist it to memory (only if the version is not stale);
* If persisted, trigger the pipeline so that main thread logic can run again.
*/
- private void doPartialRebalance(ResourceControllerDataProvider clusterData,
- Map resourceMap, Set activeNodes, RebalanceAlgorithm algorithm,
- CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
+ private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map resourceMap,
+ Set activeNodes, RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput)
+ throws HelixRebalanceException {
LOG.info("Start calculating the new best possible assignment.");
_partialRebalanceCounter.increment(1L);
_partialRebalanceLatency.startMeasuringLatency();
@@ -141,14 +142,12 @@ private void doPartialRebalance(ResourceControllerDataProvider clusterData,
if (_assignmentMetadataStore != null) {
newBestPossibleAssignmentVersion = _assignmentMetadataStore.getBestPossibleVersion() + 1;
} else {
- LOG.debug(
- "Assignment Metadata Store is null. Skip getting best possible assignment version.");
+ LOG.debug("Assignment Metadata Store is null. Skip getting best possible assignment version.");
}
// Read the baseline from metadata store
Map currentBaseline =
- _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
- resourceMap.keySet());
+ _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
// Read the best possible assignment from metadata store
Map currentBestPossibleAssignment =
@@ -156,15 +155,14 @@ private void doPartialRebalance(ResourceControllerDataProvider clusterData,
resourceMap.keySet());
ClusterModel clusterModel;
try {
- clusterModel =
- ClusterModelProvider.generateClusterModelForPartialRebalance(clusterData, resourceMap,
- activeNodes, currentBaseline, currentBestPossibleAssignment);
+ clusterModel = ClusterModelProvider
+ .generateClusterModelForPartialRebalance(clusterData, resourceMap, activeNodes,
+ currentBaseline, currentBestPossibleAssignment);
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to generate cluster model for partial rebalance.",
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
- Map newAssignment =
- WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
+ Map newAssignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
// Asynchronously report baseline divergence metric before persisting to metadata store,
// just in case if persisting fails, we still have the metric.
@@ -179,14 +177,12 @@ private void doPartialRebalance(ResourceControllerDataProvider clusterData,
currentBaseline, newAssignmentCopy);
boolean bestPossibleUpdateSuccessful = false;
- if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(
- newAssignment)) {
+ if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(newAssignment)) {
// This will not persist the new Best Possible Assignment into ZK. It will only update the in-memory cache.
// If this is done successfully, the new Best Possible Assignment will be persisted into ZK the next time that
// the pipeline is triggered. We schedule the pipeline to run below.
- bestPossibleUpdateSuccessful =
- _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
- newBestPossibleAssignmentVersion);
+ bestPossibleUpdateSuccessful = _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
+ newBestPossibleAssignmentVersion);
} else {
LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 8a1a1f04ba..750829edb9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -65,9 +65,8 @@ public class MessageGenerationPhase extends AbstractBaseStage {
// at MASTER state, we wait for timeout and if the message is still not cleaned up by
// participant, controller will cleanup them proactively to unblock further state
// transition
- public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY =
- HelixUtil.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY,
- 60 * 1000);
+ public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
+ .getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 * 1000);
private final static String PENDING_MESSAGE = "pending message";
private final static String STALE_MESSAGE = "stale message";
@@ -187,9 +186,9 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
if (desiredState.equals(HelixDefinedState.DROPPED.name())) {
- LogUtil.logDebug(logger, _eventId, String.format(
- "No current state for partition %s in resource %s, skip the drop message",
- partition.getPartitionName(), resourceName));
+ LogUtil.logDebug(logger, _eventId, String
+ .format("No current state for partition %s in resource %s, skip the drop message",
+ partition.getPartitionName(), resourceName));
message =
generateCancellationMessageForPendingMessage(desiredState, currentState, nextState,
@@ -200,8 +199,8 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
// TODO: separate logic of resource/task message generation
if (cache instanceof ResourceControllerDataProvider) {
- ((ResourceControllerDataProvider) cache).invalidateCachedIdealStateMapping(
- resourceName);
+ ((ResourceControllerDataProvider) cache)
+ .invalidateCachedIdealStateMapping(resourceName);
}
continue;
}
@@ -215,9 +214,10 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
for (Message staleMessage : staleMessages) {
// staleMessage can be simple or batch mode
- if ((System.currentTimeMillis() - currentStateOutput.getEndTime(resourceName, partition,
- instanceName) > DEFAULT_OBSELETE_MSG_PURGE_DELAY) && staleMessage.getResourceName()
- .equals(resourceName) && sessionIdMap.containsKey(instanceName) && (
+ if ((System.currentTimeMillis() - currentStateOutput
+ .getEndTime(resourceName, partition, instanceName) > DEFAULT_OBSELETE_MSG_PURGE_DELAY)
+ && staleMessage.getResourceName().equals(resourceName) && sessionIdMap
+ .containsKey(instanceName) && (
staleMessage.getPartitionName().equals(partition.getPartitionName()) || (
staleMessage.getBatchMessageMode() && staleMessage.getPartitionNames()
.contains(partition.getPartitionName())))) {
@@ -229,8 +229,8 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
if (desiredState.equals(NO_DESIRED_STATE) || desiredState.equalsIgnoreCase(currentState)) {
if (shouldCreateSTCancellation(pendingMessage, desiredState,
stateModelDef.getInitialState())) {
- message =
- MessageUtil.createStateTransitionCancellationMessage(manager.getInstanceName(),
+ message = MessageUtil
+ .createStateTransitionCancellationMessage(manager.getInstanceName(),
manager.getSessionId(), resource, partition.getPartitionName(), instanceName,
sessionIdMap.get(instanceName), stateModelDef.getId(),
pendingMessage.getFromState(), pendingMessage.getToState(), null,
@@ -252,9 +252,10 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
stateModelDef, cancellationMessage, isCancellationEnabled);
} else {
// Create new state transition message
- message = MessageUtil.createStateTransitionMessage(manager.getInstanceName(),
- manager.getSessionId(), resource, partition.getPartitionName(), instanceName,
- currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId());
+ message = MessageUtil
+ .createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(),
+ resource, partition.getPartitionName(), instanceName, currentState, nextState,
+ sessionIdMap.get(instanceName), stateModelDef.getId());
if (logger.isDebugEnabled()) {
LogUtil.logDebug(logger, _eventId, String.format(
@@ -278,7 +279,8 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
LogUtil.logError(logger, _eventId, String.format(
"An invalid message was generated! Discarding this message. sessionIdMap: %s, CurrentStateMap: %s, InstanceStateMap: %s, AllInstances: %s, LiveInstances: %s, Message: %s",
sessionIdMap, currentStateOutput.getCurrentStateMap(resourceName, partition),
- instanceStateMap, cache.getAllInstances(), cache.getLiveInstances().keySet(),
+ instanceStateMap, cache.getAllInstances(),
+ cache.getLiveInstances().keySet(),
message));
continue; // Do not add this message
}
@@ -302,8 +304,8 @@ private boolean shouldCreateSTCancellation(Message pendingMessage, String desire
// 1. pending message toState is desired state
// 2. pending message is an ERROR reset: ERROR -> initState (eg. OFFLINE)
return !desiredState.equalsIgnoreCase(pendingMessage.getToState()) && !(
- HelixDefinedState.ERROR.name().equals(pendingMessage.getFromState()) && initialState.equals(
- pendingMessage.getToState()));
+ HelixDefinedState.ERROR.name().equals(pendingMessage.getFromState()) && initialState
+ .equals(pendingMessage.getToState()));
}
private void logAndAddToCleanUp(Map> messagesToCleanUp,
@@ -333,16 +335,15 @@ private Message generateCancellationMessageForPendingMessage(final String desire
String pendingState = pendingMessage.getToState();
if (nextState.equalsIgnoreCase(pendingState)) {
LogUtil.logInfo(logger, _eventId,
- "Message already exists for " + instanceName + " to transit "
- + resource.getResourceName() + "." + partition.getPartitionName() + " from "
- + currentState + " to " + nextState + ", isRelay: "
- + pendingMessage.isRelayMessage());
+ "Message already exists for " + instanceName + " to transit " + resource
+ .getResourceName() + "." + partition.getPartitionName() + " from " + currentState
+ + " to " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
} else if (currentState.equalsIgnoreCase(pendingState)) {
LogUtil.logDebug(logger, _eventId,
- "Message hasn't been removed for " + instanceName + " to transit "
- + resource.getResourceName() + "." + partition.getPartitionName() + " to "
- + pendingState + ", desiredState: " + desiredState + ", isRelay: "
- + pendingMessage.isRelayMessage());
+ "Message hasn't been removed for " + instanceName + " to transit " + resource
+ .getResourceName() + "." + partition.getPartitionName() + " to " + pendingState
+ + ", desiredState: " + desiredState + ", isRelay: " + pendingMessage
+ .isRelayMessage());
} else {
LogUtil.logDebug(logger, _eventId,
"IdealState changed before state transition completes for " + resource.getResourceName()
@@ -408,9 +409,8 @@ public Object call() {
String instanceName = entry.getKey();
for (Message msg : entry.getValue().values()) {
if (accessor.removeProperty(msg.getKey(accessor.keyBuilder(), instanceName))) {
- LogUtil.logInfo(logger, _eventId,
- String.format("Deleted message %s from instance %s", msg.getMsgId(),
- instanceName));
+ LogUtil.logInfo(logger, _eventId, String
+ .format("Deleted message %s from instance %s", msg.getMsgId(), instanceName));
}
}
}
@@ -442,8 +442,8 @@ private int getTimeOut(ClusterConfig clusterConfig, ResourceConfig resourceConfi
String currentState, String nextState, IdealState idealState, Partition partition) {
StateTransitionTimeoutConfig stateTransitionTimeoutConfig =
clusterConfig.getStateTransitionTimeoutConfig();
- int timeout = stateTransitionTimeoutConfig != null
- ? stateTransitionTimeoutConfig.getStateTransitionTimeout(currentState, nextState) : -1;
+ int timeout = stateTransitionTimeoutConfig != null ? stateTransitionTimeoutConfig
+ .getStateTransitionTimeout(currentState, nextState) : -1;
String timeOutStr = null;
// Check IdealState whether has timeout set
@@ -470,8 +470,8 @@ private int getTimeOut(ClusterConfig clusterConfig, ResourceConfig resourceConfi
if (resourceConfig != null) {
// If resource config has timeout, replace the cluster timeout.
stateTransitionTimeoutConfig = resourceConfig.getStateTransitionTimeoutConfig();
- timeout = stateTransitionTimeoutConfig != null
- ? stateTransitionTimeoutConfig.getStateTransitionTimeout(currentState, nextState) : -1;
+ timeout = stateTransitionTimeoutConfig != null ? stateTransitionTimeoutConfig
+ .getStateTransitionTimeout(currentState, nextState) : -1;
}
return timeout;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
index e56a20f983..17eb5a3e1e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
@@ -30,6 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Each batch mode is enabled, each CallbackHandler has a CallbackEventTPExecutor.
* It has a dedupe queue for pending call back event. Pending call back event will
@@ -60,8 +61,8 @@ class CallbackProcessor implements Runnable {
private final NotificationContext _event;
public CallbackProcessor(CallbackHandler handler, NotificationContext event) {
- _processorName = _manager.getClusterName() + "-CallbackProcessor@" + Integer.toHexString(
- handler.hashCode());
+ _processorName = _manager.getClusterName() + "-CallbackProcessor@" + Integer
+ .toHexString(handler.hashCode());
_handler = handler;
_event = event;
}
@@ -88,8 +89,8 @@ public void submitEventToExecutor(NotificationContext.Type eventType, Notificati
logger.error("Failed to process callback. CallbackEventExecutor is already shut down.");
}
if (_futureCallBackProcessEvent == null || _futureCallBackProcessEvent.isDone()) {
- _futureCallBackProcessEvent = _threadPoolExecutor.submit(
- ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
+ _futureCallBackProcessEvent =
+ _threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
} else {
_callBackEventQueue.put(eventType, event);
}
@@ -101,11 +102,11 @@ private void submitPendingHandleCallBackEventToManagerThreadPool(CallbackHandler
if (_callBackEventQueue.size() != 0) {
try {
NotificationContext event = _callBackEventQueue.take();
- _futureCallBackProcessEvent = _threadPoolExecutor.submit(
- ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
+ _futureCallBackProcessEvent =
+ _threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
} catch (InterruptedException e) {
- logger.error("Error when submitting pending HandleCallBackEvent to manager thread pool",
- e);
+ logger
+ .error("Error when submitting pending HandleCallBackEvent to manager thread pool", e);
}
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index fc734eef32..5c44d78e31 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -84,6 +84,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class HelixTaskExecutor implements MessageListener, TaskExecutor {
/**
* Put together all registration information about a message handler factory
@@ -93,8 +94,7 @@ class MsgHandlerFactoryRegistryItem {
private final int _threadPoolSize;
private final int _resetTimeout;
- public MsgHandlerFactoryRegistryItem(MessageHandlerFactory factory, int threadPoolSize,
- int resetTimeout) {
+ public MsgHandlerFactoryRegistryItem(MessageHandlerFactory factory, int threadPoolSize, int resetTimeout) {
if (factory == null) {
throw new NullPointerException("Message handler factory is null");
}
@@ -224,14 +224,12 @@ public void registerMessageHandlerFactory(String type, MessageHandlerFactory fac
}
@Override
- public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
- int threadpoolSize) {
- registerMessageHandlerFactory(type, factory, threadpoolSize,
- DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS);
+ public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, int threadpoolSize) {
+ registerMessageHandlerFactory(type, factory, threadpoolSize, DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS);
}
- private void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
- int threadpoolSize, int resetTimeoutMs) {
+ private void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, int threadpoolSize,
+ int resetTimeoutMs) {
if (factory instanceof MultiTypeMessageHandlerFactory) {
if (!((MultiTypeMessageHandlerFactory) factory).getMessageTypes().contains(type)) {
throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: "
@@ -239,15 +237,15 @@ private void registerMessageHandlerFactory(String type, MessageHandlerFactory fa
}
} else {
if (!factory.getMessageType().equals(type)) {
- throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: "
- + factory.getMessageType());
+ throw new HelixException(
+ "Message factory type mismatch. Type: " + type + ", factory: " + factory
+ .getMessageType());
}
}
_isShuttingDown = false;
- MsgHandlerFactoryRegistryItem newItem =
- new MsgHandlerFactoryRegistryItem(factory, threadpoolSize, resetTimeoutMs);
+ MsgHandlerFactoryRegistryItem newItem = new MsgHandlerFactoryRegistryItem(factory, threadpoolSize, resetTimeoutMs);
MsgHandlerFactoryRegistryItem prevItem = _hdlrFtyRegistry.putIfAbsent(type, newItem);
if (prevItem == null) {
_executorMap.computeIfAbsent(type, msgType -> {
@@ -256,9 +254,8 @@ private void registerMessageHandlerFactory(String type, MessageHandlerFactory fa
_monitor.createExecutorMonitor(type, newPool);
return newPool;
});
- LOG.info(
- "Registered message handler factory for type: {}, poolSize: {}, factory: {}, pool: {}",
- type, threadpoolSize, factory, _executorMap.get(type));
+ LOG.info("Registered message handler factory for type: {}, poolSize: {}, factory: {}, pool: {}", type,
+ threadpoolSize, factory, _executorMap.get(type));
} else {
LOG.info(
"Skip register message handler factory for type: {}, poolSize: {}, factory: {}, already existing factory: {}",
@@ -311,19 +308,17 @@ void updateStateTransitionMessageThreadPool(Message message, HelixManager manage
}
}
- String perStateTransitionTypeKey = msgInfo.getMessageIdentifier(
- Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE);
- if (perStateTransitionTypeKey != null && stateModelFactory != null
- && !_transitionTypeThreadpoolChecked.contains(perStateTransitionTypeKey)) {
+ String perStateTransitionTypeKey =
+ msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE);
+ if (perStateTransitionTypeKey != null && stateModelFactory != null && !_transitionTypeThreadpoolChecked.contains(
+ perStateTransitionTypeKey)) {
ExecutorService perStateTransitionTypeExecutor =
- stateModelFactory.getExecutorService(resourceName, message.getFromState(),
- message.getToState());
+ stateModelFactory.getExecutorService(resourceName, message.getFromState(), message.getToState());
_transitionTypeThreadpoolChecked.add(perStateTransitionTypeKey);
if (perStateTransitionTypeExecutor != null) {
_executorMap.put(perStateTransitionTypeKey, perStateTransitionTypeExecutor);
- LOG.info(String.format(
- "Added client specified dedicate threadpool for resource %s from %s to %s",
+ LOG.info(String.format("Added client specified dedicate threadpool for resource %s from %s to %s",
msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE),
message.getFromState(), message.getToState()));
return;
@@ -336,8 +331,9 @@ void updateStateTransitionMessageThreadPool(Message message, HelixManager manage
// Changes to this configuration on thread pool size will only take effect after the participant get restarted.
if (configAccessor != null) {
HelixConfigScope scope =
- new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster(
- manager.getClusterName()).forResource(resourceName).build();
+ new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster(manager.getClusterName())
+ .forResource(resourceName)
+ .build();
String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
try {
@@ -345,17 +341,14 @@ void updateStateTransitionMessageThreadPool(Message message, HelixManager manage
threadpoolSize = Integer.parseInt(threadpoolSizeStr);
}
} catch (Exception e) {
- LOG.error(
- "Failed to parse ThreadPoolSize from resourceConfig for resource" + resourceName, e);
+ LOG.error("Failed to parse ThreadPoolSize from resourceConfig for resource" + resourceName, e);
}
}
- final String key =
- msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE);
+ final String key = msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE);
if (threadpoolSize > 0) {
_executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize,
r -> new Thread(r, "GenericHelixController-message_handle_" + key)));
- LOG.info("Added dedicate threadpool for resource: " + resourceName + " with size: "
- + threadpoolSize);
+ LOG.info("Added dedicate threadpool for resource: " + resourceName + " with size: " + threadpoolSize);
} else {
// if threadpool is not configured
// check whether client specifies customized threadpool.
@@ -387,8 +380,7 @@ ExecutorService findExecutorServiceForMsg(Message message) {
} else {
Message.MessageInfo msgInfo = new Message.MessageInfo(message);
for (int i = Message.MessageInfo.MessageIdentifierBase.values().length - 1; i >= 0; i--) {
- String msgIdentifer =
- msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.values()[i]);
+ String msgIdentifer = msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.values()[i]);
if (msgIdentifer != null && _executorMap.containsKey(msgIdentifer)) {
LOG.info(String.format("Find customized threadpool for %s", msgIdentifer));
executorService = _executorMap.get(msgIdentifer);
@@ -457,8 +449,8 @@ public boolean scheduleTask(MessageTask task) {
LOG.info("Scheduling message {}: {}:{}, {}->{}", taskId, message.getResourceName(),
message.getPartitionName(), message.getFromState(), message.getToState());
- _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Message handling task scheduled",
- manager);
+ _statusUpdateUtil
+ .logInfo(message, HelixTaskExecutor.class, "Message handling task scheduled", manager);
// this sync guarantees that ExecutorService.submit() task and put taskInfo into map are
// sync'ed
@@ -467,8 +459,8 @@ public boolean scheduleTask(MessageTask task) {
ExecutorService exeSvc = findExecutorServiceForMsg(message);
if (exeSvc == null) {
- LOG.warn(
- String.format("Threadpool is null for type %s of message %s", message.getMsgType(),
+ LOG.warn(String
+ .format("Threadpool is null for type %s of message %s", message.getMsgType(),
message.getMsgId()));
return false;
}
@@ -476,15 +468,17 @@ public boolean scheduleTask(MessageTask task) {
LOG.info("Submit task: " + taskId + " to pool: " + exeSvc);
Future future = exeSvc.submit(ExecutorTaskUtil.wrap(task));
- _messageTaskMap.putIfAbsent(
- getMessageTarget(message.getResourceName(), message.getPartitionName()), taskId);
+ _messageTaskMap
+ .putIfAbsent(getMessageTarget(message.getResourceName(), message.getPartitionName()),
+ taskId);
TimerTask timerTask = null;
if (message.getExecutionTimeout() > 0) {
timerTask = new MessageTimeoutTask(this, task);
_timer.schedule(timerTask, message.getExecutionTimeout());
- LOG.info("Message starts with timeout " + message.getExecutionTimeout() + " MsgId: "
- + task.getTaskId());
+ LOG.info(
+ "Message starts with timeout " + message.getExecutionTimeout() + " MsgId: " + task
+ .getTaskId());
} else {
LOG.debug("Message does not have timeout. MsgId: " + task.getTaskId());
}
@@ -499,8 +493,9 @@ public boolean scheduleTask(MessageTask task) {
}
} catch (Throwable t) {
LOG.error("Error while executing task. " + message, t);
- _statusUpdateUtil.logError(message, HelixTaskExecutor.class, t,
- "Error while executing task " + t, manager);
+ _statusUpdateUtil
+ .logError(message, HelixTaskExecutor.class, t, "Error while executing task " + t,
+ manager);
}
return false;
}
@@ -534,8 +529,9 @@ public boolean cancelTask(MessageTask task) {
_taskMap.remove(taskId);
return true;
} else {
- _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
- "fail to cancel task: " + taskId, notificationContext.getManager());
+ _statusUpdateUtil
+ .logInfo(message, HelixTaskExecutor.class, "fail to cancel task: " + taskId,
+ notificationContext.getManager());
}
} else {
_statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
@@ -550,8 +546,8 @@ public boolean cancelTask(MessageTask task) {
public void finishTask(MessageTask task) {
Message message = task.getMessage();
String taskId = task.getTaskId();
- LOG.info("message finished: " + taskId + ", took " + (new Date().getTime()
- - message.getExecuteStartTimeStamp()));
+ LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - message
+ .getExecuteStartTimeStamp()));
synchronized (_lock) {
if (_taskMap.containsKey(taskId)) {
@@ -623,19 +619,18 @@ private void updateMessageState(Collection msgsToBeUpdated, HelixDataAc
}
}
- private void shutdownAndAwaitTermination(ExecutorService pool,
- MsgHandlerFactoryRegistryItem handlerItem) {
+ private void shutdownAndAwaitTermination(ExecutorService pool, MsgHandlerFactoryRegistryItem handlerItem) {
LOG.info("Shutting down pool: " + pool);
- int timeout =
- handlerItem == null ? DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS : handlerItem.getResetTimeout();
+ int timeout = handlerItem == null? DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS : handlerItem.getResetTimeout();
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
List waitingTasks = pool.shutdownNow(); // Cancel currently executing tasks
- LOG.info("Tasks that never commenced execution after {}: {}", timeout, waitingTasks);
+ LOG.info("Tasks that never commenced execution after {}: {}", timeout,
+ waitingTasks);
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
LOG.error("Pool did not fully terminate in {} ms. pool: {}", timeout, pool);
@@ -727,8 +722,12 @@ synchronized void reset() {
shutdownExecutors();
synchronized (_hdlrFtyRegistry) {
- _hdlrFtyRegistry.values().stream().map(MsgHandlerFactoryRegistryItem::factory).distinct()
- .filter(Objects::nonNull).forEach(factory -> {
+ _hdlrFtyRegistry.values()
+ .stream()
+ .map(MsgHandlerFactoryRegistryItem::factory)
+ .distinct()
+ .filter(Objects::nonNull)
+ .forEach(factory -> {
try {
factory.reset();
} catch (Exception ex) {
@@ -743,8 +742,7 @@ synchronized void reset() {
// Log all tasks that fail to terminate
for (String taskId : _taskMap.keySet()) {
MessageTaskInfo info = _taskMap.get(taskId);
- sb.append(
- "Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage() + "\n");
+ sb.append("Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage() + "\n");
}
LOG.info(sb.toString());
@@ -786,8 +784,8 @@ private void syncSessionToController(HelixManager manager) {
HelixDataAccessor accessor = manager.getHelixDataAccessor();
PropertyKey key = new Builder(manager.getClusterName()).controllerMessage(SESSION_SYNC);
if (accessor.getProperty(key) == null) {
- LOG.info(String.format("Participant %s syncs session with controller",
- manager.getInstanceName()));
+ LOG.info(String
+ .format("Participant %s syncs session with controller", manager.getInstanceName()));
Message msg = new Message(MessageType.PARTICIPANT_SESSION_CHANGE, SESSION_SYNC);
msg.setSrcName(manager.getInstanceName());
msg.setTgtSessionId("*");
@@ -997,8 +995,8 @@ public void onMessage(String instanceName, List messages,
&& !createCurStateNames.contains(resourceName)) {
createCurStateNames.add(resourceName);
PropertyKey curStateKey = keyBuilder.currentState(instanceName, sessionId, resourceName);
- if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef())
- && !Boolean.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) {
+ if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef()) && !Boolean
+ .getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) {
curStateKey = keyBuilder.taskCurrentState(instanceName, sessionId, resourceName);
}
createCurStateKeys.add(curStateKey);
@@ -1045,8 +1043,8 @@ public void onMessage(String instanceName, List messages,
*/
try {
// Record error state to the message handler.
- handler.onError(new HelixException(
- String.format("Failed to schedule the task for executing message handler for %s.",
+ handler.onError(new HelixException(String
+ .format("Failed to schedule the task for executing message handler for %s.",
handler._message.getMsgId())), MessageHandler.ErrorCode.ERROR,
MessageHandler.ErrorType.FRAMEWORK);
} catch (Exception ex) {
@@ -1094,8 +1092,8 @@ private boolean checkAndProcessNoOpMessage(Message message, String instanceName,
+ ", tgtSessionId in message: " + tgtSessionId + ", messageId: " + message.getMsgId();
LOG.warn(warningMessage);
reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
- _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage,
- manager);
+ _statusUpdateUtil
+ .logWarning(message, HelixStateMachineEngine.class, warningMessage, manager);
// Proactively send a session sync message from participant to controller
// upon session mismatch after a new session is established
@@ -1364,8 +1362,8 @@ public MessageHandler createMessageHandler(Message message, NotificationContext
// we will keep the message and the message will be handled when
// the corresponding MessageHandlerFactory is registered
if (item == null) {
- LOG.warn("Fail to find message handler factory for type: " + msgType + " msgId: "
- + message.getMsgId());
+ LOG.warn("Fail to find message handler factory for type: " + msgType + " msgId: " + message
+ .getMsgId());
return null;
}
MessageHandlerFactory handlerFactory = item.factory();
@@ -1462,8 +1460,8 @@ private void sendNopMessage(HelixDataAccessor accessor, String instanceName) {
Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString());
nopMsg.setSrcName(instanceName);
nopMsg.setTgtName(instanceName);
- accessor.setProperty(accessor.keyBuilder().message(nopMsg.getTgtName(), nopMsg.getId()),
- nopMsg);
+ accessor
+ .setProperty(accessor.keyBuilder().message(nopMsg.getTgtName(), nopMsg.getId()), nopMsg);
LOG.info("Send NO_OP message to {}, msgId: {}.", nopMsg.getTgtName(), nopMsg.getId());
} catch (Exception e) {
LOG.error("Failed to send NO_OP message to {}.", instanceName, e);
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 36bb2f1d45..646e8f3087 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -68,7 +68,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener, LiveInstanceChangeListener, CurrentStateChangeListener, CustomizedViewChangeListener, CustomizedViewRootChangeListener {
+public class RoutingTableProvider
+ implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener,
+ LiveInstanceChangeListener, CurrentStateChangeListener, CustomizedViewChangeListener,
+ CustomizedViewRootChangeListener {
private static final Logger logger = LoggerFactory.getLogger(RoutingTableProvider.class);
private static final long DEFAULT_PERIODIC_REFRESH_INTERVAL = 300000L; // 5 minutes
private final Map> _routingTableRefMap;
@@ -87,8 +90,9 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
private ExecutorService _reportExecutor;
private Future _reportingTask = null;
- protected static final String DEFAULT_PROPERTY_TYPE = "HELIX_DEFAULT_PROPERTY";
- protected static final String DEFAULT_STATE_TYPE = "HELIX_DEFAULT";
+ protected static final String DEFAULT_PROPERTY_TYPE = "HELIX_DEFAULT_PROPERTY";
+ protected static final String DEFAULT_STATE_TYPE = "HELIX_DEFAULT";
+
public RoutingTableProvider() {
this(null);
@@ -105,8 +109,7 @@ public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataTy
DEFAULT_PERIODIC_REFRESH_INTERVAL);
}
- public RoutingTableProvider(HelixManager helixManager,
- Map> sourceDataTypeMap) {
+ public RoutingTableProvider(HelixManager helixManager, Map> sourceDataTypeMap) {
this(helixManager, sourceDataTypeMap, true, DEFAULT_PERIODIC_REFRESH_INTERVAL);
}
@@ -150,7 +153,7 @@ public RoutingTableProvider(HelixManager helixManager,
if (propertyType.equals(PropertyType.CUSTOMIZEDVIEW)) {
throw new HelixException("CustomizedView has been used without any aggregation type!");
}
- String key = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
+ String key = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
if (_routingTableRefMap.get(key) == null) {
_routingTableRefMap.put(key, new AtomicReference<>(new RoutingTable(propertyType)));
}
@@ -161,7 +164,7 @@ public RoutingTableProvider(HelixManager helixManager,
sourceDataTypeMap.get(propertyType), propertyType.name()));
}
for (String customizedStateType : _sourceDataTypeMap.get(propertyType)) {
- String key = generateReferenceKey(propertyType.name(), customizedStateType);
+ String key = generateReferenceKey(propertyType.name(), customizedStateType);
if (_routingTableRefMap.get(key) == null) {
_routingTableRefMap.put(key, new AtomicReference<>(
new CustomizedViewRoutingTable(propertyType, customizedStateType)));
@@ -222,60 +225,58 @@ private void addListeners() {
if (_helixManager != null) {
for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
switch (propertyType) {
- case EXTERNALVIEW:
- try {
- _helixManager.addExternalViewChangeListener(this);
- } catch (Exception e) {
- shutdown();
- throw new HelixException("Failed to attach ExternalView Listener to HelixManager!",
- e);
- }
- break;
- case CUSTOMIZEDVIEW:
- // Add CustomizedView root change listener
+ case EXTERNALVIEW:
+ try {
+ _helixManager.addExternalViewChangeListener(this);
+ } catch (Exception e) {
+ shutdown();
+ throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e);
+ }
+ break;
+ case CUSTOMIZEDVIEW:
+ // Add CustomizedView root change listener
+ try {
+ _helixManager.addCustomizedViewRootChangeListener(this);
+ } catch (Exception e) {
+ shutdown();
+ throw new HelixException(
+ "Failed to attach CustomizedView Root Listener to HelixManager!", e);
+ }
+ // Add individual listeners for each customizedStateType
+ List customizedStateTypes = _sourceDataTypeMap.get(propertyType);
+ for (String customizedStateType : customizedStateTypes) {
try {
- _helixManager.addCustomizedViewRootChangeListener(this);
+ _helixManager.addCustomizedViewChangeListener(this, customizedStateType);
} catch (Exception e) {
shutdown();
- throw new HelixException(
- "Failed to attach CustomizedView Root Listener to HelixManager!", e);
- }
- // Add individual listeners for each customizedStateType
- List customizedStateTypes = _sourceDataTypeMap.get(propertyType);
- for (String customizedStateType : customizedStateTypes) {
- try {
- _helixManager.addCustomizedViewChangeListener(this, customizedStateType);
- } catch (Exception e) {
- shutdown();
- throw new HelixException(String.format(
- "Failed to attach CustomizedView Listener to HelixManager for type %s!",
- customizedStateType), e);
- }
- }
- break;
- case TARGETEXTERNALVIEW:
- // Check whether target external has been enabled or not
- if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
- _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(),
- 0)) {
- shutdown();
- throw new HelixException("Target External View is not enabled!");
+ throw new HelixException(String.format(
+ "Failed to attach CustomizedView Listener to HelixManager for type %s!",
+ customizedStateType), e);
}
+ }
+ break;
+ case TARGETEXTERNALVIEW:
+ // Check whether target external has been enabled or not
+ if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
+ _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(),
+ 0)) {
+ shutdown();
+ throw new HelixException("Target External View is not enabled!");
+ }
- try {
- _helixManager.addTargetExternalViewChangeListener(this);
- } catch (Exception e) {
- shutdown();
- throw new HelixException(
- "Failed to attach TargetExternalView Listener to HelixManager!", e);
- }
- break;
- case CURRENTSTATES:
- // CurrentState change listeners will be added later in LiveInstanceChange call.
- break;
- default:
+ try {
+ _helixManager.addTargetExternalViewChangeListener(this);
+ } catch (Exception e) {
+ shutdown();
throw new HelixException(
- String.format("Unsupported source data type: %s", propertyType));
+ "Failed to attach TargetExternalView Listener to HelixManager!", e);
+ }
+ break;
+ case CURRENTSTATES:
+ // CurrentState change listeners will be added later in LiveInstanceChange call.
+ break;
+ default:
+ throw new HelixException(String.format("Unsupported source data type: %s", propertyType));
}
}
try {
@@ -307,10 +308,10 @@ private void validateSourceDataTypeMap(Map> sourceDat
}
if (!propertyType.equals(PropertyType.CUSTOMIZEDVIEW)
&& sourceDataTypeMap.get(propertyType).size() != 0) {
- logger.error("Type has been used in addition to the propertyType {} !",
- propertyType.name());
- throw new HelixException(
- String.format("Type %s has been used in addition to the propertyType %s !",
+ logger
+ .error("Type has been used in addition to the propertyType {} !", propertyType.name());
+ throw new HelixException(String
+ .format("Type %s has been used in addition to the propertyType %s !",
sourceDataTypeMap.get(propertyType), propertyType.name()));
}
}
@@ -337,26 +338,26 @@ public void shutdown() {
_helixManager.removeListener(keyBuilder.instanceConfigs(), this);
for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
switch (propertyType) {
- case EXTERNALVIEW:
- _helixManager.removeListener(keyBuilder.externalViews(), this);
- break;
- case CUSTOMIZEDVIEW:
- List customizedStateTypes = _sourceDataTypeMap.get(propertyType);
- // Remove listener on each individual customizedStateType
- for (String customizedStateType : customizedStateTypes) {
- _helixManager.removeListener(keyBuilder.customizedView(customizedStateType), this);
- }
- break;
- case TARGETEXTERNALVIEW:
- _helixManager.removeListener(keyBuilder.targetExternalViews(), this);
- break;
- case CURRENTSTATES:
- NotificationContext context = new NotificationContext(_helixManager);
- context.setType(NotificationContext.Type.FINALIZE);
- updateCurrentStatesListeners(Collections.emptyList(), context);
- break;
- default:
- break;
+ case EXTERNALVIEW:
+ _helixManager.removeListener(keyBuilder.externalViews(), this);
+ break;
+ case CUSTOMIZEDVIEW:
+ List customizedStateTypes = _sourceDataTypeMap.get(propertyType);
+ // Remove listener on each individual customizedStateType
+ for (String customizedStateType : customizedStateTypes) {
+ _helixManager.removeListener(keyBuilder.customizedView(customizedStateType), this);
+ }
+ break;
+ case TARGETEXTERNALVIEW:
+ _helixManager.removeListener(keyBuilder.targetExternalViews(), this);
+ break;
+ case CURRENTSTATES:
+ NotificationContext context = new NotificationContext(_helixManager);
+ context.setType(NotificationContext.Type.FINALIZE);
+ updateCurrentStatesListeners(Collections.emptyList(), context);
+ break;
+ default:
+ break;
}
}
}
@@ -404,12 +405,13 @@ public Map> getRoutingTableSnapshots()
if (!snapshots.containsKey(propertyTypeName)) {
snapshots.put(propertyTypeName, new HashMap<>());
}
- snapshots.get(propertyTypeName)
- .put(customizedStateType, new RoutingTableSnapshot(routingTable));
+ snapshots.get(propertyTypeName).put(customizedStateType,
+ new RoutingTableSnapshot(routingTable));
}
return snapshots;
}
+
/**
* Add RoutingTableChangeListener with user defined context
* @param routingTableChangeListener
@@ -437,7 +439,7 @@ public void addRoutingTableChangeListener(
final NotificationContext periodicRefreshContext = new NotificationContext(_helixManager);
periodicRefreshContext.setType(NotificationContext.Type.PERIODIC_REFRESH);
_routerUpdater.queueEvent(periodicRefreshContext, ClusterEventType.PeriodicalRebalance, null);
- }
+ }
}
/**
@@ -474,8 +476,8 @@ public List getInstances(String resourceName, String partitionNa
*/
public List getInstancesForResource(String resourceName, String partitionName,
String state) {
- return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE).getInstancesForResource(
- resourceName, partitionName, state);
+ return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+ .getInstancesForResource(resourceName, partitionName, state);
}
/**
@@ -490,8 +492,8 @@ public List getInstancesForResource(String resourceName, String
*/
public List getInstancesForResourceGroup(String resourceGroupName,
String partitionName, String state) {
- return getRoutingTableRef(DEFAULT_PROPERTY_TYPE,
- DEFAULT_STATE_TYPE).getInstancesForResourceGroup(resourceGroupName, partitionName, state);
+ return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+ .getInstancesForResourceGroup(resourceGroupName, partitionName, state);
}
/**
@@ -507,9 +509,8 @@ public List getInstancesForResourceGroup(String resourceGroupNam
*/
public List getInstancesForResourceGroup(String resourceGroupName,
String partitionName, String state, List resourceTags) {
- return getRoutingTableRef(DEFAULT_PROPERTY_TYPE,
- DEFAULT_STATE_TYPE).getInstancesForResourceGroup(resourceGroupName, partitionName, state,
- resourceTags);
+ return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+ .getInstancesForResourceGroup(resourceGroupName, partitionName, state, resourceTags);
}
/**
@@ -532,8 +533,8 @@ public Set getInstances(String resourceName, String state) {
* @return empty list if there is no instance in a given state
*/
public Set getInstancesForResource(String resourceName, String state) {
- return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE).getInstancesForResource(
- resourceName, state);
+ return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+ .getInstancesForResource(resourceName, state);
}
/**
@@ -543,8 +544,8 @@ public Set getInstancesForResource(String resourceName, String s
* @return empty list if there is no instance in a given state
*/
public Set getInstancesForResourceGroup(String resourceGroupName, String state) {
- return getRoutingTableRef(DEFAULT_PROPERTY_TYPE,
- DEFAULT_STATE_TYPE).getInstancesForResourceGroup(resourceGroupName, state);
+ return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+ .getInstancesForResourceGroup(resourceGroupName, state);
}
/**
@@ -556,8 +557,8 @@ public Set getInstancesForResourceGroup(String resourceGroupName
*/
public Set getInstancesForResourceGroup(String resourceGroupName, String state,
List resourceTags) {
- return getRoutingTableRef(DEFAULT_PROPERTY_TYPE,
- DEFAULT_STATE_TYPE).getInstancesForResourceGroup(resourceGroupName, state, resourceTags);
+ return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+ .getInstancesForResourceGroup(resourceGroupName, state, resourceTags);
}
/**
@@ -602,9 +603,9 @@ private RoutingTable getRoutingTableRef(String propertyTypeName, String stateTyp
if (_routingTableRefMap.keySet().size() == 1) {
String key = _routingTableRefMap.keySet().iterator().next();
if (!_routingTableRefMap.containsKey(key)) {
- throw new HelixException(String.format(
- "Currently there is no snapshot available for PropertyType %s and stateType %s",
- propertyTypeName, stateType));
+ throw new HelixException(
+ String.format("Currently there is no snapshot available for PropertyType %s and stateType %s",
+ propertyTypeName, stateType));
}
return _routingTableRefMap.get(key).get();
} else {
@@ -618,11 +619,11 @@ private RoutingTable getRoutingTableRef(String propertyTypeName, String stateTyp
}
}
- String key = generateReferenceKey(propertyTypeName, stateType);
+ String key = generateReferenceKey(propertyTypeName, stateType);
if (!_routingTableRefMap.containsKey(key)) {
- throw new HelixException(String.format(
- "Currently there is no snapshot available for PropertyType %s and stateType %s",
- propertyTypeName, stateType));
+ throw new HelixException(
+ String.format("Currently there is no snapshot available for PropertyType %s and stateType %s",
+ propertyTypeName, stateType));
}
return _routingTableRefMap.get(key).get();
}
@@ -646,8 +647,7 @@ public void onExternalViewChange(List externalViewList,
if (externalViewList != null && externalViewList.size() > 0) {
// keep this here for back-compatibility, application can call onExternalViewChange directly
// with externalview list supplied.
- String keyReference =
- generateReferenceKey(PropertyType.EXTERNALVIEW.name(), DEFAULT_STATE_TYPE);
+ String keyReference = generateReferenceKey(PropertyType.EXTERNALVIEW.name(), DEFAULT_STATE_TYPE);
HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
List configList = accessor.getChildValues(keyBuilder.instanceConfigs(), true);
@@ -736,7 +736,8 @@ public void onCustomizedViewRootChange(List customizedViewTypes,
shutdown();
throw new HelixException(
String.format("Failed to attach CustomizedView Listener to HelixManager for type %s!",
- customizedStateType), e);
+ customizedStateType),
+ e);
}
}
}
@@ -803,7 +804,7 @@ private void updateCurrentStatesListeners(List liveInstances,
private void reset() {
logger.info("Resetting the routing table.");
RoutingTable newRoutingTable;
- for (String key : _routingTableRefMap.keySet()) {
+ for (String key: _routingTableRefMap.keySet()) {
PropertyType propertyType = _routingTableRefMap.get(key).get().getPropertyType();
if (propertyType == PropertyType.CUSTOMIZEDVIEW) {
String stateType = _routingTableRefMap.get(key).get().getStateType();
@@ -831,14 +832,12 @@ protected void refreshCustomizedView(Collection customizedViews,
long startTime = System.currentTimeMillis();
PropertyType propertyType = _routingTableRefMap.get(referenceKey).get().getPropertyType();
String customizedStateType = _routingTableRefMap.get(referenceKey).get().getStateType();
- RoutingTable newRoutingTable =
- new CustomizedViewRoutingTable(customizedViews, instanceConfigs, liveInstances,
- propertyType, customizedStateType);
+ RoutingTable newRoutingTable = new CustomizedViewRoutingTable(customizedViews, instanceConfigs,
+ liveInstances, propertyType, customizedStateType);
resetRoutingTableAndNotify(startTime, newRoutingTable, referenceKey);
}
- protected void refreshCurrentState(
- Map>> currentStateMap,
+ protected void refreshCurrentState(Map>> currentStateMap,
Collection instanceConfigs, Collection liveInstances,
String referenceKey) {
long startTime = System.currentTimeMillis();
@@ -847,8 +846,7 @@ protected void refreshCurrentState(
resetRoutingTableAndNotify(startTime, newRoutingTable, referenceKey);
}
- private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable,
- String referenceKey) {
+ private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable, String referenceKey) {
_routingTableRefMap.get(referenceKey).set(newRoutingTable);
String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;
logger.info("Refreshed the RoutingTable for cluster {}, took {} ms.", clusterName,
@@ -869,7 +867,8 @@ private void notifyRoutingTableChange(String clusterName, String referenceKey) {
// record time spent
// here. Potentially, we should call this callback in a separate thread if this is a bottleneck.
long startTime = System.currentTimeMillis();
- for (Map.Entry entry : _routingTableChangeListenerMap.entrySet()) {
+ for (Map.Entry entry : _routingTableChangeListenerMap
+ .entrySet()) {
entry.getKey().onRoutingTableChange(
new RoutingTableSnapshot(_routingTableRefMap.get(referenceKey).get()),
entry.getValue().getContext());
@@ -913,8 +912,7 @@ protected void handleEvent(ClusterEvent event) {
throw new HelixException("HelixManager is null for router update event.");
}
if (!manager.isConnected()) {
- logger.error(
- String.format("HelixManager is not connected for router update event: %s", event));
+ logger.error(String.format("HelixManager is not connected for router update event: %s", event));
throw new HelixException("HelixManager is not connected for router update event.");
}
@@ -923,43 +921,40 @@ protected void handleEvent(ClusterEvent event) {
_dataCache.refresh(manager.getHelixDataAccessor());
for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
switch (propertyType) {
- case EXTERNALVIEW: {
- String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
- refreshExternalView(_dataCache.getExternalViews().values(),
- _dataCache.getRoutableInstanceConfigMap().values(),
- _dataCache.getRoutableLiveInstances().values(), keyReference);
- }
- break;
- case TARGETEXTERNALVIEW: {
- String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
- refreshExternalView(_dataCache.getTargetExternalViews().values(),
- _dataCache.getRoutableInstanceConfigMap().values(),
- _dataCache.getRoutableLiveInstances().values(), keyReference);
- }
+ case EXTERNALVIEW: {
+ String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
+ refreshExternalView(_dataCache.getExternalViews().values(),
+ _dataCache.getRoutableInstanceConfigMap().values(),
+ _dataCache.getRoutableLiveInstances().values(),
+ keyReference);
+ }
break;
+ case TARGETEXTERNALVIEW: {
+ String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
+ refreshExternalView(_dataCache.getTargetExternalViews().values(),
+ _dataCache.getRoutableInstanceConfigMap().values(),
+ _dataCache.getRoutableLiveInstances().values(),
+ keyReference);
+ }
+ break;
case CUSTOMIZEDVIEW:
- for (String customizedStateType : _sourceDataTypeMap.getOrDefault(
- PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())) {
- String keyReference =
- generateReferenceKey(propertyType.name(), customizedStateType);
+ for (String customizedStateType : _sourceDataTypeMap.getOrDefault(PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())) {
+ String keyReference = generateReferenceKey(propertyType.name(), customizedStateType);
refreshCustomizedView(_dataCache.getCustomizedView(customizedStateType).values(),
_dataCache.getRoutableInstanceConfigMap().values(),
_dataCache.getRoutableLiveInstances().values(), keyReference);
}
break;
case CURRENTSTATES: {
- String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
- ;
+ String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);;
refreshCurrentState(_dataCache.getCurrentStatesMap(),
_dataCache.getRoutableInstanceConfigMap().values(),
_dataCache.getRoutableLiveInstances().values(), keyReference);
- recordPropagationLatency(System.currentTimeMillis(),
- _dataCache.getCurrentStateSnapshot());
+ recordPropagationLatency(System.currentTimeMillis(), _dataCache.getCurrentStateSnapshot());
}
- break;
+ break;
default:
- logger.warn("Unsupported source data type: {}, stop refreshing the routing table!",
- propertyType);
+ logger.warn("Unsupported source data type: {}, stop refreshing the routing table!", propertyType);
}
_monitorMap.get(propertyType).increaseDataRefreshCounters(startTime);
@@ -990,8 +985,7 @@ public Object call() {
long endTime = partitionStateEndTimes.get(partition);
if (currentTime >= endTime) {
for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
- _monitorMap.get(propertyType)
- .recordStatePropagationLatency(currentTime - endTime);
+ _monitorMap.get(propertyType).recordStatePropagationLatency(currentTime - endTime);
logger.debug(
"CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}",
key.toString(), partition, endTime, currentTime - endTime);
@@ -1011,6 +1005,7 @@ public Object call() {
}
}
+
public void queueEvent(NotificationContext context, ClusterEventType eventType,
HelixConstants.ChangeType changeType) {
ClusterEvent event = new ClusterEvent(_clusterName, eventType);
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 9f332d899b..a8df83c064 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -70,7 +70,8 @@ public void shutdown() {
reset();
}
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
return _taskExecutor.awaitTermination(timeout, unit);
}
@@ -83,9 +84,8 @@ public void onBecomeRunningFromInit(Message msg, NotificationContext context) {
public String onBecomeStoppedFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
- throw new IllegalStateException(
- String.format("Invalid state transition. There is no running task for partition %s.",
- taskPartition));
+ throw new IllegalStateException(String.format(
+ "Invalid state transition. There is no running task for partition %s.", taskPartition));
}
_taskRunner.cancel();
@@ -101,9 +101,8 @@ public String onBecomeStoppedFromRunning(Message msg, NotificationContext contex
public String onBecomeCompletedFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
- throw new IllegalStateException(
- String.format("Invalid state transition. There is no running task for partition %s.",
- taskPartition));
+ throw new IllegalStateException(String.format(
+ "Invalid state transition. There is no running task for partition %s.", taskPartition));
}
TaskResult r = _taskRunner.waitTillDone();
@@ -122,9 +121,8 @@ public String onBecomeCompletedFromRunning(Message msg, NotificationContext cont
public String onBecomeTimedOutFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
- throw new IllegalStateException(
- String.format("Invalid state transition. There is no running task for partition %s.",
- taskPartition));
+ throw new IllegalStateException(String.format(
+ "Invalid state transition. There is no running task for partition %s.", taskPartition));
}
TaskResult r = _taskRunner.waitTillDone();
@@ -143,9 +141,8 @@ public String onBecomeTimedOutFromRunning(Message msg, NotificationContext conte
public String onBecomeTaskErrorFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
- throw new IllegalStateException(
- String.format("Invalid state transition. There is no running task for partition %s.",
- taskPartition));
+ throw new IllegalStateException(String.format(
+ "Invalid state transition. There is no running task for partition %s.", taskPartition));
}
TaskResult r = _taskRunner.waitTillDone();
@@ -164,15 +161,13 @@ public String onBecomeTaskErrorFromRunning(Message msg, NotificationContext cont
public String onBecomeTaskAbortedFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
- throw new IllegalStateException(
- String.format("Invalid state transition. There is no running task for partition %s.",
- taskPartition));
+ throw new IllegalStateException(String.format(
+ "Invalid state transition. There is no running task for partition %s.", taskPartition));
}
_taskRunner.cancel();
TaskResult r = _taskRunner.waitTillDone();
- if (r.getStatus() != TaskResult.Status.FATAL_FAILED
- && r.getStatus() != TaskResult.Status.CANCELED) {
+ if (r.getStatus() != TaskResult.Status.FATAL_FAILED && r.getStatus() != TaskResult.Status.CANCELED) {
throw new IllegalStateException(String.format(
"Partition %s received a state transition to %s but the result status code is %s.",
taskPartition, msg.getToState(), r.getStatus()));
@@ -242,9 +237,8 @@ public void onBecomeDroppedFromTaskAborted(Message msg, NotificationContext cont
public void onBecomeInitFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
- throw new IllegalStateException(
- String.format("Invalid state transition. There is no running task for partition %s.",
- taskPartition));
+ throw new IllegalStateException(String.format(
+ "Invalid state transition. There is no running task for partition %s.", taskPartition));
}
_taskRunner.cancel();
@@ -320,18 +314,16 @@ private void startTask(Message msg, String taskPartition) {
callbackContext.setTaskConfig(taskConfig);
// Create a task instance with this command
- if (command == null || _taskFactoryRegistry == null || !_taskFactoryRegistry.containsKey(
- command)) {
- throw new IllegalStateException(
- String.format("Invalid state transition. There is no running task for partition %s.",
- taskPartition));
+ if (command == null || _taskFactoryRegistry == null
+ || !_taskFactoryRegistry.containsKey(command)) {
+ throw new IllegalStateException(String.format(
+ "Invalid state transition. There is no running task for partition %s.", taskPartition));
}
TaskFactory taskFactory = _taskFactoryRegistry.get(command);
Task task = taskFactory.createNewTask(callbackContext);
if (task instanceof UserContentStore) {
- ((UserContentStore) task).init(_manager, cfg.getWorkflow(), msg.getResourceName(),
- taskPartition);
+ ((UserContentStore) task).init(_manager, cfg.getWorkflow(), msg.getResourceName(), taskPartition);
}
// Submit the task for execution
diff --git a/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java b/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java
index e246e05fa2..1877a0c6a4 100644
--- a/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java
@@ -40,8 +40,7 @@ public static Callable wrap(Callable callable) {
try {
return callable.call();
} catch (Throwable t) {
- LOG.error("Callable run on thread {} raised an exception and exited",
- Thread.currentThread().getName(), t);
+ LOG.error("Callable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t);
throw t;
}
};
@@ -59,8 +58,7 @@ public static Runnable wrap(Runnable runnable) {
try {
runnable.run();
} catch (Throwable t) {
- LOG.error("Runnable run on thread {} raised an exception and exited",
- Thread.currentThread().getName(), t);
+ LOG.error("Runnable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t);
throw t;
}
};