diff --git a/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java index 5f54a4a..a8794c2 100644 --- a/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java +++ b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java @@ -12,9 +12,19 @@ import static org.slf4j.LoggerFactory.getLogger; /** + * We still can utilize parallelity without assuming an infinite amount + * of resources. A KISS approach to do this, is to apply an upper bound + * to what we do in parallel. And thats what this class tries to assist + * with. It wants to be that tool that allows parallelity but maintains + * upper bounds. For stone-age programmers: It's nothing else than a + * semaphore really. But decorated with a vertx gift bow to make it fit + * better in the new framework world ;) + * + * Long story: + * * Level 1: KISS says do everything sequentially. Stop here! You're * done! "Optimization is the root of all evil" you know? Happy you. So - * just do NOT use this class if this is the case for your toy use case. + * just do NOT use this class if this is the case for your use case. * * Level 2: There are cases where we do not have the time to do * everything sequentially. The next step is to go parallel and/or @@ -24,21 +34,12 @@ * available (eg sockets, memory, CPU time, ...), so we can happily fill * queues to an infinite number of entries without running in trouble * ever. In case your resources are infinite, happy you you're done. - * YOU HAVE NO NEED TO USE THIS CLASS! - * + * * Level 3: Welcome in my world. Where reality starts to hit you sooner or * later and you'll realize no matter how much of those "infinite cloud * resources" and fancy frameworks magic you throw at your problem, it won't * solve the issue. Performance-is-not-an-issue? Please, just go back to * "Level 1" and be happy there. - * - * This class is for "Level 3" users only. We still can utilize parallelity - * without assuming an infinite amount of resources. A KISS approach to do - * this, is to apply an upper bound to what we do in parallel. And thats - * what this class tries to assist with. It wants to be that tool that - * allows parallelity but maintains upper bounds. For stone-age - * programmers: It's nothing else than a semaphore really. But decorated - * with a vertx gift bow to make it fit better in the new framework world ;) */ public class UpperBoundParallel { @@ -56,7 +57,7 @@ public void request(Semaphore limit, Ctx ctx, Mentor mentor) { resume(req); } - private void resume(Request req){ + private void resume(Request req) { if (!req.lock.tryLock()) { log.trace("Some other thread already working here"); return; @@ -65,7 +66,7 @@ private void resume(Request req){ if (req.worker == null) { log.trace("worker := ourself"); req.worker = ourself; - } else if( req.worker != ourself) { + } else if (req.worker != ourself) { log.trace("Another thread is already working here"); return; } @@ -87,7 +88,7 @@ private void resume(Request req){ } finally { req.lock.lock(); // MUST get back our lock RIGHT NOW. } - }else{ + } else { log.trace("return for now (hasMore = {}, numInProgress = {})", req.hasMore, req.numInProgress); } return; @@ -107,7 +108,7 @@ private void resume(Request req){ hasMore = req.mentor.runOneMore(req::onOneDone_, req.ctx); } catch (RuntimeException ex) { onOneDone(req, ex); - }finally { + } finally { // We MUST get back our lock right NOW. No way to just 'try'. log.trace("mentor.runOneMore() -> hasMore={}", hasMore); req.lock.lock(); @@ -127,11 +128,11 @@ private void resume(Request req){ private void onOneDone(Request req, Throwable ex) { req.lock.lock(); - try{ + try { req.limit.release(); req.numInProgress -= 1; log.trace("onOneDone({}) {} remaining", ex != null ? "ex" : "null", req.numInProgress); - assert req.numInProgress >= 0 : req.numInProgress +" >= 0 (BTW: mentor MUST call 'onDone' EXACTLY once)"; + assert req.numInProgress >= 0 : req.numInProgress + " >= 0 (BTW: mentor MUST call 'onDone' EXACTLY once)"; boolean isFatalError = true; if (ex != null) try { // Unlock, to prevent thread stalls as we don't know for how long mentor @@ -145,7 +146,7 @@ private void onOneDone(Request req, Throwable ex) { req.lock.lock(); // Need our lock back. req.isFatalError = isFatalError; } - }finally { + } finally { req.lock.unlock(); vertx.runOnContext(nonsense -> resume(req)); } @@ -162,7 +163,7 @@ private final class Request { private boolean isFatalError = false; private boolean isDoneCalled = false; - private Request(Ctx ctx, Mentor mentor, Semaphore limit){ + private Request(Ctx ctx, Mentor mentor, Semaphore limit) { this.ctx = ctx; this.mentor = mentor; this.limit = limit; @@ -177,14 +178,21 @@ public void onOneDone_(Throwable ex, Void result) { public static interface Mentor { /** + * Gets called as many times as possible until specified limit is + * reached. More calls are triggered as soon some tasks call 'onDone' to + * signalize they've completed. + * + * @param onDone + * MUST be called exactly ONCE as soon the requested task has + * completed its execution. * @return true if more elements have to be processed. False if - * iteration source has reached its end. + * iteration source has reached its end. */ boolean runOneMore(BiConsumer onDone, Ctx ctx); /** * @return true if iteration should continue with other elements. False - * if no more elements should be processed. + * if no more elements should be processed. */ boolean onError(Throwable ex, Ctx ctx);