diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 7f407c3..fba262e 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -343,7 +343,7 @@ public void start(Promise promise) { redisProvider = new DefaultRedisProvider(vertx, configurationProvider); } - this.upperBoundParallel = new UpperBoundParallel(vertx); + this.upperBoundParallel = new UpperBoundParallel(vertx, exceptionFactory); redisProvider.redis().onComplete(event -> { if(event.succeeded()) { diff --git a/src/main/java/org/swisspush/redisques/exception/RedisQuesExceptionFactory.java b/src/main/java/org/swisspush/redisques/exception/RedisQuesExceptionFactory.java index afa36a9..ea6d45f 100644 --- a/src/main/java/org/swisspush/redisques/exception/RedisQuesExceptionFactory.java +++ b/src/main/java/org/swisspush/redisques/exception/RedisQuesExceptionFactory.java @@ -47,4 +47,6 @@ public static RedisQuesExceptionFactory newWastefulExceptionFactory() { return new WastefulRedisQuesExceptionFactory(); } + ResourceExhaustionException newResourceExhaustionException(String msg, Throwable cause); + } diff --git a/src/main/java/org/swisspush/redisques/exception/ResourceExhaustionException.java b/src/main/java/org/swisspush/redisques/exception/ResourceExhaustionException.java new file mode 100644 index 0000000..758b738 --- /dev/null +++ b/src/main/java/org/swisspush/redisques/exception/ResourceExhaustionException.java @@ -0,0 +1,16 @@ +package org.swisspush.redisques.exception; + +/** + *

Thrown when something cannot be done right now because some + * resource is exhausted.

+ * + *

For example not enough memory, connection pools or waiting queues + * are full, etc.

+ */ +public class ResourceExhaustionException extends Exception { + + public ResourceExhaustionException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/org/swisspush/redisques/exception/ThriftyRedisQuesExceptionFactory.java b/src/main/java/org/swisspush/redisques/exception/ThriftyRedisQuesExceptionFactory.java index e93fdbd..73ced1f 100644 --- a/src/main/java/org/swisspush/redisques/exception/ThriftyRedisQuesExceptionFactory.java +++ b/src/main/java/org/swisspush/redisques/exception/ThriftyRedisQuesExceptionFactory.java @@ -37,4 +37,12 @@ public ReplyException newReplyException(ReplyFailure failureType, int failureCod return new NoStackReplyException(failureType, failureCode, msg); } + @Override + public ResourceExhaustionException newResourceExhaustionException(String msg, Throwable cause) { + if (cause instanceof ResourceExhaustionException) return (ResourceExhaustionException) cause; + return new ResourceExhaustionException(msg, cause) { + @Override public Throwable fillInStackTrace() { return this; } + }; + } + } diff --git a/src/main/java/org/swisspush/redisques/exception/WastefulRedisQuesExceptionFactory.java b/src/main/java/org/swisspush/redisques/exception/WastefulRedisQuesExceptionFactory.java index 8890fd4..cb7329f 100644 --- a/src/main/java/org/swisspush/redisques/exception/WastefulRedisQuesExceptionFactory.java +++ b/src/main/java/org/swisspush/redisques/exception/WastefulRedisQuesExceptionFactory.java @@ -30,4 +30,9 @@ public ReplyException newReplyException(ReplyFailure failureType, int failureCod return new ReplyException(failureType, failureCode, msg); } + @Override + public ResourceExhaustionException newResourceExhaustionException(String msg, Throwable cause) { + return new ResourceExhaustionException(msg, cause); + } + } diff --git a/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java b/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java index e595e14..90788aa 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java @@ -62,7 +62,7 @@ public GetQueuesItemsCountHandler( this.filterPattern = filterPattern; this.queuesPrefix = queuesPrefix; this.redisProvider = redisProvider; - this.upperBoundParallel = new UpperBoundParallel(vertx); + this.upperBoundParallel = new UpperBoundParallel(vertx, exceptionFactory); this.exceptionFactory = exceptionFactory; this.redisRequestQuota = redisRequestQuota; } diff --git a/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java index 1a933b2..eb19da8 100644 --- a/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java +++ b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java @@ -2,8 +2,10 @@ import io.vertx.core.Vertx; import org.slf4j.Logger; +import org.swisspush.redisques.exception.RedisQuesExceptionFactory; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; @@ -44,12 +46,13 @@ public class UpperBoundParallel { private static final Logger log = getLogger(UpperBoundParallel.class); - private static final long RETRY_DELAY_IF_LIMIT_REACHED_MS = 8; private final Vertx vertx; + private final RedisQuesExceptionFactory exceptionFactory; - public UpperBoundParallel(Vertx vertx) { + public UpperBoundParallel(Vertx vertx, RedisQuesExceptionFactory exceptionFactory) { assert vertx != null; this.vertx = vertx; + this.exceptionFactory = exceptionFactory; } public void request(Semaphore limit, Ctx ctx, Mentor mentor) { @@ -73,7 +76,7 @@ private void resume(Request req) { // Enqueue as much we can. while (true) { if (req.isFatalError) { - log.debug("return from 'resume()' because isFatalError"); + log.trace("return from 'resume()' because isFatalError"); return; } if (!req.hasMore) { @@ -81,7 +84,7 @@ private void resume(Request req) { req.isDoneCalled = true; // give up lock because we don't know how much time mentor will use. req.lock.unlock(); - log.debug("call 'mentor.onDone()'"); + log.trace("call 'mentor.onDone()'"); try { req.mentor.onDone(req.ctx); } finally { @@ -92,10 +95,15 @@ private void resume(Request req) { } return; } - if (!req.limit.tryAcquire()) { + if (req.numTokensAvailForOurself > 0) { + // We still have a token reserved for ourself. Use those first before acquiring + // new ones. Explanation see comment in 'onOneDone()'. + req.numTokensAvailForOurself -= 1; + }else if (!req.limit.tryAcquire()) { log.debug("redis request limit reached. Need to pause now."); break; // Go to end of loop to schedule a run later. } + req.hasStarted = true; req.numInProgress += 1; boolean hasMore = true; try { @@ -104,7 +112,14 @@ private void resume(Request req) { // waiting for our lock). req.lock.unlock(); log.trace("mentor.runOneMore() numInProgress={}", req.numInProgress); - hasMore = req.mentor.runOneMore(req::onOneDone_, req.ctx); + hasMore = req.mentor.runOneMore(new BiConsumer<>() { + // this boolean is just for paranoia, in case mentor tries to call back too often. + final AtomicBoolean isCalled = new AtomicBoolean(); + @Override public void accept(Throwable ex, Void ret) { + if (!isCalled.compareAndSet(false, true)) return; + onOneDone(req, ex); + } + }, req.ctx); } catch (RuntimeException ex) { onOneDone(req, ex); } finally { @@ -116,8 +131,19 @@ private void resume(Request req) { } assert req.numInProgress >= 0 : req.numInProgress; if (req.numInProgress == 0) { - // Looks as we could not even fire a single event. Need to try later. - vertx.setTimer(RETRY_DELAY_IF_LIMIT_REACHED_MS, nonsense -> resume(req)); + if (!req.hasStarted) { + // We couldn't even trigger one single task. No resources available to + // handle any more requests. This caller has to try later. + req.isFatalError = true; + Exception ex = exceptionFactory.newResourceExhaustionException( + "No more resources to handle yet another request now.", null); + req.mentor.onError(ex, req.ctx); + return; + }else{ + log.error("If you see this log, some unreachable code got reached. numInProgress={}, hasStarted={}", + req.numInProgress, req.hasStarted); + vertx.setTimer(4000, nonsense -> resume(req)); + } } } finally { req.worker = null; @@ -128,8 +154,21 @@ private void resume(Request req) { private void onOneDone(Request req, Throwable ex) { req.lock.lock(); try { - req.limit.release(); + // Do NOT release that token yet. instead mark the token as "ready-to-be-used". + // To signalize 'resume()' that we do not need to 'acquire' another token from + // 'limit' and we instead can re-use that one we acquired earlier. + // Reasoning: + // Originally we did just 'release' the token back to the pool and then acquired + // another one later in 'resume()'. But this is problematic, as in this case we + // give yet more incoming requests a chance to also start their processing. + // Which in the end runs us into resource exhaustion. Because we will start more + // and more requests and have no tokens free to complete the already running + // requests. So by keeping that token we already got reserved to ourself, we + // can apply backpressure to new incoming requests. This allows us to complete + // the already running requests. req.numInProgress -= 1; + req.numTokensAvailForOurself += 1; + // ^^-- Token transfer only consists of those two statements. log.trace("onOneDone({}) {} remaining", ex != null ? "ex" : "null", req.numInProgress); assert req.numInProgress >= 0 : req.numInProgress + " >= 0 (BTW: mentor MUST call 'onDone' EXACTLY once)"; boolean isFatalError = true; @@ -140,10 +179,15 @@ private void onOneDone(Request req, Throwable ex) { if (log.isDebugEnabled()) { log.debug("mentor.onError({}: {})", ex.getClass().getName(), ex.getMessage()); } - isFatalError = req.mentor.onError(ex, req.ctx); + isFatalError = !req.mentor.onError(ex, req.ctx); } finally { req.lock.lock(); // Need our lock back. req.isFatalError = isFatalError; + // Need to release our token now. As we won't do it later anymore. + if (isFatalError) { + req.numTokensAvailForOurself -= 1; + req.limit.release(); + } } } finally { req.lock.unlock(); @@ -151,14 +195,16 @@ private void onOneDone(Request req, Throwable ex) { } } - private final class Request { + private static final class Request { private final Ctx ctx; private final Mentor mentor; private final Lock lock = new ReentrantLock(); private final Semaphore limit; private Thread worker = null; private int numInProgress = 0; + private int numTokensAvailForOurself = 0; private boolean hasMore = true; + private boolean hasStarted = false; // true, as soon we could start at least once. private boolean isFatalError = false; private boolean isDoneCalled = false; @@ -167,10 +213,6 @@ private Request(Ctx ctx, Mentor mentor, Semaphore limit) { this.mentor = mentor; this.limit = limit; } - - public void onOneDone_(Throwable ex, Void result) { - onOneDone(this, ex); - } } diff --git a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java index f41430d..1deebd8 100644 --- a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java +++ b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java @@ -87,7 +87,7 @@ public QueueStatisticsCollector( this.vertx = vertx; this.exceptionFactory = exceptionFactory; this.redisRequestQuota = redisRequestQuota; - this.upperBoundParallel = new UpperBoundParallel(vertx); + this.upperBoundParallel = new UpperBoundParallel(vertx, exceptionFactory); speedStatisticsScheduler(speedIntervalSec); }