Skip to content

Commit

Permalink
[SDCISA-16147, swisspost#583] Fix more tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed Jun 4, 2024
1 parent cd2ac7a commit 7ca6a6a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,35 +180,38 @@ public boolean handle(final HttpServerRequest request) {
// TODO refactor away this callback-hell (Counts for the COMPLETE method
// surrounding this line, named 'KafkaHandler.handle()', NOT only
// those lines below).
boolean[] isResponseSent = {false};
kafkaProducerRecordBuilder.buildRecordsAsync(topic, payload).compose((List<KafkaProducerRecord<String, String>> kafkaProducerRecords) -> {
var fut = maybeValidate(request, kafkaProducerRecords).onComplete(validationEvent -> {
if(validationEvent.succeeded()) {
if(validationEvent.result().isSuccess()) {
kafkaMessageSender.sendMessages(optProducer.get().getLeft(), kafkaProducerRecords).onComplete(event -> {
if(event.succeeded()) {
RequestLoggerFactory.getLogger(KafkaHandler.class, request)
.info("Successfully sent {} message(s) to kafka topic '{}'", kafkaProducerRecords.size(), topic);
respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request);
} else {
respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request);
}
});
} else {
respondWith(StatusCode.BAD_REQUEST, validationEvent.result().getMessage(), request);
}
var fut = maybeValidate(request, kafkaProducerRecords).compose(validationEvent -> {
if(validationEvent.isSuccess()) {
kafkaMessageSender.sendMessages(optProducer.get().getLeft(), kafkaProducerRecords).onComplete(event -> {
if(event.succeeded()) {
RequestLoggerFactory.getLogger(KafkaHandler.class, request)
.info("Successfully sent {} message(s) to kafka topic '{}'", kafkaProducerRecords.size(), topic);
isResponseSent[0] = true;
respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request);
} else {
isResponseSent[0] = true;
respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request);
}
});
} else {
respondWith(StatusCode.INTERNAL_SERVER_ERROR, validationEvent.cause().getMessage(), request);
isResponseSent[0] = true;
respondWith(StatusCode.BAD_REQUEST, validationEvent.getMessage(), request);
}
return Future.succeededFuture();
});
assert fut != null;
return fut;
}).onFailure((Throwable ex) -> {
if (ex instanceof ValidationException) {
if (ex instanceof ValidationException && !isResponseSent[0]) {
respondWith(StatusCode.BAD_REQUEST, ex.getMessage(), request);
return;
}
log.error("TODO error handling", exceptionFactory.newException(ex));
respondWith(StatusCode.INTERNAL_SERVER_ERROR, ex.getMessage(), request);
if (!isResponseSent[0]) {
respondWith(StatusCode.INTERNAL_SERVER_ERROR, ex.getMessage(), request);
}
});
});
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class KafkaHandlerTest {
private KafkaHandler handler;
private MockResourceStorage storage;
private GateleenExceptionFactory exceptionFactory = newGateleenWastefulExceptionFactory();
private Vertx vertxMock;

private final String configResourceUri = "/kafka/topicsConfig";
private final String streamingPath = "/kafka/streaming/";
Expand All @@ -75,8 +76,8 @@ public class KafkaHandlerTest {

@Before
public void setUp() {
Vertx vertx = Vertx.vertx();
Vertx vertxMock = Mockito.mock(Vertx.class);
vertx = Vertx.vertx();
vertxMock = Mockito.mock(Vertx.class);
doAnswer(inv -> {
String bkup = currentThread().getName();
currentThread().setName("blah");
Expand Down Expand Up @@ -461,8 +462,9 @@ public void handleValidPayloadWithFailingMessageSending(TestContext context){
public void handlePayloadNotPassingValidation(TestContext context){
Async async = context.async();

handler = new KafkaHandler(configurationResourceManager, messageValidator, repository, kafkaMessageSender,
configResourceUri, streamingPath);
handler = new KafkaHandler(
vertxMock, exceptionFactory, configurationResourceManager, messageValidator, repository,
kafkaMessageSender, configResourceUri, streamingPath, null);

when(messageValidator.validateMessages(any(HttpServerRequest.class), any()))
.thenReturn(Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV, "Boooom")));
Expand Down Expand Up @@ -510,8 +512,9 @@ public void handlePayloadNotPassingValidation(TestContext context){
public void handleErrorWhileValidation(TestContext context){
Async async = context.async();

handler = new KafkaHandler(configurationResourceManager, messageValidator, repository, kafkaMessageSender,
configResourceUri, streamingPath);
handler = new KafkaHandler(
vertxMock, exceptionFactory, configurationResourceManager, messageValidator, repository,
kafkaMessageSender, configResourceUri, streamingPath, null);

when(messageValidator.validateMessages(any(HttpServerRequest.class), any()))
.thenReturn(Future.failedFuture("Boooom"));
Expand Down

0 comments on commit 7ca6a6a

Please sign in to comment.