You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
publicinterfaceProcessingTimeService {
/** Returns the current processing time. */// 获取当前Processing timelonggetCurrentProcessingTime();
/** * Registers a task to be executed when (processing) time is {@code timestamp}. * * @param timestamp Time when the task is to be executed (in processing time) * @param target The task to be executed * @return The future that represents the scheduled task. This always returns some future, even * if the timer was shut down */// 注册一个异步timerScheduledFuture<?> registerTimer(longtimestamp, ProcessingTimeCallbacktarget);
/** * 用于执行timer触发的动作 * A callback that can be registered via {@link #registerTimer(long, ProcessingTimeCallback)}. */@PublicEvolvinginterfaceProcessingTimeCallback {
/** * This method is invoked with the time which the callback register for. * * @param time The time this callback was registered for. */voidonProcessingTime(longtime) throwsIOException, InterruptedException, Exception;
}
}
publicinterfaceProcessingTimeServiceextendsorg.apache.flink.api.common.operators.ProcessingTimeService {
/** * Registers a task to be executed repeatedly at a fixed rate. * 注册一个按固定速率执行的timer * <p>This call behaves similar to {@link ScheduledExecutor#scheduleAtFixedRate(Runnable, long, * long, TimeUnit)}. * * @param callback to be executed after the initial delay and then after each period * @param initialDelay initial delay to start executing callback * @param period after the initial delay after which the callback is executed * @return Scheduled future representing the task to be executed repeatedly */ScheduledFuture<?> scheduleAtFixedRate(
ProcessingTimeCallbackcallback, longinitialDelay, longperiod);
/** * Registers a task to be executed repeatedly with a fixed delay. * 注册一个以固定延迟重复执行的timer * <p>This call behaves similar to {@link ScheduledExecutor#scheduleWithFixedDelay(Runnable, * long, long, TimeUnit)}. * * @param callback to be executed after the initial delay and then after each period * @param initialDelay initial delay to start executing callback * @param period after the initial delay after which the callback is executed * @return Scheduled future representing the task to be executed repeatedly */ScheduledFuture<?> scheduleWithFixedDelay(
ProcessingTimeCallbackcallback, longinitialDelay, longperiod);
/** * * This method puts the service into a state where it does not register new timers, but returns * for each call to {@link #registerTimer} or {@link #scheduleAtFixedRate} a "mock" future and * the "mock" future will be never completed. Furthermore, the timers registered before are * prevented from firing, but the timers in running are allowed to finish. * * <p>If no timer is running, the quiesce-completed future is immediately completed and * returned. Otherwise, the future returned will be completed when all running timers have * finished. */CompletableFuture<Void> quiesce();
}
publicclassSystemProcessingTimeServiceimplementsTimerService {
privatestaticfinalLoggerLOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);
/** * 定义TimeService状态 */privatestaticfinalintSTATUS_ALIVE = 0;
privatestaticfinalintSTATUS_QUIESCED = 1;
privatestaticfinalintSTATUS_SHUTDOWN = 2;
// ------------------------------------------------------------------------/** The executor service that schedules and calls the triggers of this task. */// 调度线程池,线程数为1privatefinalScheduledThreadPoolExecutortimerService;
// 异常处理器privatefinalExceptionHandlerexceptionHandler;
// timeService状态privatefinalAtomicIntegerstatus;
// 未完成的定时器privatefinalCompletableFuture<Void> quiesceCompletedFuture;
@VisibleForTestingSystemProcessingTimeService(ExceptionHandlerexceptionHandler) {
this(exceptionHandler, null);
}
SystemProcessingTimeService(ExceptionHandlerexceptionHandler, ThreadFactorythreadFactory) {
this.exceptionHandler = checkNotNull(exceptionHandler);
// 默认状态 alivethis.status = newAtomicInteger(STATUS_ALIVE);
this.quiesceCompletedFuture = newCompletableFuture<>();
if (threadFactory == null) {
this.timerService = newScheduledTaskExecutor(1);
} else {
this.timerService = newScheduledTaskExecutor(1, threadFactory);
}
// tasks should be removed if the future is canceled,如果futrue被取消,task应该被移除this.timerService.setRemoveOnCancelPolicy(true);
// make sure shutdown removes all pending tasks 确保shutdown删除所有未完成的任务// timeService关闭后,任务被终止和移除,相当于shutdownNowthis.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
// 设置为true,标示关闭后执行,false标示不执行this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
}
@OverridepubliclonggetCurrentProcessingTime() {
// 系统时间returnSystem.currentTimeMillis();
}
/** * Registers a task to be executed no sooner than time {@code timestamp}, but without strong * guarantees of order. * * @param timestamp Time when the task is to be enabled (in processing time) * @param callback The task to be executed * @return The future that represents the scheduled task. This always returns some future, * even if the timer was shut down */@OverridepublicScheduledFuture<?> registerTimer(longtimestamp, ProcessingTimeCallbackcallback) {
// 计算timestamp和getCurrentProcessingTime的延迟,在timestamp和当前时间的差值上再延迟1ms,为了watermark的判断,防止出现边界清空,小于watermakr的数据都会被丢弃longdelay = ProcessingTimeServiceUtil.getProcessingTimeDelay(timestamp, getCurrentProcessingTime());
// we directly try to register the timer and only react to the status on exception// that way we save unnecessary volatile accesses for each timertry {
// 在delay ms后执行wrapOnTimerCallbackreturntimerService.schedule(wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionExceptione) {
finalintstatus = this.status.get();
// 停止状态,没有timerif (status == STATUS_QUIESCED) {
returnnewNeverCompleteFuture(delay);
}
elseif (status == STATUS_SHUTDOWN) {
thrownewIllegalStateException("Timer service is shut down");
}
else {
// something else happened, so propagate the exceptionthrowe;
}
}
}
@OverridepublicScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallbackcallback, longinitialDelay, longperiod) {
returnscheduleRepeatedly(callback, initialDelay, period, false);
}
@OverridepublicScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallbackcallback, longinitialDelay, longperiod) {
returnscheduleRepeatedly(callback, initialDelay, period, true);
}
privateScheduledFuture<?> scheduleRepeatedly(ProcessingTimeCallbackcallback, longinitialDelay, longperiod, booleanfixedDelay) {
// 下次执行的时间finallongnextTimestamp = getCurrentProcessingTime() + initialDelay;
// 获取调度任务finalRunnabletask = wrapOnTimerCallback(callback, nextTimestamp, period);
// we directly try to register the timer and only react to the status on exception// that way we save unnecessary volatile accesses for each timertry {
returnfixedDelay
? timerService.scheduleWithFixedDelay(task, initialDelay, period, TimeUnit.MILLISECONDS)
: timerService.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionExceptione) {
finalintstatus = this.status.get();
if (status == STATUS_QUIESCED) {
returnnewNeverCompleteFuture(initialDelay);
}
elseif (status == STATUS_SHUTDOWN) {
thrownewIllegalStateException("Timer service is shut down");
}
else {
// something else happened, so propagate the exceptionthrowe;
}
}
}
/** * @return {@code true} is the status of the service * is {@link #STATUS_ALIVE}, {@code false} otherwise. */@VisibleForTestingbooleanisAlive() {
returnstatus.get() == STATUS_ALIVE;
}
@OverridepublicbooleanisTerminated() {
returnstatus.get() == STATUS_SHUTDOWN;
}
@OverridepublicCompletableFuture<Void> quiesce() {
if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) {
timerService.shutdown();
}
returnquiesceCompletedFuture;
}
@OverridepublicvoidshutdownService() {
if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) ||
status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN)) {
timerService.shutdownNow();
}
}
/** * Shuts down and clean up the timer service provider hard and immediately. This does wait * for all timers to complete or until the time limit is exceeded. Any call to * {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method. * @param time time to wait for termination. * @param timeUnit time unit of parameter time. * @return {@code true} if this timer service and all pending timers are terminated and * {@code false} if the timeout elapsed before this happened. */@VisibleForTestingbooleanshutdownAndAwaitPending(longtime, TimeUnittimeUnit) throwsInterruptedException {
shutdownService();
returntimerService.awaitTermination(time, timeUnit);
}
@OverridepublicbooleanshutdownServiceUninterruptible(longtimeoutMs) {
finalDeadlinedeadline = Deadline.fromNow(Duration.ofMillis(timeoutMs));
booleanshutdownComplete = false;
booleanreceivedInterrupt = false;
do {
try {
// wait for a reasonable time for all pending timer threads to finishshutdownComplete = shutdownAndAwaitPending(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedExceptioniex) {
// 强制终端receivedInterrupt = true;
LOG.trace("Intercepted attempt to interrupt timer service shutdown.", iex);
}
} while (deadline.hasTimeLeft() && !shutdownComplete);
if (receivedInterrupt) {
Thread.currentThread().interrupt();
}
returnshutdownComplete;
}
// safety net to destroy the thread pool// 垃圾回收的时候触发,强制关闭timerService@Overrideprotectedvoidfinalize() throwsThrowable {
super.finalize();
timerService.shutdownNow();
}
@VisibleForTestingintgetNumTasksScheduled() {
// 获取调度任务个数BlockingQueue<?> queue = timerService.getQueue();
if (queue == null) {
return0;
} else {
returnqueue.size();
}
}
// ------------------------------------------------------------------------privateclassScheduledTaskExecutorextendsScheduledThreadPoolExecutor {
publicScheduledTaskExecutor(intcorePoolSize) {
super(corePoolSize);
}
publicScheduledTaskExecutor(intcorePoolSize, ThreadFactorythreadFactory) {
super(corePoolSize, threadFactory);
}
@Overrideprotectedvoidterminated() {
super.terminated();
quiesceCompletedFuture.complete(null);
}
}
/** * An exception handler, called when {@link ProcessingTimeCallback} throws an exception. */interfaceExceptionHandler {
voidhandleException(Exceptionex);
}
/** * 将ProcessingTimeCallback包装成Runnable * @param callback * @param timestamp * @return */privateRunnablewrapOnTimerCallback(ProcessingTimeCallbackcallback, longtimestamp) {
returnnewScheduledTask(status, exceptionHandler, callback, timestamp, 0);
}
privateRunnablewrapOnTimerCallback(ProcessingTimeCallbackcallback, longnextTimestamp, longperiod) {
returnnewScheduledTask(status, exceptionHandler, callback, nextTimestamp, period);
}
/** * Timer调度Task */privatestaticfinalclassScheduledTaskimplementsRunnable {
// 服务状态privatefinalAtomicIntegerserviceStatus;
// 异常处理器privatefinalExceptionHandlerexceptionHandler;
// Processing回调函数privatefinalProcessingTimeCallbackcallback;
// 下次触发的时间privatelongnextTimestamp;
// 间隔的周期privatefinallongperiod;
ScheduledTask(
AtomicIntegerserviceStatus,
ExceptionHandlerexceptionHandler,
ProcessingTimeCallbackcallback,
longtimestamp,
longperiod) {
this.serviceStatus = serviceStatus;
this.exceptionHandler = exceptionHandler;
this.callback = callback;
this.nextTimestamp = timestamp;
this.period = period;
}
@Overridepublicvoidrun() {
// 判断服务状态if (serviceStatus.get() != STATUS_ALIVE) {
return;
}
try {
// 触发onProcessingTimecallback.onProcessingTime(nextTimestamp);
} catch (Exceptionex) {
exceptionHandler.handleException(ex);
}
// 周期调用,每隔period执行一次nextTimestamp += period;
}
}
}
NeverFireProcessingTimeService
处理时间服务,其计时器永不触发,因此所有计时器都包含在savepoint中,可以获取当前系统的processing time
TimerService
timer服务包含获取processing时间、watermakr、注册timer等。
publicinterfaceTimerService {
// 设置timer只支持keyed stream /** Error string for {@link UnsupportedOperationException} on registering timers. */StringUNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
// 删除timer只支持keyed stream/** Error string for {@link UnsupportedOperationException} on deleting timers. */StringUNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
/** Returns the current processing time. */longcurrentProcessingTime();
/** Returns the current event-time watermark. */longcurrentWatermark();
/** * Registers a timer to be fired when processing time passes the given time. * * <p>Timers can internally be scoped to keys and/or windows. When you set a timer * in a keyed context, such as in an operation on * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context * will also be active when you receive the timer notification. */voidregisterProcessingTimeTimer(longtime);
/** * Registers a timer to be fired when the event time watermark passes the given time. * * <p>Timers can internally be scoped to keys and/or windows. When you set a timer * in a keyed context, such as in an operation on * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context * will also be active when you receive the timer notification. */voidregisterEventTimeTimer(longtime);
/** * Deletes the processing-time timer with the given trigger time. This method has only an effect if such a timer * was previously registered and did not already expire. * * <p>Timers can internally be scoped to keys and/or windows. When you delete a timer, * it is removed from the current keyed context. */voiddeleteProcessingTimeTimer(longtime);
/** * Deletes the event-time timer with the given trigger time. This method has only an effect if such a timer * was previously registered and did not already expire. * * <p>Timers can internally be scoped to keys and/or windows. When you delete a timer, * it is removed from the current keyed context. */voiddeleteEventTimeTimer(longtime);
}