diff --git a/src/main/java/org/swisspush/redisques/scheduling/PeriodicSkipScheduler.java b/src/main/java/org/swisspush/redisques/scheduling/PeriodicSkipScheduler.java index 3bcc27b..5b488a2 100644 --- a/src/main/java/org/swisspush/redisques/scheduling/PeriodicSkipScheduler.java +++ b/src/main/java/org/swisspush/redisques/scheduling/PeriodicSkipScheduler.java @@ -1,6 +1,7 @@ package org.swisspush.redisques.scheduling; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import org.slf4j.Logger; @@ -48,16 +49,15 @@ private void onTrigger(Timer timer) { return; } timer.begEpochMs = currentTimeMillis(); - boolean oldValue = timer.isPending.getAndSet(true); - assert !oldValue : "Why is this already pending?"; timer.task.accept(timer::onTaskDone_); + Promise p = Promise.promise(); + var fut = p.future(); + fut.onSuccess((Void v) -> timer.onTaskDone_()); + fut.onFailure(ex -> log.error("This is expected to be UNREACHABLE", ex)); + timer.task.accept(p::complete); } private void onTaskDone(Timer timer) { - boolean oldVal = timer.isPending.getAndSet(false); - if (!oldVal) { - throw new IllegalStateException("MUST NOT be called multiple times!"); - } timer.endEpochMs = currentTimeMillis(); } @@ -71,7 +71,6 @@ public class Timer { private long id; // When the last run has begun and end. private long begEpochMs, endEpochMs; - private final AtomicBoolean isPending = new AtomicBoolean(); private Timer(Consumer task) { this.task = task;