From 35787823d11412b91fb5572c348ff59218c72ddd Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com> Date: Mon, 24 Jun 2024 20:13:48 +0200 Subject: [PATCH] [SDCISA-16293] Self-review. Cleanup. --- .../org/swisspush/redisques/RedisQues.java | 29 +++++++------------ .../redisques/performance/BurstSquasher.java | 2 +- .../performance/UpperBoundParallel.java | 6 ++-- 3 files changed, 15 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index ea0c672..3482bbb 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -11,7 +11,6 @@ import io.vertx.core.eventbus.EventBus; import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.MessageConsumer; -import io.vertx.core.impl.NoStackTraceThrowable; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Command; @@ -1259,29 +1258,21 @@ private Future checkQueues() { return ctx.iter.hasNext(); } @Override public boolean onError(Throwable ex, Void ctx_) { - for (Throwable waitQueFullEx = ex; waitQueFullEx != null; waitQueFullEx = waitQueFullEx.getCause()) { - if (!(waitQueFullEx instanceof NoStackTraceThrowable)) continue; - if (!"Redis waiting queue is full".equals(waitQueFullEx.getMessage())) continue; - // Trying to continue (aka return true) makes no sense as our redis - // client is out of resources and continuing just will make load - // situation worse. So we abort in the case of this special exception. - makeGcLifeEasier(); - p.fail(exceptionFactory.newException(ex)); - return false; - } - log.warn("TODO error handling", exceptionFactory.newException(ex)); - // Still not sure if it is a good idea to ignore errors and try to respond - // with half-baked results. But will keep it this way because old code [1] - // did without any comment that would have explained why. + // Old code [1] for whatever reason did just continue after an error. + // Nevertheless, I'll now change it to abort (aka return false). Due to + // the absence of comments I have no idea what those half-baked results + // where used for. See also [2] about this decision. // [1]: https://github.com/swisspost/vertx-redisques/blob/v3.0.33/src/main/java/org/swisspush/redisques/RedisQues.java - return true; // true, aka keep going with other queues. + // [2]: https://github.com/swisspost/vertx-redisques/issues/192 + reduceGcGraphTraversalEffort(); + p.fail(exceptionFactory.newException(ex)); + return false; } @Override public void onDone(Void ctx_) { - makeGcLifeEasier(); + reduceGcGraphTraversalEffort(); p.complete(); // Mark this composition step as completed. } - private void makeGcLifeEasier() { - // No longer used, so reduce GC graph traversal effort. + private void reduceGcGraphTraversalEffort() { ctx.redisAPI = null; ctx.counter = null; ctx.iter = null; diff --git a/src/main/java/org/swisspush/redisques/performance/BurstSquasher.java b/src/main/java/org/swisspush/redisques/performance/BurstSquasher.java index 3520798..ee50e5c 100644 --- a/src/main/java/org/swisspush/redisques/performance/BurstSquasher.java +++ b/src/main/java/org/swisspush/redisques/performance/BurstSquasher.java @@ -77,7 +77,7 @@ public void logSomewhen(Ctx ctx) { mostRecentCtx = ctx; count += 1; durationSincePrevPublishMs = now - prevPublishEpchMs; - if (durationSincePrevPublishMs > minDelayMs) { + if (durationSincePrevPublishMs >= minDelayMs) { isPublish = true; prevPublishEpchMs = now; countLocalCpy = count; diff --git a/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java index 5d7428e..eb19da8 100644 --- a/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java +++ b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java @@ -184,8 +184,10 @@ private void onOneDone(Request req, Throwable ex) { req.lock.lock(); // Need our lock back. req.isFatalError = isFatalError; // Need to release our token now. As we won't do it later anymore. - req.numTokensAvailForOurself -= 1; - req.limit.release(); + if (isFatalError) { + req.numTokensAvailForOurself -= 1; + req.limit.release(); + } } } finally { req.lock.unlock();