Skip to content

Commit

Permalink
#654 collecting kafka metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mcweba committed Jan 8, 2025
1 parent a5b4a31 commit 1607db9
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 9 deletions.
28 changes: 27 additions & 1 deletion gateleen-kafka/README_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -448,4 +448,30 @@ This sequence diagrams shows the process when messages are sent to Kafka:
│ <────────────────────│ │ │ │ │ │
│ │ │ │ │ │ │
│ └┬┘ │ │ │ │
```
```

### Micrometer metrics
The kafka feature is monitored with micrometer. The following metrics are available:
* gateleen_kafka_send_success_messages_total
* gateleen_kafka_send_fail_messages_total
* gateleen_kafka_validation_fail_messages_total

Additional tags are provided to specify the topic.

Example metrics:

```
# HELP gateleen_kafka_send_success_messages_total Amount of successfully sent kafka messages
# TYPE gateleen_kafka_send_success_messages_total counter
gateleen_kafka_send_success_messages_total{topic="my-topic-1",} 0.0
gateleen_kafka_send_fail_messages_total{topic="my-topic-1",} 455.0
gateleen_kafka_send_success_messages_total{topic="my-topic-2",} 256.0
gateleen_kafka_send_success_messages_total{topic="my-topic-3",} 6.0
gateleen_kafka_send_fail_messages_total{topic="my-topic-4",} 222.0
# HELP gateleen_kafka_validation_fail_messages_total Amount of failed kafka message validations
# TYPE gateleen_kafka_validation_fail_messages_total counter
gateleen_kafka_validation_fail_messages_total{topic="my-topic-6",} 212.0
```

To enable `gateleen_kafka_send_success_messages_total` and `gateleen_kafka_send_fail_messages_total` metrics, set a `MeterRegistry` instance by calling `setMeterRegistry(MeterRegistry meterRegistry)` method in `KafkaMessageSender` class.
To enable `gateleen_kafka_validation_fail_messages_total` metrics, set a `MeterRegistry` instance by calling `setMeterRegistry(MeterRegistry meterRegistry)` method in `KafkaMessageValidator` class.
4 changes: 4 additions & 0 deletions gateleen-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
<artifactId>gateleen-validation</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<!-- TEST dependencies -->
<dependency>
<groupId>org.swisspush.gateleen</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class KafkaHandler extends ConfigurationResourceConsumer {
private final KafkaMessageSender kafkaMessageSender;
private final Map<String, Object> properties;
private final KafkaProducerRecordBuilder kafkaProducerRecordBuilder;
private KafkaMessageValidator kafkaMessageValidator;
private final KafkaMessageValidator kafkaMessageValidator;

private boolean initialized = false;

Expand Down Expand Up @@ -140,8 +140,6 @@ private Future<Void> initializeKafkaConfiguration(Buffer configuration) {
Promise<Void> promise = Promise.promise();
final List<KafkaConfiguration> kafkaConfigurations = KafkaConfigurationParser.parse(configuration, properties);



repository.closeAll().future().onComplete((event -> {
for (KafkaConfiguration kafkaConfiguration : kafkaConfigurations) {
repository.addKafkaProducer(kafkaConfiguration);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.swisspush.gateleen.kafka;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand All @@ -9,7 +11,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static java.util.stream.Collectors.toList;
Expand All @@ -18,6 +22,22 @@ public class KafkaMessageSender {

private static final Logger log = LoggerFactory.getLogger(KafkaMessageSender.class);

private MeterRegistry meterRegistry;
private final Map<String, Counter> successSendCounterMap = new HashMap<>();
private final Map<String, Counter> failSendCounterMap = new HashMap<>();

public static final String SUCCESS_SEND_MESSAGES_METRIC = "gateleen.kafka.send.success.messages";
public static final String SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of successfully sent kafka messages";
public static final String FAIL_SEND_MESSAGES_METRIC = "gateleen.kafka.send.fail.messages";
public static final String FAIL_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of failed kafka message sendings";
public static final String TOPIC = "topic";

public void setMeterRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
successSendCounterMap.clear();
failSendCounterMap.clear();
}

Future<Void> sendMessages(KafkaProducer<String, String> kafkaProducer,
List<KafkaProducerRecord<String, String>> messages) {
Promise<Void> promise = Promise.promise();
Expand All @@ -44,7 +64,45 @@ private Future<Void> sendMessage(KafkaProducer<String, String> kafkaProducer, Ka
return kafkaProducer.send(message).compose((Function<RecordMetadata, Future<Void>>) metadata -> {
log.debug("Message successfully sent to kafka topic '{}' on partition {} with offset {}. Timestamp: {}",
metadata.getTopic(), metadata.getPartition(), metadata.getOffset(), metadata.getTimestamp());
incrementSuccessCount(metadata.getTopic());
return Future.succeededFuture();
}).onFailure(throwable -> log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable));
}).onFailure(throwable -> {
log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable);
incrementFailCount1(message.topic());
});
}

private void incrementSuccessCount(String topic) {
Counter counter = successSendCounterMap.get(topic);
if(counter != null) {
counter.increment();
return;
}

if(meterRegistry != null) {
Counter newCounter = Counter.builder(SUCCESS_SEND_MESSAGES_METRIC)
.description(SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION)
.tag(TOPIC, topic)
.register(meterRegistry);
newCounter.increment();
successSendCounterMap.put(topic, newCounter);
}
}

private void incrementFailCount1(String topic) {
Counter counter = failSendCounterMap.get(topic);
if(counter != null) {
counter.increment();
return;
}

if(meterRegistry != null) {
Counter newCounter = Counter.builder(FAIL_SEND_MESSAGES_METRIC)
.description(FAIL_SEND_MESSAGES_METRIC_DESCRIPTION)
.tag(TOPIC, topic)
.register(meterRegistry);
newCounter.increment();
failSendCounterMap.put(topic, newCounter);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.swisspush.gateleen.kafka;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
Expand All @@ -14,6 +16,7 @@
import org.swisspush.gateleen.validation.ValidationUtil;
import org.swisspush.gateleen.validation.Validator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,11 +29,23 @@ public class KafkaMessageValidator {
private final Validator validator;
private final Logger log = LoggerFactory.getLogger(KafkaHandler.class);

private MeterRegistry meterRegistry;
private final Map<String, Counter> failedToValidateCounterMap = new HashMap<>();

public static final String FAIL_VALIDATION_MESSAGES_METRIC = "gateleen.kafka.validation.fail.messages";
public static final String FAIL_VALIDATION_MESSAGES_METRIC_DESCRIPTION = "Amount of failed kafka message validations";
public static final String TOPIC = "topic";

public KafkaMessageValidator(ValidationResourceManager validationResourceManager, Validator validator) {
this.validationResourceManager = validationResourceManager;
this.validator = validator;
}

public void setMeterRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
failedToValidateCounterMap.clear();
}

public Future<ValidationResult> validateMessages(HttpServerRequest request, List<KafkaProducerRecord<String, String>> kafkaProducerRecords) {
if (kafkaProducerRecords.isEmpty()) {
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
Expand All @@ -49,6 +64,8 @@ public Future<ValidationResult> validateMessages(HttpServerRequest request, List

SchemaLocation schemaLocation = optionalSchemaLocation.get();

String topic = kafkaProducerRecords.get(0).topic();

@SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
List<Future> futures = kafkaProducerRecords.stream()
.map(message -> validator.validateWithSchemaLocation(schemaLocation, Buffer.buffer(message.value()), log))
Expand All @@ -57,10 +74,31 @@ public Future<ValidationResult> validateMessages(HttpServerRequest request, List
return CompositeFuture.all(futures).compose(compositeFuture -> {
for (Object o : compositeFuture.list()) {
if (((ValidationResult) o).getValidationStatus() != ValidationStatus.VALIDATED_POSITIV) {
incrementValidationFailCount(topic);
return Future.succeededFuture((ValidationResult) o);
}
}
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
}, throwable -> {
incrementValidationFailCount(topic);
return Future.failedFuture(throwable);
});
}

private void incrementValidationFailCount(String topic) {
Counter counter = failedToValidateCounterMap.get(topic);
if(counter != null) {
counter.increment();
return;
}

if(meterRegistry != null) {
Counter newCounter = Counter.builder(FAIL_VALIDATION_MESSAGES_METRIC)
.description(FAIL_VALIDATION_MESSAGES_METRIC_DESCRIPTION)
.tag(TOPIC, topic)
.register(meterRegistry);
newCounter.increment();
failedToValidateCounterMap.put(topic, newCounter);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.swisspush.gateleen.kafka;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
Expand Down Expand Up @@ -35,11 +37,14 @@ public class KafkaMessageSenderTest {

private KafkaProducer<String, String> producer;
private KafkaMessageSender kafkaMessageSender;
private SimpleMeterRegistry meterRegistry;

@Before
public void setUp() {
producer = Mockito.mock(KafkaProducer.class);
kafkaMessageSender = new KafkaMessageSender();
meterRegistry = new SimpleMeterRegistry();
kafkaMessageSender.setMeterRegistry(meterRegistry);
}

@Test
Expand All @@ -57,6 +62,9 @@ public void sendSingleMessage(TestContext context) throws ValidationException {
});

Mockito.verify(producer, times(1)).send(eq(records.get(0)));

Counter counter = meterRegistry.get(KafkaMessageSender.SUCCESS_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
context.assertEquals(1.0, counter.count(), "Counter for topic `myTopic` should have been incremented by 1");
}

@Test
Expand All @@ -74,6 +82,9 @@ public void sendSingleMessageWithoutKey(TestContext context) throws ValidationEx
});

Mockito.verify(producer, times(1)).send(eq(records.get(0)));

Counter counter = meterRegistry.get(KafkaMessageSender.SUCCESS_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
context.assertEquals(1.0, counter.count(), "Counter for topic `myTopic` should have been incremented by 1");
}

@Test
Expand All @@ -98,6 +109,9 @@ public void sendMultipleMessages(TestContext context) throws ValidationException
context.assertEquals(records.get(0), recordCaptor.getAllValues().get(0));
context.assertEquals(records.get(1), recordCaptor.getAllValues().get(1));
context.assertEquals(records.get(2), recordCaptor.getAllValues().get(2));

Counter counter = meterRegistry.get(KafkaMessageSender.SUCCESS_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
context.assertEquals(3.0, counter.count(), "Counter for topic `myTopic` should have been incremented by 3");
}

@Test
Expand All @@ -124,6 +138,12 @@ public void sendMultipleMessagesWithFailingMessage(TestContext context) throws V
context.assertEquals(records.get(0), recordCaptor.getAllValues().get(0));
context.assertEquals(records.get(1), recordCaptor.getAllValues().get(1));
context.assertEquals(records.get(2), recordCaptor.getAllValues().get(2));

Counter successCounter = meterRegistry.get(KafkaMessageSender.SUCCESS_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
context.assertEquals(2.0, successCounter.count(), "Success counter for topic `myTopic` should have been incremented by 2");

Counter failCounter = meterRegistry.get(KafkaMessageSender.FAIL_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
context.assertEquals(1.0, failCounter.count(), "Fail counter for topic `myTopic` should have been incremented by 1");
}

private JsonObject buildSingleRecordPayload(String key){
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package org.swisspush.gateleen.kafka;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerResponse;
Expand Down Expand Up @@ -36,18 +37,18 @@
@RunWith(VertxUnitRunner.class)
public class KafkaMessageValidatorTest {

private Vertx vertx;
private KafkaMessageValidator messageValidator;
private Validator validator;
private ValidationResourceManager validationResourceManager;
private SimpleMeterRegistry meterRegistry;

@Before
public void setUp() {
vertx = Vertx.vertx();
validationResourceManager = Mockito.mock(ValidationResourceManager.class);
validator = Mockito.mock(Validator.class);

meterRegistry = new SimpleMeterRegistry();
messageValidator = new KafkaMessageValidator(validationResourceManager, validator);
messageValidator.setMeterRegistry(meterRegistry);
}

@Test
Expand Down Expand Up @@ -141,6 +142,10 @@ public void testValidateMessagesMatchingValidationResourceEntry(TestContext cont
context.assertEquals(ValidationStatus.COULD_NOT_VALIDATE, event.result().getValidationStatus());
verify(validationResourceManager, times(2)).getValidationResource();
verify(validator, times(1)).validateWithSchemaLocation(any(), any(), any());

Counter counter = meterRegistry.get(KafkaMessageValidator.FAIL_VALIDATION_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, "myOtherTopic").counter();
context.assertEquals(1.0, counter.count(), "Counter for topic `myOtherTopic` should have been incremented by 1");

async.complete();
});

Expand Down Expand Up @@ -176,6 +181,10 @@ public void testValidateMessagesWithFailInValidator(TestContext context) {
context.assertTrue(event.failed());
verify(validationResourceManager, times(2)).getValidationResource();
verify(validator, times(2)).validateWithSchemaLocation(any(), any(), any());

Counter counter = meterRegistry.get(KafkaMessageValidator.FAIL_VALIDATION_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, "myOtherTopic").counter();
context.assertEquals(1.0, counter.count(), "Counter for topic `myOtherTopic` should have been incremented by 1");

async.complete();
});

Expand Down Expand Up @@ -217,4 +226,47 @@ public void testValidateMultipleMessages(TestContext context) {
});

}

@Test
public void testValidateMultipleMessagesWithValidatedNegative(TestContext context) {
Async async = context.async();

ValidationResource validationResource = new ValidationResource();
validationResource.addResource(
Map.of(ValidationResource.METHOD_PROPERTY, "PUT",
ValidationResource.URL_PROPERTY, "/path/to/myTopic",
ValidationResource.SCHEMA_LOCATION_PROPERTY, "/path/to/schema"
));

when(validationResourceManager.getValidationResource()).thenReturn(validationResource);

HttpServerResponse response = spy(new StreamingResponse(new HeadersMultiMap()));
StreamingRequest request = new StreamingRequest(HttpMethod.PUT, "/path/to/myTopic", "", new HeadersMultiMap(), response);

String payload_1 = new JsonObject().encode();
String payload_2 = new JsonObject().put("foo", "bar").encode();
String payload_3 = new JsonObject().put("abc", "def").encode();
List<KafkaProducerRecord<String, String>> kafkaProducerRecords = new ArrayList<>();
kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_1));
kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_2));
kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_3));

when(validator.validateWithSchemaLocation(any(), any(), any())).thenReturn(
Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV)));
when(validator.validateWithSchemaLocation(any(), eq(Buffer.buffer(payload_2)), any())).thenReturn(
Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV)));

messageValidator.validateMessages(request, kafkaProducerRecords).onComplete(event -> {
context.assertTrue(event.succeeded());
context.assertEquals(ValidationStatus.VALIDATED_NEGATIV, event.result().getValidationStatus());
verify(validationResourceManager, times(2)).getValidationResource();
verify(validator, times(3)).validateWithSchemaLocation(any(), any(), any());

Counter counter = meterRegistry.get(KafkaMessageValidator.FAIL_VALIDATION_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, "myOtherTopic").counter();
context.assertEquals(1.0, counter.count(), "Counter for topic `myOtherTopic` should have been incremented by 1");

async.complete();
});

}
}

0 comments on commit 1607db9

Please sign in to comment.