From 3645d2b42b1c2e7ef3ead267b0493189f9335b9c Mon Sep 17 00:00:00 2001 From: Jacob Deker Date: Mon, 9 Dec 2024 15:56:56 +0100 Subject: [PATCH] #2963 Fix NPE during task rebalancing Reformatted touched files according to "helix-format" --- .../controller/GenericHelixController.java | 222 +++++++------- .../waged/GlobalRebalanceRunner.java | 35 ++- .../waged/PartialRebalanceRunner.java | 72 ++--- .../stages/MessageGenerationPhase.java | 68 ++--- .../manager/zk/CallbackEventExecutor.java | 17 +- .../messaging/handling/HelixTaskExecutor.java | 134 ++++----- .../helix/spectator/RoutingTableProvider.java | 279 +++++++++--------- .../org/apache/helix/task/TaskStateModel.java | 48 +-- .../apache/helix/util/ExecutorTaskUtil.java | 6 +- 9 files changed, 444 insertions(+), 437 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 428bfa7fb5..e84ac45cca 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -129,13 +129,7 @@ * 4. select the messages that can be sent, needs messages and state model constraints
* 5. send messages */ -public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener, - MessageListener, CurrentStateChangeListener, - TaskCurrentStateChangeListener, - CustomizedStateRootChangeListener, - CustomizedStateChangeListener, - CustomizedStateConfigChangeListener, ControllerChangeListener, - InstanceConfigChangeListener, ResourceConfigChangeListener, ClusterConfigChangeListener { +public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener, TaskCurrentStateChangeListener, CustomizedStateRootChangeListener, CustomizedStateChangeListener, CustomizedStateConfigChangeListener, ControllerChangeListener, InstanceConfigChangeListener, ResourceConfigChangeListener, ClusterConfigChangeListener { private static final Logger logger = LoggerFactory.getLogger(GenericHelixController.class.getName()); @@ -235,10 +229,8 @@ public static GenericHelixController getLeaderController(String clusterName) { if (clusterName != null) { ImmutableSet controllers = _helixControllerFactory.get(clusterName); if (controllers != null) { - return controllers.stream() - .filter(controller -> controller._helixManager != null) - .filter(controller -> controller._helixManager.isLeader()) - .findAny().orElse(null); + return controllers.stream().filter(controller -> controller._helixManager != null) + .filter(controller -> controller._helixManager.isLeader()).findAny().orElse(null); } } return null; @@ -297,8 +289,7 @@ public GenericHelixController(String clusterName) { public GenericHelixController(String clusterName, Set enabledPipelins) { this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()), createTaskRegistry(Pipeline.Type.TASK.name()), - createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()), - clusterName, + createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()), clusterName, enabledPipelins); } @@ -342,8 +333,8 @@ public long getNextRebalanceTime() { public void run() { try { if (_shouldRefreshCacheOption.orElse( - _clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || _clusterEventType - .equals(ClusterEventType.OnDemandRebalance))) { + _clusterEventType.equals(ClusterEventType.PeriodicalRebalance) + || _clusterEventType.equals(ClusterEventType.OnDemandRebalance))) { requestDataProvidersFullRefresh(); HelixDataAccessor accessor = _manager.getHelixDataAccessor(); @@ -372,8 +363,8 @@ private void forceRebalance(HelixManager manager, ClusterEventType eventType) { NotificationContext changeContext = new NotificationContext(manager); changeContext.setType(NotificationContext.Type.CALLBACK); pushToEventQueues(eventType, changeContext, Collections.EMPTY_MAP); - logger.info(String - .format("Controller rebalance pipeline triggered with event type: %s for cluster %s", + logger.info( + String.format("Controller rebalance pipeline triggered with event type: %s for cluster %s", eventType, _clusterName)); } @@ -390,9 +381,9 @@ void startPeriodRebalance(long period, HelixManager manager) { synchronized (_periodicalRebalanceExecutor) { lastScheduledFuture = _periodicRebalanceFutureTask; _timerPeriod = period; - _periodicRebalanceFutureTask = _periodicalRebalanceExecutor - .scheduleAtFixedRate(new RebalanceTask(manager, ClusterEventType.PeriodicalRebalance), - _timerPeriod, _timerPeriod, TimeUnit.MILLISECONDS); + _periodicRebalanceFutureTask = _periodicalRebalanceExecutor.scheduleAtFixedRate( + new RebalanceTask(manager, ClusterEventType.PeriodicalRebalance), _timerPeriod, + _timerPeriod, TimeUnit.MILLISECONDS); } if (lastScheduledFuture != null && !lastScheduledFuture.isCancelled()) { lastScheduledFuture.cancel(false /* mayInterruptIfRunning */); @@ -421,6 +412,7 @@ private void shutdownOnDemandTimer() { _onDemandRebalanceTimer.cancel(); } } + /** * This function is deprecated. Please use RebalanceUtil.scheduleInstantPipeline method instead. * schedule a future rebalance pipeline run, delayed at given time. @@ -428,8 +420,8 @@ private void shutdownOnDemandTimer() { @Deprecated public void scheduleRebalance(long rebalanceTime) { if (_helixManager == null) { - logger.warn( - "Failed to schedule a future pipeline run for cluster " + _clusterName + " helix manager is null!"); + logger.warn("Failed to schedule a future pipeline run for cluster " + _clusterName + + " helix manager is null!"); return; } @@ -448,9 +440,8 @@ public void scheduleRebalance(long rebalanceTime) { new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime); _onDemandRebalanceTimer.schedule(newTask, delay); - logger.info( - "Scheduled a future pipeline run for cluster " + _helixManager.getClusterName() + " in delay " - + delay); + logger.info("Scheduled a future pipeline run for cluster " + _helixManager.getClusterName() + + " in delay " + delay); preTask = _nextRebalanceTask.getAndSet(newTask); if (preTask != null) { @@ -475,7 +466,8 @@ public void scheduleOnDemandRebalance(long delay) { */ public void scheduleOnDemandRebalance(long delay, boolean shouldRefreshCache) { if (_helixManager == null) { - logger.error("Failed to schedule a future pipeline run for cluster {}. Helix manager is null!", + logger.error( + "Failed to schedule a future pipeline run for cluster {}. Helix manager is null!", _clusterName); return; } @@ -495,7 +487,7 @@ public void scheduleOnDemandRebalance(long delay, boolean shouldRefreshCache) { shouldRefreshCache); _onDemandRebalanceTimer.schedule(newTask, delay); - logger.info("Scheduled instant pipeline run for cluster {}." , _helixManager.getClusterName()); + logger.info("Scheduled instant pipeline run for cluster {}.", _helixManager.getClusterName()); RebalanceTask preTask = _nextRebalanceTask.getAndSet(newTask); if (preTask != null) { @@ -561,23 +553,19 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) { rebalancePipeline); registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline); - registry - .register(ClusterEventType.ClusterConfigChange, dataRefresh, autoExitMaintenancePipeline, - dataPreprocess, rebalancePipeline); - registry - .register(ClusterEventType.LiveInstanceChange, dataRefresh, autoExitMaintenancePipeline, - liveInstancePipeline, dataPreprocess, externalViewPipeline, customizedViewPipeline, - rebalancePipeline); - registry - .register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline); + registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, + autoExitMaintenancePipeline, dataPreprocess, rebalancePipeline); + registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, + autoExitMaintenancePipeline, liveInstancePipeline, dataPreprocess, externalViewPipeline, + customizedViewPipeline, rebalancePipeline); + registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, + rebalancePipeline); registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline); - registry - .register(ClusterEventType.PeriodicalRebalance, dataRefresh, autoExitMaintenancePipeline, - dataPreprocess, externalViewPipeline, rebalancePipeline); - registry - .register(ClusterEventType.OnDemandRebalance, dataRefresh, autoExitMaintenancePipeline, - dataPreprocess, externalViewPipeline, rebalancePipeline); + registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, + autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline); + registry.register(ClusterEventType.OnDemandRebalance, dataRefresh, + autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline); registry.register(ClusterEventType.ControllerChange, dataRefresh, autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline); // TODO: We now include rebalance pipeline in customized state change for correctness. @@ -632,8 +620,8 @@ private static PipelineRegistry createTaskRegistry(String pipelineName) { rebalancePipeline); registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline, dataPreprocess, rebalancePipeline); - registry - .register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline); + registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, + rebalancePipeline); registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, rebalancePipeline); registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, dataPreprocess, rebalancePipeline); @@ -664,13 +652,10 @@ private static PipelineRegistry createManagementModeRegistry(String pipelineName managementMode.addStage(new ManagementMessageDispatchStage()); PipelineRegistry registry = new PipelineRegistry(); - Arrays.asList( - ClusterEventType.ControllerChange, - ClusterEventType.LiveInstanceChange, - ClusterEventType.MessageChange, - ClusterEventType.OnDemandRebalance, - ClusterEventType.PeriodicalRebalance - ).forEach(type -> registry.register(type, dataRefresh, dataPreprocess, managementMode)); + Arrays.asList(ClusterEventType.ControllerChange, ClusterEventType.LiveInstanceChange, + ClusterEventType.MessageChange, ClusterEventType.OnDemandRebalance, + ClusterEventType.PeriodicalRebalance) + .forEach(type -> registry.register(type, dataRefresh, dataPreprocess, managementMode)); return registry; } @@ -698,7 +683,8 @@ private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskR _asyncTasksThreadPool = Executors.newScheduledThreadPool(ASYNC_TASKS_THREADPOOL_SIZE, new ThreadFactory() { - @Override public Thread newThread(Runnable r) { + @Override + public Thread newThread(Runnable r) { return new Thread(r, "HelixController-async_tasks-" + _clusterName); } }); @@ -776,7 +762,7 @@ private void shutdownAsyncFIFOWorkers() { private boolean isEventQueueEmpty(boolean taskQueue) { if (taskQueue) { - return _taskEventQueue == null || _taskEventQueue.isEmpty(); + return _taskEventQueue == null || _taskEventQueue.isEmpty(); } else { return _eventQueue == null || _eventQueue.isEmpty(); } @@ -824,8 +810,8 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv // If manager session changes, no need to run pipeline for the stale event. if (!eventSessionId.isPresent() || !eventSessionId.get().equals(managerSessionId)) { logger.warn( - "Controller pipeline is not invoked because event session doesn't match cluster " + - "manager session. Event type: {}, id: {}, session: {}, actual manager session: " + "Controller pipeline is not invoked because event session doesn't match cluster " + + "manager session. Event type: {}, id: {}, session: {}, actual manager session: " + "{}, instance: {}, cluster: {}", event.getEventType(), event.getEventId(), eventSessionId.orElse("NOT_PRESENT"), managerSessionId, manager.getInstanceName(), manager.getClusterName()); @@ -851,8 +837,8 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv pipelines = _managementModeRegistry.getPipelinesForEvent(event.getEventType()); isManagementPipeline = true; } else { - logger.warn(String - .format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(), + logger.warn( + String.format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(), event.getEventType(), event.getEventId())); return; } @@ -878,7 +864,8 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv } else { // TODO: should be in the initialization of controller. if (_resourceControlDataProvider != null) { - checkRebalancingTimer(manager, Collections.emptyList(), dataProvider.getClusterConfig()); + checkRebalancingTimer(manager, Collections.emptyList(), + dataProvider.getClusterConfig()); } if (_isMonitoring) { _clusterStatusMonitor.setEnabled(!_inManagementMode); @@ -889,7 +876,8 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv } dataProvider.setClusterEventId(event.getEventId()); - event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), _lastPipelineEndTimestamp); + event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), + _lastPipelineEndTimestamp); event.addAttribute(AttributeName.ControllerDataProvider.name(), dataProvider); logger.info("START: Invoking {} controller pipeline for cluster: {}. Event type: {}, ID: {}. " @@ -912,20 +900,22 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv helixMetaDataAccessRebalanceFail = true; // If pipeline failed due to read/write fails to zookeeper, retry the pipeline. dataProvider.requireFullRefresh(); - logger.warn("Rebalance pipeline failed due to read failure from zookeeper, cluster: " + _clusterName); + logger.warn("Rebalance pipeline failed due to read failure from zookeeper, cluster: " + + _clusterName); // only push a retry event when there is no pending event in the corresponding event queue. if (isEventQueueEmpty(isTaskFrameworkPipeline)) { - _continuousRebalanceFailureCount ++; + _continuousRebalanceFailureCount++; long delay = getRetryDelay(_continuousRebalanceFailureCount); if (delay == 0) { forceRebalance(manager, ClusterEventType.RetryRebalance); } else { - _asyncTasksThreadPool - .schedule(new RebalanceTask(manager, ClusterEventType.RetryRebalance), delay, - TimeUnit.MILLISECONDS); + _asyncTasksThreadPool.schedule( + new RebalanceTask(manager, ClusterEventType.RetryRebalance), delay, + TimeUnit.MILLISECONDS); } - logger.info("Retry rebalance pipeline with delay " + delay + "ms for cluster: " + _clusterName); + logger.info( + "Retry rebalance pipeline with delay " + delay + "ms for cluster: " + _clusterName); } } _clusterStatusMonitor.reportRebalanceFailure(); @@ -956,20 +946,18 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv if (notificationContext != null) { zkCallbackTime = notificationContext.getCreationTime(); if (_isMonitoring) { - _clusterStatusMonitor - .updateClusterEventDuration(ClusterEventMonitor.PhaseName.Callback.name(), - enqueueTime - zkCallbackTime); + _clusterStatusMonitor.updateClusterEventDuration( + ClusterEventMonitor.PhaseName.Callback.name(), enqueueTime - zkCallbackTime); } sb.append(String.format("Callback time for event: %s took: %s ms\n", event.getEventType(), enqueueTime - zkCallbackTime)); } if (_isMonitoring) { - _clusterStatusMonitor - .updateClusterEventDuration(ClusterEventMonitor.PhaseName.InQueue.name(), - startTime - enqueueTime); - _clusterStatusMonitor - .updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(), - _lastPipelineEndTimestamp - startTime); + _clusterStatusMonitor.updateClusterEventDuration( + ClusterEventMonitor.PhaseName.InQueue.name(), startTime - enqueueTime); + _clusterStatusMonitor.updateClusterEventDuration( + ClusterEventMonitor.PhaseName.TotalProcessed.name(), + _lastPipelineEndTimestamp - startTime); } sb.append(String.format("InQueue time for event: %s took: %s ms\n", event.getEventType(), startTime - enqueueTime)); @@ -990,13 +978,13 @@ private void updateContinuousRebalancedFailureCount(boolean isTaskFrameworkPipel if (isTaskFrameworkPipeline) { _continuousTaskRebalanceFailureCount = resetToZero ? 0 : _continuousTaskRebalanceFailureCount + 1; - _clusterStatusMonitor - .reportContinuousTaskRebalanceFailureCount(_continuousTaskRebalanceFailureCount); + _clusterStatusMonitor.reportContinuousTaskRebalanceFailureCount( + _continuousTaskRebalanceFailureCount); } else { _continuousResourceRebalanceFailureCount = resetToZero ? 0 : _continuousResourceRebalanceFailureCount + 1; - _clusterStatusMonitor - .reportContinuousResourceRebalanceFailureCount(_continuousResourceRebalanceFailureCount); + _clusterStatusMonitor.reportContinuousResourceRebalanceFailureCount( + _continuousResourceRebalanceFailureCount); } } @@ -1019,8 +1007,8 @@ public void onStateChange(String instanceName, List statesInfo, NotificationContext changeContext) { logger.info("START: GenericClusterController.onStateChange()"); notifyCaches(changeContext, ChangeType.CURRENT_STATE); - pushToEventQueues(ClusterEventType.CurrentStateChange, changeContext, Collections - .singletonMap(AttributeName.instanceName.name(), instanceName)); + pushToEventQueues(ClusterEventType.CurrentStateChange, changeContext, + Collections.singletonMap(AttributeName.instanceName.name(), instanceName)); logger.info("END: GenericClusterController.onStateChange()"); } @@ -1030,8 +1018,8 @@ public void onTaskCurrentStateChange(String instanceName, List sta NotificationContext changeContext) { logger.info("START: GenericClusterController.onTaskCurrentStateChange()"); notifyCaches(changeContext, ChangeType.TASK_CURRENT_STATE); - pushToEventQueues(ClusterEventType.TaskCurrentStateChange, changeContext, Collections - .singletonMap(AttributeName.instanceName.name(), instanceName)); + pushToEventQueues(ClusterEventType.TaskCurrentStateChange, changeContext, + Collections.singletonMap(AttributeName.instanceName.name(), instanceName)); logger.info("END: GenericClusterController.onTaskCurrentStateChange()"); } @@ -1094,8 +1082,8 @@ public void onCustomizedStateChange(String instanceName, List s NotificationContext changeContext) { logger.info("START: GenericClusterController.onCustomizedStateChange()"); notifyCaches(changeContext, ChangeType.CUSTOMIZED_STATE); - pushToEventQueues(ClusterEventType.CustomizedStateChange, changeContext, Collections - .singletonMap(AttributeName.instanceName.name(), instanceName)); + pushToEventQueues(ClusterEventType.CustomizedStateChange, changeContext, + Collections.singletonMap(AttributeName.instanceName.name(), instanceName)); logger.info("END: GenericClusterController.onCustomizedStateChange()"); } @@ -1114,7 +1102,8 @@ public void onMessage(String instanceName, List messages, @Override public void onLiveInstanceChange(List liveInstances, NotificationContext changeContext) { - logger.info("START: Generic GenericClusterController.onLiveInstanceChange() for cluster " + _clusterName); + logger.info("START: Generic GenericClusterController.onLiveInstanceChange() for cluster " + + _clusterName); notifyCaches(changeContext, ChangeType.LIVE_INSTANCE); if (liveInstances == null) { @@ -1208,44 +1197,40 @@ public void onInstanceConfigChange(List instanceConfigs, @Override @PreFetch(enabled = false) - public void onResourceConfigChange( - List resourceConfigs, NotificationContext context) { + public void onResourceConfigChange(List resourceConfigs, + NotificationContext context) { logger.info( "START: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName); notifyCaches(context, ChangeType.RESOURCE_CONFIG); pushToEventQueues(ClusterEventType.ResourceConfigChange, context, Collections.emptyMap()); - logger - .info("END: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName); + logger.info( + "END: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName); } @Override @PreFetch(enabled = false) - public void onCustomizedStateConfigChange( - CustomizedStateConfig customizedStateConfig, + public void onCustomizedStateConfigChange(CustomizedStateConfig customizedStateConfig, NotificationContext context) { - logger.info( - "START: GenericClusterController.onCustomizedStateConfigChange() for cluster " - + _clusterName); + logger.info("START: GenericClusterController.onCustomizedStateConfigChange() for cluster " + + _clusterName); notifyCaches(context, ChangeType.CUSTOMIZED_STATE_CONFIG); pushToEventQueues(ClusterEventType.CustomizeStateConfigChange, context, - Collections. emptyMap()); - logger.info( - "END: GenericClusterController.onCustomizedStateConfigChange() for cluster " - + _clusterName); + Collections.emptyMap()); + logger.info("END: GenericClusterController.onCustomizedStateConfigChange() for cluster " + + _clusterName); } @Override @PreFetch(enabled = false) - public void onClusterConfigChange(ClusterConfig clusterConfig, - NotificationContext context) { + public void onClusterConfigChange(ClusterConfig clusterConfig, NotificationContext context) { logger.info( "START: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName); notifyCaches(context, ChangeType.CLUSTER_CONFIG); pushToEventQueues(ClusterEventType.ClusterConfigChange, context, Collections.emptyMap()); - logger - .info("END: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName); + logger.info( + "END: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName); } private void notifyCaches(NotificationContext context, ChangeType changeType) { @@ -1426,17 +1411,16 @@ protected void checkLiveInstancesObservation(List liveInstances, } } - for (String instance : curInstances.keySet()) { - if (lastInstances == null || !lastInstances.containsKey(instance)) { - try { - manager.addCustomizedStateRootChangeListener(this, instance); - logger.info(manager.getInstanceName() + " added root path listener for customized " - + "state change for " + instance + ", listener: " + this); - } catch (Exception e) { - logger.error( - "Fail to add root path listener for customized state change for instance: " - + instance, e); - } + for (String instance : curInstances.keySet()) { + if (lastInstances == null || !lastInstances.containsKey(instance)) { + try { + manager.addCustomizedStateRootChangeListener(this, instance); + logger.info(manager.getInstanceName() + " added root path listener for customized " + + "state change for " + instance + ", listener: " + this); + } catch (Exception e) { + logger.error("Fail to add root path listener for customized state change for instance: " + + instance, e); + } } } @@ -1451,8 +1435,8 @@ public void shutdown() throws InterruptedException { stopPeriodRebalance(); _periodicalRebalanceExecutor.shutdown(); - if (!_periodicalRebalanceExecutor - .awaitTermination(EVENT_THREAD_JOIN_TIMEOUT, TimeUnit.MILLISECONDS)) { + if (!_periodicalRebalanceExecutor.awaitTermination(EVENT_THREAD_JOIN_TIMEOUT, + TimeUnit.MILLISECONDS)) { _periodicalRebalanceExecutor.shutdownNow(); } @@ -1550,16 +1534,16 @@ public void run() { while (!isInterrupted()) { try { ClusterEvent newClusterEvent = _eventBlockingQueue.take(); - String threadName = String.format( - "HelixController-pipeline-%s-(%s)", _processorName, newClusterEvent.getEventId()); + String threadName = String.format("HelixController-pipeline-%s-(%s)", _processorName, + newClusterEvent.getEventId()); this.setName(threadName); handleEvent(newClusterEvent, _cache); } catch (InterruptedException e) { logger.warn("ClusterEventProcessor interrupted " + _processorName, e); interrupt(); } catch (ZkInterruptedException e) { - logger - .warn("ClusterEventProcessor caught a ZK connection interrupt " + _processorName, e); + logger.warn("ClusterEventProcessor caught a ZK connection interrupt " + _processorName, + e); interrupt(); } catch (ThreadDeath death) { logger.error("ClusterEventProcessor caught a ThreadDeath " + _processorName, death); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java index 5b7a7baba3..af9b14697f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; + import org.apache.helix.HelixConstants; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.changedetector.ResourceChangeDetector; @@ -46,7 +47,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Global Rebalance does the baseline recalculation when certain changes happen. * The Global Baseline calculation does not consider any temporary status, such as participants' offline/disabled. @@ -60,9 +60,9 @@ class GlobalRebalanceRunner implements AutoCloseable { // When any of the following change happens, the rebalancer needs to do a global rebalance which // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline. private static final Set GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES = - ImmutableSet - .of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE, - HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG); + ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG, + HelixConstants.ChangeType.IDEAL_STATE, HelixConstants.ChangeType.CLUSTER_CONFIG, + HelixConstants.ChangeType.INSTANCE_CONFIG); // To calculate the baseline asynchronously private final ExecutorService _baselineCalculateExecutor; @@ -77,10 +77,8 @@ class GlobalRebalanceRunner implements AutoCloseable { private boolean _asyncGlobalRebalanceEnabled; public GlobalRebalanceRunner(AssignmentManager assignmentManager, - AssignmentMetadataStore assignmentMetadataStore, - MetricCollector metricCollector, - LatencyMetric writeLatency, - CountMetric rebalanceFailureCount, + AssignmentMetadataStore assignmentMetadataStore, MetricCollector metricCollector, + LatencyMetric writeLatency, CountMetric rebalanceFailureCount, boolean isAsyncGlobalRebalanceEnabled) { _baselineCalculateExecutor = Executors.newSingleThreadExecutor(); _assignmentManager = assignmentManager; @@ -106,14 +104,17 @@ public GlobalRebalanceRunner(AssignmentManager assignmentManager, * @param algorithm * @throws HelixRebalanceException */ - public void globalRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, - final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException { + public void globalRebalance(ResourceControllerDataProvider clusterData, + Map resourceMap, final CurrentStateOutput currentStateOutput, + RebalanceAlgorithm algorithm) throws HelixRebalanceException { _changeDetector.updateSnapshots(clusterData); // Get all the changed items' information. Filter for the items that have content changed. - final Map> clusterChanges = _changeDetector.getAllChanges(); + final Map> clusterChanges = + _changeDetector.getAllChanges(); Set allAssignableInstances = clusterData.getAssignableInstances(); - if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) { + if (clusterChanges.keySet().stream() + .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) { final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled; // Calculate the Baseline assignment for global rebalance. Future result = _baselineCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> { @@ -153,8 +154,8 @@ public void globalRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, Set allAssignableInstances, - RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline, - Map> clusterChanges) + RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, + boolean shouldTriggerMainPipeline, Map> clusterChanges) throws HelixRebalanceException { LOG.info("Start calculating the new baseline."); _baselineCalcCounter.increment(1L); @@ -165,7 +166,8 @@ private void doGlobalRebalance(ResourceControllerDataProvider clusterData, // 1. Ignore node status (disable/offline). // 2. Use the previous Baseline as the only parameter about the previous assignment. Map currentBaseline = - _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); + _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, + resourceMap.keySet()); ClusterModel clusterModel; try { clusterModel = ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap, @@ -175,7 +177,8 @@ private void doGlobalRebalance(ResourceControllerDataProvider clusterData, HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); } - Map newBaseline = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm); + Map newBaseline = + WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm); boolean isBaselineChanged = _assignmentMetadataStore != null && _assignmentMetadataStore.isBaselineChanged(newBaseline); // Write the new baseline to metadata store diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java index 141d398e92..d307fe3f8b 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; + import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil; @@ -44,7 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Compute the best possible assignment based on the Baseline and the previous Best Possible assignment. * The coordinator compares the previous Best Possible assignment with the current cluster state so as to derive a @@ -68,10 +68,8 @@ class PartialRebalanceRunner implements AutoCloseable { private Future _asyncPartialRebalanceResult; public PartialRebalanceRunner(AssignmentManager assignmentManager, - AssignmentMetadataStore assignmentMetadataStore, - MetricCollector metricCollector, - CountMetric rebalanceFailureCount, - boolean isAsyncPartialRebalanceEnabled) { + AssignmentMetadataStore assignmentMetadataStore, MetricCollector metricCollector, + CountMetric rebalanceFailureCount, boolean isAsyncPartialRebalanceEnabled) { _assignmentManager = assignmentManager; _assignmentMetadataStore = assignmentMetadataStore; _bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor(); @@ -82,16 +80,16 @@ public PartialRebalanceRunner(AssignmentManager assignmentManager, WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(), CountMetric.class); _partialRebalanceLatency = metricCollector.getMetric( - WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge - .name(), + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), LatencyMetric.class); _baselineDivergenceGauge = metricCollector.getMetric( WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(), BaselineDivergenceGauge.class); } - public void partialRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, - Set activeNodes, final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) + public void partialRebalance(ResourceControllerDataProvider clusterData, + Map resourceMap, Set activeNodes, + final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException { // If partial rebalance is async and the previous result is not completed yet, // do not start another partial rebalance. @@ -100,19 +98,20 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map { - try { - doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm, - currentStateOutput); - } catch (HelixRebalanceException e) { - if (_asyncPartialRebalanceEnabled) { - _rebalanceFailureCount.increment(1L); - } - LOG.error("Failed to calculate best possible assignment!", e); - return false; - } - return true; - })); + _asyncPartialRebalanceResult = + _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> { + try { + doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm, + currentStateOutput); + } catch (HelixRebalanceException e) { + if (_asyncPartialRebalanceEnabled) { + _rebalanceFailureCount.increment(1L); + } + LOG.error("Failed to calculate best possible assignment!", e); + return false; + } + return true; + })); if (!_asyncPartialRebalanceEnabled) { try { if (!_asyncPartialRebalanceResult.get()) { @@ -131,9 +130,9 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, - Set activeNodes, RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput) - throws HelixRebalanceException { + private void doPartialRebalance(ResourceControllerDataProvider clusterData, + Map resourceMap, Set activeNodes, RebalanceAlgorithm algorithm, + CurrentStateOutput currentStateOutput) throws HelixRebalanceException { LOG.info("Start calculating the new best possible assignment."); _partialRebalanceCounter.increment(1L); _partialRebalanceLatency.startMeasuringLatency(); @@ -142,12 +141,14 @@ private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map< if (_assignmentMetadataStore != null) { newBestPossibleAssignmentVersion = _assignmentMetadataStore.getBestPossibleVersion() + 1; } else { - LOG.debug("Assignment Metadata Store is null. Skip getting best possible assignment version."); + LOG.debug( + "Assignment Metadata Store is null. Skip getting best possible assignment version."); } // Read the baseline from metadata store Map currentBaseline = - _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); + _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, + resourceMap.keySet()); // Read the best possible assignment from metadata store Map currentBestPossibleAssignment = @@ -155,14 +156,15 @@ private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map< resourceMap.keySet()); ClusterModel clusterModel; try { - clusterModel = ClusterModelProvider - .generateClusterModelForPartialRebalance(clusterData, resourceMap, activeNodes, - currentBaseline, currentBestPossibleAssignment); + clusterModel = + ClusterModelProvider.generateClusterModelForPartialRebalance(clusterData, resourceMap, + activeNodes, currentBaseline, currentBestPossibleAssignment); } catch (Exception ex) { throw new HelixRebalanceException("Failed to generate cluster model for partial rebalance.", HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); } - Map newAssignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm); + Map newAssignment = + WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm); // Asynchronously report baseline divergence metric before persisting to metadata store, // just in case if persisting fails, we still have the metric. @@ -177,12 +179,14 @@ private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map< currentBaseline, newAssignmentCopy); boolean bestPossibleUpdateSuccessful = false; - if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(newAssignment)) { + if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged( + newAssignment)) { // This will not persist the new Best Possible Assignment into ZK. It will only update the in-memory cache. // If this is done successfully, the new Best Possible Assignment will be persisted into ZK the next time that // the pipeline is triggered. We schedule the pipeline to run below. - bestPossibleUpdateSuccessful = _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment, - newBestPossibleAssignmentVersion); + bestPossibleUpdateSuccessful = + _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment, + newBestPossibleAssignmentVersion); } else { LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment."); } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index 750829edb9..8a1a1f04ba 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -65,8 +65,9 @@ public class MessageGenerationPhase extends AbstractBaseStage { // at MASTER state, we wait for timeout and if the message is still not cleaned up by // participant, controller will cleanup them proactively to unblock further state // transition - public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil - .getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 * 1000); + public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = + HelixUtil.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, + 60 * 1000); private final static String PENDING_MESSAGE = "pending message"; private final static String STALE_MESSAGE = "stale message"; @@ -186,9 +187,9 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr nextState = stateModelDef.getNextStateForTransition(currentState, desiredState); if (desiredState.equals(HelixDefinedState.DROPPED.name())) { - LogUtil.logDebug(logger, _eventId, String - .format("No current state for partition %s in resource %s, skip the drop message", - partition.getPartitionName(), resourceName)); + LogUtil.logDebug(logger, _eventId, String.format( + "No current state for partition %s in resource %s, skip the drop message", + partition.getPartitionName(), resourceName)); message = generateCancellationMessageForPendingMessage(desiredState, currentState, nextState, @@ -199,8 +200,8 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr // TODO: separate logic of resource/task message generation if (cache instanceof ResourceControllerDataProvider) { - ((ResourceControllerDataProvider) cache) - .invalidateCachedIdealStateMapping(resourceName); + ((ResourceControllerDataProvider) cache).invalidateCachedIdealStateMapping( + resourceName); } continue; } @@ -214,10 +215,9 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr for (Message staleMessage : staleMessages) { // staleMessage can be simple or batch mode - if ((System.currentTimeMillis() - currentStateOutput - .getEndTime(resourceName, partition, instanceName) > DEFAULT_OBSELETE_MSG_PURGE_DELAY) - && staleMessage.getResourceName().equals(resourceName) && sessionIdMap - .containsKey(instanceName) && ( + if ((System.currentTimeMillis() - currentStateOutput.getEndTime(resourceName, partition, + instanceName) > DEFAULT_OBSELETE_MSG_PURGE_DELAY) && staleMessage.getResourceName() + .equals(resourceName) && sessionIdMap.containsKey(instanceName) && ( staleMessage.getPartitionName().equals(partition.getPartitionName()) || ( staleMessage.getBatchMessageMode() && staleMessage.getPartitionNames() .contains(partition.getPartitionName())))) { @@ -229,8 +229,8 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr if (desiredState.equals(NO_DESIRED_STATE) || desiredState.equalsIgnoreCase(currentState)) { if (shouldCreateSTCancellation(pendingMessage, desiredState, stateModelDef.getInitialState())) { - message = MessageUtil - .createStateTransitionCancellationMessage(manager.getInstanceName(), + message = + MessageUtil.createStateTransitionCancellationMessage(manager.getInstanceName(), manager.getSessionId(), resource, partition.getPartitionName(), instanceName, sessionIdMap.get(instanceName), stateModelDef.getId(), pendingMessage.getFromState(), pendingMessage.getToState(), null, @@ -252,10 +252,9 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr stateModelDef, cancellationMessage, isCancellationEnabled); } else { // Create new state transition message - message = MessageUtil - .createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(), - resource, partition.getPartitionName(), instanceName, currentState, nextState, - sessionIdMap.get(instanceName), stateModelDef.getId()); + message = MessageUtil.createStateTransitionMessage(manager.getInstanceName(), + manager.getSessionId(), resource, partition.getPartitionName(), instanceName, + currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId()); if (logger.isDebugEnabled()) { LogUtil.logDebug(logger, _eventId, String.format( @@ -279,8 +278,7 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr LogUtil.logError(logger, _eventId, String.format( "An invalid message was generated! Discarding this message. sessionIdMap: %s, CurrentStateMap: %s, InstanceStateMap: %s, AllInstances: %s, LiveInstances: %s, Message: %s", sessionIdMap, currentStateOutput.getCurrentStateMap(resourceName, partition), - instanceStateMap, cache.getAllInstances(), - cache.getLiveInstances().keySet(), + instanceStateMap, cache.getAllInstances(), cache.getLiveInstances().keySet(), message)); continue; // Do not add this message } @@ -304,8 +302,8 @@ private boolean shouldCreateSTCancellation(Message pendingMessage, String desire // 1. pending message toState is desired state // 2. pending message is an ERROR reset: ERROR -> initState (eg. OFFLINE) return !desiredState.equalsIgnoreCase(pendingMessage.getToState()) && !( - HelixDefinedState.ERROR.name().equals(pendingMessage.getFromState()) && initialState - .equals(pendingMessage.getToState())); + HelixDefinedState.ERROR.name().equals(pendingMessage.getFromState()) && initialState.equals( + pendingMessage.getToState())); } private void logAndAddToCleanUp(Map> messagesToCleanUp, @@ -335,15 +333,16 @@ private Message generateCancellationMessageForPendingMessage(final String desire String pendingState = pendingMessage.getToState(); if (nextState.equalsIgnoreCase(pendingState)) { LogUtil.logInfo(logger, _eventId, - "Message already exists for " + instanceName + " to transit " + resource - .getResourceName() + "." + partition.getPartitionName() + " from " + currentState - + " to " + nextState + ", isRelay: " + pendingMessage.isRelayMessage()); + "Message already exists for " + instanceName + " to transit " + + resource.getResourceName() + "." + partition.getPartitionName() + " from " + + currentState + " to " + nextState + ", isRelay: " + + pendingMessage.isRelayMessage()); } else if (currentState.equalsIgnoreCase(pendingState)) { LogUtil.logDebug(logger, _eventId, - "Message hasn't been removed for " + instanceName + " to transit " + resource - .getResourceName() + "." + partition.getPartitionName() + " to " + pendingState - + ", desiredState: " + desiredState + ", isRelay: " + pendingMessage - .isRelayMessage()); + "Message hasn't been removed for " + instanceName + " to transit " + + resource.getResourceName() + "." + partition.getPartitionName() + " to " + + pendingState + ", desiredState: " + desiredState + ", isRelay: " + + pendingMessage.isRelayMessage()); } else { LogUtil.logDebug(logger, _eventId, "IdealState changed before state transition completes for " + resource.getResourceName() @@ -409,8 +408,9 @@ public Object call() { String instanceName = entry.getKey(); for (Message msg : entry.getValue().values()) { if (accessor.removeProperty(msg.getKey(accessor.keyBuilder(), instanceName))) { - LogUtil.logInfo(logger, _eventId, String - .format("Deleted message %s from instance %s", msg.getMsgId(), instanceName)); + LogUtil.logInfo(logger, _eventId, + String.format("Deleted message %s from instance %s", msg.getMsgId(), + instanceName)); } } } @@ -442,8 +442,8 @@ private int getTimeOut(ClusterConfig clusterConfig, ResourceConfig resourceConfi String currentState, String nextState, IdealState idealState, Partition partition) { StateTransitionTimeoutConfig stateTransitionTimeoutConfig = clusterConfig.getStateTransitionTimeoutConfig(); - int timeout = stateTransitionTimeoutConfig != null ? stateTransitionTimeoutConfig - .getStateTransitionTimeout(currentState, nextState) : -1; + int timeout = stateTransitionTimeoutConfig != null + ? stateTransitionTimeoutConfig.getStateTransitionTimeout(currentState, nextState) : -1; String timeOutStr = null; // Check IdealState whether has timeout set @@ -470,8 +470,8 @@ private int getTimeOut(ClusterConfig clusterConfig, ResourceConfig resourceConfi if (resourceConfig != null) { // If resource config has timeout, replace the cluster timeout. stateTransitionTimeoutConfig = resourceConfig.getStateTransitionTimeoutConfig(); - timeout = stateTransitionTimeoutConfig != null ? stateTransitionTimeoutConfig - .getStateTransitionTimeout(currentState, nextState) : -1; + timeout = stateTransitionTimeoutConfig != null + ? stateTransitionTimeoutConfig.getStateTransitionTimeout(currentState, nextState) : -1; } return timeout; diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java index 17eb5a3e1e..e56a20f983 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java @@ -30,7 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Each batch mode is enabled, each CallbackHandler has a CallbackEventTPExecutor. * It has a dedupe queue for pending call back event. Pending call back event will @@ -61,8 +60,8 @@ class CallbackProcessor implements Runnable { private final NotificationContext _event; public CallbackProcessor(CallbackHandler handler, NotificationContext event) { - _processorName = _manager.getClusterName() + "-CallbackProcessor@" + Integer - .toHexString(handler.hashCode()); + _processorName = _manager.getClusterName() + "-CallbackProcessor@" + Integer.toHexString( + handler.hashCode()); _handler = handler; _event = event; } @@ -89,8 +88,8 @@ public void submitEventToExecutor(NotificationContext.Type eventType, Notificati logger.error("Failed to process callback. CallbackEventExecutor is already shut down."); } if (_futureCallBackProcessEvent == null || _futureCallBackProcessEvent.isDone()) { - _futureCallBackProcessEvent = - _threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); + _futureCallBackProcessEvent = _threadPoolExecutor.submit( + ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); } else { _callBackEventQueue.put(eventType, event); } @@ -102,11 +101,11 @@ private void submitPendingHandleCallBackEventToManagerThreadPool(CallbackHandler if (_callBackEventQueue.size() != 0) { try { NotificationContext event = _callBackEventQueue.take(); - _futureCallBackProcessEvent = - _threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); + _futureCallBackProcessEvent = _threadPoolExecutor.submit( + ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); } catch (InterruptedException e) { - logger - .error("Error when submitting pending HandleCallBackEvent to manager thread pool", e); + logger.error("Error when submitting pending HandleCallBackEvent to manager thread pool", + e); } } } diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index 5c44d78e31..fc734eef32 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -84,7 +84,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class HelixTaskExecutor implements MessageListener, TaskExecutor { /** * Put together all registration information about a message handler factory @@ -94,7 +93,8 @@ class MsgHandlerFactoryRegistryItem { private final int _threadPoolSize; private final int _resetTimeout; - public MsgHandlerFactoryRegistryItem(MessageHandlerFactory factory, int threadPoolSize, int resetTimeout) { + public MsgHandlerFactoryRegistryItem(MessageHandlerFactory factory, int threadPoolSize, + int resetTimeout) { if (factory == null) { throw new NullPointerException("Message handler factory is null"); } @@ -224,12 +224,14 @@ public void registerMessageHandlerFactory(String type, MessageHandlerFactory fac } @Override - public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, int threadpoolSize) { - registerMessageHandlerFactory(type, factory, threadpoolSize, DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS); + public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, + int threadpoolSize) { + registerMessageHandlerFactory(type, factory, threadpoolSize, + DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS); } - private void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, int threadpoolSize, - int resetTimeoutMs) { + private void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, + int threadpoolSize, int resetTimeoutMs) { if (factory instanceof MultiTypeMessageHandlerFactory) { if (!((MultiTypeMessageHandlerFactory) factory).getMessageTypes().contains(type)) { throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: " @@ -237,15 +239,15 @@ private void registerMessageHandlerFactory(String type, MessageHandlerFactory fa } } else { if (!factory.getMessageType().equals(type)) { - throw new HelixException( - "Message factory type mismatch. Type: " + type + ", factory: " + factory - .getMessageType()); + throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: " + + factory.getMessageType()); } } _isShuttingDown = false; - MsgHandlerFactoryRegistryItem newItem = new MsgHandlerFactoryRegistryItem(factory, threadpoolSize, resetTimeoutMs); + MsgHandlerFactoryRegistryItem newItem = + new MsgHandlerFactoryRegistryItem(factory, threadpoolSize, resetTimeoutMs); MsgHandlerFactoryRegistryItem prevItem = _hdlrFtyRegistry.putIfAbsent(type, newItem); if (prevItem == null) { _executorMap.computeIfAbsent(type, msgType -> { @@ -254,8 +256,9 @@ private void registerMessageHandlerFactory(String type, MessageHandlerFactory fa _monitor.createExecutorMonitor(type, newPool); return newPool; }); - LOG.info("Registered message handler factory for type: {}, poolSize: {}, factory: {}, pool: {}", type, - threadpoolSize, factory, _executorMap.get(type)); + LOG.info( + "Registered message handler factory for type: {}, poolSize: {}, factory: {}, pool: {}", + type, threadpoolSize, factory, _executorMap.get(type)); } else { LOG.info( "Skip register message handler factory for type: {}, poolSize: {}, factory: {}, already existing factory: {}", @@ -308,17 +311,19 @@ void updateStateTransitionMessageThreadPool(Message message, HelixManager manage } } - String perStateTransitionTypeKey = - msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE); - if (perStateTransitionTypeKey != null && stateModelFactory != null && !_transitionTypeThreadpoolChecked.contains( - perStateTransitionTypeKey)) { + String perStateTransitionTypeKey = msgInfo.getMessageIdentifier( + Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE); + if (perStateTransitionTypeKey != null && stateModelFactory != null + && !_transitionTypeThreadpoolChecked.contains(perStateTransitionTypeKey)) { ExecutorService perStateTransitionTypeExecutor = - stateModelFactory.getExecutorService(resourceName, message.getFromState(), message.getToState()); + stateModelFactory.getExecutorService(resourceName, message.getFromState(), + message.getToState()); _transitionTypeThreadpoolChecked.add(perStateTransitionTypeKey); if (perStateTransitionTypeExecutor != null) { _executorMap.put(perStateTransitionTypeKey, perStateTransitionTypeExecutor); - LOG.info(String.format("Added client specified dedicate threadpool for resource %s from %s to %s", + LOG.info(String.format( + "Added client specified dedicate threadpool for resource %s from %s to %s", msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE), message.getFromState(), message.getToState())); return; @@ -331,9 +336,8 @@ void updateStateTransitionMessageThreadPool(Message message, HelixManager manage // Changes to this configuration on thread pool size will only take effect after the participant get restarted. if (configAccessor != null) { HelixConfigScope scope = - new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster(manager.getClusterName()) - .forResource(resourceName) - .build(); + new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster( + manager.getClusterName()).forResource(resourceName).build(); String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS); try { @@ -341,14 +345,17 @@ void updateStateTransitionMessageThreadPool(Message message, HelixManager manage threadpoolSize = Integer.parseInt(threadpoolSizeStr); } } catch (Exception e) { - LOG.error("Failed to parse ThreadPoolSize from resourceConfig for resource" + resourceName, e); + LOG.error( + "Failed to parse ThreadPoolSize from resourceConfig for resource" + resourceName, e); } } - final String key = msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE); + final String key = + msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE); if (threadpoolSize > 0) { _executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize, r -> new Thread(r, "GenericHelixController-message_handle_" + key))); - LOG.info("Added dedicate threadpool for resource: " + resourceName + " with size: " + threadpoolSize); + LOG.info("Added dedicate threadpool for resource: " + resourceName + " with size: " + + threadpoolSize); } else { // if threadpool is not configured // check whether client specifies customized threadpool. @@ -380,7 +387,8 @@ ExecutorService findExecutorServiceForMsg(Message message) { } else { Message.MessageInfo msgInfo = new Message.MessageInfo(message); for (int i = Message.MessageInfo.MessageIdentifierBase.values().length - 1; i >= 0; i--) { - String msgIdentifer = msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.values()[i]); + String msgIdentifer = + msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.values()[i]); if (msgIdentifer != null && _executorMap.containsKey(msgIdentifer)) { LOG.info(String.format("Find customized threadpool for %s", msgIdentifer)); executorService = _executorMap.get(msgIdentifer); @@ -449,8 +457,8 @@ public boolean scheduleTask(MessageTask task) { LOG.info("Scheduling message {}: {}:{}, {}->{}", taskId, message.getResourceName(), message.getPartitionName(), message.getFromState(), message.getToState()); - _statusUpdateUtil - .logInfo(message, HelixTaskExecutor.class, "Message handling task scheduled", manager); + _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Message handling task scheduled", + manager); // this sync guarantees that ExecutorService.submit() task and put taskInfo into map are // sync'ed @@ -459,8 +467,8 @@ public boolean scheduleTask(MessageTask task) { ExecutorService exeSvc = findExecutorServiceForMsg(message); if (exeSvc == null) { - LOG.warn(String - .format("Threadpool is null for type %s of message %s", message.getMsgType(), + LOG.warn( + String.format("Threadpool is null for type %s of message %s", message.getMsgType(), message.getMsgId())); return false; } @@ -468,17 +476,15 @@ public boolean scheduleTask(MessageTask task) { LOG.info("Submit task: " + taskId + " to pool: " + exeSvc); Future future = exeSvc.submit(ExecutorTaskUtil.wrap(task)); - _messageTaskMap - .putIfAbsent(getMessageTarget(message.getResourceName(), message.getPartitionName()), - taskId); + _messageTaskMap.putIfAbsent( + getMessageTarget(message.getResourceName(), message.getPartitionName()), taskId); TimerTask timerTask = null; if (message.getExecutionTimeout() > 0) { timerTask = new MessageTimeoutTask(this, task); _timer.schedule(timerTask, message.getExecutionTimeout()); - LOG.info( - "Message starts with timeout " + message.getExecutionTimeout() + " MsgId: " + task - .getTaskId()); + LOG.info("Message starts with timeout " + message.getExecutionTimeout() + " MsgId: " + + task.getTaskId()); } else { LOG.debug("Message does not have timeout. MsgId: " + task.getTaskId()); } @@ -493,9 +499,8 @@ public boolean scheduleTask(MessageTask task) { } } catch (Throwable t) { LOG.error("Error while executing task. " + message, t); - _statusUpdateUtil - .logError(message, HelixTaskExecutor.class, t, "Error while executing task " + t, - manager); + _statusUpdateUtil.logError(message, HelixTaskExecutor.class, t, + "Error while executing task " + t, manager); } return false; } @@ -529,9 +534,8 @@ public boolean cancelTask(MessageTask task) { _taskMap.remove(taskId); return true; } else { - _statusUpdateUtil - .logInfo(message, HelixTaskExecutor.class, "fail to cancel task: " + taskId, - notificationContext.getManager()); + _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, + "fail to cancel task: " + taskId, notificationContext.getManager()); } } else { _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class, @@ -546,8 +550,8 @@ public boolean cancelTask(MessageTask task) { public void finishTask(MessageTask task) { Message message = task.getMessage(); String taskId = task.getTaskId(); - LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - message - .getExecuteStartTimeStamp())); + LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() + - message.getExecuteStartTimeStamp())); synchronized (_lock) { if (_taskMap.containsKey(taskId)) { @@ -619,18 +623,19 @@ private void updateMessageState(Collection msgsToBeUpdated, HelixDataAc } } - private void shutdownAndAwaitTermination(ExecutorService pool, MsgHandlerFactoryRegistryItem handlerItem) { + private void shutdownAndAwaitTermination(ExecutorService pool, + MsgHandlerFactoryRegistryItem handlerItem) { LOG.info("Shutting down pool: " + pool); - int timeout = handlerItem == null? DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS : handlerItem.getResetTimeout(); + int timeout = + handlerItem == null ? DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS : handlerItem.getResetTimeout(); pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { List waitingTasks = pool.shutdownNow(); // Cancel currently executing tasks - LOG.info("Tasks that never commenced execution after {}: {}", timeout, - waitingTasks); + LOG.info("Tasks that never commenced execution after {}: {}", timeout, waitingTasks); // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { LOG.error("Pool did not fully terminate in {} ms. pool: {}", timeout, pool); @@ -722,12 +727,8 @@ synchronized void reset() { shutdownExecutors(); synchronized (_hdlrFtyRegistry) { - _hdlrFtyRegistry.values() - .stream() - .map(MsgHandlerFactoryRegistryItem::factory) - .distinct() - .filter(Objects::nonNull) - .forEach(factory -> { + _hdlrFtyRegistry.values().stream().map(MsgHandlerFactoryRegistryItem::factory).distinct() + .filter(Objects::nonNull).forEach(factory -> { try { factory.reset(); } catch (Exception ex) { @@ -742,7 +743,8 @@ synchronized void reset() { // Log all tasks that fail to terminate for (String taskId : _taskMap.keySet()) { MessageTaskInfo info = _taskMap.get(taskId); - sb.append("Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage() + "\n"); + sb.append( + "Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage() + "\n"); } LOG.info(sb.toString()); @@ -784,8 +786,8 @@ private void syncSessionToController(HelixManager manager) { HelixDataAccessor accessor = manager.getHelixDataAccessor(); PropertyKey key = new Builder(manager.getClusterName()).controllerMessage(SESSION_SYNC); if (accessor.getProperty(key) == null) { - LOG.info(String - .format("Participant %s syncs session with controller", manager.getInstanceName())); + LOG.info(String.format("Participant %s syncs session with controller", + manager.getInstanceName())); Message msg = new Message(MessageType.PARTICIPANT_SESSION_CHANGE, SESSION_SYNC); msg.setSrcName(manager.getInstanceName()); msg.setTgtSessionId("*"); @@ -995,8 +997,8 @@ public void onMessage(String instanceName, List messages, && !createCurStateNames.contains(resourceName)) { createCurStateNames.add(resourceName); PropertyKey curStateKey = keyBuilder.currentState(instanceName, sessionId, resourceName); - if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef()) && !Boolean - .getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) { + if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef()) + && !Boolean.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) { curStateKey = keyBuilder.taskCurrentState(instanceName, sessionId, resourceName); } createCurStateKeys.add(curStateKey); @@ -1043,8 +1045,8 @@ public void onMessage(String instanceName, List messages, */ try { // Record error state to the message handler. - handler.onError(new HelixException(String - .format("Failed to schedule the task for executing message handler for %s.", + handler.onError(new HelixException( + String.format("Failed to schedule the task for executing message handler for %s.", handler._message.getMsgId())), MessageHandler.ErrorCode.ERROR, MessageHandler.ErrorType.FRAMEWORK); } catch (Exception ex) { @@ -1092,8 +1094,8 @@ private boolean checkAndProcessNoOpMessage(Message message, String instanceName, + ", tgtSessionId in message: " + tgtSessionId + ", messageId: " + message.getMsgId(); LOG.warn(warningMessage); reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED); - _statusUpdateUtil - .logWarning(message, HelixStateMachineEngine.class, warningMessage, manager); + _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage, + manager); // Proactively send a session sync message from participant to controller // upon session mismatch after a new session is established @@ -1362,8 +1364,8 @@ public MessageHandler createMessageHandler(Message message, NotificationContext // we will keep the message and the message will be handled when // the corresponding MessageHandlerFactory is registered if (item == null) { - LOG.warn("Fail to find message handler factory for type: " + msgType + " msgId: " + message - .getMsgId()); + LOG.warn("Fail to find message handler factory for type: " + msgType + " msgId: " + + message.getMsgId()); return null; } MessageHandlerFactory handlerFactory = item.factory(); @@ -1460,8 +1462,8 @@ private void sendNopMessage(HelixDataAccessor accessor, String instanceName) { Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString()); nopMsg.setSrcName(instanceName); nopMsg.setTgtName(instanceName); - accessor - .setProperty(accessor.keyBuilder().message(nopMsg.getTgtName(), nopMsg.getId()), nopMsg); + accessor.setProperty(accessor.keyBuilder().message(nopMsg.getTgtName(), nopMsg.getId()), + nopMsg); LOG.info("Send NO_OP message to {}, msgId: {}.", nopMsg.getTgtName(), nopMsg.getId()); } catch (Exception e) { LOG.error("Failed to send NO_OP message to {}.", instanceName, e); diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java index 646e8f3087..36bb2f1d45 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java @@ -68,10 +68,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RoutingTableProvider - implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener, - LiveInstanceChangeListener, CurrentStateChangeListener, CustomizedViewChangeListener, - CustomizedViewRootChangeListener { +public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener, LiveInstanceChangeListener, CurrentStateChangeListener, CustomizedViewChangeListener, CustomizedViewRootChangeListener { private static final Logger logger = LoggerFactory.getLogger(RoutingTableProvider.class); private static final long DEFAULT_PERIODIC_REFRESH_INTERVAL = 300000L; // 5 minutes private final Map> _routingTableRefMap; @@ -90,9 +87,8 @@ public class RoutingTableProvider private ExecutorService _reportExecutor; private Future _reportingTask = null; - protected static final String DEFAULT_PROPERTY_TYPE = "HELIX_DEFAULT_PROPERTY"; - protected static final String DEFAULT_STATE_TYPE = "HELIX_DEFAULT"; - + protected static final String DEFAULT_PROPERTY_TYPE = "HELIX_DEFAULT_PROPERTY"; + protected static final String DEFAULT_STATE_TYPE = "HELIX_DEFAULT"; public RoutingTableProvider() { this(null); @@ -109,7 +105,8 @@ public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataTy DEFAULT_PERIODIC_REFRESH_INTERVAL); } - public RoutingTableProvider(HelixManager helixManager, Map> sourceDataTypeMap) { + public RoutingTableProvider(HelixManager helixManager, + Map> sourceDataTypeMap) { this(helixManager, sourceDataTypeMap, true, DEFAULT_PERIODIC_REFRESH_INTERVAL); } @@ -153,7 +150,7 @@ public RoutingTableProvider(HelixManager helixManager, if (propertyType.equals(PropertyType.CUSTOMIZEDVIEW)) { throw new HelixException("CustomizedView has been used without any aggregation type!"); } - String key = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); + String key = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); if (_routingTableRefMap.get(key) == null) { _routingTableRefMap.put(key, new AtomicReference<>(new RoutingTable(propertyType))); } @@ -164,7 +161,7 @@ public RoutingTableProvider(HelixManager helixManager, sourceDataTypeMap.get(propertyType), propertyType.name())); } for (String customizedStateType : _sourceDataTypeMap.get(propertyType)) { - String key = generateReferenceKey(propertyType.name(), customizedStateType); + String key = generateReferenceKey(propertyType.name(), customizedStateType); if (_routingTableRefMap.get(key) == null) { _routingTableRefMap.put(key, new AtomicReference<>( new CustomizedViewRoutingTable(propertyType, customizedStateType))); @@ -225,58 +222,60 @@ private void addListeners() { if (_helixManager != null) { for (PropertyType propertyType : _sourceDataTypeMap.keySet()) { switch (propertyType) { - case EXTERNALVIEW: - try { - _helixManager.addExternalViewChangeListener(this); - } catch (Exception e) { - shutdown(); - throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e); - } - break; - case CUSTOMIZEDVIEW: - // Add CustomizedView root change listener - try { - _helixManager.addCustomizedViewRootChangeListener(this); - } catch (Exception e) { - shutdown(); - throw new HelixException( - "Failed to attach CustomizedView Root Listener to HelixManager!", e); - } - // Add individual listeners for each customizedStateType - List customizedStateTypes = _sourceDataTypeMap.get(propertyType); - for (String customizedStateType : customizedStateTypes) { + case EXTERNALVIEW: try { - _helixManager.addCustomizedViewChangeListener(this, customizedStateType); + _helixManager.addExternalViewChangeListener(this); } catch (Exception e) { shutdown(); - throw new HelixException(String.format( - "Failed to attach CustomizedView Listener to HelixManager for type %s!", - customizedStateType), e); + throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", + e); + } + break; + case CUSTOMIZEDVIEW: + // Add CustomizedView root change listener + try { + _helixManager.addCustomizedViewRootChangeListener(this); + } catch (Exception e) { + shutdown(); + throw new HelixException( + "Failed to attach CustomizedView Root Listener to HelixManager!", e); + } + // Add individual listeners for each customizedStateType + List customizedStateTypes = _sourceDataTypeMap.get(propertyType); + for (String customizedStateType : customizedStateTypes) { + try { + _helixManager.addCustomizedViewChangeListener(this, customizedStateType); + } catch (Exception e) { + shutdown(); + throw new HelixException(String.format( + "Failed to attach CustomizedView Listener to HelixManager for type %s!", + customizedStateType), e); + } + } + break; + case TARGETEXTERNALVIEW: + // Check whether target external has been enabled or not + if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists( + _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), + 0)) { + shutdown(); + throw new HelixException("Target External View is not enabled!"); } - } - break; - case TARGETEXTERNALVIEW: - // Check whether target external has been enabled or not - if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists( - _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), - 0)) { - shutdown(); - throw new HelixException("Target External View is not enabled!"); - } - try { - _helixManager.addTargetExternalViewChangeListener(this); - } catch (Exception e) { - shutdown(); + try { + _helixManager.addTargetExternalViewChangeListener(this); + } catch (Exception e) { + shutdown(); + throw new HelixException( + "Failed to attach TargetExternalView Listener to HelixManager!", e); + } + break; + case CURRENTSTATES: + // CurrentState change listeners will be added later in LiveInstanceChange call. + break; + default: throw new HelixException( - "Failed to attach TargetExternalView Listener to HelixManager!", e); - } - break; - case CURRENTSTATES: - // CurrentState change listeners will be added later in LiveInstanceChange call. - break; - default: - throw new HelixException(String.format("Unsupported source data type: %s", propertyType)); + String.format("Unsupported source data type: %s", propertyType)); } } try { @@ -308,10 +307,10 @@ private void validateSourceDataTypeMap(Map> sourceDat } if (!propertyType.equals(PropertyType.CUSTOMIZEDVIEW) && sourceDataTypeMap.get(propertyType).size() != 0) { - logger - .error("Type has been used in addition to the propertyType {} !", propertyType.name()); - throw new HelixException(String - .format("Type %s has been used in addition to the propertyType %s !", + logger.error("Type has been used in addition to the propertyType {} !", + propertyType.name()); + throw new HelixException( + String.format("Type %s has been used in addition to the propertyType %s !", sourceDataTypeMap.get(propertyType), propertyType.name())); } } @@ -338,26 +337,26 @@ public void shutdown() { _helixManager.removeListener(keyBuilder.instanceConfigs(), this); for (PropertyType propertyType : _sourceDataTypeMap.keySet()) { switch (propertyType) { - case EXTERNALVIEW: - _helixManager.removeListener(keyBuilder.externalViews(), this); - break; - case CUSTOMIZEDVIEW: - List customizedStateTypes = _sourceDataTypeMap.get(propertyType); - // Remove listener on each individual customizedStateType - for (String customizedStateType : customizedStateTypes) { - _helixManager.removeListener(keyBuilder.customizedView(customizedStateType), this); - } - break; - case TARGETEXTERNALVIEW: - _helixManager.removeListener(keyBuilder.targetExternalViews(), this); - break; - case CURRENTSTATES: - NotificationContext context = new NotificationContext(_helixManager); - context.setType(NotificationContext.Type.FINALIZE); - updateCurrentStatesListeners(Collections.emptyList(), context); - break; - default: - break; + case EXTERNALVIEW: + _helixManager.removeListener(keyBuilder.externalViews(), this); + break; + case CUSTOMIZEDVIEW: + List customizedStateTypes = _sourceDataTypeMap.get(propertyType); + // Remove listener on each individual customizedStateType + for (String customizedStateType : customizedStateTypes) { + _helixManager.removeListener(keyBuilder.customizedView(customizedStateType), this); + } + break; + case TARGETEXTERNALVIEW: + _helixManager.removeListener(keyBuilder.targetExternalViews(), this); + break; + case CURRENTSTATES: + NotificationContext context = new NotificationContext(_helixManager); + context.setType(NotificationContext.Type.FINALIZE); + updateCurrentStatesListeners(Collections.emptyList(), context); + break; + default: + break; } } } @@ -405,13 +404,12 @@ public Map> getRoutingTableSnapshots() if (!snapshots.containsKey(propertyTypeName)) { snapshots.put(propertyTypeName, new HashMap<>()); } - snapshots.get(propertyTypeName).put(customizedStateType, - new RoutingTableSnapshot(routingTable)); + snapshots.get(propertyTypeName) + .put(customizedStateType, new RoutingTableSnapshot(routingTable)); } return snapshots; } - /** * Add RoutingTableChangeListener with user defined context * @param routingTableChangeListener @@ -439,7 +437,7 @@ public void addRoutingTableChangeListener( final NotificationContext periodicRefreshContext = new NotificationContext(_helixManager); periodicRefreshContext.setType(NotificationContext.Type.PERIODIC_REFRESH); _routerUpdater.queueEvent(periodicRefreshContext, ClusterEventType.PeriodicalRebalance, null); - } + } } /** @@ -476,8 +474,8 @@ public List getInstances(String resourceName, String partitionNa */ public List getInstancesForResource(String resourceName, String partitionName, String state) { - return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE) - .getInstancesForResource(resourceName, partitionName, state); + return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE).getInstancesForResource( + resourceName, partitionName, state); } /** @@ -492,8 +490,8 @@ public List getInstancesForResource(String resourceName, String */ public List getInstancesForResourceGroup(String resourceGroupName, String partitionName, String state) { - return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE) - .getInstancesForResourceGroup(resourceGroupName, partitionName, state); + return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, + DEFAULT_STATE_TYPE).getInstancesForResourceGroup(resourceGroupName, partitionName, state); } /** @@ -509,8 +507,9 @@ public List getInstancesForResourceGroup(String resourceGroupNam */ public List getInstancesForResourceGroup(String resourceGroupName, String partitionName, String state, List resourceTags) { - return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE) - .getInstancesForResourceGroup(resourceGroupName, partitionName, state, resourceTags); + return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, + DEFAULT_STATE_TYPE).getInstancesForResourceGroup(resourceGroupName, partitionName, state, + resourceTags); } /** @@ -533,8 +532,8 @@ public Set getInstances(String resourceName, String state) { * @return empty list if there is no instance in a given state */ public Set getInstancesForResource(String resourceName, String state) { - return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE) - .getInstancesForResource(resourceName, state); + return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE).getInstancesForResource( + resourceName, state); } /** @@ -544,8 +543,8 @@ public Set getInstancesForResource(String resourceName, String s * @return empty list if there is no instance in a given state */ public Set getInstancesForResourceGroup(String resourceGroupName, String state) { - return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE) - .getInstancesForResourceGroup(resourceGroupName, state); + return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, + DEFAULT_STATE_TYPE).getInstancesForResourceGroup(resourceGroupName, state); } /** @@ -557,8 +556,8 @@ public Set getInstancesForResourceGroup(String resourceGroupName */ public Set getInstancesForResourceGroup(String resourceGroupName, String state, List resourceTags) { - return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE) - .getInstancesForResourceGroup(resourceGroupName, state, resourceTags); + return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, + DEFAULT_STATE_TYPE).getInstancesForResourceGroup(resourceGroupName, state, resourceTags); } /** @@ -603,9 +602,9 @@ private RoutingTable getRoutingTableRef(String propertyTypeName, String stateTyp if (_routingTableRefMap.keySet().size() == 1) { String key = _routingTableRefMap.keySet().iterator().next(); if (!_routingTableRefMap.containsKey(key)) { - throw new HelixException( - String.format("Currently there is no snapshot available for PropertyType %s and stateType %s", - propertyTypeName, stateType)); + throw new HelixException(String.format( + "Currently there is no snapshot available for PropertyType %s and stateType %s", + propertyTypeName, stateType)); } return _routingTableRefMap.get(key).get(); } else { @@ -619,11 +618,11 @@ private RoutingTable getRoutingTableRef(String propertyTypeName, String stateTyp } } - String key = generateReferenceKey(propertyTypeName, stateType); + String key = generateReferenceKey(propertyTypeName, stateType); if (!_routingTableRefMap.containsKey(key)) { - throw new HelixException( - String.format("Currently there is no snapshot available for PropertyType %s and stateType %s", - propertyTypeName, stateType)); + throw new HelixException(String.format( + "Currently there is no snapshot available for PropertyType %s and stateType %s", + propertyTypeName, stateType)); } return _routingTableRefMap.get(key).get(); } @@ -647,7 +646,8 @@ public void onExternalViewChange(List externalViewList, if (externalViewList != null && externalViewList.size() > 0) { // keep this here for back-compatibility, application can call onExternalViewChange directly // with externalview list supplied. - String keyReference = generateReferenceKey(PropertyType.EXTERNALVIEW.name(), DEFAULT_STATE_TYPE); + String keyReference = + generateReferenceKey(PropertyType.EXTERNALVIEW.name(), DEFAULT_STATE_TYPE); HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); List configList = accessor.getChildValues(keyBuilder.instanceConfigs(), true); @@ -736,8 +736,7 @@ public void onCustomizedViewRootChange(List customizedViewTypes, shutdown(); throw new HelixException( String.format("Failed to attach CustomizedView Listener to HelixManager for type %s!", - customizedStateType), - e); + customizedStateType), e); } } } @@ -804,7 +803,7 @@ private void updateCurrentStatesListeners(List liveInstances, private void reset() { logger.info("Resetting the routing table."); RoutingTable newRoutingTable; - for (String key: _routingTableRefMap.keySet()) { + for (String key : _routingTableRefMap.keySet()) { PropertyType propertyType = _routingTableRefMap.get(key).get().getPropertyType(); if (propertyType == PropertyType.CUSTOMIZEDVIEW) { String stateType = _routingTableRefMap.get(key).get().getStateType(); @@ -832,12 +831,14 @@ protected void refreshCustomizedView(Collection customizedViews, long startTime = System.currentTimeMillis(); PropertyType propertyType = _routingTableRefMap.get(referenceKey).get().getPropertyType(); String customizedStateType = _routingTableRefMap.get(referenceKey).get().getStateType(); - RoutingTable newRoutingTable = new CustomizedViewRoutingTable(customizedViews, instanceConfigs, - liveInstances, propertyType, customizedStateType); + RoutingTable newRoutingTable = + new CustomizedViewRoutingTable(customizedViews, instanceConfigs, liveInstances, + propertyType, customizedStateType); resetRoutingTableAndNotify(startTime, newRoutingTable, referenceKey); } - protected void refreshCurrentState(Map>> currentStateMap, + protected void refreshCurrentState( + Map>> currentStateMap, Collection instanceConfigs, Collection liveInstances, String referenceKey) { long startTime = System.currentTimeMillis(); @@ -846,7 +847,8 @@ protected void refreshCurrentState(Map entry : _routingTableChangeListenerMap - .entrySet()) { + for (Map.Entry entry : _routingTableChangeListenerMap.entrySet()) { entry.getKey().onRoutingTableChange( new RoutingTableSnapshot(_routingTableRefMap.get(referenceKey).get()), entry.getValue().getContext()); @@ -912,7 +913,8 @@ protected void handleEvent(ClusterEvent event) { throw new HelixException("HelixManager is null for router update event."); } if (!manager.isConnected()) { - logger.error(String.format("HelixManager is not connected for router update event: %s", event)); + logger.error( + String.format("HelixManager is not connected for router update event: %s", event)); throw new HelixException("HelixManager is not connected for router update event."); } @@ -921,40 +923,43 @@ protected void handleEvent(ClusterEvent event) { _dataCache.refresh(manager.getHelixDataAccessor()); for (PropertyType propertyType : _sourceDataTypeMap.keySet()) { switch (propertyType) { - case EXTERNALVIEW: { - String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); - refreshExternalView(_dataCache.getExternalViews().values(), - _dataCache.getRoutableInstanceConfigMap().values(), - _dataCache.getRoutableLiveInstances().values(), - keyReference); - } + case EXTERNALVIEW: { + String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); + refreshExternalView(_dataCache.getExternalViews().values(), + _dataCache.getRoutableInstanceConfigMap().values(), + _dataCache.getRoutableLiveInstances().values(), keyReference); + } + break; + case TARGETEXTERNALVIEW: { + String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); + refreshExternalView(_dataCache.getTargetExternalViews().values(), + _dataCache.getRoutableInstanceConfigMap().values(), + _dataCache.getRoutableLiveInstances().values(), keyReference); + } break; - case TARGETEXTERNALVIEW: { - String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); - refreshExternalView(_dataCache.getTargetExternalViews().values(), - _dataCache.getRoutableInstanceConfigMap().values(), - _dataCache.getRoutableLiveInstances().values(), - keyReference); - } - break; case CUSTOMIZEDVIEW: - for (String customizedStateType : _sourceDataTypeMap.getOrDefault(PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())) { - String keyReference = generateReferenceKey(propertyType.name(), customizedStateType); + for (String customizedStateType : _sourceDataTypeMap.getOrDefault( + PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())) { + String keyReference = + generateReferenceKey(propertyType.name(), customizedStateType); refreshCustomizedView(_dataCache.getCustomizedView(customizedStateType).values(), _dataCache.getRoutableInstanceConfigMap().values(), _dataCache.getRoutableLiveInstances().values(), keyReference); } break; case CURRENTSTATES: { - String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);; + String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); + ; refreshCurrentState(_dataCache.getCurrentStatesMap(), _dataCache.getRoutableInstanceConfigMap().values(), _dataCache.getRoutableLiveInstances().values(), keyReference); - recordPropagationLatency(System.currentTimeMillis(), _dataCache.getCurrentStateSnapshot()); + recordPropagationLatency(System.currentTimeMillis(), + _dataCache.getCurrentStateSnapshot()); } - break; + break; default: - logger.warn("Unsupported source data type: {}, stop refreshing the routing table!", propertyType); + logger.warn("Unsupported source data type: {}, stop refreshing the routing table!", + propertyType); } _monitorMap.get(propertyType).increaseDataRefreshCounters(startTime); @@ -985,7 +990,8 @@ public Object call() { long endTime = partitionStateEndTimes.get(partition); if (currentTime >= endTime) { for (PropertyType propertyType : _sourceDataTypeMap.keySet()) { - _monitorMap.get(propertyType).recordStatePropagationLatency(currentTime - endTime); + _monitorMap.get(propertyType) + .recordStatePropagationLatency(currentTime - endTime); logger.debug( "CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}", key.toString(), partition, endTime, currentTime - endTime); @@ -1005,7 +1011,6 @@ public Object call() { } } - public void queueEvent(NotificationContext context, ClusterEventType eventType, HelixConstants.ChangeType changeType) { ClusterEvent event = new ClusterEvent(_clusterName, eventType); diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java index a8df83c064..9f332d899b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java @@ -70,8 +70,7 @@ public void shutdown() { reset(); } - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return _taskExecutor.awaitTermination(timeout, unit); } @@ -84,8 +83,9 @@ public void onBecomeRunningFromInit(Message msg, NotificationContext context) { public String onBecomeStoppedFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException(String.format( - "Invalid state transition. There is no running task for partition %s.", taskPartition)); + throw new IllegalStateException( + String.format("Invalid state transition. There is no running task for partition %s.", + taskPartition)); } _taskRunner.cancel(); @@ -101,8 +101,9 @@ public String onBecomeStoppedFromRunning(Message msg, NotificationContext contex public String onBecomeCompletedFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException(String.format( - "Invalid state transition. There is no running task for partition %s.", taskPartition)); + throw new IllegalStateException( + String.format("Invalid state transition. There is no running task for partition %s.", + taskPartition)); } TaskResult r = _taskRunner.waitTillDone(); @@ -121,8 +122,9 @@ public String onBecomeCompletedFromRunning(Message msg, NotificationContext cont public String onBecomeTimedOutFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException(String.format( - "Invalid state transition. There is no running task for partition %s.", taskPartition)); + throw new IllegalStateException( + String.format("Invalid state transition. There is no running task for partition %s.", + taskPartition)); } TaskResult r = _taskRunner.waitTillDone(); @@ -141,8 +143,9 @@ public String onBecomeTimedOutFromRunning(Message msg, NotificationContext conte public String onBecomeTaskErrorFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException(String.format( - "Invalid state transition. There is no running task for partition %s.", taskPartition)); + throw new IllegalStateException( + String.format("Invalid state transition. There is no running task for partition %s.", + taskPartition)); } TaskResult r = _taskRunner.waitTillDone(); @@ -161,13 +164,15 @@ public String onBecomeTaskErrorFromRunning(Message msg, NotificationContext cont public String onBecomeTaskAbortedFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException(String.format( - "Invalid state transition. There is no running task for partition %s.", taskPartition)); + throw new IllegalStateException( + String.format("Invalid state transition. There is no running task for partition %s.", + taskPartition)); } _taskRunner.cancel(); TaskResult r = _taskRunner.waitTillDone(); - if (r.getStatus() != TaskResult.Status.FATAL_FAILED && r.getStatus() != TaskResult.Status.CANCELED) { + if (r.getStatus() != TaskResult.Status.FATAL_FAILED + && r.getStatus() != TaskResult.Status.CANCELED) { throw new IllegalStateException(String.format( "Partition %s received a state transition to %s but the result status code is %s.", taskPartition, msg.getToState(), r.getStatus())); @@ -237,8 +242,9 @@ public void onBecomeDroppedFromTaskAborted(Message msg, NotificationContext cont public void onBecomeInitFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException(String.format( - "Invalid state transition. There is no running task for partition %s.", taskPartition)); + throw new IllegalStateException( + String.format("Invalid state transition. There is no running task for partition %s.", + taskPartition)); } _taskRunner.cancel(); @@ -314,16 +320,18 @@ private void startTask(Message msg, String taskPartition) { callbackContext.setTaskConfig(taskConfig); // Create a task instance with this command - if (command == null || _taskFactoryRegistry == null - || !_taskFactoryRegistry.containsKey(command)) { - throw new IllegalStateException(String.format( - "Invalid state transition. There is no running task for partition %s.", taskPartition)); + if (command == null || _taskFactoryRegistry == null || !_taskFactoryRegistry.containsKey( + command)) { + throw new IllegalStateException( + String.format("Invalid state transition. There is no running task for partition %s.", + taskPartition)); } TaskFactory taskFactory = _taskFactoryRegistry.get(command); Task task = taskFactory.createNewTask(callbackContext); if (task instanceof UserContentStore) { - ((UserContentStore) task).init(_manager, cfg.getWorkflow(), msg.getResourceName(), taskPartition); + ((UserContentStore) task).init(_manager, cfg.getWorkflow(), msg.getResourceName(), + taskPartition); } // Submit the task for execution diff --git a/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java b/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java index 1877a0c6a4..e246e05fa2 100644 --- a/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java @@ -40,7 +40,8 @@ public static Callable wrap(Callable callable) { try { return callable.call(); } catch (Throwable t) { - LOG.error("Callable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t); + LOG.error("Callable run on thread {} raised an exception and exited", + Thread.currentThread().getName(), t); throw t; } }; @@ -58,7 +59,8 @@ public static Runnable wrap(Runnable runnable) { try { runnable.run(); } catch (Throwable t) { - LOG.error("Runnable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t); + LOG.error("Runnable run on thread {} raised an exception and exited", + Thread.currentThread().getName(), t); throw t; } };