Skip to content

Commit

Permalink
[SDCISA-16293] Apply only fix in UpperBoundParallel with minimal deps.
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed Jun 27, 2024
1 parent fe125e2 commit db83bf1
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public void start(Promise<Void> promise) {
redisProvider = new DefaultRedisProvider(vertx, configurationProvider);
}

this.upperBoundParallel = new UpperBoundParallel(vertx);
this.upperBoundParallel = new UpperBoundParallel(vertx, exceptionFactory);

redisProvider.redis().onComplete(event -> {
if(event.succeeded()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,6 @@ public static RedisQuesExceptionFactory newWastefulExceptionFactory() {
return new WastefulRedisQuesExceptionFactory();
}

ResourceExhaustionException newResourceExhaustionException(String msg, Throwable cause);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.swisspush.redisques.exception;

/**
* <p>Thrown when something cannot be done right now because some
* resource is exhausted.</p>
*
* <p>For example not enough memory, connection pools or waiting queues
* are full, etc.</p>
*/
public class ResourceExhaustionException extends Exception {

public ResourceExhaustionException(String message, Throwable cause) {
super(message, cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,12 @@ public ReplyException newReplyException(ReplyFailure failureType, int failureCod
return new NoStackReplyException(failureType, failureCode, msg);
}

@Override
public ResourceExhaustionException newResourceExhaustionException(String msg, Throwable cause) {
if (cause instanceof ResourceExhaustionException) return (ResourceExhaustionException) cause;
return new ResourceExhaustionException(msg, cause) {
@Override public Throwable fillInStackTrace() { return this; }
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ public ReplyException newReplyException(ReplyFailure failureType, int failureCod
return new ReplyException(failureType, failureCode, msg);
}

@Override
public ResourceExhaustionException newResourceExhaustionException(String msg, Throwable cause) {
return new ResourceExhaustionException(msg, cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public GetQueuesItemsCountHandler(
this.filterPattern = filterPattern;
this.queuesPrefix = queuesPrefix;
this.redisProvider = redisProvider;
this.upperBoundParallel = new UpperBoundParallel(vertx);
this.upperBoundParallel = new UpperBoundParallel(vertx, exceptionFactory);
this.exceptionFactory = exceptionFactory;
this.redisRequestQuota = redisRequestQuota;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import io.vertx.core.Vertx;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -44,12 +46,13 @@
public class UpperBoundParallel {

private static final Logger log = getLogger(UpperBoundParallel.class);
private static final long RETRY_DELAY_IF_LIMIT_REACHED_MS = 8;
private final Vertx vertx;
private final RedisQuesExceptionFactory exceptionFactory;

public UpperBoundParallel(Vertx vertx) {
public UpperBoundParallel(Vertx vertx, RedisQuesExceptionFactory exceptionFactory) {
assert vertx != null;
this.vertx = vertx;
this.exceptionFactory = exceptionFactory;
}

public <Ctx> void request(Semaphore limit, Ctx ctx, Mentor<Ctx> mentor) {
Expand All @@ -73,15 +76,15 @@ private <Ctx> void resume(Request<Ctx> req) {
// Enqueue as much we can.
while (true) {
if (req.isFatalError) {
log.debug("return from 'resume()' because isFatalError");
log.trace("return from 'resume()' because isFatalError");
return;
}
if (!req.hasMore) {
if (req.numInProgress == 0 && !req.isDoneCalled) {
req.isDoneCalled = true;
// give up lock because we don't know how much time mentor will use.
req.lock.unlock();
log.debug("call 'mentor.onDone()'");
log.trace("call 'mentor.onDone()'");
try {
req.mentor.onDone(req.ctx);
} finally {
Expand All @@ -92,10 +95,15 @@ private <Ctx> void resume(Request<Ctx> req) {
}
return;
}
if (!req.limit.tryAcquire()) {
if (req.numTokensAvailForOurself > 0) {
// We still have a token reserved for ourself. Use those first before acquiring
// new ones. Explanation see comment in 'onOneDone()'.
req.numTokensAvailForOurself -= 1;
}else if (!req.limit.tryAcquire()) {
log.debug("redis request limit reached. Need to pause now.");
break; // Go to end of loop to schedule a run later.
}
req.hasStarted = true;
req.numInProgress += 1;
boolean hasMore = true;
try {
Expand All @@ -104,7 +112,14 @@ private <Ctx> void resume(Request<Ctx> req) {
// waiting for our lock).
req.lock.unlock();
log.trace("mentor.runOneMore() numInProgress={}", req.numInProgress);
hasMore = req.mentor.runOneMore(req::onOneDone_, req.ctx);
hasMore = req.mentor.runOneMore(new BiConsumer<>() {
// this boolean is just for paranoia, in case mentor tries to call back too often.
final AtomicBoolean isCalled = new AtomicBoolean();
@Override public void accept(Throwable ex, Void ret) {
if (!isCalled.compareAndSet(false, true)) return;
onOneDone(req, ex);
}
}, req.ctx);
} catch (RuntimeException ex) {
onOneDone(req, ex);
} finally {
Expand All @@ -116,8 +131,19 @@ private <Ctx> void resume(Request<Ctx> req) {
}
assert req.numInProgress >= 0 : req.numInProgress;
if (req.numInProgress == 0) {
// Looks as we could not even fire a single event. Need to try later.
vertx.setTimer(RETRY_DELAY_IF_LIMIT_REACHED_MS, nonsense -> resume(req));
if (!req.hasStarted) {
// We couldn't even trigger one single task. No resources available to
// handle any more requests. This caller has to try later.
req.isFatalError = true;
Exception ex = exceptionFactory.newResourceExhaustionException(
"No more resources to handle yet another request now.", null);
req.mentor.onError(ex, req.ctx);
return;
}else{
log.error("If you see this log, some unreachable code got reached. numInProgress={}, hasStarted={}",
req.numInProgress, req.hasStarted);
vertx.setTimer(4000, nonsense -> resume(req));
}
}
} finally {
req.worker = null;
Expand All @@ -128,8 +154,21 @@ private <Ctx> void resume(Request<Ctx> req) {
private <Ctx> void onOneDone(Request<Ctx> req, Throwable ex) {
req.lock.lock();
try {
req.limit.release();
// Do NOT release that token yet. instead mark the token as "ready-to-be-used".
// To signalize 'resume()' that we do not need to 'acquire' another token from
// 'limit' and we instead can re-use that one we acquired earlier.
// Reasoning:
// Originally we did just 'release' the token back to the pool and then acquired
// another one later in 'resume()'. But this is problematic, as in this case we
// give yet more incoming requests a chance to also start their processing.
// Which in the end runs us into resource exhaustion. Because we will start more
// and more requests and have no tokens free to complete the already running
// requests. So by keeping that token we already got reserved to ourself, we
// can apply backpressure to new incoming requests. This allows us to complete
// the already running requests.
req.numInProgress -= 1;
req.numTokensAvailForOurself += 1;
// ^^-- Token transfer only consists of those two statements.
log.trace("onOneDone({}) {} remaining", ex != null ? "ex" : "null", req.numInProgress);
assert req.numInProgress >= 0 : req.numInProgress + " >= 0 (BTW: mentor MUST call 'onDone' EXACTLY once)";
boolean isFatalError = true;
Expand All @@ -140,25 +179,32 @@ private <Ctx> void onOneDone(Request<Ctx> req, Throwable ex) {
if (log.isDebugEnabled()) {
log.debug("mentor.onError({}: {})", ex.getClass().getName(), ex.getMessage());
}
isFatalError = req.mentor.onError(ex, req.ctx);
isFatalError = !req.mentor.onError(ex, req.ctx);
} finally {
req.lock.lock(); // Need our lock back.
req.isFatalError = isFatalError;
// Need to release our token now. As we won't do it later anymore.
if (isFatalError) {
req.numTokensAvailForOurself -= 1;
req.limit.release();
}
}
} finally {
req.lock.unlock();
vertx.runOnContext(nonsense -> resume(req));
}
}

private final class Request<Ctx> {
private static final class Request<Ctx> {
private final Ctx ctx;
private final Mentor<Ctx> mentor;
private final Lock lock = new ReentrantLock();
private final Semaphore limit;
private Thread worker = null;
private int numInProgress = 0;
private int numTokensAvailForOurself = 0;
private boolean hasMore = true;
private boolean hasStarted = false; // true, as soon we could start at least once.
private boolean isFatalError = false;
private boolean isDoneCalled = false;

Expand All @@ -167,10 +213,6 @@ private Request(Ctx ctx, Mentor<Ctx> mentor, Semaphore limit) {
this.mentor = mentor;
this.limit = limit;
}

public void onOneDone_(Throwable ex, Void result) {
onOneDone(this, ex);
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public QueueStatisticsCollector(
this.vertx = vertx;
this.exceptionFactory = exceptionFactory;
this.redisRequestQuota = redisRequestQuota;
this.upperBoundParallel = new UpperBoundParallel(vertx);
this.upperBoundParallel = new UpperBoundParallel(vertx, exceptionFactory);
speedStatisticsScheduler(speedIntervalSec);
}

Expand Down

0 comments on commit db83bf1

Please sign in to comment.