Skip to content

Commit

Permalink
[SDCISA-16293, SDCISA-16512] Try fix stuff.
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed Jul 2, 2024
1 parent 1841b5d commit 5114ed9
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,8 @@ private Future<Void> checkQueues() {
}
}
});
} else {
onDone.accept(null, null);
}
return ctx.iter.hasNext();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,28 @@ private <Ctx> void resume(Request<Ctx> req) {
if (!req.lock.tryLock()) {
log.trace("Some other thread already working here");
return;
} else try {
}
try {
Thread ourself = currentThread();
if (req.worker == null) {
log.trace("worker := ourself");
req.worker = ourself;
} else if (req.worker != ourself) {
log.trace("Another thread is already working here");
req.lock.unlock();
return;
}
} catch (Throwable ex) {
req.lock.unlock();
throw ex;
}
try {
// Enqueue as much we can.
while (true) {
assert req.worker == currentThread();
if (req.isFatalError) {
log.trace("return from 'resume()' because isFatalError");
assert req.numTokensAvailForOurself == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")";
return;
}
if (!req.hasMore) {
Expand All @@ -90,6 +99,9 @@ private <Ctx> void resume(Request<Ctx> req) {
} finally {
req.lock.lock(); // MUST get back our lock RIGHT NOW.
}
log.debug("Release remaining {} tokens", req.numTokensAvailForOurself);
req.limit.release(req.numTokensAvailForOurself);
req.numTokensAvailForOurself = 0;
} else {
log.trace("return for now (hasMore = {}, numInProgress = {})", req.hasMore, req.numInProgress);
}
Expand All @@ -105,13 +117,17 @@ private <Ctx> void resume(Request<Ctx> req) {
}
req.hasStarted = true;
req.numInProgress += 1;
assert req.hasMore : "assert(hasMore)";
boolean hasMore = true;
try {
// We MUST give up our lock while calling mentor. We cannot know how long
// mentor is going to block (which would then cascade to all threads
// waiting for our lock).
assert req.worker == currentThread();
assert req.hasMore;
req.lock.unlock();
log.trace("mentor.runOneMore() numInProgress={}", req.numInProgress);
assert req.worker == currentThread();
hasMore = req.mentor.runOneMore(new BiConsumer<>() {
// this boolean is just for paranoia, in case mentor tries to call back too often.
final AtomicBoolean isCalled = new AtomicBoolean();
Expand All @@ -121,11 +137,14 @@ private <Ctx> void resume(Request<Ctx> req) {
}
}, req.ctx);
} catch (RuntimeException ex) {
assert req.worker == currentThread();
onOneDone(req, ex);
} finally {
// We MUST get back our lock right NOW. No way to just 'try'.
log.trace("mentor.runOneMore() -> hasMore={}", hasMore);
req.lock.lock();
assert req.worker == currentThread();
assert req.hasMore;
req.hasMore = hasMore;
}
}
Expand All @@ -138,6 +157,7 @@ private <Ctx> void resume(Request<Ctx> req) {
Exception ex = exceptionFactory.newResourceExhaustionException(
"No more resources to handle yet another request now.", null);
req.mentor.onError(ex, req.ctx);
assert req.numTokensAvailForOurself == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")";
return;
} else {
log.error("If you see this log, some unreachable code got reached. numInProgress={}, hasStarted={}",
Expand All @@ -149,6 +169,7 @@ private <Ctx> void resume(Request<Ctx> req) {
req.worker = null;
req.lock.unlock();
}
assert req.numTokensAvailForOurself == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")";
}

private <Ctx> void onOneDone(Request<Ctx> req, Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package org.swisspush.redisques.performance;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.swisspush.redisques.exception.ResourceExhaustionException;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;

import static org.swisspush.redisques.exception.RedisQuesExceptionFactory.newWastefulExceptionFactory;

@RunWith(VertxUnitRunner.class)
public class UpperBoundParallelTest {

private UpperBoundParallel target;

private Vertx vertx;
private Semaphore limit;

@Before
public void before() {
vertx = Vertx.vertx();
target = new UpperBoundParallel(vertx, newWastefulExceptionFactory());
limit = new Semaphore(1);
}

@Test
public void smokeTest(TestContext testContext) {
Async async = testContext.async();
final int availTokens = limit.availablePermits();
target.request(limit, null, new UpperBoundParallel.Mentor<Void>() {
Iterator<String> iter = List.of("input-one", "input-two", "input-three").iterator();
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void unused) {
if(iter.hasNext()){
String elem = iter.next();
vertx.runOnContext((Void v) -> { // <- Just imagine some async operation here
onDone.accept(null, null);
});
}else{
onDone.accept(null, null);
}
return iter.hasNext();
}
@Override public boolean onError(Throwable ex, Void ctx) {
testContext.fail(ex);
return false;
}
@Override public void onDone(Void ctx) {
testContext.assertTrue(!iter.hasNext());
vertx.setTimer(1, nonsense -> {
testContext.assertEquals(availTokens, limit.availablePermits());
async.complete();
});
}
});
}

@Test
public void worksForZeroElements(TestContext testContext) {
Async async = testContext.async();
int availTokens = limit.availablePermits();
target.request(limit, null, new UpperBoundParallel.Mentor<Void>() {
Iterator<String> iter = List.<String>of().iterator();
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void unused) {
if(iter.hasNext()){
String elem = iter.next();
vertx.runOnContext((Void v) -> { // <- Just imagine some async operation here
onDone.accept(null, null);
});
}else{
onDone.accept(null, null);
}
return iter.hasNext();
}
@Override public boolean onError(Throwable ex, Void ctx) {
testContext.fail(ex);
return false;
}
@Override public void onDone(Void ctx) {
testContext.assertTrue(!iter.hasNext());
vertx.setTimer(1, nonsense -> {
testContext.assertEquals(availTokens, limit.availablePermits());
async.complete();
});
}
});
}

@Test
public void worksIfHandlerThrows(TestContext testContext) {
Async async = testContext.async();
RuntimeException myFancyTestException = new RuntimeException(){};
int availTokens = limit.availablePermits();
target.request(limit, null, new UpperBoundParallel.Mentor<Void>() {
Iterator<String> iter = List.<String>of("the-lonely-elem").iterator();
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void unused) {
if(iter.hasNext()){
iter.next();
throw myFancyTestException;
}
return iter.hasNext();
}
@Override public boolean onError(Throwable ex, Void ctx) {
testContext.assertEquals(myFancyTestException, ex);
vertx.setTimer(1, nonsense -> {
testContext.assertEquals(availTokens, limit.availablePermits());
async.complete();
});
return false;
}
@Override public void onDone(Void ctx) {
testContext.fail();
}
});
}

@Test
public void worksIfHandlerReportsError(TestContext testContext) {
Async async = testContext.async();
Throwable myFancyTestException = new Throwable(){};
int availTokens = limit.availablePermits();
target.request(limit, null, new UpperBoundParallel.Mentor<Void>() {
Iterator<String> iter = List.<String>of("the-lonely-elem").iterator();
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void unused) {
if(iter.hasNext()){
iter.next();
onDone.accept(myFancyTestException, null);
}
return iter.hasNext();
}
@Override public boolean onError(Throwable ex, Void ctx) {
testContext.assertEquals(myFancyTestException, ex);
vertx.setTimer(1, nonsense -> {
testContext.assertEquals(availTokens, limit.availablePermits());
async.complete();
});
return false;
}
@Override public void onDone(Void ctx) {
testContext.fail();
}
});
}

@Test
public void mustNotContinueIfDoneNotReported(TestContext testContext) {
Async async = testContext.async();
target.request(limit, null, new UpperBoundParallel.Mentor<Void>() {
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void unused) {
// onDone() call missing by intent.
return false;
}
@Override public boolean onError(Throwable ex, Void ctx) {
testContext.fail();
return false;
}
@Override public void onDone(Void ctx) {
testContext.fail();
}
});
vertx.setTimer(500, nonsense -> async.complete());
}

@Test
public void reportsErrorIfNoTokensLeft(TestContext testContext) {
Async async = testContext.async();
limit.drainPermits(); // <- Whops, no tokens left for code under test.
target.request(limit, null, new UpperBoundParallel.Mentor<Void>() {
Iterator<String> iter = List.<String>of("the-lonely-elem").iterator();
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void unused) {
testContext.fail();
return false;
}
@Override public boolean onError(Throwable ex, Void ctx) {
testContext.assertTrue(ex instanceof ResourceExhaustionException);
testContext.assertNotNull(ex.getMessage());
vertx.setTimer(1, nonsense -> {
testContext.assertEquals(0, limit.availablePermits());
async.complete();
});
return false;
}
@Override public void onDone(Void ctx) {
testContext.fail();
}
});
}

}

0 comments on commit 5114ed9

Please sign in to comment.