diff --git a/gateleen-cache/pom.xml b/gateleen-cache/pom.xml index c45639810..ec1ab879e 100644 --- a/gateleen-cache/pom.xml +++ b/gateleen-cache/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-cache diff --git a/gateleen-core/pom.xml b/gateleen-core/pom.xml index c6124598c..7ca3e99ee 100644 --- a/gateleen-core/pom.xml +++ b/gateleen-core/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-core diff --git a/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/GateleenExceptionFactory.java b/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/GateleenExceptionFactory.java index af7db1e55..6e0ab500b 100644 --- a/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/GateleenExceptionFactory.java +++ b/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/GateleenExceptionFactory.java @@ -36,6 +36,14 @@ public interface GateleenExceptionFactory { public Exception newException(String msg, Throwable cause); + /** Convenience overload for {@link #newIllegalStateException(String, Throwable)}. */ + public default IllegalStateException newIllegalStateException(String msg) { return newIllegalStateException(msg, null); } + + /** Convenience overload for {@link #newIllegalStateException(String, Throwable)}. */ + public default IllegalStateException newIllegalStateException(Throwable cause) { return newIllegalStateException(null, cause); } + + public IllegalStateException newIllegalStateException(String msg, Throwable cause); + public ReplyException newReplyException(ReplyFailure failureType, int failureCode, String message); diff --git a/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/GateleenThriftyExceptionFactory.java b/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/GateleenThriftyExceptionFactory.java index 1a26ae481..1ce9631ab 100644 --- a/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/GateleenThriftyExceptionFactory.java +++ b/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/GateleenThriftyExceptionFactory.java @@ -16,6 +16,12 @@ public Exception newException(String message, Throwable cause) { return new GateleenNoStacktraceException(message, cause); } + @Override + public IllegalStateException newIllegalStateException(String msg, Throwable cause) { + if (cause instanceof IllegalStateException) return (IllegalStateException) cause; + return new NoStackIllegalStateException(msg); + } + @Override public ReplyException newReplyException(ReplyFailure failureType, int failureCode, String message) { return new GateleenNoStackReplyException(failureType, failureCode, message); diff --git a/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/GateleenWastefulExceptionFactory.java b/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/GateleenWastefulExceptionFactory.java index 1fc0889ca..b15e3719a 100644 --- a/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/GateleenWastefulExceptionFactory.java +++ b/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/GateleenWastefulExceptionFactory.java @@ -15,6 +15,11 @@ public Exception newException(String message, Throwable cause) { return new Exception(message, cause); } + @Override + public IllegalStateException newIllegalStateException(String msg, Throwable cause) { + return new IllegalStateException(msg, cause); + } + @Override public ReplyException newReplyException(ReplyFailure failureType, int failureCode, String message) { return new ReplyException(failureType, failureCode, message); diff --git a/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/NoStackIllegalStateException.java b/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/NoStackIllegalStateException.java new file mode 100644 index 000000000..e773a9c05 --- /dev/null +++ b/gateleen-core/src/main/java/org/swisspush/gateleen/core/exception/NoStackIllegalStateException.java @@ -0,0 +1,26 @@ +package org.swisspush.gateleen.core.exception; + +public class NoStackIllegalStateException extends IllegalStateException { + + public NoStackIllegalStateException() { + super(); + } + + public NoStackIllegalStateException(String s) { + super(s); + } + + public NoStackIllegalStateException(String message, Throwable cause) { + super(message, cause); + } + + public NoStackIllegalStateException(Throwable cause) { + super(cause); + } + + @Override + public Throwable fillInStackTrace() { + return this; + } + +} diff --git a/gateleen-core/src/main/java/org/swisspush/gateleen/core/http/LocalHttpClient.java b/gateleen-core/src/main/java/org/swisspush/gateleen/core/http/LocalHttpClient.java index 2d02b7efd..823a4c655 100644 --- a/gateleen-core/src/main/java/org/swisspush/gateleen/core/http/LocalHttpClient.java +++ b/gateleen-core/src/main/java/org/swisspush/gateleen/core/http/LocalHttpClient.java @@ -28,6 +28,6 @@ public void setRoutingContexttHandler(Handler wrappedRoutingCont @Override protected HttpClientRequest doRequest(HttpMethod method, String uri) { - return new LocalHttpClientRequest(method, uri, vertx, wrappedRoutingContexttHandler, exceptionFactory, new LocalHttpServerResponse(vertx)); + return new LocalHttpClientRequest(method, uri, vertx, wrappedRoutingContexttHandler, exceptionFactory, new LocalHttpServerResponse(vertx, exceptionFactory)); } } diff --git a/gateleen-core/src/main/java/org/swisspush/gateleen/core/http/LocalHttpServerResponse.java b/gateleen-core/src/main/java/org/swisspush/gateleen/core/http/LocalHttpServerResponse.java index d6ad0845c..0f1cd1cab 100644 --- a/gateleen-core/src/main/java/org/swisspush/gateleen/core/http/LocalHttpServerResponse.java +++ b/gateleen-core/src/main/java/org/swisspush/gateleen/core/http/LocalHttpServerResponse.java @@ -6,6 +6,7 @@ import io.vertx.core.http.impl.headers.HeadersMultiMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.swisspush.gateleen.core.exception.GateleenExceptionFactory; import org.swisspush.gateleen.core.util.StatusCode; /** @@ -16,6 +17,7 @@ public class LocalHttpServerResponse extends BufferBridge implements FastFailHttpServerResponse { private static final Logger logger = LoggerFactory.getLogger(LocalHttpServerResponse.class); + private final GateleenExceptionFactory exceptionFactory; private int statusCode; private String statusMessage; private static final String EMPTY = ""; @@ -118,8 +120,9 @@ public HttpClientResponse exceptionHandler(Handler handler) { }; - public LocalHttpServerResponse(Vertx vertx) { + public LocalHttpServerResponse(Vertx vertx, GateleenExceptionFactory exceptionFactory) { super(vertx); + this.exceptionFactory = exceptionFactory; // Attach most simple possible exception handler to base. setExceptionHandler(thr -> logger.error("Processing of response failed.", thr)); } @@ -214,9 +217,10 @@ public Future write(String chunk, String enc) { public Future write(Buffer data) { // emulate Vertx's HttpServerResponseImpl if (!chunked && !headers.contains(HttpHeaders.CONTENT_LENGTH)) { - IllegalStateException ex = new IllegalStateException("You must set the Content-Length header to be the total size of the message " + IllegalStateException ex = exceptionFactory.newIllegalStateException("" + + "You must set the Content-Length header to be the total size of the message " + "body BEFORE sending any data if you are not using HTTP chunked encoding."); - logger.error("non-proper HttpServerResponse occured", ex); + logger.debug("non-proper HttpServerResponse occurred", ex); throw ex; } ensureBound(); diff --git a/gateleen-delegate/pom.xml b/gateleen-delegate/pom.xml index 3374b2244..76f734231 100644 --- a/gateleen-delegate/pom.xml +++ b/gateleen-delegate/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-delegate diff --git a/gateleen-delta/pom.xml b/gateleen-delta/pom.xml index 1fecc47b8..707c2b705 100644 --- a/gateleen-delta/pom.xml +++ b/gateleen-delta/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-delta diff --git a/gateleen-delta/src/main/java/org/swisspush/gateleen/delta/DeltaHandler.java b/gateleen-delta/src/main/java/org/swisspush/gateleen/delta/DeltaHandler.java index fa37b8cb2..3cff33d97 100755 --- a/gateleen-delta/src/main/java/org/swisspush/gateleen/delta/DeltaHandler.java +++ b/gateleen-delta/src/main/java/org/swisspush/gateleen/delta/DeltaHandler.java @@ -6,6 +6,8 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; import io.vertx.core.http.*; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; @@ -17,6 +19,9 @@ import org.swisspush.gateleen.core.util.*; import org.swisspush.gateleen.core.util.ExpansionDeltaUtil.CollectionResourceContainer; import org.swisspush.gateleen.core.util.ExpansionDeltaUtil.SlashHandling; +import org.swisspush.gateleen.logging.LogAppenderRepository; +import org.swisspush.gateleen.logging.LoggingHandler; +import org.swisspush.gateleen.logging.LoggingResourceManager; import org.swisspush.gateleen.routing.Router; import org.swisspush.gateleen.routing.Rule; import org.swisspush.gateleen.routing.RuleFeaturesProvider; @@ -24,6 +29,7 @@ import java.util.*; +import static org.swisspush.gateleen.logging.LoggingHandler.SKIP_LOGGING_HEADER; import static org.swisspush.gateleen.routing.RuleFeatures.Feature.DELTA_ON_BACKEND; /** @@ -54,18 +60,28 @@ public class DeltaHandler implements RuleProvider.RuleChangesObserver { private boolean rejectLimitOffsetRequests; private RuleProvider ruleProvider; + + private final Vertx vertx; + private final LoggingResourceManager loggingResourceManager; + private final LogAppenderRepository logAppenderRepository; private RuleFeaturesProvider ruleFeaturesProvider = new RuleFeaturesProvider(new ArrayList<>()); - public DeltaHandler(RedisProvider redisProvider, HttpClient httpClient, RuleProvider ruleProvider) { - this(redisProvider, httpClient, ruleProvider, false); + public DeltaHandler(Vertx vertx, RedisProvider redisProvider, HttpClient httpClient, RuleProvider ruleProvider, + LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository) { + this(vertx,redisProvider, httpClient, ruleProvider, loggingResourceManager, logAppenderRepository, false); } - public DeltaHandler(RedisProvider redisProvider, HttpClient httpClient, RuleProvider ruleProvider, boolean rejectLimitOffsetRequests) { + public DeltaHandler(Vertx vertx, RedisProvider redisProvider, HttpClient httpClient, RuleProvider ruleProvider, + LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, + boolean rejectLimitOffsetRequests) { + this.vertx = vertx; this.redisProvider = redisProvider; this.httpClient = httpClient; this.rejectLimitOffsetRequests = rejectLimitOffsetRequests; this.ruleProvider = ruleProvider; + this.loggingResourceManager = loggingResourceManager; + this.logAppenderRepository = logAppenderRepository; this.ruleProvider.registerObserver(this); } @@ -284,6 +300,9 @@ private DeltaResourcesContainer getDeltaResourceNames(List subResourceNa private void handleCollectionGET(final HttpServerRequest request, final String updateId, final Logger log) { request.pause(); + final LoggingHandler loggingHandler = new LoggingHandler(loggingResourceManager, logAppenderRepository, request, vertx.eventBus()); + loggingHandler.request(request.headers()); + final HttpMethod method = HttpMethod.GET; final String targetUri = ExpansionDeltaUtil.constructRequestUri(request.path(), request.params(), null, null, SlashHandling.KEEP); log.debug("constructed uri for request: {}", targetUri); @@ -299,6 +318,7 @@ private void handleCollectionGET(final HttpServerRequest request, final String u cReq.headers().setAll(request.headers()); // add a marker header to signalize, that in the next loop of the mainverticle we should pass the deltahandler cReq.headers().set(DELTA_BACKEND_HEADER, ""); + cReq.headers().set(SKIP_LOGGING_HEADER, "true"); cReq.headers().set("Accept", "application/json"); cReq.setChunked(true); request.handler(cReq::write); @@ -307,6 +327,8 @@ private void handleCollectionGET(final HttpServerRequest request, final String u HttpClientResponse cRes = asyncResult1.result(); HttpServerRequestUtil.prepareResponse(request, cRes); + loggingHandler.setResponse(cRes); + if (cRes.headers().contains(DELTA_HEADER)) { cRes.handler(data -> request.response().write(data)); cRes.endHandler(v1 -> request.response().end()); @@ -327,7 +349,7 @@ private void handleCollectionGET(final HttpServerRequest request, final String u log.trace("DeltaHandler: deltaResourceKeys for targetUri ({}): {}", targetUri, deltaResourceKeys); } - if (deltaResourceKeys.size() > 0) { + if (!deltaResourceKeys.isEmpty()) { if (log.isTraceEnabled()) { log.trace("DeltaHandler: targetUri ({}) using mget command.", targetUri); } @@ -345,9 +367,12 @@ private void handleCollectionGET(final HttpServerRequest request, final String u JsonObject result = buildResultJsonObject(deltaResourcesContainer.getResourceNames(), dataContainer.getCollectionName()); + String responseBody = result.toString(); request.response().putHeader(DELTA_HEADER, "" + deltaResourcesContainer.getMaxUpdateId()); - request.response().end(result.toString()); + loggingHandler.appendResponsePayload(Buffer.buffer(responseBody)); + loggingHandler.log(); + request.response().end(responseBody); })).onFailure(event -> { log.error("Redis: handleCollectionGET failed", event); handleError(request, "error reading delta information"); diff --git a/gateleen-delta/src/test/java/org/swisspush/gateleen/delta/DeltaHandlerTest.java b/gateleen-delta/src/test/java/org/swisspush/gateleen/delta/DeltaHandlerTest.java index d9cdddfd4..ef9546296 100644 --- a/gateleen-delta/src/test/java/org/swisspush/gateleen/delta/DeltaHandlerTest.java +++ b/gateleen-delta/src/test/java/org/swisspush/gateleen/delta/DeltaHandlerTest.java @@ -1,9 +1,6 @@ package org.swisspush.gateleen.delta; -import io.vertx.core.AsyncResult; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.MultiMap; +import io.vertx.core.*; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; @@ -19,6 +16,8 @@ import org.swisspush.gateleen.core.http.DummyHttpServerResponse; import org.swisspush.gateleen.core.redis.RedisProvider; import org.swisspush.gateleen.core.util.StatusCode; +import org.swisspush.gateleen.logging.LogAppenderRepository; +import org.swisspush.gateleen.logging.LoggingResourceManager; import org.swisspush.gateleen.routing.Router; import org.swisspush.gateleen.routing.Rule; import org.swisspush.gateleen.routing.RuleProvider; @@ -36,6 +35,9 @@ public class DeltaHandlerTest { private RedisAPI redisAPI; private RedisProvider redisProvider; private RuleProvider ruleProvider; + private Vertx vertx; + private LoggingResourceManager loggingResourceManager; + private LogAppenderRepository logAppenderRepository; private Router router = mock(Router.class); private HttpServerRequest request; private HttpServerResponse response; @@ -44,6 +46,9 @@ public class DeltaHandlerTest { @Before public void before() { redisAPI = mock(RedisAPI.class); + vertx = mock(Vertx.class); + loggingResourceManager = mock(LoggingResourceManager.class); + logAppenderRepository = mock(LogAppenderRepository.class); redisProvider = mock(RedisProvider.class); when(redisProvider.redis()).thenReturn(Future.succeededFuture(redisAPI)); @@ -69,7 +74,7 @@ public void before() { @Test public void testIsDeltaRequest(TestContext context) { - DeltaHandler deltaHandler = new DeltaHandler(redisProvider, null, ruleProvider); + DeltaHandler deltaHandler = new DeltaHandler(vertx, redisProvider, null, ruleProvider, loggingResourceManager, logAppenderRepository); deltaHandler.rulesChanged(List.of( rule("/gateleen/server/res_1", false), rule("/gateleen/server/res_2", true)) @@ -152,7 +157,7 @@ public void testIsDeltaRequest(TestContext context) { @Test public void testDeltaNoExpiry() { - DeltaHandler deltaHandler = new DeltaHandler(redisProvider, null, ruleProvider); + DeltaHandler deltaHandler = new DeltaHandler(vertx, redisProvider, null, ruleProvider, loggingResourceManager, logAppenderRepository); deltaHandler.handle(request, router); verify(redisAPI, times(1)).set(eq(Arrays.asList("delta:resources:a:b:c", "555")), any()); @@ -163,7 +168,7 @@ public void testDeltaNoExpiry() { public void testDeltaWithExpiry() { requestHeaders.add("x-expire-after", "123"); - DeltaHandler deltaHandler = new DeltaHandler(redisProvider, null, ruleProvider); + DeltaHandler deltaHandler = new DeltaHandler(vertx, redisProvider, null, ruleProvider, loggingResourceManager, logAppenderRepository); deltaHandler.handle(request, router); verify(redisAPI, times(1)).setex(eq("delta:resources:a:b:c"), eq("123"), eq("555"), any()); @@ -182,7 +187,7 @@ public void testFailingRedisProviderAccess(TestContext context) { ArgumentCaptor bodyCaptor = ArgumentCaptor.forClass(String.class); when(request.response().end(bodyCaptor.capture())).thenReturn(Future.succeededFuture()); - DeltaHandler deltaHandler = new DeltaHandler(redisProvider, null, ruleProvider); + DeltaHandler deltaHandler = new DeltaHandler(vertx, redisProvider, null, ruleProvider, loggingResourceManager, logAppenderRepository); deltaHandler.handle(request, router); context.assertEquals(StatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), statusCodeCaptor.getValue(), "StatusCode should be 500"); @@ -191,7 +196,7 @@ public void testFailingRedisProviderAccess(TestContext context) { @Test public void testRejectLimitOffsetParameters(TestContext context) { - DeltaHandler deltaHandler = new DeltaHandler(redisProvider, null, ruleProvider, true); + DeltaHandler deltaHandler = new DeltaHandler(vertx, redisProvider, null, ruleProvider, loggingResourceManager, logAppenderRepository, true); final DummyHttpServerResponse response = new DummyHttpServerResponse(); DeltaRequest request = new DeltaRequest(MultiMap.caseInsensitiveMultiMap() .add("delta", "0") diff --git a/gateleen-expansion/pom.xml b/gateleen-expansion/pom.xml index cd4fa1dd5..6fd869f93 100644 --- a/gateleen-expansion/pom.xml +++ b/gateleen-expansion/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-expansion diff --git a/gateleen-hook-js/pom.xml b/gateleen-hook-js/pom.xml index 59455301d..6790dd29b 100644 --- a/gateleen-hook-js/pom.xml +++ b/gateleen-hook-js/pom.xml @@ -4,7 +4,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-hook-js jar diff --git a/gateleen-hook/pom.xml b/gateleen-hook/pom.xml index d0c5e3ca2..957613927 100644 --- a/gateleen-hook/pom.xml +++ b/gateleen-hook/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-hook diff --git a/gateleen-kafka/pom.xml b/gateleen-kafka/pom.xml index c58d33a08..ab53b09fe 100644 --- a/gateleen-kafka/pom.xml +++ b/gateleen-kafka/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-kafka diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java index a2345d9f6..5a63358a7 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java @@ -2,6 +2,7 @@ import io.vertx.core.Future; import io.vertx.core.Promise; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServerRequest; @@ -12,6 +13,7 @@ import org.slf4j.LoggerFactory; import org.swisspush.gateleen.core.configuration.ConfigurationResourceConsumer; import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager; +import org.swisspush.gateleen.core.exception.GateleenExceptionFactory; import org.swisspush.gateleen.core.http.RequestLoggerFactory; import org.swisspush.gateleen.core.util.ResponseStatusCodeLogUtil; import org.swisspush.gateleen.core.util.StatusCode; @@ -25,6 +27,8 @@ import java.util.Optional; import java.util.regex.Pattern; +import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory; + /** * Handler class for all Kafka related requests. * @@ -42,20 +46,26 @@ public class KafkaHandler extends ConfigurationResourceConsumer { private final Logger log = LoggerFactory.getLogger(KafkaHandler.class); private final String streamingPath; + private final GateleenExceptionFactory exceptionFactory; private final KafkaProducerRepository repository; private final KafkaTopicExtractor topicExtractor; private final KafkaMessageSender kafkaMessageSender; private final Map properties; + private final KafkaProducerRecordBuilder kafkaProducerRecordBuilder; private KafkaMessageValidator kafkaMessageValidator; private boolean initialized = false; + /** @deprecated Use {@link #builder()} */ + @Deprecated public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) { this(configurationResourceManager, null, repository, kafkaMessageSender, configResourceUri, streamingPath); } + /** @deprecated Use {@link #builder()} */ + @Deprecated public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) { @@ -63,6 +73,8 @@ public KafkaHandler(ConfigurationResourceManager configurationResourceManager, K configResourceUri, streamingPath, new HashMap<>()); } + /** @deprecated Use {@link #builder()} */ + @Deprecated public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { @@ -70,10 +82,29 @@ public KafkaHandler(ConfigurationResourceManager configurationResourceManager, K configResourceUri, streamingPath, properties); } + /** @deprecated Use {@link #builder()} */ + @Deprecated public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { + this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager, + kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath, + properties); + log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!"); + } + public KafkaHandler( + Vertx vertx, + GateleenExceptionFactory exceptionFactory, + ConfigurationResourceManager configurationResourceManager, + KafkaMessageValidator kafkaMessageValidator, + KafkaProducerRepository repository, + KafkaMessageSender kafkaMessageSender, + String configResourceUri, + String streamingPath, + Map properties + ) { super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema"); + this.exceptionFactory = exceptionFactory; this.repository = repository; this.kafkaMessageValidator = kafkaMessageValidator; this.kafkaMessageSender = kafkaMessageSender; @@ -81,6 +112,11 @@ public KafkaHandler(ConfigurationResourceManager configurationResourceManager, K this.properties = properties; this.topicExtractor = new KafkaTopicExtractor(streamingPath); + this.kafkaProducerRecordBuilder = new KafkaProducerRecordBuilder(vertx, exceptionFactory); + } + + public static KafkaHandlerBuilder builder() { + return new KafkaHandlerBuilder(); } public Future initialize() { @@ -140,31 +176,43 @@ public boolean handle(final HttpServerRequest request) { } request.bodyHandler(payload -> { - try { - log.debug("incoming kafka message payload: {}", payload); - final List> kafkaProducerRecords = KafkaProducerRecordBuilder.buildRecords(topic, payload); - maybeValidate(request, kafkaProducerRecords).onComplete(validationEvent -> { - if(validationEvent.succeeded()) { - if(validationEvent.result().isSuccess()) { - kafkaMessageSender.sendMessages(optProducer.get().getLeft(), kafkaProducerRecords).onComplete(event -> { - if(event.succeeded()) { - RequestLoggerFactory.getLogger(KafkaHandler.class, request) - .info("Successfully sent {} message(s) to kafka topic '{}'", kafkaProducerRecords.size(), topic); - respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request); - } else { - respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request); - } - }); - } else { - respondWith(StatusCode.BAD_REQUEST, validationEvent.result().getMessage(), request); - } + log.debug("incoming kafka message payload: {}", payload); + // TODO refactor away this callback-hell (Counts for the COMPLETE method + // surrounding this line, named 'KafkaHandler.handle()', NOT only + // those lines below). + boolean[] isResponseSent = {false}; + kafkaProducerRecordBuilder.buildRecordsAsync(topic, payload).compose((List> kafkaProducerRecords) -> { + var fut = maybeValidate(request, kafkaProducerRecords).compose(validationEvent -> { + if(validationEvent.isSuccess()) { + kafkaMessageSender.sendMessages(optProducer.get().getLeft(), kafkaProducerRecords).onComplete(event -> { + if(event.succeeded()) { + RequestLoggerFactory.getLogger(KafkaHandler.class, request) + .info("Successfully sent {} message(s) to kafka topic '{}'", kafkaProducerRecords.size(), topic); + isResponseSent[0] = true; + respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request); + } else { + isResponseSent[0] = true; + respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request); + } + }); } else { - respondWith(StatusCode.INTERNAL_SERVER_ERROR, validationEvent.cause().getMessage(), request); + isResponseSent[0] = true; + respondWith(StatusCode.BAD_REQUEST, validationEvent.getMessage(), request); } + return Future.succeededFuture(); }); - } catch (ValidationException ve){ - respondWith(StatusCode.BAD_REQUEST, ve.getMessage(), request); - } + assert fut != null; + return fut; + }).onFailure((Throwable ex) -> { + if (ex instanceof ValidationException && !isResponseSent[0]) { + respondWith(StatusCode.BAD_REQUEST, ex.getMessage(), request); + return; + } + log.error("TODO error handling", exceptionFactory.newException(ex)); + if (!isResponseSent[0]) { + respondWith(StatusCode.INTERNAL_SERVER_ERROR, ex.getMessage(), request); + } + }); }); return true; } @@ -189,9 +237,13 @@ public void resourceRemoved(String resourceUri) { private Future maybeValidate(HttpServerRequest request, List> kafkaProducerRecords) { if(kafkaMessageValidator != null) { - return kafkaMessageValidator.validateMessages(request, kafkaProducerRecords); + var fut = kafkaMessageValidator.validateMessages(request, kafkaProducerRecords); + assert fut != null; + return fut; } - return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV)); + var fut = Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV)); + assert fut != null; + return fut; } private void respondWith(StatusCode statusCode, String responseMessage, HttpServerRequest request) { diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandlerBuilder.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandlerBuilder.java new file mode 100644 index 000000000..324e2354c --- /dev/null +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandlerBuilder.java @@ -0,0 +1,85 @@ +package org.swisspush.gateleen.kafka; + +import io.vertx.core.Vertx; +import org.slf4j.Logger; +import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager; +import org.swisspush.gateleen.core.exception.GateleenExceptionFactory; + +import java.util.Map; + +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory; + +public class KafkaHandlerBuilder { + + private static final Logger log = getLogger(KafkaHandlerBuilder.class); + private Vertx vertx; + private GateleenExceptionFactory exceptionFactory; + private ConfigurationResourceManager configurationResourceManager; + private KafkaMessageValidator kafkaMessageValidator; + private KafkaProducerRepository repository; + private KafkaMessageSender kafkaMessageSender; + private String configResourceUri; + private String streamingPath; + private Map properties; + + /** Use {@link KafkaHandler#builder()} */ + KafkaHandlerBuilder() {/**/} + + public KafkaHandler build() { + if (vertx == null) throw new NullPointerException("vertx missing"); + if (exceptionFactory == null) exceptionFactory = newGateleenThriftyExceptionFactory(); + if (repository == null) throw new NullPointerException("kafkaProducerRepository missing"); + if (kafkaMessageSender == null) throw new NullPointerException("kafkaMessageSender missing"); + if (streamingPath == null) log.warn("no 'streamingPath' given. Are you sure you want none?"); + return new KafkaHandler( + vertx, exceptionFactory, configurationResourceManager, kafkaMessageValidator, repository, + kafkaMessageSender, configResourceUri, streamingPath, properties); + } + + public KafkaHandlerBuilder withVertx(Vertx vertx) { + this.vertx = vertx; + return this; + } + + public KafkaHandlerBuilder withExceptionFactory(GateleenExceptionFactory exceptionFactory) { + this.exceptionFactory = exceptionFactory; + return this; + } + + public KafkaHandlerBuilder withConfigurationResourceManager(ConfigurationResourceManager configurationResourceManager) { + this.configurationResourceManager = configurationResourceManager; + return this; + } + + public KafkaHandlerBuilder withKafkaMessageValidator(KafkaMessageValidator kafkaMessageValidator) { + this.kafkaMessageValidator = kafkaMessageValidator; + return this; + } + + public KafkaHandlerBuilder withRepository(KafkaProducerRepository repository) { + this.repository = repository; + return this; + } + + public KafkaHandlerBuilder withKafkaMessageSender(KafkaMessageSender kafkaMessageSender) { + this.kafkaMessageSender = kafkaMessageSender; + return this; + } + + public KafkaHandlerBuilder withConfigResourceUri(String configResourceUri) { + this.configResourceUri = configResourceUri; + return this; + } + + public KafkaHandlerBuilder withStreamingPath(String streamingPath) { + this.streamingPath = streamingPath; + return this; + } + + public KafkaHandlerBuilder withProperties(Map properties) { + this.properties = properties; + return this; + } + +} diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java index 83b3bf62f..724fc3f66 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java @@ -1,15 +1,23 @@ package org.swisspush.gateleen.kafka; +import io.vertx.core.Future; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.json.DecodeException; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.slf4j.Logger; +import org.swisspush.gateleen.core.exception.GateleenExceptionFactory; import org.swisspush.gateleen.validation.ValidationException; import java.util.ArrayList; import java.util.List; +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.currentThread; +import static org.slf4j.LoggerFactory.getLogger; + /** * Creates {@link KafkaProducerRecord}s by parsing the request payload. * @@ -17,10 +25,21 @@ */ class KafkaProducerRecordBuilder { + private static final Logger log = getLogger(KafkaProducerRecordBuilder.class); private static final String RECORDS = "records"; private static final String KEY = "key"; private static final String VALUE = "value"; private static final String HEADERS = "headers"; + private final Vertx vertx; + private final GateleenExceptionFactory exceptionFactory; + + KafkaProducerRecordBuilder( + Vertx vertx, + GateleenExceptionFactory exceptionFactory + ) { + this.vertx = vertx; + this.exceptionFactory = exceptionFactory; + } /** * Builds a list of {@link KafkaProducerRecord}s based on the provided payload. @@ -32,6 +51,38 @@ class KafkaProducerRecordBuilder { * @return A list of {@link KafkaProducerRecord}s created from the provided payload * @throws ValidationException when the payload is not valid (missing properties, wrong types, etc.) */ + Future>> buildRecordsAsync(String topic, Buffer payload) { + return Future.succeededFuture().compose((Void v) -> { + JsonObject payloadObj; + try { + payloadObj = new JsonObject(payload); + } catch (DecodeException de) { + return Future.failedFuture(new ValidationException("Error while parsing payload", de)); + } + JsonArray recordsArray; + try { + recordsArray = payloadObj.getJsonArray(RECORDS); + } catch (ClassCastException cce) { + return Future.failedFuture(new ValidationException("Property '" + RECORDS + "' must be of type JsonArray holding JsonObject objects")); + } + if (recordsArray == null) { + return Future.failedFuture(new ValidationException("Missing 'records' array")); + } + return vertx.executeBlocking(() -> { + long beginEpchMs = currentTimeMillis(); + List> kafkaProducerRecords = new ArrayList<>(recordsArray.size()); + for (int i = 0; i < recordsArray.size(); i++) { + kafkaProducerRecords.add(fromRecordJsonObject(topic, recordsArray.getJsonObject(i))); + } + long durationMs = currentTimeMillis() - beginEpchMs; + log.debug("Serializing JSON did block thread for {}ms", durationMs); + return kafkaProducerRecords; + }); + }); + } + + /** @deprecated Use {@link #buildRecordsAsync(String, Buffer)}. */ + @Deprecated static List> buildRecords(String topic, Buffer payload) throws ValidationException { List> kafkaProducerRecords = new ArrayList<>(); JsonObject payloadObj; diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java index 8ed8cca73..cda346db4 100644 --- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java +++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java @@ -2,6 +2,7 @@ import io.vertx.core.Future; import io.vertx.core.MultiMap; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServerRequest; @@ -11,6 +12,7 @@ import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -27,13 +29,25 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; -import static org.awaitility.Awaitility.await; +import static java.lang.Thread.currentThread; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; import static org.swisspush.gateleen.core.configuration.ConfigurationResourceManager.CONFIG_RESOURCE_CHANGED_ADDRESS; import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenWastefulExceptionFactory; @@ -53,6 +67,7 @@ public class KafkaHandlerTest { private KafkaHandler handler; private MockResourceStorage storage; private GateleenExceptionFactory exceptionFactory = newGateleenWastefulExceptionFactory(); + private Vertx vertxMock; private final String configResourceUri = "/kafka/topicsConfig"; private final String streamingPath = "/kafka/streaming/"; @@ -60,16 +75,35 @@ public class KafkaHandlerTest { private final String CONFIG_RESOURCE = ResourcesUtils.loadResource("testresource_valid_kafka_topic_configuration", true); private final String CONFIG_WILDCARD_RESOURCE = ResourcesUtils.loadResource("testresource_wildcard_kafka_topic_configuration", true); + @AfterClass + public static void afterClass() throws InterruptedException { + // Test needs some cool-down time. Without it, following tests potentially + // will fail with obscure errors. + Thread.sleep(3000); + } + @Before public void setUp() { vertx = Vertx.vertx(); + vertxMock = Mockito.mock(Vertx.class); + doAnswer(inv -> { + String bkup = currentThread().getName(); + currentThread().setName("blah"); + try { + var result = ((Callable) inv.getArguments()[0]).call(); + return Future.succeededFuture(result); + } finally { + currentThread().setName(bkup); + } + }).when(vertxMock).executeBlocking(any(Callable.class)); repository = Mockito.spy(new KafkaProducerRepository(vertx)); kafkaMessageSender = Mockito.mock(KafkaMessageSender.class); messageValidator = Mockito.mock(KafkaMessageValidator.class); storage = new MockResourceStorage(); configurationResourceManager = new ConfigurationResourceManager(vertx, storage, exceptionFactory); - handler = new KafkaHandler(configurationResourceManager, repository, kafkaMessageSender, - configResourceUri, streamingPath); + handler = new KafkaHandler( + vertxMock, exceptionFactory, configurationResourceManager, null, repository, + kafkaMessageSender, configResourceUri, streamingPath, null); when(kafkaMessageSender.sendMessages(any(), any())).thenReturn(Future.succeededFuture()); } @@ -436,8 +470,9 @@ public void handleValidPayloadWithFailingMessageSending(TestContext context){ public void handlePayloadNotPassingValidation(TestContext context){ Async async = context.async(); - handler = new KafkaHandler(configurationResourceManager, messageValidator, repository, kafkaMessageSender, - configResourceUri, streamingPath); + handler = new KafkaHandler( + vertxMock, exceptionFactory, configurationResourceManager, messageValidator, repository, + kafkaMessageSender, configResourceUri, streamingPath, null); when(messageValidator.validateMessages(any(HttpServerRequest.class), any())) .thenReturn(Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV, "Boooom"))); @@ -485,8 +520,9 @@ public void handlePayloadNotPassingValidation(TestContext context){ public void handleErrorWhileValidation(TestContext context){ Async async = context.async(); - handler = new KafkaHandler(configurationResourceManager, messageValidator, repository, kafkaMessageSender, - configResourceUri, streamingPath); + handler = new KafkaHandler( + vertxMock, exceptionFactory, configurationResourceManager, messageValidator, repository, + kafkaMessageSender, configResourceUri, streamingPath, null); when(messageValidator.validateMessages(any(HttpServerRequest.class), any())) .thenReturn(Future.failedFuture("Boooom")); diff --git a/gateleen-logging/pom.xml b/gateleen-logging/pom.xml index 8a6ba5a5f..446f61684 100644 --- a/gateleen-logging/pom.xml +++ b/gateleen-logging/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-logging diff --git a/gateleen-logging/src/main/java/org/swisspush/gateleen/logging/LoggingHandler.java b/gateleen-logging/src/main/java/org/swisspush/gateleen/logging/LoggingHandler.java index dfe9e3204..094d9322d 100755 --- a/gateleen-logging/src/main/java/org/swisspush/gateleen/logging/LoggingHandler.java +++ b/gateleen-logging/src/main/java/org/swisspush/gateleen/logging/LoggingHandler.java @@ -64,6 +64,8 @@ public class LoggingHandler { private static final String ADDRESS = "address"; private static final String DEFAULT = "default"; + public static final String SKIP_LOGGING_HEADER = "x-skip-request-log"; + private Map loggers = new HashMap<>(); private Logger log; @@ -76,6 +78,12 @@ public LoggingHandler(LoggingResourceManager loggingResourceManager, LogAppender this.log = RequestLoggerFactory.getLogger(LoggingHandler.class, request); ((org.apache.logging.log4j.core.Logger) LogManager.getLogger(DEFAULT_LOGGER)).setAdditive(false); boolean stopValidation = false; + + if(request.headers().get(SKIP_LOGGING_HEADER) != null) { + log.info("request will not be logged because of skip log request header"); + return; + } + for (Map payloadFilter : loggingResource.getPayloadFilters()) { if (active || stopValidation) { break; diff --git a/gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestPropertyFilter.java b/gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestPropertyFilter.java index c33d7a24b..291762a86 100644 --- a/gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestPropertyFilter.java +++ b/gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestPropertyFilter.java @@ -81,6 +81,7 @@ private static void logFilterResult(HttpServerRequest request, String filterProp private static void logFilterResult(HttpServerRequest request, String filterPropertyKey, String filterPropertyValue, FilterResult filterResult, boolean noMatchingProperty){ if(FilterResult.NO_MATCH != filterResult) { Logger log = RequestLoggerFactory.getLogger(RequestPropertyFilter.class, request); + if (!log.isInfoEnabled()) return; StringBuilder sb = new StringBuilder("Request to ").append(request.uri()); if (noMatchingProperty) { sb.append(" with no matching filterProperty"); @@ -88,7 +89,7 @@ private static void logFilterResult(HttpServerRequest request, String filterProp sb.append(" with filterProperty ").append(filterPropertyKey).append("=").append(filterPropertyValue); } sb.append(" has FilterResult ").append(filterResult.name()); - log.info(sb.toString()); + log.info("{}", sb); } } } diff --git a/gateleen-merge/pom.xml b/gateleen-merge/pom.xml index 1ac048c6e..a33161ac6 100644 --- a/gateleen-merge/pom.xml +++ b/gateleen-merge/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-merge diff --git a/gateleen-monitoring/pom.xml b/gateleen-monitoring/pom.xml index d5fbb3a8d..4242c1777 100644 --- a/gateleen-monitoring/pom.xml +++ b/gateleen-monitoring/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-monitoring diff --git a/gateleen-packing/pom.xml b/gateleen-packing/pom.xml index 5298ffb32..00d804238 100644 --- a/gateleen-packing/pom.xml +++ b/gateleen-packing/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-packing diff --git a/gateleen-player/pom.xml b/gateleen-player/pom.xml index de137f487..32e1414f7 100644 --- a/gateleen-player/pom.xml +++ b/gateleen-player/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-player diff --git a/gateleen-playground/pom.xml b/gateleen-playground/pom.xml index e1cf9a818..50e9ac122 100644 --- a/gateleen-playground/pom.xml +++ b/gateleen-playground/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-playground diff --git a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java index 612a0b0a7..2559fd4cc 100755 --- a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java +++ b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java @@ -1,6 +1,9 @@ package org.swisspush.gateleen.playground; -import io.vertx.core.*; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpServer; @@ -55,7 +58,6 @@ import org.swisspush.gateleen.monitoring.MonitoringHandler; import org.swisspush.gateleen.monitoring.ResetMetricsController; import org.swisspush.gateleen.qos.QoSHandler; -import org.swisspush.gateleen.queue.queuing.QueueBrowser; import org.swisspush.gateleen.queue.queuing.QueueClient; import org.swisspush.gateleen.queue.queuing.QueueProcessor; import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker; @@ -213,7 +215,8 @@ public void start() { RuleProvider ruleProvider = new RuleProvider(vertx, RULES_ROOT, storage, props); - deltaHandler = new DeltaHandler(redisProvider, selfClient, ruleProvider, true); + deltaHandler = new DeltaHandler(vertx, redisProvider, selfClient, ruleProvider, loggingResourceManager, + logAppenderRepository, true); expansionHandler = new ExpansionHandler(ruleProvider, selfClient, props, ROOT); copyResourceHandler = new CopyResourceHandler(selfClient, exceptionFactory, SERVER_ROOT + "/v1/copy"); monitoringHandler = new MonitoringHandler(vertx, storage, PREFIX, SERVER_ROOT + "/monitoring/rpr"); @@ -332,8 +335,6 @@ public void start() { queueCircuitBreakerConfigurationResourceManager, requestHandler, circuitBreakerPort); new QueueProcessor(vertx, selfClient, monitoringHandler, queueCircuitBreaker); - final QueueBrowser queueBrowser = new QueueBrowser(vertx, SERVER_ROOT + "/queuing", Address.redisquesAddress(), - monitoringHandler); LogController logController = new LogController(); logController.registerLogConfiguratorMBean(JMX_DOMAIN); @@ -365,7 +366,7 @@ public void start() { .delegateHandler(delegateHandler) .customHttpResponseHandler(customHttpResponseHandler) .contentTypeConstraintHandler(contentTypeConstraintHandler) - .build(vertx, redisProvider, Server.class, router, monitoringHandler, queueBrowser); + .build(vertx, redisProvider, Server.class, router, monitoringHandler); Handler routingContextHandlerrNew = runConfig.buildRoutingContextHandler(); selfClient.setRoutingContexttHandler(routingContextHandlerrNew); diff --git a/gateleen-qos/pom.xml b/gateleen-qos/pom.xml index 7bb5aea47..8cca1b4c2 100644 --- a/gateleen-qos/pom.xml +++ b/gateleen-qos/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-qos diff --git a/gateleen-queue/pom.xml b/gateleen-queue/pom.xml index 7cc56c55d..5f42bd0a2 100644 --- a/gateleen-queue/pom.xml +++ b/gateleen-queue/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-queue diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueueBrowser.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueueBrowser.java deleted file mode 100644 index 3bc4ce1af..000000000 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueueBrowser.java +++ /dev/null @@ -1,381 +0,0 @@ -package org.swisspush.gateleen.queue.queuing; - -import org.swisspush.gateleen.monitoring.MonitoringHandler; -import org.swisspush.gateleen.core.util.StatusCode; -import org.swisspush.gateleen.core.util.StringUtils; -import io.vertx.core.AsyncResult; -import io.vertx.ext.web.Router; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import io.vertx.core.Handler; -import io.vertx.core.MultiMap; -import io.vertx.core.Vertx; -import io.vertx.core.eventbus.EventBus; -import io.vertx.core.eventbus.Message; -import io.vertx.core.http.HttpServerRequest; -import io.vertx.core.http.HttpServerResponse; -import io.vertx.core.json.DecodeException; -import io.vertx.core.json.JsonArray; -import io.vertx.core.json.JsonObject; - -import java.nio.charset.Charset; -import java.util.*; - -import static org.swisspush.redisques.util.RedisquesAPI.*; - -/** - * @author https://github.com/lbovet [Laurent Bovet] - * @deprecated Use http api from vertx-redisques (version greater than v2.2.4) directly. See https://github.com/swisspush/vertx-redisques - */ -public class QueueBrowser implements Handler { - - public static final String APPLICATION_JSON = "application/json"; - public static final String CONTENT_TYPE = "content-type"; - public static final String UTF_8 = "UTF-8"; - public static final String PAYLOAD = "payload"; - private static Logger log = LoggerFactory.getLogger(QueueBrowser.class); - private static final int DEFAULT_QUEUE_NUM = 1000; - private static final int DEFAULT_MAX_QUEUEITEM_COUNT = 49; - private static final String SHOW_EMPTY_QUEUES_PARAM = "showEmptyQueues"; - private EventBus eb; - private final String redisquesAddress; - - private Router router; - - public QueueBrowser(Vertx vertx, String prefix, final String redisquesAddress, final MonitoringHandler monitoringHandler) { - this.router = Router.router(vertx); - this.redisquesAddress = redisquesAddress; - eb = vertx.eventBus(); - - // List queuing features - router.get(prefix + "/").handler(ctx -> { - JsonObject result = new JsonObject(); - JsonArray items = new JsonArray(); - items.add("locks/"); - items.add("monitoring"); - items.add("queues/"); - result.put(lastPart(ctx.request().path(), "/"), items); - ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON); - ctx.response().end(result.encode()); - }); - - // List queues - router.get(prefix + "/queues/").handler(ctx -> monitoringHandler.updateQueuesSizesInformation(DEFAULT_QUEUE_NUM, false, new MonitoringHandler.MonitoringCallback() { - @Override - public void onDone(JsonObject result) { - JsonArray array = result.getJsonArray("queues"); - JsonArray resultArray = new JsonArray(); - for (int i = 0; i < array.size(); i++) { - JsonObject arrayEntry = array.getJsonObject(i); - resultArray.add(arrayEntry.getString("name")); - } - result.put(lastPart(ctx.request().path(), "/"), resultArray); - jsonResponse(ctx.response(), result); - } - - @Override - public void onFail(String errorMessage, int statusCode) { - ctx.response().setStatusCode(statusCode); - ctx.response().setStatusMessage(errorMessage); - ctx.response().end(); - } - })); - - // List queue items - router.getWithRegex(prefix + "/queues/[^/]+").handler(ctx -> { - final String queue = lastPart(ctx.request().path(), "/"); - String limitParam = null; - if (ctx.request() != null && ctx.request().params().contains("limit")) { - limitParam = ctx.request().params().get("limit"); - } - eb.request(redisquesAddress, buildGetQueueItemsOperation(queue, limitParam), (Handler>>) reply -> { - JsonObject replyBody = reply.result().body(); - if (OK.equals(replyBody.getString(STATUS))) { - List list = reply.result().body().getJsonArray(VALUE).getList(); - JsonArray items = new JsonArray(); - for (Object item : list.toArray()) { - items.add((String) item); - } - JsonObject result = new JsonObject().put(queue, items); - jsonResponse(ctx.response(), result); - } else { - ctx.response().setStatusCode(StatusCode.NOT_FOUND.getStatusCode()); - ctx.response().end(reply.result().body().getString("message")); - log.warn("Error in routerMatcher.getWithRegEx. Command = '" + (replyBody.getString("command") == null ? "" : replyBody.getString("command")) + "'."); - } - }); - }); - - // Delete all queue items - router.deleteWithRegex(prefix + "/queues/[^/]+").handler(ctx -> { - final String queue = lastPart(ctx.request().path(), "/"); - eb.request(redisquesAddress, buildDeleteAllQueueItemsOperation(queue), reply -> ctx.response().end()); - }); - - // Get item - router.getWithRegex(prefix + "/queues/([^/]+)/[0-9]+").handler(ctx -> { - final String queue = lastPart(ctx.request().path().substring(0, ctx.request().path().length() - 2), "/"); - final int index = Integer.parseInt(lastPart(ctx.request().path(), "/")); - eb.request(redisquesAddress, buildGetQueueItemOperation(queue, index), (Handler>>) reply -> { - JsonObject replyBody = reply.result().body(); - if (OK.equals(replyBody.getString(STATUS))) { - ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON); - ctx.response().end(decode(reply.result().body().getString(VALUE))); - } else { - ctx.response().setStatusCode(StatusCode.NOT_FOUND.getStatusCode()); - ctx.response().setStatusMessage(StatusCode.NOT_FOUND.getStatusMessage()); - ctx.response().end("Not Found"); - } - }); - }); - - // Replace item - router.putWithRegex(prefix + "/queues/([^/]+)/[0-9]+").handler(ctx -> { - final String queue = part(ctx.request().path(), "/", 2); - checkLocked(queue, ctx.request(), aVoid -> { - final int index = Integer.parseInt(lastPart(ctx.request().path(), "/")); - ctx.request().bodyHandler(buffer -> { - String strBuffer = encode(buffer.toString()); - eb.request(redisquesAddress, buildReplaceQueueItemOperation(queue, index, strBuffer), - (Handler>>) reply -> checkReply(reply.result(), ctx.request(), StatusCode.NOT_FOUND)); - }); - }); - }); - - // Delete item - router.deleteWithRegex(prefix + "/queues/([^/]+)/[0-9]+").handler(ctx -> { - final String queue = part(ctx.request().path(), "/", 2); - final int index = Integer.parseInt(lastPart(ctx.request().path(), "/")); - checkLocked(queue, ctx.request(), aVoid -> eb.request(redisquesAddress, buildDeleteQueueItemOperation(queue, index), - (Handler>>) reply -> checkReply(reply.result(), ctx.request(), StatusCode.NOT_FOUND))); - }); - - // Add item - router.postWithRegex(prefix + "/queues/([^/]+)/").handler(ctx -> { - final String queue = part(ctx.request().path(), "/", 1); - ctx.request().bodyHandler(buffer -> { - String strBuffer = encode(buffer.toString()); - eb.request(redisquesAddress, buildAddQueueItemOperation(queue, strBuffer), - (Handler>>) reply -> checkReply(reply.result(), ctx.request(), StatusCode.BAD_REQUEST)); - }); - }); - - // get all locks - router.getWithRegex(prefix + "/locks/").handler(ctx -> eb.request(redisquesAddress, buildGetAllLocksOperation(), - (Handler>>) reply -> { - if (OK.equals(reply.result().body().getString(STATUS))) { - jsonResponse(ctx.response(), reply.result().body().getJsonObject(VALUE)); - } else { - ctx.response().setStatusCode(StatusCode.NOT_FOUND.getStatusCode()); - ctx.response().setStatusMessage(StatusCode.NOT_FOUND.getStatusMessage()); - ctx.response().end("Not Found"); - } - })); - - // add lock - router.putWithRegex(prefix + "/locks/[^/]+").handler(ctx -> { - String queue = lastPart(ctx.request().path(), "/"); - eb.request(redisquesAddress, buildPutLockOperation(queue, extractUser(ctx.request())), - (Handler>>) reply -> checkReply(reply.result(), ctx.request(), StatusCode.BAD_REQUEST)); - }); - - // get single lock - router.getWithRegex(prefix + "/locks/[^/]+").handler(ctx -> { - String queue = lastPart(ctx.request().path(), "/"); - eb.request(redisquesAddress, buildGetLockOperation(queue), (Handler>>) reply -> { - if (OK.equals(reply.result().body().getString(STATUS))) { - ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON); - ctx.response().end(reply.result().body().getString(VALUE)); - } else { - ctx.response().setStatusCode(StatusCode.NOT_FOUND.getStatusCode()); - ctx.response().setStatusMessage(StatusCode.NOT_FOUND.getStatusMessage()); - ctx.response().end(NO_SUCH_LOCK); - } - }); - }); - - // delete single lock - router.deleteWithRegex(prefix + "/locks/[^/]+").handler(ctx -> { - String queue = lastPart(ctx.request().path(), "/"); - eb.request(redisquesAddress, buildDeleteLockOperation(queue), - (Handler>>) reply -> checkReply(reply.result(), ctx.request(), StatusCode.BAD_REQUEST)); - }); - - // Gathering queues monitoring informations - router.getWithRegex(prefix + "/monitoring/[^/]*").handler(ctx -> { - int numQueues = extractNumOfQueuesValue(ctx.request().path(), "/"); - boolean showEmptyQueues = showEmptyQueues(ctx.request().params()); - monitoringHandler.updateQueuesSizesInformation(numQueues, showEmptyQueues, new MonitoringHandler.MonitoringCallback() { - @Override - public void onDone(JsonObject result) { - jsonResponse(ctx.response(), result); - } - - @Override - public void onFail(String errorMessage, int statusCode) { - ctx.response().setStatusCode(statusCode); - ctx.response().setStatusMessage(errorMessage); - ctx.response().end(); - } - }); - }); - } - - private String extractUser(HttpServerRequest request) { - String user = request.headers().get("x-rp-usr"); - if (user == null) { - user = "Unknown"; - } - return user; - } - - private void checkReply(Message reply, HttpServerRequest request, StatusCode statusCode) { - if (OK.equals(reply.body().getString(STATUS))) { - request.response().end(); - } else { - request.response().setStatusCode(statusCode.getStatusCode()); - request.response().setStatusMessage(statusCode.getStatusMessage()); - request.response().end(statusCode.getStatusMessage()); - } - } - - private String lastPart(String source, String separator) { - String[] tokens = source.split(separator); - return tokens[tokens.length - 1]; - } - - private String part(String source, String separator, int pos) { - String[] tokens = source.split(separator); - return tokens[tokens.length - pos]; - } - - private boolean showEmptyQueues(MultiMap requestParams) { - String showEmptyQueues = StringUtils.getStringOrEmpty(requestParams.get(SHOW_EMPTY_QUEUES_PARAM)); - return showEmptyQueues.equalsIgnoreCase("true") || showEmptyQueues.equals("1"); - } - - private int getMaxQueueItemCountIndex(HttpServerRequest request) { - int defaultMaxIndex = DEFAULT_MAX_QUEUEITEM_COUNT; - if (request != null && request.params().contains("limit")) { - String limitParam = request.params().get("limit"); - try { - int maxIndex = Integer.parseInt(limitParam) - 1; - if (maxIndex >= 0) { - defaultMaxIndex = maxIndex; - } - } catch (NumberFormatException ex) { - log.warn("Invalid limit parameter '{}' configured for max queue item count. Using default {}", limitParam, DEFAULT_MAX_QUEUEITEM_COUNT); - } - } - return defaultMaxIndex; - } - - private int extractNumOfQueuesValue(String source, String separator) { - String numberOfQueuesStr = lastPart(source, separator); - Integer numQueues; - try { - numQueues = Integer.parseInt(numberOfQueuesStr); - } catch (Exception e) { - numQueues = DEFAULT_QUEUE_NUM; - log.warn("Queue size monitoring url was used with wrong or without number of queues param. Using default {}", DEFAULT_QUEUE_NUM); - } - - return numQueues; - } - - public void handle(HttpServerRequest request) { - router.handle(request); - } - - /** - * Encode the payload from a payloadString or payloadObjet. - * - * @param decoded decoded - * @return String - */ - public String encode(String decoded) { - JsonObject object = new JsonObject(decoded); - - String payloadString; - JsonObject payloadObject = object.getJsonObject("payloadObject"); - if (payloadObject != null) { - payloadString = payloadObject.encode(); - } else { - payloadString = object.getString("payloadString"); - } - - if (payloadString != null) { - object.put(PAYLOAD, payloadString.getBytes(Charset.forName(UTF_8))); - object.remove("payloadString"); - object.remove("payloadObject"); - } - - // update the content-length - int length = 0; - if (object.containsKey(PAYLOAD)) { - length = object.getBinary(PAYLOAD).length; - } - JsonArray newHeaders = new JsonArray(); - for (Object headerObj : object.getJsonArray("headers")) { - JsonArray header = (JsonArray) headerObj; - String key = header.getString(0); - if (key.equalsIgnoreCase("content-length")) { - JsonArray contentLengthHeader = new JsonArray(); - contentLengthHeader.add("Content-Length"); - contentLengthHeader.add(Integer.toString(length)); - newHeaders.add(contentLengthHeader); - } else { - newHeaders.add(header); - } - } - object.put("headers", newHeaders); - - return object.toString(); - } - - /** - * Decode the payload if the content-type is text or json. - * - * @param encoded encoded - * @return String - */ - public String decode(String encoded) { - JsonObject object = new JsonObject(encoded); - JsonArray headers = object.getJsonArray("headers"); - for (Object headerObj : headers) { - JsonArray header = (JsonArray) headerObj; - String key = header.getString(0); - String value = header.getString(1); - if (key.equalsIgnoreCase(CONTENT_TYPE) && (value.contains("text/") || value.contains(APPLICATION_JSON))) { - try { - object.put("payloadObject", new JsonObject(new String(object.getBinary(PAYLOAD), Charset.forName(UTF_8)))); - } catch (DecodeException e) { - object.put("payloadString", new String(object.getBinary(PAYLOAD), Charset.forName(UTF_8))); - } - object.remove(PAYLOAD); - break; - } - } - return object.toString(); - } - - private void checkLocked(String queue, final HttpServerRequest request, final Handler handler) { - request.pause(); - eb.request(redisquesAddress, buildGetLockOperation(queue), (Handler>>) reply -> { - if (NO_SUCH_LOCK.equals(reply.result().body().getString(STATUS))) { - request.resume(); - request.response().setStatusCode(StatusCode.CONFLICT.getStatusCode()); - request.response().setStatusMessage("Queue must be locked to perform this operation"); - request.response().end("Queue must be locked to perform this operation"); - } else { - handler.handle(null); - request.resume(); - } - }); - } - - private void jsonResponse(HttpServerResponse response, JsonObject object) { - response.putHeader(CONTENT_TYPE, APPLICATION_JSON); - response.end(object.encode()); - } -} diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/QueueProcessorTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/QueueProcessorTest.java index c05d9d3e3..b9c0c0c19 100644 --- a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/QueueProcessorTest.java +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/QueueProcessorTest.java @@ -217,7 +217,7 @@ private void setHttpClientRespondStatusCode(StatusCode statusCode) { String url = (String) invocation.getArguments()[1]; LocalHttpClientRequest request = new LocalHttpClientRequest(httpMethod, url, vertx, event -> { - }, exceptionFactory, new LocalHttpServerResponse(vertx)) { + }, exceptionFactory, new LocalHttpServerResponse(vertx, exceptionFactory)) { @Override public HttpClientRequest response(Handler> handler) { FastFaiHttpClientResponse response = new FastFaiHttpClientResponse() { diff --git a/gateleen-routing/pom.xml b/gateleen-routing/pom.xml index 07b9b4dbe..85ab49b46 100644 --- a/gateleen-routing/pom.xml +++ b/gateleen-routing/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-routing diff --git a/gateleen-runconfig/pom.xml b/gateleen-runconfig/pom.xml index 8d4324df2..c6e4daa80 100644 --- a/gateleen-runconfig/pom.xml +++ b/gateleen-runconfig/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-runconfig diff --git a/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java b/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java index 2d9043015..a6f9f273c 100755 --- a/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java +++ b/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java @@ -35,7 +35,6 @@ import org.swisspush.gateleen.monitoring.MonitoringHandler; import org.swisspush.gateleen.packing.PackingHandler; import org.swisspush.gateleen.qos.QoSHandler; -import org.swisspush.gateleen.queue.queuing.QueueBrowser; import org.swisspush.gateleen.queue.queuing.QueuingHandler; import org.swisspush.gateleen.queue.queuing.circuitbreaker.configuration.QueueCircuitBreakerConfigurationResourceManager; import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitter; @@ -101,7 +100,6 @@ public class RunConfig { private final ExpansionHandler expansionHandler; private final DeltaHandler deltaHandler; private final MonitoringHandler monitoringHandler; - private final QueueBrowser queueBrowser; private final Authorizer authorizer; private final CopyResourceHandler copyResourceHandler; private final QoSHandler qosHandler; @@ -113,7 +111,7 @@ public class RunConfig { private final CustomHttpResponseHandler customHttpResponseHandler; public RunConfig(Vertx vertx, RedisProvider redisProvider, Class verticleClass, Router router, MonitoringHandler monitoringHandler, - QueueBrowser queueBrowser, CORSHandler corsHandler, SchedulerResourceManager schedulerResourceManager, + CORSHandler corsHandler, SchedulerResourceManager schedulerResourceManager, ValidationResourceManager validationResourceManager, LoggingResourceManager loggingResourceManager, ConfigurationResourceManager configurationResourceManager, QueueCircuitBreakerConfigurationResourceManager queueCircuitBreakerConfigurationResourceManager, @@ -129,7 +127,6 @@ public RunConfig(Vertx vertx, RedisProvider redisProvider, Class verticleClass, this.verticleClass = verticleClass; this.router = router; this.monitoringHandler = monitoringHandler; - this.queueBrowser = queueBrowser; this.corsHandler = corsHandler; this.schedulerResourceManager = schedulerResourceManager; this.validationResourceManager = validationResourceManager; @@ -164,7 +161,6 @@ private RunConfig(RunConfigBuilder builder) { builder.verticleClass, builder.router, builder.monitoringHandler, - builder.queueBrowser, builder.corsHandler, builder.schedulerResourceManager, builder.validationResourceManager, @@ -222,7 +218,6 @@ public static class RunConfigBuilder { private Class verticleClass; private Router router; private MonitoringHandler monitoringHandler; - private QueueBrowser queueBrowser; private CORSHandler corsHandler; private SchedulerResourceManager schedulerResourceManager; private ValidationResourceManager validationResourceManager; @@ -377,13 +372,12 @@ public RunConfigBuilder cacheHandler(CacheHandler cacheHandler) { return this; } - public RunConfig build(Vertx vertx, RedisProvider redisProvider, Class verticleClass, Router router, MonitoringHandler monitoringHandler, QueueBrowser queueBrowser) { + public RunConfig build(Vertx vertx, RedisProvider redisProvider, Class verticleClass, Router router, MonitoringHandler monitoringHandler) { this.vertx = vertx; this.redisProvider = redisProvider; this.verticleClass = verticleClass; this.router = router; this.monitoringHandler = monitoringHandler; - this.queueBrowser = queueBrowser; return new RunConfig(this); } } @@ -633,10 +627,6 @@ private void handleRequest(final RoutingContext ctx) { if (copyResourceHandler != null && copyResourceHandler.handle(request)) { return; } - if (request.path().startsWith(SERVER_ROOT + "/queuing/")) { - queueBrowser.handle(request); - return; - } if (hookHandler != null && hookHandler.handle(ctx)) { return; } diff --git a/gateleen-scheduler/pom.xml b/gateleen-scheduler/pom.xml index 1fcfbf677..25367dc43 100644 --- a/gateleen-scheduler/pom.xml +++ b/gateleen-scheduler/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-scheduler diff --git a/gateleen-security/pom.xml b/gateleen-security/pom.xml index 0fc3067c4..9173fc016 100644 --- a/gateleen-security/pom.xml +++ b/gateleen-security/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-security diff --git a/gateleen-test/pom.xml b/gateleen-test/pom.xml index 8611850de..79bc6c1a2 100644 --- a/gateleen-test/pom.xml +++ b/gateleen-test/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-test jar diff --git a/gateleen-test/src/test/java/org/swisspush/gateleen/AbstractTest.java b/gateleen-test/src/test/java/org/swisspush/gateleen/AbstractTest.java index b02253c5d..20f2480b4 100755 --- a/gateleen-test/src/test/java/org/swisspush/gateleen/AbstractTest.java +++ b/gateleen-test/src/test/java/org/swisspush/gateleen/AbstractTest.java @@ -57,7 +57,6 @@ import org.swisspush.gateleen.monitoring.MonitoringHandler; import org.swisspush.gateleen.monitoring.ResetMetricsController; import org.swisspush.gateleen.qos.QoSHandler; -import org.swisspush.gateleen.queue.queuing.QueueBrowser; import org.swisspush.gateleen.queue.queuing.QueueClient; import org.swisspush.gateleen.queue.queuing.QueueProcessor; import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker; @@ -105,6 +104,7 @@ public abstract class AbstractTest { protected static final int REDIS_PORT = 6379; protected static final int STORAGE_PORT = 8989; private static final int CIRCUIT_BREAKER_REST_API_PORT = 7014; + protected static final int REDISQUES_API_PORT = 7015; /** * Basis configuration for RestAssured @@ -219,8 +219,6 @@ public static void setupBeforeClass(TestContext context) { CIRCUIT_BREAKER_REST_API_PORT); new QueueProcessor(vertx, selfClient, monitoringHandler, queueCircuitBreaker); - final QueueBrowser queueBrowser = new QueueBrowser(vertx, SERVER_ROOT + "/queuing", - Address.redisquesAddress(), monitoringHandler); new CustomRedisMonitor(vertx, redisProvider, "main", "rest-storage", 10).start(); Router router = Router.builder() @@ -249,7 +247,7 @@ public static void setupBeforeClass(TestContext context) { RunConfig.with() .cacheHandler(cacheHandler) .corsHandler(new CORSHandler()) - .deltaHandler(new DeltaHandler(redisProvider, selfClient, ruleProvider)) + .deltaHandler(new DeltaHandler(vertx, redisProvider, selfClient, ruleProvider, loggingResourceManager, logAppenderRepository)) .expansionHandler(new ExpansionHandler(vertx, storage, selfClient, props, ROOT, RULES_ROOT)) .hookHandler(hookHandler) .qosHandler(qosHandler) @@ -266,7 +264,7 @@ public static void setupBeforeClass(TestContext context) { .delegateHandler(delegateHandler) .mergeHandler(mergeHandler) .customHttpResponseHandler(customHttpResponseHandler) - .build(vertx, redisProvider, AbstractTest.class, router, monitoringHandler, queueBrowser); + .build(vertx, redisProvider, AbstractTest.class, router, monitoringHandler); Handler routingContextHandlerrNew = runConfig.buildRoutingContextHandler(); selfClient.setRoutingContexttHandler(routingContextHandlerrNew); mainServer = vertx.createHttpServer(); diff --git a/gateleen-test/src/test/java/org/swisspush/gateleen/TestUtils.java b/gateleen-test/src/test/java/org/swisspush/gateleen/TestUtils.java index 0100ae71b..b2fef8f7e 100644 --- a/gateleen-test/src/test/java/org/swisspush/gateleen/TestUtils.java +++ b/gateleen-test/src/test/java/org/swisspush/gateleen/TestUtils.java @@ -8,10 +8,8 @@ import org.swisspush.gateleen.hook.HookHandler; import org.swisspush.gateleen.hook.HookTriggerType; -import java.util.Arrays; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; import java.util.regex.Pattern; import static org.awaitility.Awaitility.await; @@ -114,6 +112,23 @@ public static void waitSomeTime(int seconds) { } } + /** + * Adds the routing rule for queuing to the given routing rules. + * + * @param rules current rules. + */ + public static JsonObject addRoutingRuleQueuing(JsonObject rules) { + + JsonObject queuing = createRoutingRule(ImmutableMap.of( + "description", + "vertx-redisques API", + "url", + "http://localhost:" + AbstractTest.REDISQUES_API_PORT + "/queuing/$1")); + + rules = addRoutingRule(rules, AbstractTest.SERVER_ROOT + "/queuing/(.*)", queuing); + return rules; + } + /** * Adds the routing rule for cleanup to the given routing rules. * @@ -348,9 +363,29 @@ public static void registerListener(final String requestUrl, final String target public static void registerListener(final String requestUrl, final String target, String[] methods, String filter, Integer queueExpireTime, Map staticHeaders, HookTriggerType type, String headersFilter) { + registerListener(requestUrl, target, methods, filter, queueExpireTime, staticHeaders, type, headersFilter, null); + } + + /** + * Registers a listener with a filter, static headers, a queue header and a event trigger. + * + * @param requestUrl + * @param target + * @param methods + * @param filter + * @param queueExpireTime + * @param staticHeaders + * @param queueHeader + */ + public static void registerListener(final String requestUrl, final String target, String[] methods, String filter, + Integer queueExpireTime, Map staticHeaders, HookTriggerType type, + String headersFilter, String queueHeader) { JsonObject route = new JsonObject(); route.put("destination", target); + if(queueHeader != null) { + route.put("headers", new JsonArray(List.of(new JsonObject().put("header", "x-queue").put("value", queueHeader)))); + } if(methods != null){ route.put("methods", new JsonArray(Arrays.asList(methods))); } diff --git a/gateleen-test/src/test/java/org/swisspush/gateleen/hook/ListenerTest.java b/gateleen-test/src/test/java/org/swisspush/gateleen/hook/ListenerTest.java index 2ef58ec1b..4282fe13e 100755 --- a/gateleen-test/src/test/java/org/swisspush/gateleen/hook/ListenerTest.java +++ b/gateleen-test/src/test/java/org/swisspush/gateleen/hook/ListenerTest.java @@ -3,7 +3,6 @@ import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.junit.WireMockRule; import com.google.common.collect.ImmutableMap; -import org.awaitility.Awaitility; import io.restassured.RestAssured; import io.restassured.http.Header; import io.restassured.http.Headers; @@ -11,6 +10,7 @@ import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.awaitility.Awaitility; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -23,8 +23,8 @@ import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; -import static org.awaitility.Awaitility.await; import static io.restassured.RestAssured.*; +import static org.awaitility.Awaitility.await; import static org.awaitility.Durations.FIVE_SECONDS; import static org.awaitility.Durations.TEN_SECONDS; import static org.hamcrest.CoreMatchers.*; @@ -63,6 +63,7 @@ private void initRoutingRules() { // add a routing JsonObject rules = new JsonObject(); rules = TestUtils.addRoutingRuleMainStorage(rules); + rules = TestUtils.addRoutingRuleQueuing(rules); rules = TestUtils.addRoutingRuleHooks(rules); TestUtils.putRoutingRules(rules); } @@ -678,7 +679,7 @@ public void testHookQueueExpiryOverride(TestContext context) { String requestUrl = sourceUrl + TestUtils.getHookListenersUrlSuffix() + "testservice" + "/" + 1; String targetUrl = targetUrlBase + "/result"; - String queueName = HookHandler.LISTENER_QUEUE_PREFIX + "-" + hookHandler.getUniqueListenerId(SERVER_ROOT + requestUrl); + String queueName = "hook-queue-expiry-test"; String putRequest = sourceUrl + "/test1"; String putTarget = targetUrl + "/test1"; @@ -689,7 +690,8 @@ public void testHookQueueExpiryOverride(TestContext context) { // ---- // register Listener - TestUtils.registerListener(requestUrl, targetUrl, null, null, 5); + TestUtils.registerListener(requestUrl, targetUrl, null, null, 5, + null, null, null, queueName); // lock queue String lockRequestUrl = "queuing/locks/" + queueName; diff --git a/gateleen-test/src/test/java/org/swisspush/gateleen/queue/QueueTest.java b/gateleen-test/src/test/java/org/swisspush/gateleen/queue/QueueTest.java index de9f8007d..cecb34778 100644 --- a/gateleen-test/src/test/java/org/swisspush/gateleen/queue/QueueTest.java +++ b/gateleen-test/src/test/java/org/swisspush/gateleen/queue/QueueTest.java @@ -14,8 +14,8 @@ import static org.awaitility.Awaitility.await; import static io.restassured.RestAssured.*; -import static junit.framework.Assert.assertFalse; -import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.awaitility.Durations.*; import static org.hamcrest.CoreMatchers.*; @@ -30,6 +30,7 @@ public void init() { // add a routing JsonObject rules = new JsonObject(); rules = TestUtils.addRoutingRuleMainStorage(rules); + rules = TestUtils.addRoutingRuleQueuing(rules); rules = TestUtils.addRoutingRuleCleanup(rules); TestUtils.putRoutingRules(rules); } diff --git a/gateleen-test/src/test/java/org/swisspush/gateleen/queue/expiry/ResourceQueueExpiryTest.java b/gateleen-test/src/test/java/org/swisspush/gateleen/queue/expiry/ResourceQueueExpiryTest.java index 06f3c173f..37d605386 100644 --- a/gateleen-test/src/test/java/org/swisspush/gateleen/queue/expiry/ResourceQueueExpiryTest.java +++ b/gateleen-test/src/test/java/org/swisspush/gateleen/queue/expiry/ResourceQueueExpiryTest.java @@ -1,6 +1,7 @@ package org.swisspush.gateleen.queue.expiry; import io.restassured.RestAssured; +import io.vertx.core.json.JsonObject; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; @@ -36,6 +37,14 @@ public void initRestAssured() { RestAssured.requestSpecification.basePath(SERVER_ROOT + "/"); } + private void initRoutingRules() { + // add a routing + JsonObject rules = new JsonObject(); + rules = TestUtils.addRoutingRuleMainStorage(rules); + rules = TestUtils.addRoutingRuleQueuing(rules); + TestUtils.putRoutingRules(rules); + } + /** * Checks if the GET request of the * given resource returns the wished body. @@ -55,6 +64,7 @@ private void checkGETBodyWithAwait(final String requestUrl, final String body) { public void testQueueExpiry(TestContext context) { Async async = context.async(); delete(); + initRoutingRules(); System.out.println("testQueueExpiry"); @@ -165,6 +175,7 @@ public void testQueueExpiry(TestContext context) { public void testQueueExpiryOverride_requestIsExpired_beforeRegularExpiryTime(TestContext context) { Async async = context.async(); delete(); + initRoutingRules(); System.out.println("testQueueExpiry"); @@ -213,6 +224,7 @@ public void testQueueExpiryOverride_requestIsExpired_beforeRegularExpiryTime(Tes public void testQueueExpiryOverride_requestIsNotExpired_regularResourceExpiry(TestContext context) { Async async = context.async(); delete(); + initRoutingRules(); System.out.println("testQueueExpiry"); diff --git a/gateleen-testhelper/pom.xml b/gateleen-testhelper/pom.xml index 05882f11d..ec439ba52 100644 --- a/gateleen-testhelper/pom.xml +++ b/gateleen-testhelper/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-testhelper diff --git a/gateleen-user/pom.xml b/gateleen-user/pom.xml index 4f69e7b6d..92cc2d300 100644 --- a/gateleen-user/pom.xml +++ b/gateleen-user/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-user diff --git a/gateleen-validation/pom.xml b/gateleen-validation/pom.xml index 391d4c537..5afcddce0 100644 --- a/gateleen-validation/pom.xml +++ b/gateleen-validation/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT gateleen-validation diff --git a/pom.xml b/pom.xml index 5cecd732f..19ae1663a 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.7-SNAPSHOT + 2.1.8-SNAPSHOT pom gateleen Middleware library based on Vert.x to build advanced JSON/REST communication servers