diff --git a/gateleen-core/src/main/java/org/swisspush/gateleen/core/util/LockUtil.java b/gateleen-core/src/main/java/org/swisspush/gateleen/core/util/LockUtil.java
index 705e03d2..85f8e214 100644
--- a/gateleen-core/src/main/java/org/swisspush/gateleen/core/util/LockUtil.java
+++ b/gateleen-core/src/main/java/org/swisspush/gateleen/core/util/LockUtil.java
@@ -80,4 +80,14 @@ public void releaseLock(Lock lockImpl, String lock, String token, Logger log){
}
});
}
+
+ /**
+ * Calculate the lock expiry time. This is a simple helper to work with the lock expiry time.
+ *
+ * @param taskInterval the interval of the task
+ * @return the calculated lock expiry time
+ */
+ public static long calcLockExpiry(long taskInterval) {
+ return taskInterval <= 1 ? 1 : taskInterval / 2;
+ }
}
diff --git a/gateleen-core/src/test/java/org/swisspush/gateleen/core/util/LockUtilTest.java b/gateleen-core/src/test/java/org/swisspush/gateleen/core/util/LockUtilTest.java
index bb059875..293010da 100644
--- a/gateleen-core/src/test/java/org/swisspush/gateleen/core/util/LockUtilTest.java
+++ b/gateleen-core/src/test/java/org/swisspush/gateleen/core/util/LockUtilTest.java
@@ -33,6 +33,20 @@ public void setUp(){
lockUtil = new LockUtil(newGateleenWastefulExceptionFactory());
}
+ @Test
+ public void testCalculateLockExpiry(TestContext context) {
+ context.assertEquals(1L, LockUtil.calcLockExpiry(1));
+ context.assertEquals(1L, LockUtil.calcLockExpiry(0));
+ context.assertEquals(1L, LockUtil.calcLockExpiry(-20));
+ context.assertEquals(1L, LockUtil.calcLockExpiry(2));
+ context.assertEquals(1L, LockUtil.calcLockExpiry(3));
+ context.assertEquals(2L, LockUtil.calcLockExpiry(4));
+ context.assertEquals(4L, LockUtil.calcLockExpiry(8));
+ context.assertEquals(32L, LockUtil.calcLockExpiry(64));
+ context.assertEquals(750L, LockUtil.calcLockExpiry(1500));
+ context.assertEquals(5000L, LockUtil.calcLockExpiry(10001));
+ }
+
@Test
public void testAcquireLockWithoutLockImplementationDefined(TestContext context) {
Async async = context.async();
diff --git a/gateleen-playground/pom.xml b/gateleen-playground/pom.xml
index 8b2ccea3..010778a0 100644
--- a/gateleen-playground/pom.xml
+++ b/gateleen-playground/pom.xml
@@ -103,6 +103,21 @@
gateleen-kafka
${project.version}
+
+ io.vertx
+ vertx-micrometer-metrics
+ ${vertx.version}
+
+
+ io.micrometer
+ micrometer-core
+ ${micrometer.version}
+
+
+ io.micrometer
+ micrometer-registry-prometheus
+ ${micrometer.version}
+
org.swisspush
redisques
diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/RedisQueueCircuitBreakerStorage.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/RedisQueueCircuitBreakerStorage.java
index 7833d4fd..39a1f245 100644
--- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/RedisQueueCircuitBreakerStorage.java
+++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/RedisQueueCircuitBreakerStorage.java
@@ -38,6 +38,7 @@ public class RedisQueueCircuitBreakerStorage implements QueueCircuitBreakerStora
public static final String STORAGE_OPEN_CIRCUITS = STORAGE_PREFIX + "open-circuits";
public static final String STORAGE_QUEUES_TO_UNLOCK = STORAGE_PREFIX + "queues-to-unlock";
public static final String FIELD_STATE = "state";
+ public static final String FIELD_STATUS = "status";
public static final String FIELD_FAILRATIO = "failRatio";
public static final String FIELD_CIRCUIT = "circuit";
public static final String FIELD_METRICNAME = "metric";
@@ -111,7 +112,7 @@ public Future getQueueCircuitInformation(String circuitHash) {
String circuit = Objects.toString(event.result().get(2), null);
String metric = Objects.toString(event.result().get(3), null);
JsonObject result = new JsonObject();
- result.put("status", state.name().toLowerCase());
+ result.put(FIELD_STATUS, state.name().toLowerCase());
JsonObject info = new JsonObject();
if (failRatioStr != null) {
info.put(FIELD_FAILRATIO, Integer.valueOf(failRatioStr));
diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/monitoring/QueueCircuitBreakerMetricsCollector.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/monitoring/QueueCircuitBreakerMetricsCollector.java
new file mode 100644
index 00000000..82bfecb0
--- /dev/null
+++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/monitoring/QueueCircuitBreakerMetricsCollector.java
@@ -0,0 +1,160 @@
+package org.swisspush.gateleen.queue.queuing.circuitbreaker.monitoring;
+
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.vertx.core.*;
+import io.vertx.core.json.JsonObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.swisspush.gateleen.core.exception.GateleenExceptionFactory;
+import org.swisspush.gateleen.core.lock.Lock;
+import org.swisspush.gateleen.core.util.Address;
+import org.swisspush.gateleen.core.util.LockUtil;
+import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreakerStorage;
+import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+import static org.swisspush.gateleen.core.util.LockUtil.acquireLock;
+import static org.swisspush.gateleen.core.util.LockUtil.calcLockExpiry;
+import static org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.RedisQueueCircuitBreakerStorage.*;
+
+public class QueueCircuitBreakerMetricsCollector {
+
+ private final Logger log = LoggerFactory.getLogger(QueueCircuitBreakerMetricsCollector.class);
+
+ private final Lock lock;
+ private final LockUtil lockUtil;
+
+ public static final String COLLECT_METRICS_TASK_LOCK = "collectCircuitBreakerMetrics";
+ public static final String CIRCUIT_BREAKER_STATUS_METRIC = "gateleen.circuitbreaker.status";
+ public static final String CIRCUIT_BREAKER_FAILRATIO_METRIC = "gateleen.circuitbreaker.failratio";
+
+ private final QueueCircuitBreakerStorage queueCircuitBreakerStorage;
+ private final MeterRegistry meterRegistry;
+ private final long metricCollectionIntervalMs;
+
+ private final Map circuitStateMap = new HashMap<>();
+ private final Map circuitFailRatioMap = new HashMap<>();
+
+ public QueueCircuitBreakerMetricsCollector(Vertx vertx, Lock lock, QueueCircuitBreakerStorage queueCircuitBreakerStorage,
+ MeterRegistry meterRegistry, GateleenExceptionFactory exceptionFactory,
+ long metricCollectionIntervalSeconds) {
+ this.lock = lock;
+ this.lockUtil = new LockUtil(exceptionFactory);
+ this.queueCircuitBreakerStorage = queueCircuitBreakerStorage;
+ this.meterRegistry = meterRegistry;
+
+ this.metricCollectionIntervalMs = metricCollectionIntervalSeconds * 1000;
+
+ vertx.setPeriodic(metricCollectionIntervalMs, event -> {
+ collectMetrics().onFailure(event1 -> log.error("Could not collect metrics. Message: {}", event1.getMessage()));
+ });
+ }
+
+ public Future collectMetrics() {
+ log.debug("Collecting metrics");
+ Promise promise = Promise.promise();
+ final String token = createToken(COLLECT_METRICS_TASK_LOCK);
+ acquireLock(lock, COLLECT_METRICS_TASK_LOCK, token, calcLockExpiry(metricCollectionIntervalMs), log).onComplete(lockEvent -> {
+ if (lockEvent.succeeded()) {
+ if (lockEvent.result()) {
+ handleMetricsCollection(token).onComplete(event -> {
+ if (event.succeeded()) {
+ promise.complete();
+ } else {
+ promise.fail(event.cause());
+ }
+ });
+ } else {
+ promise.complete();
+ }
+ } else {
+ log.error("Could not acquire lock '{}'. Message: {}", COLLECT_METRICS_TASK_LOCK, lockEvent.cause().getMessage());
+ promise.fail(lockEvent.cause().getMessage());
+ }
+ });
+ return promise.future();
+ }
+
+ private Future handleMetricsCollection(String token) {
+ return queueCircuitBreakerStorage.getAllCircuits().compose((Function>) entries -> {
+ extractMetricsFromCircuitsObject(entries);
+ return Future.succeededFuture();
+ }).andThen(event -> lockUtil.releaseLock(lock, COLLECT_METRICS_TASK_LOCK, token, log));
+ }
+
+ private void extractMetricsFromCircuitsObject(JsonObject circuits) {
+ circuits.stream().forEach(entry -> {
+ String circuitName = entry.getKey();
+ JsonObject circuitValue = (JsonObject) entry.getValue();
+ QueueCircuitState queueCircuitState = QueueCircuitState.fromString(circuitValue.getString(FIELD_STATUS), null);
+ if (queueCircuitState == null) {
+ log.warn("No status found for circuit '{}'", circuitName);
+ return;
+ }
+
+ JsonObject infos = circuitValue.getJsonObject("infos");
+ if (infos != null) {
+ String metric = infos.getString(FIELD_METRICNAME);
+ Integer failRatio = infos.getInteger(FIELD_FAILRATIO);
+ if (metric != null && failRatio != null) {
+ publishMetric(metric, queueCircuitState, failRatio);
+ }
+ }
+ });
+ }
+
+ private void publishMetric(String metricName, QueueCircuitState queueCircuitState, int failRatio) {
+ Integer stateValue = circuitStateToValue(queueCircuitState);
+ if(stateValue != null) {
+ getCircuitStateMeter(metricName).set(stateValue);
+ }
+ getCircuitFailRatioMeter(metricName).set(failRatio);
+ }
+
+ private String createToken(String appendix) {
+ return Address.instanceAddress() + "_" + System.currentTimeMillis() + "_" + appendix;
+ }
+
+ private AtomicInteger getCircuitStateMeter(String metricName) {
+ return circuitStateMap.computeIfAbsent(metricName, key -> {
+ AtomicInteger newMeterValue = new AtomicInteger();
+ Gauge.builder(CIRCUIT_BREAKER_STATUS_METRIC, newMeterValue, AtomicInteger::get)
+ .description("Status of the circuit, 0=CLOSED, 1=HALF_OPEN, 2=OPEN")
+ .tag("metricName", metricName)
+ .register(meterRegistry);
+ return newMeterValue;
+ });
+ }
+
+ private AtomicInteger getCircuitFailRatioMeter(String metricName) {
+ return circuitFailRatioMap.computeIfAbsent(metricName, key -> {
+ AtomicInteger newMeterValue = new AtomicInteger();
+ Gauge.builder(CIRCUIT_BREAKER_FAILRATIO_METRIC, newMeterValue, AtomicInteger::get)
+ .description("Fail ratio of the circuit in percentage")
+ .tag("metricName", metricName)
+ .register(meterRegistry);
+ return newMeterValue;
+ });
+ }
+
+ private Integer circuitStateToValue(QueueCircuitState queueCircuitState) {
+ if (queueCircuitState == null) {
+ return null;
+ }
+ switch (queueCircuitState) {
+ case CLOSED:
+ return 0;
+ case HALF_OPEN:
+ return 1;
+ case OPEN:
+ return 2;
+ default:
+ return null;
+ }
+ }
+}
diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/monitoring/QueueCircuitBreakerMetricsCollectorTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/monitoring/QueueCircuitBreakerMetricsCollectorTest.java
new file mode 100644
index 00000000..046d8862
--- /dev/null
+++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/monitoring/QueueCircuitBreakerMetricsCollectorTest.java
@@ -0,0 +1,220 @@
+package org.swisspush.gateleen.queue.queuing.circuitbreaker.monitoring;
+
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.search.MeterNotFoundException;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.Timeout;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.swisspush.gateleen.core.lock.Lock;
+import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreakerStorage;
+
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.verify;
+import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenWastefulExceptionFactory;
+import static org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.RedisQueueCircuitBreakerStorage.*;
+import static org.swisspush.gateleen.queue.queuing.circuitbreaker.monitoring.QueueCircuitBreakerMetricsCollector.*;
+
+/**
+ * Tests for the {@link QueueCircuitBreakerMetricsCollector} class
+ *
+ * @author https://github.com/mcweba [Marc-Andre Weber]
+ */
+@RunWith(VertxUnitRunner.class)
+public class QueueCircuitBreakerMetricsCollectorTest {
+
+ private Vertx vertx;
+ private Lock lock;
+ private QueueCircuitBreakerStorage queueCircuitBreakerStorage;
+ private MeterRegistry meterRegistry;
+ private QueueCircuitBreakerMetricsCollector collector;
+
+ @org.junit.Rule
+ public Timeout rule = Timeout.seconds(50);
+
+ @Before
+ public void setUp() {
+ vertx = Vertx.vertx();
+
+ lock = Mockito.mock(Lock.class);
+ Mockito.when(lock.acquireLock(anyString(), anyString(), anyLong())).thenReturn(Future.succeededFuture(Boolean.TRUE));
+ Mockito.when(lock.releaseLock(anyString(), anyString())).thenReturn(Future.succeededFuture(Boolean.TRUE));
+
+ meterRegistry = new SimpleMeterRegistry();
+ queueCircuitBreakerStorage = Mockito.mock(QueueCircuitBreakerStorage.class);
+
+ collector = new QueueCircuitBreakerMetricsCollector(vertx, lock, queueCircuitBreakerStorage, meterRegistry,
+ newGateleenWastefulExceptionFactory(), 5);
+ }
+
+ @Test
+ public void testCollectMetricsSuccess(TestContext context) {
+ Async async = context.async();
+
+ JsonObject allCircuits = new JsonObject();
+ allCircuits.put("5645745t43f54gf", createCircuitInfo("closed", "M1", 0));
+ allCircuits.put("12rt878665f54gf", createCircuitInfo("half_open", "M2", 35));
+ allCircuits.put("8789jz45745t43f54gf", createCircuitInfo("open", "M3", 100));
+
+ Mockito.when(queueCircuitBreakerStorage.getAllCircuits())
+ .thenReturn(Future.succeededFuture(allCircuits));
+
+ collector.collectMetrics().onComplete(event -> {
+ context.assertTrue(event.succeeded());
+
+ // verify status gauges
+ context.assertEquals(0.0, getStatusGauge("M1").value(), "Status of circuit M1 should be 0.0 -> CLOSED");
+ context.assertEquals(1.0, getStatusGauge("M2").value(), "Status of circuit M2 should be 0.0 -> HALF_OPEN");
+ context.assertEquals(2.0, getStatusGauge("M3").value(), "Status of circuit M3 should be 0.0 -> OPEN");
+
+ // verify fail ratio gauges
+ context.assertEquals(0.0, getFailRatioGauge("M1").value(), "Fail ratio of circuit M1 should be 0.0");
+ context.assertEquals(35.0, getFailRatioGauge("M2").value(), "Fail ratio of circuit M2 should be 35.0");
+ context.assertEquals(100.0, getFailRatioGauge("M3").value(), "Fail ratio of circuit M3 should be 100.0");
+
+ verify(lock, Mockito.times(1)).releaseLock(eq(COLLECT_METRICS_TASK_LOCK), anyString());
+
+ async.complete();
+ });
+ }
+
+ @Test
+ public void testCollectMetricsStorageFailure(TestContext context) {
+ Async async = context.async();
+
+ JsonObject allCircuits = new JsonObject();
+ allCircuits.put("5645745t43f54gf", createCircuitInfo("closed", "M1", 0));
+ allCircuits.put("12rt878665f54gf", createCircuitInfo("half_open", "M2", 35));
+ allCircuits.put("8789jz45745t43f54gf", createCircuitInfo("open", "M3", 100));
+
+ Mockito.when(queueCircuitBreakerStorage.getAllCircuits())
+ .thenReturn(Future.failedFuture("Boooom"));
+
+ collector.collectMetrics().onComplete(event -> {
+ context.assertTrue(event.failed());
+
+ verify(lock, Mockito.times(1)).releaseLock(eq(COLLECT_METRICS_TASK_LOCK), anyString());
+
+ async.complete();
+ });
+ }
+
+ @Test
+ public void testCollectMetricsIgnoreEntries(TestContext context) {
+ Async async = context.async();
+
+ JsonObject allCircuits = new JsonObject();
+ allCircuits.put("5645745t43f5465", createCircuitInfo("closed", "M1", 0));
+ allCircuits.put("12rt878665f54gf", createCircuitInfo("foobar_state", "M2", 35));
+ allCircuits.put("8789jz45745t4g8", createCircuitInfo(null, "M3", 100));
+ allCircuits.put("8634662437g894c", createCircuitInfo("open", null, 100));
+
+ Mockito.when(queueCircuitBreakerStorage.getAllCircuits())
+ .thenReturn(Future.succeededFuture(allCircuits));
+
+ collector.collectMetrics().onComplete(event -> {
+ context.assertTrue(event.succeeded());
+
+ // verify status gauges
+ context.assertEquals(0.0, getStatusGauge("M1").value(), "Status of circuit M1 should be 0.0 -> CLOSED");
+ context.assertFalse(statusGaugeExists("M2"));
+ context.assertFalse(statusGaugeExists("M3"));
+ context.assertFalse(statusGaugeExists("M4"));
+
+ // verify fail ratio gauges
+ context.assertEquals(0.0, getFailRatioGauge("M1").value(), "Fail ratio of circuit M1 should be 0.0");
+ context.assertFalse(failRatioGaugeExists("M2"));
+ context.assertFalse(failRatioGaugeExists("M3"));
+ context.assertFalse(failRatioGaugeExists("M4"));
+
+ verify(lock, Mockito.times(1)).releaseLock(eq(COLLECT_METRICS_TASK_LOCK), anyString());
+
+ async.complete();
+ });
+ }
+
+ public boolean gaugeExists(String metricName) {
+ meterRegistry.get(CIRCUIT_BREAKER_STATUS_METRIC).tag("metricName", metricName).gauge();
+ return true;
+ }
+
+ @Test
+ public void testCollectMetricsFailedToAcquireLock(TestContext context) {
+ Async async = context.async();
+
+ Mockito.when(lock.acquireLock(anyString(), anyString(), anyLong())).thenReturn(Future.failedFuture("Boooom"));
+
+ collector.collectMetrics().onComplete(event -> {
+ context.assertTrue(event.failed());
+ context.assertEquals("Boooom", event.cause().getMessage());
+ Mockito.verifyNoInteractions(queueCircuitBreakerStorage);
+ async.complete();
+ });
+ }
+
+ @Test
+ public void testCollectMetricsLockAlreadyAcquired(TestContext context) {
+ Async async = context.async();
+
+ Mockito.when(lock.acquireLock(anyString(), anyString(), anyLong())).thenReturn(Future.succeededFuture(Boolean.FALSE));
+
+ collector.collectMetrics().onComplete(event -> {
+ context.assertTrue(event.succeeded());
+ Mockito.verifyNoInteractions(queueCircuitBreakerStorage);
+ async.complete();
+ });
+ }
+
+ private Gauge getStatusGauge(String metricName){
+ return meterRegistry.get(CIRCUIT_BREAKER_STATUS_METRIC).tag("metricName", metricName).gauge();
+ }
+
+ private boolean statusGaugeExists(String metricName){
+ try {
+ meterRegistry.get(CIRCUIT_BREAKER_STATUS_METRIC).tag("metricName", metricName).gauge();
+ return true;
+ } catch (MeterNotFoundException ex) {
+ return false;
+ }
+ }
+
+ private Gauge getFailRatioGauge(String metricName){
+ return meterRegistry.get(CIRCUIT_BREAKER_FAILRATIO_METRIC).tag("metricName", metricName).gauge();
+ }
+
+ private boolean failRatioGaugeExists(String metricName){
+ try {
+ meterRegistry.get(CIRCUIT_BREAKER_FAILRATIO_METRIC).tag("metricName", metricName).gauge();
+ return true;
+ } catch (MeterNotFoundException ex) {
+ return false;
+ }
+ }
+
+ private JsonObject createCircuitInfo(String status, String metricName, Integer failRatio){
+ JsonObject circuit = new JsonObject();
+ JsonObject infos = new JsonObject().put(FIELD_CIRCUIT, "/some/circuit/url");
+ circuit.put("infos", infos);
+
+ if(status != null) {
+ circuit.put(FIELD_STATUS, status);
+ }
+ if(metricName != null) {
+ infos.put(FIELD_METRICNAME, metricName);
+ }
+ if(failRatio != null) {
+ infos.put(FIELD_FAILRATIO, failRatio);
+ }
+
+ return circuit;
+ }
+}