From 9266487d461f72eaa346165119ca854e36146a04 Mon Sep 17 00:00:00 2001 From: Jacob Deker Date: Wed, 11 Dec 2024 07:16:59 +0100 Subject: [PATCH] Revert "#2963 Fix NPE during task rebalancing" This reverts commit 3645d2b42b1c2e7ef3ead267b0493189f9335b9c. --- .../controller/GenericHelixController.java | 222 +++++++------- .../waged/GlobalRebalanceRunner.java | 35 +-- .../waged/PartialRebalanceRunner.java | 72 +++-- .../stages/MessageGenerationPhase.java | 68 ++--- .../manager/zk/CallbackEventExecutor.java | 17 +- .../messaging/handling/HelixTaskExecutor.java | 134 +++++---- .../helix/spectator/RoutingTableProvider.java | 279 +++++++++--------- .../org/apache/helix/task/TaskStateModel.java | 48 ++- .../apache/helix/util/ExecutorTaskUtil.java | 6 +- 9 files changed, 437 insertions(+), 444 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index e84ac45cca..428bfa7fb5 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -129,7 +129,13 @@ * 4. select the messages that can be sent, needs messages and state model constraints
* 5. send messages */ -public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener, TaskCurrentStateChangeListener, CustomizedStateRootChangeListener, CustomizedStateChangeListener, CustomizedStateConfigChangeListener, ControllerChangeListener, InstanceConfigChangeListener, ResourceConfigChangeListener, ClusterConfigChangeListener { +public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener, + MessageListener, CurrentStateChangeListener, + TaskCurrentStateChangeListener, + CustomizedStateRootChangeListener, + CustomizedStateChangeListener, + CustomizedStateConfigChangeListener, ControllerChangeListener, + InstanceConfigChangeListener, ResourceConfigChangeListener, ClusterConfigChangeListener { private static final Logger logger = LoggerFactory.getLogger(GenericHelixController.class.getName()); @@ -229,8 +235,10 @@ public static GenericHelixController getLeaderController(String clusterName) { if (clusterName != null) { ImmutableSet controllers = _helixControllerFactory.get(clusterName); if (controllers != null) { - return controllers.stream().filter(controller -> controller._helixManager != null) - .filter(controller -> controller._helixManager.isLeader()).findAny().orElse(null); + return controllers.stream() + .filter(controller -> controller._helixManager != null) + .filter(controller -> controller._helixManager.isLeader()) + .findAny().orElse(null); } } return null; @@ -289,7 +297,8 @@ public GenericHelixController(String clusterName) { public GenericHelixController(String clusterName, Set enabledPipelins) { this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()), createTaskRegistry(Pipeline.Type.TASK.name()), - createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()), clusterName, + createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()), + clusterName, enabledPipelins); } @@ -333,8 +342,8 @@ public long getNextRebalanceTime() { public void run() { try { if (_shouldRefreshCacheOption.orElse( - _clusterEventType.equals(ClusterEventType.PeriodicalRebalance) - || _clusterEventType.equals(ClusterEventType.OnDemandRebalance))) { + _clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || _clusterEventType + .equals(ClusterEventType.OnDemandRebalance))) { requestDataProvidersFullRefresh(); HelixDataAccessor accessor = _manager.getHelixDataAccessor(); @@ -363,8 +372,8 @@ private void forceRebalance(HelixManager manager, ClusterEventType eventType) { NotificationContext changeContext = new NotificationContext(manager); changeContext.setType(NotificationContext.Type.CALLBACK); pushToEventQueues(eventType, changeContext, Collections.EMPTY_MAP); - logger.info( - String.format("Controller rebalance pipeline triggered with event type: %s for cluster %s", + logger.info(String + .format("Controller rebalance pipeline triggered with event type: %s for cluster %s", eventType, _clusterName)); } @@ -381,9 +390,9 @@ void startPeriodRebalance(long period, HelixManager manager) { synchronized (_periodicalRebalanceExecutor) { lastScheduledFuture = _periodicRebalanceFutureTask; _timerPeriod = period; - _periodicRebalanceFutureTask = _periodicalRebalanceExecutor.scheduleAtFixedRate( - new RebalanceTask(manager, ClusterEventType.PeriodicalRebalance), _timerPeriod, - _timerPeriod, TimeUnit.MILLISECONDS); + _periodicRebalanceFutureTask = _periodicalRebalanceExecutor + .scheduleAtFixedRate(new RebalanceTask(manager, ClusterEventType.PeriodicalRebalance), + _timerPeriod, _timerPeriod, TimeUnit.MILLISECONDS); } if (lastScheduledFuture != null && !lastScheduledFuture.isCancelled()) { lastScheduledFuture.cancel(false /* mayInterruptIfRunning */); @@ -412,7 +421,6 @@ private void shutdownOnDemandTimer() { _onDemandRebalanceTimer.cancel(); } } - /** * This function is deprecated. Please use RebalanceUtil.scheduleInstantPipeline method instead. * schedule a future rebalance pipeline run, delayed at given time. @@ -420,8 +428,8 @@ private void shutdownOnDemandTimer() { @Deprecated public void scheduleRebalance(long rebalanceTime) { if (_helixManager == null) { - logger.warn("Failed to schedule a future pipeline run for cluster " + _clusterName - + " helix manager is null!"); + logger.warn( + "Failed to schedule a future pipeline run for cluster " + _clusterName + " helix manager is null!"); return; } @@ -440,8 +448,9 @@ public void scheduleRebalance(long rebalanceTime) { new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime); _onDemandRebalanceTimer.schedule(newTask, delay); - logger.info("Scheduled a future pipeline run for cluster " + _helixManager.getClusterName() - + " in delay " + delay); + logger.info( + "Scheduled a future pipeline run for cluster " + _helixManager.getClusterName() + " in delay " + + delay); preTask = _nextRebalanceTask.getAndSet(newTask); if (preTask != null) { @@ -466,8 +475,7 @@ public void scheduleOnDemandRebalance(long delay) { */ public void scheduleOnDemandRebalance(long delay, boolean shouldRefreshCache) { if (_helixManager == null) { - logger.error( - "Failed to schedule a future pipeline run for cluster {}. Helix manager is null!", + logger.error("Failed to schedule a future pipeline run for cluster {}. Helix manager is null!", _clusterName); return; } @@ -487,7 +495,7 @@ public void scheduleOnDemandRebalance(long delay, boolean shouldRefreshCache) { shouldRefreshCache); _onDemandRebalanceTimer.schedule(newTask, delay); - logger.info("Scheduled instant pipeline run for cluster {}.", _helixManager.getClusterName()); + logger.info("Scheduled instant pipeline run for cluster {}." , _helixManager.getClusterName()); RebalanceTask preTask = _nextRebalanceTask.getAndSet(newTask); if (preTask != null) { @@ -553,19 +561,23 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) { rebalancePipeline); registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline); - registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, - autoExitMaintenancePipeline, dataPreprocess, rebalancePipeline); - registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, - autoExitMaintenancePipeline, liveInstancePipeline, dataPreprocess, externalViewPipeline, - customizedViewPipeline, rebalancePipeline); - registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, - rebalancePipeline); + registry + .register(ClusterEventType.ClusterConfigChange, dataRefresh, autoExitMaintenancePipeline, + dataPreprocess, rebalancePipeline); + registry + .register(ClusterEventType.LiveInstanceChange, dataRefresh, autoExitMaintenancePipeline, + liveInstancePipeline, dataPreprocess, externalViewPipeline, customizedViewPipeline, + rebalancePipeline); + registry + .register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline); registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline); - registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, - autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline); - registry.register(ClusterEventType.OnDemandRebalance, dataRefresh, - autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline); + registry + .register(ClusterEventType.PeriodicalRebalance, dataRefresh, autoExitMaintenancePipeline, + dataPreprocess, externalViewPipeline, rebalancePipeline); + registry + .register(ClusterEventType.OnDemandRebalance, dataRefresh, autoExitMaintenancePipeline, + dataPreprocess, externalViewPipeline, rebalancePipeline); registry.register(ClusterEventType.ControllerChange, dataRefresh, autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline); // TODO: We now include rebalance pipeline in customized state change for correctness. @@ -620,8 +632,8 @@ private static PipelineRegistry createTaskRegistry(String pipelineName) { rebalancePipeline); registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline, dataPreprocess, rebalancePipeline); - registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, - rebalancePipeline); + registry + .register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline); registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, rebalancePipeline); registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, dataPreprocess, rebalancePipeline); @@ -652,10 +664,13 @@ private static PipelineRegistry createManagementModeRegistry(String pipelineName managementMode.addStage(new ManagementMessageDispatchStage()); PipelineRegistry registry = new PipelineRegistry(); - Arrays.asList(ClusterEventType.ControllerChange, ClusterEventType.LiveInstanceChange, - ClusterEventType.MessageChange, ClusterEventType.OnDemandRebalance, - ClusterEventType.PeriodicalRebalance) - .forEach(type -> registry.register(type, dataRefresh, dataPreprocess, managementMode)); + Arrays.asList( + ClusterEventType.ControllerChange, + ClusterEventType.LiveInstanceChange, + ClusterEventType.MessageChange, + ClusterEventType.OnDemandRebalance, + ClusterEventType.PeriodicalRebalance + ).forEach(type -> registry.register(type, dataRefresh, dataPreprocess, managementMode)); return registry; } @@ -683,8 +698,7 @@ private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskR _asyncTasksThreadPool = Executors.newScheduledThreadPool(ASYNC_TASKS_THREADPOOL_SIZE, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { + @Override public Thread newThread(Runnable r) { return new Thread(r, "HelixController-async_tasks-" + _clusterName); } }); @@ -762,7 +776,7 @@ private void shutdownAsyncFIFOWorkers() { private boolean isEventQueueEmpty(boolean taskQueue) { if (taskQueue) { - return _taskEventQueue == null || _taskEventQueue.isEmpty(); + return _taskEventQueue == null || _taskEventQueue.isEmpty(); } else { return _eventQueue == null || _eventQueue.isEmpty(); } @@ -810,8 +824,8 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv // If manager session changes, no need to run pipeline for the stale event. if (!eventSessionId.isPresent() || !eventSessionId.get().equals(managerSessionId)) { logger.warn( - "Controller pipeline is not invoked because event session doesn't match cluster " - + "manager session. Event type: {}, id: {}, session: {}, actual manager session: " + "Controller pipeline is not invoked because event session doesn't match cluster " + + "manager session. Event type: {}, id: {}, session: {}, actual manager session: " + "{}, instance: {}, cluster: {}", event.getEventType(), event.getEventId(), eventSessionId.orElse("NOT_PRESENT"), managerSessionId, manager.getInstanceName(), manager.getClusterName()); @@ -837,8 +851,8 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv pipelines = _managementModeRegistry.getPipelinesForEvent(event.getEventType()); isManagementPipeline = true; } else { - logger.warn( - String.format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(), + logger.warn(String + .format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(), event.getEventType(), event.getEventId())); return; } @@ -864,8 +878,7 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv } else { // TODO: should be in the initialization of controller. if (_resourceControlDataProvider != null) { - checkRebalancingTimer(manager, Collections.emptyList(), - dataProvider.getClusterConfig()); + checkRebalancingTimer(manager, Collections.emptyList(), dataProvider.getClusterConfig()); } if (_isMonitoring) { _clusterStatusMonitor.setEnabled(!_inManagementMode); @@ -876,8 +889,7 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv } dataProvider.setClusterEventId(event.getEventId()); - event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), - _lastPipelineEndTimestamp); + event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), _lastPipelineEndTimestamp); event.addAttribute(AttributeName.ControllerDataProvider.name(), dataProvider); logger.info("START: Invoking {} controller pipeline for cluster: {}. Event type: {}, ID: {}. " @@ -900,22 +912,20 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv helixMetaDataAccessRebalanceFail = true; // If pipeline failed due to read/write fails to zookeeper, retry the pipeline. dataProvider.requireFullRefresh(); - logger.warn("Rebalance pipeline failed due to read failure from zookeeper, cluster: " - + _clusterName); + logger.warn("Rebalance pipeline failed due to read failure from zookeeper, cluster: " + _clusterName); // only push a retry event when there is no pending event in the corresponding event queue. if (isEventQueueEmpty(isTaskFrameworkPipeline)) { - _continuousRebalanceFailureCount++; + _continuousRebalanceFailureCount ++; long delay = getRetryDelay(_continuousRebalanceFailureCount); if (delay == 0) { forceRebalance(manager, ClusterEventType.RetryRebalance); } else { - _asyncTasksThreadPool.schedule( - new RebalanceTask(manager, ClusterEventType.RetryRebalance), delay, - TimeUnit.MILLISECONDS); + _asyncTasksThreadPool + .schedule(new RebalanceTask(manager, ClusterEventType.RetryRebalance), delay, + TimeUnit.MILLISECONDS); } - logger.info( - "Retry rebalance pipeline with delay " + delay + "ms for cluster: " + _clusterName); + logger.info("Retry rebalance pipeline with delay " + delay + "ms for cluster: " + _clusterName); } } _clusterStatusMonitor.reportRebalanceFailure(); @@ -946,18 +956,20 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv if (notificationContext != null) { zkCallbackTime = notificationContext.getCreationTime(); if (_isMonitoring) { - _clusterStatusMonitor.updateClusterEventDuration( - ClusterEventMonitor.PhaseName.Callback.name(), enqueueTime - zkCallbackTime); + _clusterStatusMonitor + .updateClusterEventDuration(ClusterEventMonitor.PhaseName.Callback.name(), + enqueueTime - zkCallbackTime); } sb.append(String.format("Callback time for event: %s took: %s ms\n", event.getEventType(), enqueueTime - zkCallbackTime)); } if (_isMonitoring) { - _clusterStatusMonitor.updateClusterEventDuration( - ClusterEventMonitor.PhaseName.InQueue.name(), startTime - enqueueTime); - _clusterStatusMonitor.updateClusterEventDuration( - ClusterEventMonitor.PhaseName.TotalProcessed.name(), - _lastPipelineEndTimestamp - startTime); + _clusterStatusMonitor + .updateClusterEventDuration(ClusterEventMonitor.PhaseName.InQueue.name(), + startTime - enqueueTime); + _clusterStatusMonitor + .updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(), + _lastPipelineEndTimestamp - startTime); } sb.append(String.format("InQueue time for event: %s took: %s ms\n", event.getEventType(), startTime - enqueueTime)); @@ -978,13 +990,13 @@ private void updateContinuousRebalancedFailureCount(boolean isTaskFrameworkPipel if (isTaskFrameworkPipeline) { _continuousTaskRebalanceFailureCount = resetToZero ? 0 : _continuousTaskRebalanceFailureCount + 1; - _clusterStatusMonitor.reportContinuousTaskRebalanceFailureCount( - _continuousTaskRebalanceFailureCount); + _clusterStatusMonitor + .reportContinuousTaskRebalanceFailureCount(_continuousTaskRebalanceFailureCount); } else { _continuousResourceRebalanceFailureCount = resetToZero ? 0 : _continuousResourceRebalanceFailureCount + 1; - _clusterStatusMonitor.reportContinuousResourceRebalanceFailureCount( - _continuousResourceRebalanceFailureCount); + _clusterStatusMonitor + .reportContinuousResourceRebalanceFailureCount(_continuousResourceRebalanceFailureCount); } } @@ -1007,8 +1019,8 @@ public void onStateChange(String instanceName, List statesInfo, NotificationContext changeContext) { logger.info("START: GenericClusterController.onStateChange()"); notifyCaches(changeContext, ChangeType.CURRENT_STATE); - pushToEventQueues(ClusterEventType.CurrentStateChange, changeContext, - Collections.singletonMap(AttributeName.instanceName.name(), instanceName)); + pushToEventQueues(ClusterEventType.CurrentStateChange, changeContext, Collections + .singletonMap(AttributeName.instanceName.name(), instanceName)); logger.info("END: GenericClusterController.onStateChange()"); } @@ -1018,8 +1030,8 @@ public void onTaskCurrentStateChange(String instanceName, List sta NotificationContext changeContext) { logger.info("START: GenericClusterController.onTaskCurrentStateChange()"); notifyCaches(changeContext, ChangeType.TASK_CURRENT_STATE); - pushToEventQueues(ClusterEventType.TaskCurrentStateChange, changeContext, - Collections.singletonMap(AttributeName.instanceName.name(), instanceName)); + pushToEventQueues(ClusterEventType.TaskCurrentStateChange, changeContext, Collections + .singletonMap(AttributeName.instanceName.name(), instanceName)); logger.info("END: GenericClusterController.onTaskCurrentStateChange()"); } @@ -1082,8 +1094,8 @@ public void onCustomizedStateChange(String instanceName, List s NotificationContext changeContext) { logger.info("START: GenericClusterController.onCustomizedStateChange()"); notifyCaches(changeContext, ChangeType.CUSTOMIZED_STATE); - pushToEventQueues(ClusterEventType.CustomizedStateChange, changeContext, - Collections.singletonMap(AttributeName.instanceName.name(), instanceName)); + pushToEventQueues(ClusterEventType.CustomizedStateChange, changeContext, Collections + .singletonMap(AttributeName.instanceName.name(), instanceName)); logger.info("END: GenericClusterController.onCustomizedStateChange()"); } @@ -1102,8 +1114,7 @@ public void onMessage(String instanceName, List messages, @Override public void onLiveInstanceChange(List liveInstances, NotificationContext changeContext) { - logger.info("START: Generic GenericClusterController.onLiveInstanceChange() for cluster " - + _clusterName); + logger.info("START: Generic GenericClusterController.onLiveInstanceChange() for cluster " + _clusterName); notifyCaches(changeContext, ChangeType.LIVE_INSTANCE); if (liveInstances == null) { @@ -1197,40 +1208,44 @@ public void onInstanceConfigChange(List instanceConfigs, @Override @PreFetch(enabled = false) - public void onResourceConfigChange(List resourceConfigs, - NotificationContext context) { + public void onResourceConfigChange( + List resourceConfigs, NotificationContext context) { logger.info( "START: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName); notifyCaches(context, ChangeType.RESOURCE_CONFIG); pushToEventQueues(ClusterEventType.ResourceConfigChange, context, Collections.emptyMap()); - logger.info( - "END: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName); + logger + .info("END: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName); } @Override @PreFetch(enabled = false) - public void onCustomizedStateConfigChange(CustomizedStateConfig customizedStateConfig, + public void onCustomizedStateConfigChange( + CustomizedStateConfig customizedStateConfig, NotificationContext context) { - logger.info("START: GenericClusterController.onCustomizedStateConfigChange() for cluster " - + _clusterName); + logger.info( + "START: GenericClusterController.onCustomizedStateConfigChange() for cluster " + + _clusterName); notifyCaches(context, ChangeType.CUSTOMIZED_STATE_CONFIG); pushToEventQueues(ClusterEventType.CustomizeStateConfigChange, context, - Collections.emptyMap()); - logger.info("END: GenericClusterController.onCustomizedStateConfigChange() for cluster " - + _clusterName); + Collections. emptyMap()); + logger.info( + "END: GenericClusterController.onCustomizedStateConfigChange() for cluster " + + _clusterName); } @Override @PreFetch(enabled = false) - public void onClusterConfigChange(ClusterConfig clusterConfig, NotificationContext context) { + public void onClusterConfigChange(ClusterConfig clusterConfig, + NotificationContext context) { logger.info( "START: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName); notifyCaches(context, ChangeType.CLUSTER_CONFIG); pushToEventQueues(ClusterEventType.ClusterConfigChange, context, Collections.emptyMap()); - logger.info( - "END: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName); + logger + .info("END: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName); } private void notifyCaches(NotificationContext context, ChangeType changeType) { @@ -1411,16 +1426,17 @@ protected void checkLiveInstancesObservation(List liveInstances, } } - for (String instance : curInstances.keySet()) { - if (lastInstances == null || !lastInstances.containsKey(instance)) { - try { - manager.addCustomizedStateRootChangeListener(this, instance); - logger.info(manager.getInstanceName() + " added root path listener for customized " - + "state change for " + instance + ", listener: " + this); - } catch (Exception e) { - logger.error("Fail to add root path listener for customized state change for instance: " - + instance, e); - } + for (String instance : curInstances.keySet()) { + if (lastInstances == null || !lastInstances.containsKey(instance)) { + try { + manager.addCustomizedStateRootChangeListener(this, instance); + logger.info(manager.getInstanceName() + " added root path listener for customized " + + "state change for " + instance + ", listener: " + this); + } catch (Exception e) { + logger.error( + "Fail to add root path listener for customized state change for instance: " + + instance, e); + } } } @@ -1435,8 +1451,8 @@ public void shutdown() throws InterruptedException { stopPeriodRebalance(); _periodicalRebalanceExecutor.shutdown(); - if (!_periodicalRebalanceExecutor.awaitTermination(EVENT_THREAD_JOIN_TIMEOUT, - TimeUnit.MILLISECONDS)) { + if (!_periodicalRebalanceExecutor + .awaitTermination(EVENT_THREAD_JOIN_TIMEOUT, TimeUnit.MILLISECONDS)) { _periodicalRebalanceExecutor.shutdownNow(); } @@ -1534,16 +1550,16 @@ public void run() { while (!isInterrupted()) { try { ClusterEvent newClusterEvent = _eventBlockingQueue.take(); - String threadName = String.format("HelixController-pipeline-%s-(%s)", _processorName, - newClusterEvent.getEventId()); + String threadName = String.format( + "HelixController-pipeline-%s-(%s)", _processorName, newClusterEvent.getEventId()); this.setName(threadName); handleEvent(newClusterEvent, _cache); } catch (InterruptedException e) { logger.warn("ClusterEventProcessor interrupted " + _processorName, e); interrupt(); } catch (ZkInterruptedException e) { - logger.warn("ClusterEventProcessor caught a ZK connection interrupt " + _processorName, - e); + logger + .warn("ClusterEventProcessor caught a ZK connection interrupt " + _processorName, e); interrupt(); } catch (ThreadDeath death) { logger.error("ClusterEventProcessor caught a ThreadDeath " + _processorName, death); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java index af9b14697f..5b7a7baba3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java @@ -27,7 +27,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; - import org.apache.helix.HelixConstants; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.changedetector.ResourceChangeDetector; @@ -47,6 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Global Rebalance does the baseline recalculation when certain changes happen. * The Global Baseline calculation does not consider any temporary status, such as participants' offline/disabled. @@ -60,9 +60,9 @@ class GlobalRebalanceRunner implements AutoCloseable { // When any of the following change happens, the rebalancer needs to do a global rebalance which // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline. private static final Set GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES = - ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG, - HelixConstants.ChangeType.IDEAL_STATE, HelixConstants.ChangeType.CLUSTER_CONFIG, - HelixConstants.ChangeType.INSTANCE_CONFIG); + ImmutableSet + .of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE, + HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG); // To calculate the baseline asynchronously private final ExecutorService _baselineCalculateExecutor; @@ -77,8 +77,10 @@ class GlobalRebalanceRunner implements AutoCloseable { private boolean _asyncGlobalRebalanceEnabled; public GlobalRebalanceRunner(AssignmentManager assignmentManager, - AssignmentMetadataStore assignmentMetadataStore, MetricCollector metricCollector, - LatencyMetric writeLatency, CountMetric rebalanceFailureCount, + AssignmentMetadataStore assignmentMetadataStore, + MetricCollector metricCollector, + LatencyMetric writeLatency, + CountMetric rebalanceFailureCount, boolean isAsyncGlobalRebalanceEnabled) { _baselineCalculateExecutor = Executors.newSingleThreadExecutor(); _assignmentManager = assignmentManager; @@ -104,17 +106,14 @@ public GlobalRebalanceRunner(AssignmentManager assignmentManager, * @param algorithm * @throws HelixRebalanceException */ - public void globalRebalance(ResourceControllerDataProvider clusterData, - Map resourceMap, final CurrentStateOutput currentStateOutput, - RebalanceAlgorithm algorithm) throws HelixRebalanceException { + public void globalRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, + final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException { _changeDetector.updateSnapshots(clusterData); // Get all the changed items' information. Filter for the items that have content changed. - final Map> clusterChanges = - _changeDetector.getAllChanges(); + final Map> clusterChanges = _changeDetector.getAllChanges(); Set allAssignableInstances = clusterData.getAssignableInstances(); - if (clusterChanges.keySet().stream() - .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) { + if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) { final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled; // Calculate the Baseline assignment for global rebalance. Future result = _baselineCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> { @@ -154,8 +153,8 @@ public void globalRebalance(ResourceControllerDataProvider clusterData, */ private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, Set allAssignableInstances, - RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, - boolean shouldTriggerMainPipeline, Map> clusterChanges) + RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline, + Map> clusterChanges) throws HelixRebalanceException { LOG.info("Start calculating the new baseline."); _baselineCalcCounter.increment(1L); @@ -166,8 +165,7 @@ private void doGlobalRebalance(ResourceControllerDataProvider clusterData, // 1. Ignore node status (disable/offline). // 2. Use the previous Baseline as the only parameter about the previous assignment. Map currentBaseline = - _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, - resourceMap.keySet()); + _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); ClusterModel clusterModel; try { clusterModel = ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap, @@ -177,8 +175,7 @@ private void doGlobalRebalance(ResourceControllerDataProvider clusterData, HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); } - Map newBaseline = - WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm); + Map newBaseline = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm); boolean isBaselineChanged = _assignmentMetadataStore != null && _assignmentMetadataStore.isBaselineChanged(newBaseline); // Write the new baseline to metadata store diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java index d307fe3f8b..141d398e92 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java @@ -26,7 +26,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; - import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil; @@ -45,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Compute the best possible assignment based on the Baseline and the previous Best Possible assignment. * The coordinator compares the previous Best Possible assignment with the current cluster state so as to derive a @@ -68,8 +68,10 @@ class PartialRebalanceRunner implements AutoCloseable { private Future _asyncPartialRebalanceResult; public PartialRebalanceRunner(AssignmentManager assignmentManager, - AssignmentMetadataStore assignmentMetadataStore, MetricCollector metricCollector, - CountMetric rebalanceFailureCount, boolean isAsyncPartialRebalanceEnabled) { + AssignmentMetadataStore assignmentMetadataStore, + MetricCollector metricCollector, + CountMetric rebalanceFailureCount, + boolean isAsyncPartialRebalanceEnabled) { _assignmentManager = assignmentManager; _assignmentMetadataStore = assignmentMetadataStore; _bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor(); @@ -80,16 +82,16 @@ public PartialRebalanceRunner(AssignmentManager assignmentManager, WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(), CountMetric.class); _partialRebalanceLatency = metricCollector.getMetric( - WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge + .name(), LatencyMetric.class); _baselineDivergenceGauge = metricCollector.getMetric( WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(), BaselineDivergenceGauge.class); } - public void partialRebalance(ResourceControllerDataProvider clusterData, - Map resourceMap, Set activeNodes, - final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) + public void partialRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, + Set activeNodes, final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException { // If partial rebalance is async and the previous result is not completed yet, // do not start another partial rebalance. @@ -98,20 +100,19 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, return; } - _asyncPartialRebalanceResult = - _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> { - try { - doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm, - currentStateOutput); - } catch (HelixRebalanceException e) { - if (_asyncPartialRebalanceEnabled) { - _rebalanceFailureCount.increment(1L); - } - LOG.error("Failed to calculate best possible assignment!", e); - return false; - } - return true; - })); + _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> { + try { + doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm, + currentStateOutput); + } catch (HelixRebalanceException e) { + if (_asyncPartialRebalanceEnabled) { + _rebalanceFailureCount.increment(1L); + } + LOG.error("Failed to calculate best possible assignment!", e); + return false; + } + return true; + })); if (!_asyncPartialRebalanceEnabled) { try { if (!_asyncPartialRebalanceResult.get()) { @@ -130,9 +131,9 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, * If the result differ from the persisted result, persist it to memory (only if the version is not stale); * If persisted, trigger the pipeline so that main thread logic can run again. */ - private void doPartialRebalance(ResourceControllerDataProvider clusterData, - Map resourceMap, Set activeNodes, RebalanceAlgorithm algorithm, - CurrentStateOutput currentStateOutput) throws HelixRebalanceException { + private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, + Set activeNodes, RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput) + throws HelixRebalanceException { LOG.info("Start calculating the new best possible assignment."); _partialRebalanceCounter.increment(1L); _partialRebalanceLatency.startMeasuringLatency(); @@ -141,14 +142,12 @@ private void doPartialRebalance(ResourceControllerDataProvider clusterData, if (_assignmentMetadataStore != null) { newBestPossibleAssignmentVersion = _assignmentMetadataStore.getBestPossibleVersion() + 1; } else { - LOG.debug( - "Assignment Metadata Store is null. Skip getting best possible assignment version."); + LOG.debug("Assignment Metadata Store is null. Skip getting best possible assignment version."); } // Read the baseline from metadata store Map currentBaseline = - _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, - resourceMap.keySet()); + _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); // Read the best possible assignment from metadata store Map currentBestPossibleAssignment = @@ -156,15 +155,14 @@ private void doPartialRebalance(ResourceControllerDataProvider clusterData, resourceMap.keySet()); ClusterModel clusterModel; try { - clusterModel = - ClusterModelProvider.generateClusterModelForPartialRebalance(clusterData, resourceMap, - activeNodes, currentBaseline, currentBestPossibleAssignment); + clusterModel = ClusterModelProvider + .generateClusterModelForPartialRebalance(clusterData, resourceMap, activeNodes, + currentBaseline, currentBestPossibleAssignment); } catch (Exception ex) { throw new HelixRebalanceException("Failed to generate cluster model for partial rebalance.", HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); } - Map newAssignment = - WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm); + Map newAssignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm); // Asynchronously report baseline divergence metric before persisting to metadata store, // just in case if persisting fails, we still have the metric. @@ -179,14 +177,12 @@ private void doPartialRebalance(ResourceControllerDataProvider clusterData, currentBaseline, newAssignmentCopy); boolean bestPossibleUpdateSuccessful = false; - if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged( - newAssignment)) { + if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(newAssignment)) { // This will not persist the new Best Possible Assignment into ZK. It will only update the in-memory cache. // If this is done successfully, the new Best Possible Assignment will be persisted into ZK the next time that // the pipeline is triggered. We schedule the pipeline to run below. - bestPossibleUpdateSuccessful = - _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment, - newBestPossibleAssignmentVersion); + bestPossibleUpdateSuccessful = _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment, + newBestPossibleAssignmentVersion); } else { LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment."); } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index 8a1a1f04ba..750829edb9 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -65,9 +65,8 @@ public class MessageGenerationPhase extends AbstractBaseStage { // at MASTER state, we wait for timeout and if the message is still not cleaned up by // participant, controller will cleanup them proactively to unblock further state // transition - public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = - HelixUtil.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, - 60 * 1000); + public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil + .getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 * 1000); private final static String PENDING_MESSAGE = "pending message"; private final static String STALE_MESSAGE = "stale message"; @@ -187,9 +186,9 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr nextState = stateModelDef.getNextStateForTransition(currentState, desiredState); if (desiredState.equals(HelixDefinedState.DROPPED.name())) { - LogUtil.logDebug(logger, _eventId, String.format( - "No current state for partition %s in resource %s, skip the drop message", - partition.getPartitionName(), resourceName)); + LogUtil.logDebug(logger, _eventId, String + .format("No current state for partition %s in resource %s, skip the drop message", + partition.getPartitionName(), resourceName)); message = generateCancellationMessageForPendingMessage(desiredState, currentState, nextState, @@ -200,8 +199,8 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr // TODO: separate logic of resource/task message generation if (cache instanceof ResourceControllerDataProvider) { - ((ResourceControllerDataProvider) cache).invalidateCachedIdealStateMapping( - resourceName); + ((ResourceControllerDataProvider) cache) + .invalidateCachedIdealStateMapping(resourceName); } continue; } @@ -215,9 +214,10 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr for (Message staleMessage : staleMessages) { // staleMessage can be simple or batch mode - if ((System.currentTimeMillis() - currentStateOutput.getEndTime(resourceName, partition, - instanceName) > DEFAULT_OBSELETE_MSG_PURGE_DELAY) && staleMessage.getResourceName() - .equals(resourceName) && sessionIdMap.containsKey(instanceName) && ( + if ((System.currentTimeMillis() - currentStateOutput + .getEndTime(resourceName, partition, instanceName) > DEFAULT_OBSELETE_MSG_PURGE_DELAY) + && staleMessage.getResourceName().equals(resourceName) && sessionIdMap + .containsKey(instanceName) && ( staleMessage.getPartitionName().equals(partition.getPartitionName()) || ( staleMessage.getBatchMessageMode() && staleMessage.getPartitionNames() .contains(partition.getPartitionName())))) { @@ -229,8 +229,8 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr if (desiredState.equals(NO_DESIRED_STATE) || desiredState.equalsIgnoreCase(currentState)) { if (shouldCreateSTCancellation(pendingMessage, desiredState, stateModelDef.getInitialState())) { - message = - MessageUtil.createStateTransitionCancellationMessage(manager.getInstanceName(), + message = MessageUtil + .createStateTransitionCancellationMessage(manager.getInstanceName(), manager.getSessionId(), resource, partition.getPartitionName(), instanceName, sessionIdMap.get(instanceName), stateModelDef.getId(), pendingMessage.getFromState(), pendingMessage.getToState(), null, @@ -252,9 +252,10 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr stateModelDef, cancellationMessage, isCancellationEnabled); } else { // Create new state transition message - message = MessageUtil.createStateTransitionMessage(manager.getInstanceName(), - manager.getSessionId(), resource, partition.getPartitionName(), instanceName, - currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId()); + message = MessageUtil + .createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(), + resource, partition.getPartitionName(), instanceName, currentState, nextState, + sessionIdMap.get(instanceName), stateModelDef.getId()); if (logger.isDebugEnabled()) { LogUtil.logDebug(logger, _eventId, String.format( @@ -278,7 +279,8 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr LogUtil.logError(logger, _eventId, String.format( "An invalid message was generated! Discarding this message. sessionIdMap: %s, CurrentStateMap: %s, InstanceStateMap: %s, AllInstances: %s, LiveInstances: %s, Message: %s", sessionIdMap, currentStateOutput.getCurrentStateMap(resourceName, partition), - instanceStateMap, cache.getAllInstances(), cache.getLiveInstances().keySet(), + instanceStateMap, cache.getAllInstances(), + cache.getLiveInstances().keySet(), message)); continue; // Do not add this message } @@ -302,8 +304,8 @@ private boolean shouldCreateSTCancellation(Message pendingMessage, String desire // 1. pending message toState is desired state // 2. pending message is an ERROR reset: ERROR -> initState (eg. OFFLINE) return !desiredState.equalsIgnoreCase(pendingMessage.getToState()) && !( - HelixDefinedState.ERROR.name().equals(pendingMessage.getFromState()) && initialState.equals( - pendingMessage.getToState())); + HelixDefinedState.ERROR.name().equals(pendingMessage.getFromState()) && initialState + .equals(pendingMessage.getToState())); } private void logAndAddToCleanUp(Map> messagesToCleanUp, @@ -333,16 +335,15 @@ private Message generateCancellationMessageForPendingMessage(final String desire String pendingState = pendingMessage.getToState(); if (nextState.equalsIgnoreCase(pendingState)) { LogUtil.logInfo(logger, _eventId, - "Message already exists for " + instanceName + " to transit " - + resource.getResourceName() + "." + partition.getPartitionName() + " from " - + currentState + " to " + nextState + ", isRelay: " - + pendingMessage.isRelayMessage()); + "Message already exists for " + instanceName + " to transit " + resource + .getResourceName() + "." + partition.getPartitionName() + " from " + currentState + + " to " + nextState + ", isRelay: " + pendingMessage.isRelayMessage()); } else if (currentState.equalsIgnoreCase(pendingState)) { LogUtil.logDebug(logger, _eventId, - "Message hasn't been removed for " + instanceName + " to transit " - + resource.getResourceName() + "." + partition.getPartitionName() + " to " - + pendingState + ", desiredState: " + desiredState + ", isRelay: " - + pendingMessage.isRelayMessage()); + "Message hasn't been removed for " + instanceName + " to transit " + resource + .getResourceName() + "." + partition.getPartitionName() + " to " + pendingState + + ", desiredState: " + desiredState + ", isRelay: " + pendingMessage + .isRelayMessage()); } else { LogUtil.logDebug(logger, _eventId, "IdealState changed before state transition completes for " + resource.getResourceName() @@ -408,9 +409,8 @@ public Object call() { String instanceName = entry.getKey(); for (Message msg : entry.getValue().values()) { if (accessor.removeProperty(msg.getKey(accessor.keyBuilder(), instanceName))) { - LogUtil.logInfo(logger, _eventId, - String.format("Deleted message %s from instance %s", msg.getMsgId(), - instanceName)); + LogUtil.logInfo(logger, _eventId, String + .format("Deleted message %s from instance %s", msg.getMsgId(), instanceName)); } } } @@ -442,8 +442,8 @@ private int getTimeOut(ClusterConfig clusterConfig, ResourceConfig resourceConfi String currentState, String nextState, IdealState idealState, Partition partition) { StateTransitionTimeoutConfig stateTransitionTimeoutConfig = clusterConfig.getStateTransitionTimeoutConfig(); - int timeout = stateTransitionTimeoutConfig != null - ? stateTransitionTimeoutConfig.getStateTransitionTimeout(currentState, nextState) : -1; + int timeout = stateTransitionTimeoutConfig != null ? stateTransitionTimeoutConfig + .getStateTransitionTimeout(currentState, nextState) : -1; String timeOutStr = null; // Check IdealState whether has timeout set @@ -470,8 +470,8 @@ private int getTimeOut(ClusterConfig clusterConfig, ResourceConfig resourceConfi if (resourceConfig != null) { // If resource config has timeout, replace the cluster timeout. stateTransitionTimeoutConfig = resourceConfig.getStateTransitionTimeoutConfig(); - timeout = stateTransitionTimeoutConfig != null - ? stateTransitionTimeoutConfig.getStateTransitionTimeout(currentState, nextState) : -1; + timeout = stateTransitionTimeoutConfig != null ? stateTransitionTimeoutConfig + .getStateTransitionTimeout(currentState, nextState) : -1; } return timeout; diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java index e56a20f983..17eb5a3e1e 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Each batch mode is enabled, each CallbackHandler has a CallbackEventTPExecutor. * It has a dedupe queue for pending call back event. Pending call back event will @@ -60,8 +61,8 @@ class CallbackProcessor implements Runnable { private final NotificationContext _event; public CallbackProcessor(CallbackHandler handler, NotificationContext event) { - _processorName = _manager.getClusterName() + "-CallbackProcessor@" + Integer.toHexString( - handler.hashCode()); + _processorName = _manager.getClusterName() + "-CallbackProcessor@" + Integer + .toHexString(handler.hashCode()); _handler = handler; _event = event; } @@ -88,8 +89,8 @@ public void submitEventToExecutor(NotificationContext.Type eventType, Notificati logger.error("Failed to process callback. CallbackEventExecutor is already shut down."); } if (_futureCallBackProcessEvent == null || _futureCallBackProcessEvent.isDone()) { - _futureCallBackProcessEvent = _threadPoolExecutor.submit( - ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); + _futureCallBackProcessEvent = + _threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); } else { _callBackEventQueue.put(eventType, event); } @@ -101,11 +102,11 @@ private void submitPendingHandleCallBackEventToManagerThreadPool(CallbackHandler if (_callBackEventQueue.size() != 0) { try { NotificationContext event = _callBackEventQueue.take(); - _futureCallBackProcessEvent = _threadPoolExecutor.submit( - ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); + _futureCallBackProcessEvent = + _threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event))); } catch (InterruptedException e) { - logger.error("Error when submitting pending HandleCallBackEvent to manager thread pool", - e); + logger + .error("Error when submitting pending HandleCallBackEvent to manager thread pool", e); } } } diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index fc734eef32..5c44d78e31 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -84,6 +84,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class HelixTaskExecutor implements MessageListener, TaskExecutor { /** * Put together all registration information about a message handler factory @@ -93,8 +94,7 @@ class MsgHandlerFactoryRegistryItem { private final int _threadPoolSize; private final int _resetTimeout; - public MsgHandlerFactoryRegistryItem(MessageHandlerFactory factory, int threadPoolSize, - int resetTimeout) { + public MsgHandlerFactoryRegistryItem(MessageHandlerFactory factory, int threadPoolSize, int resetTimeout) { if (factory == null) { throw new NullPointerException("Message handler factory is null"); } @@ -224,14 +224,12 @@ public void registerMessageHandlerFactory(String type, MessageHandlerFactory fac } @Override - public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, - int threadpoolSize) { - registerMessageHandlerFactory(type, factory, threadpoolSize, - DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS); + public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, int threadpoolSize) { + registerMessageHandlerFactory(type, factory, threadpoolSize, DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS); } - private void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, - int threadpoolSize, int resetTimeoutMs) { + private void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, int threadpoolSize, + int resetTimeoutMs) { if (factory instanceof MultiTypeMessageHandlerFactory) { if (!((MultiTypeMessageHandlerFactory) factory).getMessageTypes().contains(type)) { throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: " @@ -239,15 +237,15 @@ private void registerMessageHandlerFactory(String type, MessageHandlerFactory fa } } else { if (!factory.getMessageType().equals(type)) { - throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: " - + factory.getMessageType()); + throw new HelixException( + "Message factory type mismatch. Type: " + type + ", factory: " + factory + .getMessageType()); } } _isShuttingDown = false; - MsgHandlerFactoryRegistryItem newItem = - new MsgHandlerFactoryRegistryItem(factory, threadpoolSize, resetTimeoutMs); + MsgHandlerFactoryRegistryItem newItem = new MsgHandlerFactoryRegistryItem(factory, threadpoolSize, resetTimeoutMs); MsgHandlerFactoryRegistryItem prevItem = _hdlrFtyRegistry.putIfAbsent(type, newItem); if (prevItem == null) { _executorMap.computeIfAbsent(type, msgType -> { @@ -256,9 +254,8 @@ private void registerMessageHandlerFactory(String type, MessageHandlerFactory fa _monitor.createExecutorMonitor(type, newPool); return newPool; }); - LOG.info( - "Registered message handler factory for type: {}, poolSize: {}, factory: {}, pool: {}", - type, threadpoolSize, factory, _executorMap.get(type)); + LOG.info("Registered message handler factory for type: {}, poolSize: {}, factory: {}, pool: {}", type, + threadpoolSize, factory, _executorMap.get(type)); } else { LOG.info( "Skip register message handler factory for type: {}, poolSize: {}, factory: {}, already existing factory: {}", @@ -311,19 +308,17 @@ void updateStateTransitionMessageThreadPool(Message message, HelixManager manage } } - String perStateTransitionTypeKey = msgInfo.getMessageIdentifier( - Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE); - if (perStateTransitionTypeKey != null && stateModelFactory != null - && !_transitionTypeThreadpoolChecked.contains(perStateTransitionTypeKey)) { + String perStateTransitionTypeKey = + msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE); + if (perStateTransitionTypeKey != null && stateModelFactory != null && !_transitionTypeThreadpoolChecked.contains( + perStateTransitionTypeKey)) { ExecutorService perStateTransitionTypeExecutor = - stateModelFactory.getExecutorService(resourceName, message.getFromState(), - message.getToState()); + stateModelFactory.getExecutorService(resourceName, message.getFromState(), message.getToState()); _transitionTypeThreadpoolChecked.add(perStateTransitionTypeKey); if (perStateTransitionTypeExecutor != null) { _executorMap.put(perStateTransitionTypeKey, perStateTransitionTypeExecutor); - LOG.info(String.format( - "Added client specified dedicate threadpool for resource %s from %s to %s", + LOG.info(String.format("Added client specified dedicate threadpool for resource %s from %s to %s", msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE), message.getFromState(), message.getToState())); return; @@ -336,8 +331,9 @@ void updateStateTransitionMessageThreadPool(Message message, HelixManager manage // Changes to this configuration on thread pool size will only take effect after the participant get restarted. if (configAccessor != null) { HelixConfigScope scope = - new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster( - manager.getClusterName()).forResource(resourceName).build(); + new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster(manager.getClusterName()) + .forResource(resourceName) + .build(); String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS); try { @@ -345,17 +341,14 @@ void updateStateTransitionMessageThreadPool(Message message, HelixManager manage threadpoolSize = Integer.parseInt(threadpoolSizeStr); } } catch (Exception e) { - LOG.error( - "Failed to parse ThreadPoolSize from resourceConfig for resource" + resourceName, e); + LOG.error("Failed to parse ThreadPoolSize from resourceConfig for resource" + resourceName, e); } } - final String key = - msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE); + final String key = msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE); if (threadpoolSize > 0) { _executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize, r -> new Thread(r, "GenericHelixController-message_handle_" + key))); - LOG.info("Added dedicate threadpool for resource: " + resourceName + " with size: " - + threadpoolSize); + LOG.info("Added dedicate threadpool for resource: " + resourceName + " with size: " + threadpoolSize); } else { // if threadpool is not configured // check whether client specifies customized threadpool. @@ -387,8 +380,7 @@ ExecutorService findExecutorServiceForMsg(Message message) { } else { Message.MessageInfo msgInfo = new Message.MessageInfo(message); for (int i = Message.MessageInfo.MessageIdentifierBase.values().length - 1; i >= 0; i--) { - String msgIdentifer = - msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.values()[i]); + String msgIdentifer = msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.values()[i]); if (msgIdentifer != null && _executorMap.containsKey(msgIdentifer)) { LOG.info(String.format("Find customized threadpool for %s", msgIdentifer)); executorService = _executorMap.get(msgIdentifer); @@ -457,8 +449,8 @@ public boolean scheduleTask(MessageTask task) { LOG.info("Scheduling message {}: {}:{}, {}->{}", taskId, message.getResourceName(), message.getPartitionName(), message.getFromState(), message.getToState()); - _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Message handling task scheduled", - manager); + _statusUpdateUtil + .logInfo(message, HelixTaskExecutor.class, "Message handling task scheduled", manager); // this sync guarantees that ExecutorService.submit() task and put taskInfo into map are // sync'ed @@ -467,8 +459,8 @@ public boolean scheduleTask(MessageTask task) { ExecutorService exeSvc = findExecutorServiceForMsg(message); if (exeSvc == null) { - LOG.warn( - String.format("Threadpool is null for type %s of message %s", message.getMsgType(), + LOG.warn(String + .format("Threadpool is null for type %s of message %s", message.getMsgType(), message.getMsgId())); return false; } @@ -476,15 +468,17 @@ public boolean scheduleTask(MessageTask task) { LOG.info("Submit task: " + taskId + " to pool: " + exeSvc); Future future = exeSvc.submit(ExecutorTaskUtil.wrap(task)); - _messageTaskMap.putIfAbsent( - getMessageTarget(message.getResourceName(), message.getPartitionName()), taskId); + _messageTaskMap + .putIfAbsent(getMessageTarget(message.getResourceName(), message.getPartitionName()), + taskId); TimerTask timerTask = null; if (message.getExecutionTimeout() > 0) { timerTask = new MessageTimeoutTask(this, task); _timer.schedule(timerTask, message.getExecutionTimeout()); - LOG.info("Message starts with timeout " + message.getExecutionTimeout() + " MsgId: " - + task.getTaskId()); + LOG.info( + "Message starts with timeout " + message.getExecutionTimeout() + " MsgId: " + task + .getTaskId()); } else { LOG.debug("Message does not have timeout. MsgId: " + task.getTaskId()); } @@ -499,8 +493,9 @@ public boolean scheduleTask(MessageTask task) { } } catch (Throwable t) { LOG.error("Error while executing task. " + message, t); - _statusUpdateUtil.logError(message, HelixTaskExecutor.class, t, - "Error while executing task " + t, manager); + _statusUpdateUtil + .logError(message, HelixTaskExecutor.class, t, "Error while executing task " + t, + manager); } return false; } @@ -534,8 +529,9 @@ public boolean cancelTask(MessageTask task) { _taskMap.remove(taskId); return true; } else { - _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, - "fail to cancel task: " + taskId, notificationContext.getManager()); + _statusUpdateUtil + .logInfo(message, HelixTaskExecutor.class, "fail to cancel task: " + taskId, + notificationContext.getManager()); } } else { _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class, @@ -550,8 +546,8 @@ public boolean cancelTask(MessageTask task) { public void finishTask(MessageTask task) { Message message = task.getMessage(); String taskId = task.getTaskId(); - LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - - message.getExecuteStartTimeStamp())); + LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - message + .getExecuteStartTimeStamp())); synchronized (_lock) { if (_taskMap.containsKey(taskId)) { @@ -623,19 +619,18 @@ private void updateMessageState(Collection msgsToBeUpdated, HelixDataAc } } - private void shutdownAndAwaitTermination(ExecutorService pool, - MsgHandlerFactoryRegistryItem handlerItem) { + private void shutdownAndAwaitTermination(ExecutorService pool, MsgHandlerFactoryRegistryItem handlerItem) { LOG.info("Shutting down pool: " + pool); - int timeout = - handlerItem == null ? DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS : handlerItem.getResetTimeout(); + int timeout = handlerItem == null? DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS : handlerItem.getResetTimeout(); pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { List waitingTasks = pool.shutdownNow(); // Cancel currently executing tasks - LOG.info("Tasks that never commenced execution after {}: {}", timeout, waitingTasks); + LOG.info("Tasks that never commenced execution after {}: {}", timeout, + waitingTasks); // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { LOG.error("Pool did not fully terminate in {} ms. pool: {}", timeout, pool); @@ -727,8 +722,12 @@ synchronized void reset() { shutdownExecutors(); synchronized (_hdlrFtyRegistry) { - _hdlrFtyRegistry.values().stream().map(MsgHandlerFactoryRegistryItem::factory).distinct() - .filter(Objects::nonNull).forEach(factory -> { + _hdlrFtyRegistry.values() + .stream() + .map(MsgHandlerFactoryRegistryItem::factory) + .distinct() + .filter(Objects::nonNull) + .forEach(factory -> { try { factory.reset(); } catch (Exception ex) { @@ -743,8 +742,7 @@ synchronized void reset() { // Log all tasks that fail to terminate for (String taskId : _taskMap.keySet()) { MessageTaskInfo info = _taskMap.get(taskId); - sb.append( - "Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage() + "\n"); + sb.append("Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage() + "\n"); } LOG.info(sb.toString()); @@ -786,8 +784,8 @@ private void syncSessionToController(HelixManager manager) { HelixDataAccessor accessor = manager.getHelixDataAccessor(); PropertyKey key = new Builder(manager.getClusterName()).controllerMessage(SESSION_SYNC); if (accessor.getProperty(key) == null) { - LOG.info(String.format("Participant %s syncs session with controller", - manager.getInstanceName())); + LOG.info(String + .format("Participant %s syncs session with controller", manager.getInstanceName())); Message msg = new Message(MessageType.PARTICIPANT_SESSION_CHANGE, SESSION_SYNC); msg.setSrcName(manager.getInstanceName()); msg.setTgtSessionId("*"); @@ -997,8 +995,8 @@ public void onMessage(String instanceName, List messages, && !createCurStateNames.contains(resourceName)) { createCurStateNames.add(resourceName); PropertyKey curStateKey = keyBuilder.currentState(instanceName, sessionId, resourceName); - if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef()) - && !Boolean.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) { + if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef()) && !Boolean + .getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) { curStateKey = keyBuilder.taskCurrentState(instanceName, sessionId, resourceName); } createCurStateKeys.add(curStateKey); @@ -1045,8 +1043,8 @@ public void onMessage(String instanceName, List messages, */ try { // Record error state to the message handler. - handler.onError(new HelixException( - String.format("Failed to schedule the task for executing message handler for %s.", + handler.onError(new HelixException(String + .format("Failed to schedule the task for executing message handler for %s.", handler._message.getMsgId())), MessageHandler.ErrorCode.ERROR, MessageHandler.ErrorType.FRAMEWORK); } catch (Exception ex) { @@ -1094,8 +1092,8 @@ private boolean checkAndProcessNoOpMessage(Message message, String instanceName, + ", tgtSessionId in message: " + tgtSessionId + ", messageId: " + message.getMsgId(); LOG.warn(warningMessage); reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED); - _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage, - manager); + _statusUpdateUtil + .logWarning(message, HelixStateMachineEngine.class, warningMessage, manager); // Proactively send a session sync message from participant to controller // upon session mismatch after a new session is established @@ -1364,8 +1362,8 @@ public MessageHandler createMessageHandler(Message message, NotificationContext // we will keep the message and the message will be handled when // the corresponding MessageHandlerFactory is registered if (item == null) { - LOG.warn("Fail to find message handler factory for type: " + msgType + " msgId: " - + message.getMsgId()); + LOG.warn("Fail to find message handler factory for type: " + msgType + " msgId: " + message + .getMsgId()); return null; } MessageHandlerFactory handlerFactory = item.factory(); @@ -1462,8 +1460,8 @@ private void sendNopMessage(HelixDataAccessor accessor, String instanceName) { Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString()); nopMsg.setSrcName(instanceName); nopMsg.setTgtName(instanceName); - accessor.setProperty(accessor.keyBuilder().message(nopMsg.getTgtName(), nopMsg.getId()), - nopMsg); + accessor + .setProperty(accessor.keyBuilder().message(nopMsg.getTgtName(), nopMsg.getId()), nopMsg); LOG.info("Send NO_OP message to {}, msgId: {}.", nopMsg.getTgtName(), nopMsg.getId()); } catch (Exception e) { LOG.error("Failed to send NO_OP message to {}.", instanceName, e); diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java index 36bb2f1d45..646e8f3087 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java @@ -68,7 +68,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener, LiveInstanceChangeListener, CurrentStateChangeListener, CustomizedViewChangeListener, CustomizedViewRootChangeListener { +public class RoutingTableProvider + implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener, + LiveInstanceChangeListener, CurrentStateChangeListener, CustomizedViewChangeListener, + CustomizedViewRootChangeListener { private static final Logger logger = LoggerFactory.getLogger(RoutingTableProvider.class); private static final long DEFAULT_PERIODIC_REFRESH_INTERVAL = 300000L; // 5 minutes private final Map> _routingTableRefMap; @@ -87,8 +90,9 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc private ExecutorService _reportExecutor; private Future _reportingTask = null; - protected static final String DEFAULT_PROPERTY_TYPE = "HELIX_DEFAULT_PROPERTY"; - protected static final String DEFAULT_STATE_TYPE = "HELIX_DEFAULT"; + protected static final String DEFAULT_PROPERTY_TYPE = "HELIX_DEFAULT_PROPERTY"; + protected static final String DEFAULT_STATE_TYPE = "HELIX_DEFAULT"; + public RoutingTableProvider() { this(null); @@ -105,8 +109,7 @@ public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataTy DEFAULT_PERIODIC_REFRESH_INTERVAL); } - public RoutingTableProvider(HelixManager helixManager, - Map> sourceDataTypeMap) { + public RoutingTableProvider(HelixManager helixManager, Map> sourceDataTypeMap) { this(helixManager, sourceDataTypeMap, true, DEFAULT_PERIODIC_REFRESH_INTERVAL); } @@ -150,7 +153,7 @@ public RoutingTableProvider(HelixManager helixManager, if (propertyType.equals(PropertyType.CUSTOMIZEDVIEW)) { throw new HelixException("CustomizedView has been used without any aggregation type!"); } - String key = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); + String key = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); if (_routingTableRefMap.get(key) == null) { _routingTableRefMap.put(key, new AtomicReference<>(new RoutingTable(propertyType))); } @@ -161,7 +164,7 @@ public RoutingTableProvider(HelixManager helixManager, sourceDataTypeMap.get(propertyType), propertyType.name())); } for (String customizedStateType : _sourceDataTypeMap.get(propertyType)) { - String key = generateReferenceKey(propertyType.name(), customizedStateType); + String key = generateReferenceKey(propertyType.name(), customizedStateType); if (_routingTableRefMap.get(key) == null) { _routingTableRefMap.put(key, new AtomicReference<>( new CustomizedViewRoutingTable(propertyType, customizedStateType))); @@ -222,60 +225,58 @@ private void addListeners() { if (_helixManager != null) { for (PropertyType propertyType : _sourceDataTypeMap.keySet()) { switch (propertyType) { - case EXTERNALVIEW: - try { - _helixManager.addExternalViewChangeListener(this); - } catch (Exception e) { - shutdown(); - throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", - e); - } - break; - case CUSTOMIZEDVIEW: - // Add CustomizedView root change listener + case EXTERNALVIEW: + try { + _helixManager.addExternalViewChangeListener(this); + } catch (Exception e) { + shutdown(); + throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e); + } + break; + case CUSTOMIZEDVIEW: + // Add CustomizedView root change listener + try { + _helixManager.addCustomizedViewRootChangeListener(this); + } catch (Exception e) { + shutdown(); + throw new HelixException( + "Failed to attach CustomizedView Root Listener to HelixManager!", e); + } + // Add individual listeners for each customizedStateType + List customizedStateTypes = _sourceDataTypeMap.get(propertyType); + for (String customizedStateType : customizedStateTypes) { try { - _helixManager.addCustomizedViewRootChangeListener(this); + _helixManager.addCustomizedViewChangeListener(this, customizedStateType); } catch (Exception e) { shutdown(); - throw new HelixException( - "Failed to attach CustomizedView Root Listener to HelixManager!", e); - } - // Add individual listeners for each customizedStateType - List customizedStateTypes = _sourceDataTypeMap.get(propertyType); - for (String customizedStateType : customizedStateTypes) { - try { - _helixManager.addCustomizedViewChangeListener(this, customizedStateType); - } catch (Exception e) { - shutdown(); - throw new HelixException(String.format( - "Failed to attach CustomizedView Listener to HelixManager for type %s!", - customizedStateType), e); - } - } - break; - case TARGETEXTERNALVIEW: - // Check whether target external has been enabled or not - if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists( - _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), - 0)) { - shutdown(); - throw new HelixException("Target External View is not enabled!"); + throw new HelixException(String.format( + "Failed to attach CustomizedView Listener to HelixManager for type %s!", + customizedStateType), e); } + } + break; + case TARGETEXTERNALVIEW: + // Check whether target external has been enabled or not + if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists( + _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), + 0)) { + shutdown(); + throw new HelixException("Target External View is not enabled!"); + } - try { - _helixManager.addTargetExternalViewChangeListener(this); - } catch (Exception e) { - shutdown(); - throw new HelixException( - "Failed to attach TargetExternalView Listener to HelixManager!", e); - } - break; - case CURRENTSTATES: - // CurrentState change listeners will be added later in LiveInstanceChange call. - break; - default: + try { + _helixManager.addTargetExternalViewChangeListener(this); + } catch (Exception e) { + shutdown(); throw new HelixException( - String.format("Unsupported source data type: %s", propertyType)); + "Failed to attach TargetExternalView Listener to HelixManager!", e); + } + break; + case CURRENTSTATES: + // CurrentState change listeners will be added later in LiveInstanceChange call. + break; + default: + throw new HelixException(String.format("Unsupported source data type: %s", propertyType)); } } try { @@ -307,10 +308,10 @@ private void validateSourceDataTypeMap(Map> sourceDat } if (!propertyType.equals(PropertyType.CUSTOMIZEDVIEW) && sourceDataTypeMap.get(propertyType).size() != 0) { - logger.error("Type has been used in addition to the propertyType {} !", - propertyType.name()); - throw new HelixException( - String.format("Type %s has been used in addition to the propertyType %s !", + logger + .error("Type has been used in addition to the propertyType {} !", propertyType.name()); + throw new HelixException(String + .format("Type %s has been used in addition to the propertyType %s !", sourceDataTypeMap.get(propertyType), propertyType.name())); } } @@ -337,26 +338,26 @@ public void shutdown() { _helixManager.removeListener(keyBuilder.instanceConfigs(), this); for (PropertyType propertyType : _sourceDataTypeMap.keySet()) { switch (propertyType) { - case EXTERNALVIEW: - _helixManager.removeListener(keyBuilder.externalViews(), this); - break; - case CUSTOMIZEDVIEW: - List customizedStateTypes = _sourceDataTypeMap.get(propertyType); - // Remove listener on each individual customizedStateType - for (String customizedStateType : customizedStateTypes) { - _helixManager.removeListener(keyBuilder.customizedView(customizedStateType), this); - } - break; - case TARGETEXTERNALVIEW: - _helixManager.removeListener(keyBuilder.targetExternalViews(), this); - break; - case CURRENTSTATES: - NotificationContext context = new NotificationContext(_helixManager); - context.setType(NotificationContext.Type.FINALIZE); - updateCurrentStatesListeners(Collections.emptyList(), context); - break; - default: - break; + case EXTERNALVIEW: + _helixManager.removeListener(keyBuilder.externalViews(), this); + break; + case CUSTOMIZEDVIEW: + List customizedStateTypes = _sourceDataTypeMap.get(propertyType); + // Remove listener on each individual customizedStateType + for (String customizedStateType : customizedStateTypes) { + _helixManager.removeListener(keyBuilder.customizedView(customizedStateType), this); + } + break; + case TARGETEXTERNALVIEW: + _helixManager.removeListener(keyBuilder.targetExternalViews(), this); + break; + case CURRENTSTATES: + NotificationContext context = new NotificationContext(_helixManager); + context.setType(NotificationContext.Type.FINALIZE); + updateCurrentStatesListeners(Collections.emptyList(), context); + break; + default: + break; } } } @@ -404,12 +405,13 @@ public Map> getRoutingTableSnapshots() if (!snapshots.containsKey(propertyTypeName)) { snapshots.put(propertyTypeName, new HashMap<>()); } - snapshots.get(propertyTypeName) - .put(customizedStateType, new RoutingTableSnapshot(routingTable)); + snapshots.get(propertyTypeName).put(customizedStateType, + new RoutingTableSnapshot(routingTable)); } return snapshots; } + /** * Add RoutingTableChangeListener with user defined context * @param routingTableChangeListener @@ -437,7 +439,7 @@ public void addRoutingTableChangeListener( final NotificationContext periodicRefreshContext = new NotificationContext(_helixManager); periodicRefreshContext.setType(NotificationContext.Type.PERIODIC_REFRESH); _routerUpdater.queueEvent(periodicRefreshContext, ClusterEventType.PeriodicalRebalance, null); - } + } } /** @@ -474,8 +476,8 @@ public List getInstances(String resourceName, String partitionNa */ public List getInstancesForResource(String resourceName, String partitionName, String state) { - return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE).getInstancesForResource( - resourceName, partitionName, state); + return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE) + .getInstancesForResource(resourceName, partitionName, state); } /** @@ -490,8 +492,8 @@ public List getInstancesForResource(String resourceName, String */ public List getInstancesForResourceGroup(String resourceGroupName, String partitionName, String state) { - return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, - DEFAULT_STATE_TYPE).getInstancesForResourceGroup(resourceGroupName, partitionName, state); + return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE) + .getInstancesForResourceGroup(resourceGroupName, partitionName, state); } /** @@ -507,9 +509,8 @@ public List getInstancesForResourceGroup(String resourceGroupNam */ public List getInstancesForResourceGroup(String resourceGroupName, String partitionName, String state, List resourceTags) { - return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, - DEFAULT_STATE_TYPE).getInstancesForResourceGroup(resourceGroupName, partitionName, state, - resourceTags); + return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE) + .getInstancesForResourceGroup(resourceGroupName, partitionName, state, resourceTags); } /** @@ -532,8 +533,8 @@ public Set getInstances(String resourceName, String state) { * @return empty list if there is no instance in a given state */ public Set getInstancesForResource(String resourceName, String state) { - return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE).getInstancesForResource( - resourceName, state); + return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE) + .getInstancesForResource(resourceName, state); } /** @@ -543,8 +544,8 @@ public Set getInstancesForResource(String resourceName, String s * @return empty list if there is no instance in a given state */ public Set getInstancesForResourceGroup(String resourceGroupName, String state) { - return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, - DEFAULT_STATE_TYPE).getInstancesForResourceGroup(resourceGroupName, state); + return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE) + .getInstancesForResourceGroup(resourceGroupName, state); } /** @@ -556,8 +557,8 @@ public Set getInstancesForResourceGroup(String resourceGroupName */ public Set getInstancesForResourceGroup(String resourceGroupName, String state, List resourceTags) { - return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, - DEFAULT_STATE_TYPE).getInstancesForResourceGroup(resourceGroupName, state, resourceTags); + return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE) + .getInstancesForResourceGroup(resourceGroupName, state, resourceTags); } /** @@ -602,9 +603,9 @@ private RoutingTable getRoutingTableRef(String propertyTypeName, String stateTyp if (_routingTableRefMap.keySet().size() == 1) { String key = _routingTableRefMap.keySet().iterator().next(); if (!_routingTableRefMap.containsKey(key)) { - throw new HelixException(String.format( - "Currently there is no snapshot available for PropertyType %s and stateType %s", - propertyTypeName, stateType)); + throw new HelixException( + String.format("Currently there is no snapshot available for PropertyType %s and stateType %s", + propertyTypeName, stateType)); } return _routingTableRefMap.get(key).get(); } else { @@ -618,11 +619,11 @@ private RoutingTable getRoutingTableRef(String propertyTypeName, String stateTyp } } - String key = generateReferenceKey(propertyTypeName, stateType); + String key = generateReferenceKey(propertyTypeName, stateType); if (!_routingTableRefMap.containsKey(key)) { - throw new HelixException(String.format( - "Currently there is no snapshot available for PropertyType %s and stateType %s", - propertyTypeName, stateType)); + throw new HelixException( + String.format("Currently there is no snapshot available for PropertyType %s and stateType %s", + propertyTypeName, stateType)); } return _routingTableRefMap.get(key).get(); } @@ -646,8 +647,7 @@ public void onExternalViewChange(List externalViewList, if (externalViewList != null && externalViewList.size() > 0) { // keep this here for back-compatibility, application can call onExternalViewChange directly // with externalview list supplied. - String keyReference = - generateReferenceKey(PropertyType.EXTERNALVIEW.name(), DEFAULT_STATE_TYPE); + String keyReference = generateReferenceKey(PropertyType.EXTERNALVIEW.name(), DEFAULT_STATE_TYPE); HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); List configList = accessor.getChildValues(keyBuilder.instanceConfigs(), true); @@ -736,7 +736,8 @@ public void onCustomizedViewRootChange(List customizedViewTypes, shutdown(); throw new HelixException( String.format("Failed to attach CustomizedView Listener to HelixManager for type %s!", - customizedStateType), e); + customizedStateType), + e); } } } @@ -803,7 +804,7 @@ private void updateCurrentStatesListeners(List liveInstances, private void reset() { logger.info("Resetting the routing table."); RoutingTable newRoutingTable; - for (String key : _routingTableRefMap.keySet()) { + for (String key: _routingTableRefMap.keySet()) { PropertyType propertyType = _routingTableRefMap.get(key).get().getPropertyType(); if (propertyType == PropertyType.CUSTOMIZEDVIEW) { String stateType = _routingTableRefMap.get(key).get().getStateType(); @@ -831,14 +832,12 @@ protected void refreshCustomizedView(Collection customizedViews, long startTime = System.currentTimeMillis(); PropertyType propertyType = _routingTableRefMap.get(referenceKey).get().getPropertyType(); String customizedStateType = _routingTableRefMap.get(referenceKey).get().getStateType(); - RoutingTable newRoutingTable = - new CustomizedViewRoutingTable(customizedViews, instanceConfigs, liveInstances, - propertyType, customizedStateType); + RoutingTable newRoutingTable = new CustomizedViewRoutingTable(customizedViews, instanceConfigs, + liveInstances, propertyType, customizedStateType); resetRoutingTableAndNotify(startTime, newRoutingTable, referenceKey); } - protected void refreshCurrentState( - Map>> currentStateMap, + protected void refreshCurrentState(Map>> currentStateMap, Collection instanceConfigs, Collection liveInstances, String referenceKey) { long startTime = System.currentTimeMillis(); @@ -847,8 +846,7 @@ protected void refreshCurrentState( resetRoutingTableAndNotify(startTime, newRoutingTable, referenceKey); } - private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable, - String referenceKey) { + private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable, String referenceKey) { _routingTableRefMap.get(referenceKey).set(newRoutingTable); String clusterName = _helixManager != null ? _helixManager.getClusterName() : null; logger.info("Refreshed the RoutingTable for cluster {}, took {} ms.", clusterName, @@ -869,7 +867,8 @@ private void notifyRoutingTableChange(String clusterName, String referenceKey) { // record time spent // here. Potentially, we should call this callback in a separate thread if this is a bottleneck. long startTime = System.currentTimeMillis(); - for (Map.Entry entry : _routingTableChangeListenerMap.entrySet()) { + for (Map.Entry entry : _routingTableChangeListenerMap + .entrySet()) { entry.getKey().onRoutingTableChange( new RoutingTableSnapshot(_routingTableRefMap.get(referenceKey).get()), entry.getValue().getContext()); @@ -913,8 +912,7 @@ protected void handleEvent(ClusterEvent event) { throw new HelixException("HelixManager is null for router update event."); } if (!manager.isConnected()) { - logger.error( - String.format("HelixManager is not connected for router update event: %s", event)); + logger.error(String.format("HelixManager is not connected for router update event: %s", event)); throw new HelixException("HelixManager is not connected for router update event."); } @@ -923,43 +921,40 @@ protected void handleEvent(ClusterEvent event) { _dataCache.refresh(manager.getHelixDataAccessor()); for (PropertyType propertyType : _sourceDataTypeMap.keySet()) { switch (propertyType) { - case EXTERNALVIEW: { - String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); - refreshExternalView(_dataCache.getExternalViews().values(), - _dataCache.getRoutableInstanceConfigMap().values(), - _dataCache.getRoutableLiveInstances().values(), keyReference); - } - break; - case TARGETEXTERNALVIEW: { - String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); - refreshExternalView(_dataCache.getTargetExternalViews().values(), - _dataCache.getRoutableInstanceConfigMap().values(), - _dataCache.getRoutableLiveInstances().values(), keyReference); - } + case EXTERNALVIEW: { + String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); + refreshExternalView(_dataCache.getExternalViews().values(), + _dataCache.getRoutableInstanceConfigMap().values(), + _dataCache.getRoutableLiveInstances().values(), + keyReference); + } break; + case TARGETEXTERNALVIEW: { + String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); + refreshExternalView(_dataCache.getTargetExternalViews().values(), + _dataCache.getRoutableInstanceConfigMap().values(), + _dataCache.getRoutableLiveInstances().values(), + keyReference); + } + break; case CUSTOMIZEDVIEW: - for (String customizedStateType : _sourceDataTypeMap.getOrDefault( - PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())) { - String keyReference = - generateReferenceKey(propertyType.name(), customizedStateType); + for (String customizedStateType : _sourceDataTypeMap.getOrDefault(PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())) { + String keyReference = generateReferenceKey(propertyType.name(), customizedStateType); refreshCustomizedView(_dataCache.getCustomizedView(customizedStateType).values(), _dataCache.getRoutableInstanceConfigMap().values(), _dataCache.getRoutableLiveInstances().values(), keyReference); } break; case CURRENTSTATES: { - String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); - ; + String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);; refreshCurrentState(_dataCache.getCurrentStatesMap(), _dataCache.getRoutableInstanceConfigMap().values(), _dataCache.getRoutableLiveInstances().values(), keyReference); - recordPropagationLatency(System.currentTimeMillis(), - _dataCache.getCurrentStateSnapshot()); + recordPropagationLatency(System.currentTimeMillis(), _dataCache.getCurrentStateSnapshot()); } - break; + break; default: - logger.warn("Unsupported source data type: {}, stop refreshing the routing table!", - propertyType); + logger.warn("Unsupported source data type: {}, stop refreshing the routing table!", propertyType); } _monitorMap.get(propertyType).increaseDataRefreshCounters(startTime); @@ -990,8 +985,7 @@ public Object call() { long endTime = partitionStateEndTimes.get(partition); if (currentTime >= endTime) { for (PropertyType propertyType : _sourceDataTypeMap.keySet()) { - _monitorMap.get(propertyType) - .recordStatePropagationLatency(currentTime - endTime); + _monitorMap.get(propertyType).recordStatePropagationLatency(currentTime - endTime); logger.debug( "CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}", key.toString(), partition, endTime, currentTime - endTime); @@ -1011,6 +1005,7 @@ public Object call() { } } + public void queueEvent(NotificationContext context, ClusterEventType eventType, HelixConstants.ChangeType changeType) { ClusterEvent event = new ClusterEvent(_clusterName, eventType); diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java index 9f332d899b..a8df83c064 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java @@ -70,7 +70,8 @@ public void shutdown() { reset(); } - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { return _taskExecutor.awaitTermination(timeout, unit); } @@ -83,9 +84,8 @@ public void onBecomeRunningFromInit(Message msg, NotificationContext context) { public String onBecomeStoppedFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException( - String.format("Invalid state transition. There is no running task for partition %s.", - taskPartition)); + throw new IllegalStateException(String.format( + "Invalid state transition. There is no running task for partition %s.", taskPartition)); } _taskRunner.cancel(); @@ -101,9 +101,8 @@ public String onBecomeStoppedFromRunning(Message msg, NotificationContext contex public String onBecomeCompletedFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException( - String.format("Invalid state transition. There is no running task for partition %s.", - taskPartition)); + throw new IllegalStateException(String.format( + "Invalid state transition. There is no running task for partition %s.", taskPartition)); } TaskResult r = _taskRunner.waitTillDone(); @@ -122,9 +121,8 @@ public String onBecomeCompletedFromRunning(Message msg, NotificationContext cont public String onBecomeTimedOutFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException( - String.format("Invalid state transition. There is no running task for partition %s.", - taskPartition)); + throw new IllegalStateException(String.format( + "Invalid state transition. There is no running task for partition %s.", taskPartition)); } TaskResult r = _taskRunner.waitTillDone(); @@ -143,9 +141,8 @@ public String onBecomeTimedOutFromRunning(Message msg, NotificationContext conte public String onBecomeTaskErrorFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException( - String.format("Invalid state transition. There is no running task for partition %s.", - taskPartition)); + throw new IllegalStateException(String.format( + "Invalid state transition. There is no running task for partition %s.", taskPartition)); } TaskResult r = _taskRunner.waitTillDone(); @@ -164,15 +161,13 @@ public String onBecomeTaskErrorFromRunning(Message msg, NotificationContext cont public String onBecomeTaskAbortedFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException( - String.format("Invalid state transition. There is no running task for partition %s.", - taskPartition)); + throw new IllegalStateException(String.format( + "Invalid state transition. There is no running task for partition %s.", taskPartition)); } _taskRunner.cancel(); TaskResult r = _taskRunner.waitTillDone(); - if (r.getStatus() != TaskResult.Status.FATAL_FAILED - && r.getStatus() != TaskResult.Status.CANCELED) { + if (r.getStatus() != TaskResult.Status.FATAL_FAILED && r.getStatus() != TaskResult.Status.CANCELED) { throw new IllegalStateException(String.format( "Partition %s received a state transition to %s but the result status code is %s.", taskPartition, msg.getToState(), r.getStatus())); @@ -242,9 +237,8 @@ public void onBecomeDroppedFromTaskAborted(Message msg, NotificationContext cont public void onBecomeInitFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException( - String.format("Invalid state transition. There is no running task for partition %s.", - taskPartition)); + throw new IllegalStateException(String.format( + "Invalid state transition. There is no running task for partition %s.", taskPartition)); } _taskRunner.cancel(); @@ -320,18 +314,16 @@ private void startTask(Message msg, String taskPartition) { callbackContext.setTaskConfig(taskConfig); // Create a task instance with this command - if (command == null || _taskFactoryRegistry == null || !_taskFactoryRegistry.containsKey( - command)) { - throw new IllegalStateException( - String.format("Invalid state transition. There is no running task for partition %s.", - taskPartition)); + if (command == null || _taskFactoryRegistry == null + || !_taskFactoryRegistry.containsKey(command)) { + throw new IllegalStateException(String.format( + "Invalid state transition. There is no running task for partition %s.", taskPartition)); } TaskFactory taskFactory = _taskFactoryRegistry.get(command); Task task = taskFactory.createNewTask(callbackContext); if (task instanceof UserContentStore) { - ((UserContentStore) task).init(_manager, cfg.getWorkflow(), msg.getResourceName(), - taskPartition); + ((UserContentStore) task).init(_manager, cfg.getWorkflow(), msg.getResourceName(), taskPartition); } // Submit the task for execution diff --git a/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java b/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java index e246e05fa2..1877a0c6a4 100644 --- a/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java @@ -40,8 +40,7 @@ public static Callable wrap(Callable callable) { try { return callable.call(); } catch (Throwable t) { - LOG.error("Callable run on thread {} raised an exception and exited", - Thread.currentThread().getName(), t); + LOG.error("Callable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t); throw t; } }; @@ -59,8 +58,7 @@ public static Runnable wrap(Runnable runnable) { try { runnable.run(); } catch (Throwable t) { - LOG.error("Runnable run on thread {} raised an exception and exited", - Thread.currentThread().getName(), t); + LOG.error("Runnable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t); throw t; } };