diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index fba262e..784e744 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -1245,6 +1245,8 @@ private Future checkQueues() { } } }); + } else { + onDone.accept(null, null); } return ctx.iter.hasNext(); } diff --git a/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java index d56cb80..f3ec76a 100644 --- a/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java +++ b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java @@ -64,19 +64,28 @@ private void resume(Request req) { if (!req.lock.tryLock()) { log.trace("Some other thread already working here"); return; - } else try { + } + try { Thread ourself = currentThread(); if (req.worker == null) { log.trace("worker := ourself"); req.worker = ourself; } else if (req.worker != ourself) { log.trace("Another thread is already working here"); + req.lock.unlock(); return; } + } catch (Throwable ex) { + req.lock.unlock(); + throw ex; + } + try { // Enqueue as much we can. while (true) { + assert req.worker == currentThread(); if (req.isFatalError) { log.trace("return from 'resume()' because isFatalError"); + assert req.numTokensAvailForOurself == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")"; return; } if (!req.hasMore) { @@ -90,6 +99,9 @@ private void resume(Request req) { } finally { req.lock.lock(); // MUST get back our lock RIGHT NOW. } + log.debug("Release remaining {} tokens", req.numTokensAvailForOurself); + req.limit.release(req.numTokensAvailForOurself); + req.numTokensAvailForOurself = 0; } else { log.trace("return for now (hasMore = {}, numInProgress = {})", req.hasMore, req.numInProgress); } @@ -105,13 +117,17 @@ private void resume(Request req) { } req.hasStarted = true; req.numInProgress += 1; + assert req.hasMore : "assert(hasMore)"; boolean hasMore = true; try { // We MUST give up our lock while calling mentor. We cannot know how long // mentor is going to block (which would then cascade to all threads // waiting for our lock). + assert req.worker == currentThread(); + assert req.hasMore; req.lock.unlock(); log.trace("mentor.runOneMore() numInProgress={}", req.numInProgress); + assert req.worker == currentThread(); hasMore = req.mentor.runOneMore(new BiConsumer<>() { // this boolean is just for paranoia, in case mentor tries to call back too often. final AtomicBoolean isCalled = new AtomicBoolean(); @@ -121,11 +137,14 @@ private void resume(Request req) { } }, req.ctx); } catch (RuntimeException ex) { + assert req.worker == currentThread(); onOneDone(req, ex); } finally { // We MUST get back our lock right NOW. No way to just 'try'. log.trace("mentor.runOneMore() -> hasMore={}", hasMore); req.lock.lock(); + assert req.worker == currentThread(); + assert req.hasMore; req.hasMore = hasMore; } } @@ -138,6 +157,7 @@ private void resume(Request req) { Exception ex = exceptionFactory.newResourceExhaustionException( "No more resources to handle yet another request now.", null); req.mentor.onError(ex, req.ctx); + assert req.numTokensAvailForOurself == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")"; return; } else { log.error("If you see this log, some unreachable code got reached. numInProgress={}, hasStarted={}", @@ -149,6 +169,7 @@ private void resume(Request req) { req.worker = null; req.lock.unlock(); } + assert req.numTokensAvailForOurself == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")"; } private void onOneDone(Request req, Throwable ex) { diff --git a/src/test/java/org/swisspush/redisques/performance/UpperBoundParallelTest.java b/src/test/java/org/swisspush/redisques/performance/UpperBoundParallelTest.java new file mode 100644 index 0000000..5a177e3 --- /dev/null +++ b/src/test/java/org/swisspush/redisques/performance/UpperBoundParallelTest.java @@ -0,0 +1,198 @@ +package org.swisspush.redisques.performance; + +import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.swisspush.redisques.exception.ResourceExhaustionException; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.function.BiConsumer; + +import static org.swisspush.redisques.exception.RedisQuesExceptionFactory.newWastefulExceptionFactory; + +@RunWith(VertxUnitRunner.class) +public class UpperBoundParallelTest { + + private UpperBoundParallel target; + + private Vertx vertx; + private Semaphore limit; + + @Before + public void before() { + vertx = Vertx.vertx(); + target = new UpperBoundParallel(vertx, newWastefulExceptionFactory()); + limit = new Semaphore(1); + } + + @Test + public void smokeTest(TestContext testContext) { + Async async = testContext.async(); + final int availTokens = limit.availablePermits(); + target.request(limit, null, new UpperBoundParallel.Mentor() { + Iterator iter = List.of("input-one", "input-two", "input-three").iterator(); + @Override public boolean runOneMore(BiConsumer onDone, Void unused) { + if(iter.hasNext()){ + String elem = iter.next(); + vertx.runOnContext((Void v) -> { // <- Just imagine some async operation here + onDone.accept(null, null); + }); + }else{ + onDone.accept(null, null); + } + return iter.hasNext(); + } + @Override public boolean onError(Throwable ex, Void ctx) { + testContext.fail(ex); + return false; + } + @Override public void onDone(Void ctx) { + testContext.assertTrue(!iter.hasNext()); + vertx.setTimer(1, nonsense -> { + testContext.assertEquals(availTokens, limit.availablePermits()); + async.complete(); + }); + } + }); + } + + @Test + public void worksForZeroElements(TestContext testContext) { + Async async = testContext.async(); + int availTokens = limit.availablePermits(); + target.request(limit, null, new UpperBoundParallel.Mentor() { + Iterator iter = List.of().iterator(); + @Override public boolean runOneMore(BiConsumer onDone, Void unused) { + if(iter.hasNext()){ + String elem = iter.next(); + vertx.runOnContext((Void v) -> { // <- Just imagine some async operation here + onDone.accept(null, null); + }); + }else{ + onDone.accept(null, null); + } + return iter.hasNext(); + } + @Override public boolean onError(Throwable ex, Void ctx) { + testContext.fail(ex); + return false; + } + @Override public void onDone(Void ctx) { + testContext.assertTrue(!iter.hasNext()); + vertx.setTimer(1, nonsense -> { + testContext.assertEquals(availTokens, limit.availablePermits()); + async.complete(); + }); + } + }); + } + + @Test + public void worksIfHandlerThrows(TestContext testContext) { + Async async = testContext.async(); + RuntimeException myFancyTestException = new RuntimeException(){}; + int availTokens = limit.availablePermits(); + target.request(limit, null, new UpperBoundParallel.Mentor() { + Iterator iter = List.of("the-lonely-elem").iterator(); + @Override public boolean runOneMore(BiConsumer onDone, Void unused) { + if(iter.hasNext()){ + iter.next(); + throw myFancyTestException; + } + return iter.hasNext(); + } + @Override public boolean onError(Throwable ex, Void ctx) { + testContext.assertEquals(myFancyTestException, ex); + vertx.setTimer(1, nonsense -> { + testContext.assertEquals(availTokens, limit.availablePermits()); + async.complete(); + }); + return false; + } + @Override public void onDone(Void ctx) { + testContext.fail(); + } + }); + } + + @Test + public void worksIfHandlerReportsError(TestContext testContext) { + Async async = testContext.async(); + Throwable myFancyTestException = new Throwable(){}; + int availTokens = limit.availablePermits(); + target.request(limit, null, new UpperBoundParallel.Mentor() { + Iterator iter = List.of("the-lonely-elem").iterator(); + @Override public boolean runOneMore(BiConsumer onDone, Void unused) { + if(iter.hasNext()){ + iter.next(); + onDone.accept(myFancyTestException, null); + } + return iter.hasNext(); + } + @Override public boolean onError(Throwable ex, Void ctx) { + testContext.assertEquals(myFancyTestException, ex); + vertx.setTimer(1, nonsense -> { + testContext.assertEquals(availTokens, limit.availablePermits()); + async.complete(); + }); + return false; + } + @Override public void onDone(Void ctx) { + testContext.fail(); + } + }); + } + + @Test + public void mustNotContinueIfDoneNotReported(TestContext testContext) { + Async async = testContext.async(); + target.request(limit, null, new UpperBoundParallel.Mentor() { + @Override public boolean runOneMore(BiConsumer onDone, Void unused) { + // onDone() call missing by intent. + return false; + } + @Override public boolean onError(Throwable ex, Void ctx) { + testContext.fail(); + return false; + } + @Override public void onDone(Void ctx) { + testContext.fail(); + } + }); + vertx.setTimer(500, nonsense -> async.complete()); + } + + @Test + public void reportsErrorIfNoTokensLeft(TestContext testContext) { + Async async = testContext.async(); + limit.drainPermits(); // <- Whops, no tokens left for code under test. + target.request(limit, null, new UpperBoundParallel.Mentor() { + Iterator iter = List.of("the-lonely-elem").iterator(); + @Override public boolean runOneMore(BiConsumer onDone, Void unused) { + testContext.fail(); + return false; + } + @Override public boolean onError(Throwable ex, Void ctx) { + testContext.assertTrue(ex instanceof ResourceExhaustionException); + testContext.assertNotNull(ex.getMessage()); + vertx.setTimer(1, nonsense -> { + testContext.assertEquals(0, limit.availablePermits()); + async.complete(); + }); + return false; + } + @Override public void onDone(Void ctx) { + testContext.fail(); + } + }); + } + +}