Skip to content

Commit

Permalink
#615 collect circuit breaker metrics [final commit]
Browse files Browse the repository at this point in the history
  • Loading branch information
mcweba committed Dec 23, 2024
1 parent 28ed0c0 commit 197836d
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ public Future<Long> unlockSampleQueues() {
failedFutures.add(event1.cause().getMessage());
}
if (futureCounter.get() == 0) {
if (failedFutures.size() > 0) {
if (!failedFutures.isEmpty()) {
promise.fail("The following queues could not be unlocked: " + failedFutures);
} else {
promise.complete((long) queuesToUnlock.size());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.swisspush.gateleen.queue.queuing.circuitbreaker.impl;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -247,7 +246,7 @@ public Future<Void> closeAllCircuits() {
Future<Void> closeOpenCircuitsFuture = closeCircuitsByKey(STORAGE_OPEN_CIRCUITS);
Future<Void> closeHalfOpenCircuitsFuture = closeCircuitsByKey(STORAGE_HALFOPEN_CIRCUITS);

CompositeFuture.all(closeOpenCircuitsFuture, closeHalfOpenCircuitsFuture).onComplete(event -> {
Future.all(closeOpenCircuitsFuture, closeHalfOpenCircuitsFuture).onComplete(event -> {
if (event.succeeded()) {
promise.complete();
} else {
Expand All @@ -262,14 +261,14 @@ private Future<Void> closeCircuitsByKey(String key) {
Promise<Void> promise = Promise.promise();
redisProvider.redis().onSuccess(redisAPI -> redisAPI.smembers(key, event -> {
if (event.succeeded()) {
List<Future> promises = new ArrayList<>();
List<Future<Void>> promises = new ArrayList<>();
for (Response circuit : event.result()) {
promises.add(closeCircuit(circuit.toString(), false));
}
if (promises.size() == 0) {
if (promises.isEmpty()) {
promise.complete();
} else {
CompositeFuture.all(promises).onComplete(event1 -> {
Future.all(promises).onComplete(event1 -> {
if (event1.succeeded()) {
promise.complete();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import static org.swisspush.gateleen.core.util.LockUtil.calcLockExpiry;
import static org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.RedisQueueCircuitBreakerStorage.*;

/**
* Class responsible for collecting metrics for the Queue Circuit Breaker.
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public class QueueCircuitBreakerMetricsCollector {

private final Logger log = LoggerFactory.getLogger(QueueCircuitBreakerMetricsCollector.class);
Expand All @@ -40,6 +45,16 @@ public class QueueCircuitBreakerMetricsCollector {
private final Map<String, AtomicInteger> circuitStateMap = new HashMap<>();
private final Map<String, AtomicInteger> circuitFailRatioMap = new HashMap<>();

/**
* Constructor for QueueCircuitBreakerMetricsCollector.
*
* @param vertx Vertx instance
* @param lock Lock instance
* @param queueCircuitBreakerStorage Storage for circuit breaker data
* @param meterRegistry Meter registry for metrics
* @param exceptionFactory Exception factory
* @param metricCollectionIntervalSeconds Interval for metric collection in seconds
*/
public QueueCircuitBreakerMetricsCollector(Vertx vertx, Lock lock, QueueCircuitBreakerStorage queueCircuitBreakerStorage,
MeterRegistry meterRegistry, GateleenExceptionFactory exceptionFactory,
long metricCollectionIntervalSeconds) {
Expand All @@ -50,15 +65,19 @@ public QueueCircuitBreakerMetricsCollector(Vertx vertx, Lock lock, QueueCircuitB

this.metricCollectionIntervalMs = metricCollectionIntervalSeconds * 1000;

vertx.setPeriodic(metricCollectionIntervalMs, event -> {
collectMetrics().onFailure(event1 -> log.error("Could not collect metrics. Message: {}", event1.getMessage()));
});
vertx.setPeriodic(metricCollectionIntervalMs, event -> collectMetrics()
.onFailure(event1 -> log.error("Could not collect metrics. Message: {}", event1.getMessage())));
}

/**
* Collects metrics for the Queue Circuit Breaker.
*
* @return Future representing the completion of the metric collection
*/
public Future<Void> collectMetrics() {
log.debug("Collecting metrics");
Promise<Void> promise = Promise.promise();
final String token = createToken(COLLECT_METRICS_TASK_LOCK);
final String token = createToken();
acquireLock(lock, COLLECT_METRICS_TASK_LOCK, token, calcLockExpiry(metricCollectionIntervalMs), log).onComplete(lockEvent -> {
if (lockEvent.succeeded()) {
if (lockEvent.result()) {
Expand Down Expand Up @@ -116,8 +135,8 @@ private void publishMetric(String metricName, QueueCircuitState queueCircuitStat
getCircuitFailRatioMeter(metricName).set(failRatio);
}

private String createToken(String appendix) {
return Address.instanceAddress() + "_" + System.currentTimeMillis() + "_" + appendix;
private String createToken() {
return Address.instanceAddress() + "_" + System.currentTimeMillis() + "_" + COLLECT_METRICS_TASK_LOCK;
}

private AtomicInteger getCircuitStateMeter(String metricName) {
Expand All @@ -143,9 +162,6 @@ private AtomicInteger getCircuitFailRatioMeter(String metricName) {
}

private Integer circuitStateToValue(QueueCircuitState queueCircuitState) {
if (queueCircuitState == null) {
return null;
}
switch (queueCircuitState) {
case CLOSED:
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
int result = pattern.hashCode();
result = 31 * result + circuitHash.hashCode();
return result;
return Objects.hash(pattern, circuitHash);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,46 @@ public void testCollectMetricsSuccess(TestContext context) {
});
}

@Test
public void testCollectMetricsSuccessUpdatedMetrics(TestContext context) {
Async async = context.async();

JsonObject allCircuits = new JsonObject();
allCircuits.put("5645745t43f54gf", createCircuitInfo("closed", "M1", 0));

Mockito.when(queueCircuitBreakerStorage.getAllCircuits())
.thenReturn(Future.succeededFuture(allCircuits));

collector.collectMetrics().onComplete(event -> {
context.assertTrue(event.succeeded());

// verify status gauge
context.assertEquals(0.0, getStatusGauge("M1").value(), "Status of circuit M1 should be 0.0 -> CLOSED");

// verify fail ratio gauge
context.assertEquals(0.0, getFailRatioGauge("M1").value(), "Fail ratio of circuit M1 should be 0.0");

allCircuits.put("5645745t43f54gf", createCircuitInfo("half_open", "M1", 55));

Mockito.when(queueCircuitBreakerStorage.getAllCircuits())
.thenReturn(Future.succeededFuture(allCircuits));

collector.collectMetrics().onComplete(event1 -> {
context.assertTrue(event1.succeeded());

// verify status gauge
context.assertEquals(1.0, getStatusGauge("M1").value(), "Status of circuit M1 should be 1.0 -> HALF_OPEN");

// verify fail ratio gauge
context.assertEquals(55.0, getFailRatioGauge("M1").value(), "Fail ratio of circuit M1 should be 55.0");

verify(lock, Mockito.times(2)).releaseLock(eq(COLLECT_METRICS_TASK_LOCK), anyString());

async.complete();
});
});
}

@Test
public void testCollectMetricsStorageFailure(TestContext context) {
Async async = context.async();
Expand All @@ -102,6 +142,15 @@ public void testCollectMetricsStorageFailure(TestContext context) {
collector.collectMetrics().onComplete(event -> {
context.assertTrue(event.failed());

context.assertFalse(statusGaugeExists("M1"));
context.assertFalse(failRatioGaugeExists("M1"));

context.assertFalse(statusGaugeExists("M2"));
context.assertFalse(failRatioGaugeExists("M2"));

context.assertFalse(statusGaugeExists("M3"));
context.assertFalse(failRatioGaugeExists("M3"));

verify(lock, Mockito.times(1)).releaseLock(eq(COLLECT_METRICS_TASK_LOCK), anyString());

async.complete();
Expand All @@ -117,6 +166,8 @@ public void testCollectMetricsIgnoreEntries(TestContext context) {
allCircuits.put("12rt878665f54gf", createCircuitInfo("foobar_state", "M2", 35));
allCircuits.put("8789jz45745t4g8", createCircuitInfo(null, "M3", 100));
allCircuits.put("8634662437g894c", createCircuitInfo("open", null, 100));
allCircuits.put("125645t43f5465", createCircuitInfo("half_open", "M5", 20));
allCircuits.put("f6545745t43f5465", createCircuitInfo("open", "M6", 90));

Mockito.when(queueCircuitBreakerStorage.getAllCircuits())
.thenReturn(Future.succeededFuture(allCircuits));
Expand All @@ -129,24 +180,23 @@ public void testCollectMetricsIgnoreEntries(TestContext context) {
context.assertFalse(statusGaugeExists("M2"));
context.assertFalse(statusGaugeExists("M3"));
context.assertFalse(statusGaugeExists("M4"));
context.assertEquals(1.0, getStatusGauge("M5").value(), "Status of circuit M5 should be 1.0 -> HALF_OPEN");
context.assertEquals(2.0, getStatusGauge("M6").value(), "Status of circuit M6 should be 2.0 -> OPEN");

// verify fail ratio gauges
context.assertEquals(0.0, getFailRatioGauge("M1").value(), "Fail ratio of circuit M1 should be 0.0");
context.assertFalse(failRatioGaugeExists("M2"));
context.assertFalse(failRatioGaugeExists("M3"));
context.assertFalse(failRatioGaugeExists("M4"));
context.assertEquals(20.0, getFailRatioGauge("M5").value(), "Fail ratio of circuit M5 should be 20.0");
context.assertEquals(90.0, getFailRatioGauge("M6").value(), "Fail ratio of circuit M6 should be 90.0");

verify(lock, Mockito.times(1)).releaseLock(eq(COLLECT_METRICS_TASK_LOCK), anyString());

async.complete();
});
}

public boolean gaugeExists(String metricName) {
meterRegistry.get(CIRCUIT_BREAKER_STATUS_METRIC).tag("metricName", metricName).gauge();
return true;
}

@Test
public void testCollectMetricsFailedToAcquireLock(TestContext context) {
Async async = context.async();
Expand All @@ -157,6 +207,7 @@ public void testCollectMetricsFailedToAcquireLock(TestContext context) {
context.assertTrue(event.failed());
context.assertEquals("Boooom", event.cause().getMessage());
Mockito.verifyNoInteractions(queueCircuitBreakerStorage);
verify(lock, Mockito.never()).releaseLock(eq(COLLECT_METRICS_TASK_LOCK), anyString());
async.complete();
});
}
Expand Down

0 comments on commit 197836d

Please sign in to comment.