Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Siembol common: reworking storm kafka writer (#525)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariannovotny authored Feb 3, 2022
1 parent 76a9366 commit 8d2905a
Show file tree
Hide file tree
Showing 43 changed files with 320 additions and 353 deletions.
4 changes: 2 additions & 2 deletions alerting/alerting-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId>
<version>2.2.12-SNAPSHOT</version>
<version>2.2.13-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
Expand All @@ -35,7 +35,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.2.12-SNAPSHOT</version>
<version>2.2.13-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
4 changes: 2 additions & 2 deletions alerting/alerting-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId>
<version>2.2.12-SNAPSHOT</version>
<version>2.2.13-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
Expand All @@ -23,7 +23,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.2.12-SNAPSHOT</version>
<version>2.2.13-SNAPSHOT</version>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions alerting/alerting-storm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId>
<version>2.2.12-SNAPSHOT</version>
<version>2.2.13-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
Expand Down Expand Up @@ -51,7 +51,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.2.12-SNAPSHOT</version>
<version>2.2.13-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package uk.co.gresearch.siembol.alerts.storm;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;

import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.co.gresearch.siembol.alerts.common.AlertingEngineType;
import uk.co.gresearch.siembol.common.error.ErrorMessage;
import uk.co.gresearch.siembol.common.error.ErrorType;
import uk.co.gresearch.siembol.alerts.common.AlertingResult;
import uk.co.gresearch.siembol.alerts.protection.RuleProtectionSystem;
import uk.co.gresearch.siembol.alerts.protection.RuleProtectionSystemImpl;
import uk.co.gresearch.siembol.alerts.storm.model.*;
import uk.co.gresearch.siembol.common.model.AlertingStormAttributesDto;
import uk.co.gresearch.siembol.common.storm.KafkaWriterAnchor;
import uk.co.gresearch.siembol.common.storm.KafkaWriterBoltBase;
import uk.co.gresearch.siembol.common.storm.KafkaWriterMessage;

public class AlertingKafkaWriterBolt extends KafkaWriterBoltBase {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String WRONG_ALERTS_FIELD_MESSAGE = "Wrong alerts type in tuple";
private static final String WRONG_EXCEPTION_FIELD_MESSAGE = "Wrong exceptions type in tuple";
private static final String RULE_PROTECTION_ERROR_MESSAGE =
"The rule: %s reaches the limit\n hourly matches: %d, daily matches: %d, alert: %s";
private static final String SEND_MSG_LOG = "Sending message {}\n to {} topic";
private static final String MISSING_CORRELATION_KEY_MSG = "Missing key in correlation alert %s";

private final String errorSensorType;
private final String errorTopic;
private final String outputTopic;
private final String correlationTopic;
private RuleProtectionSystem ruleProtection;

public AlertingKafkaWriterBolt(AlertingStormAttributesDto attributes) {
super(attributes.getKafkaProducerProperties().getProperties());
this.outputTopic = attributes.getOutputTopic();
this.errorTopic = attributes.getKafkaErrorTopic();
this.correlationTopic = attributes.getCorrelationOutputTopic();
AlertingEngineType engineType = AlertingEngineType.valueOfName(attributes.getAlertingEngine());
errorSensorType = engineType.toString();
}

@Override
public void execute(Tuple tuple) {
Object matchesObject = tuple.getValueByField(TupleFieldNames.ALERTING_MATCHES.toString());
if (!(matchesObject instanceof AlertMessages)) {
LOG.error(WRONG_ALERTS_FIELD_MESSAGE);
throw new IllegalStateException(WRONG_ALERTS_FIELD_MESSAGE);
}
AlertMessages matches = (AlertMessages)matchesObject;

Object exceptionsObject = tuple.getValueByField(TupleFieldNames.ALERTING_EXCEPTIONS.toString());
if (!(exceptionsObject instanceof ExceptionMessages)) {
LOG.error(WRONG_EXCEPTION_FIELD_MESSAGE);
throw new IllegalStateException(WRONG_EXCEPTION_FIELD_MESSAGE);
}
ExceptionMessages exceptions = (ExceptionMessages)exceptionsObject;
var messages = new ArrayList<KafkaWriterMessage>();

for (var match : matches) {
AlertingResult matchesInfo = ruleProtection.incrementRuleMatches(match.getFullRuleName());
int hourlyMatches = matchesInfo.getAttributes().getHourlyMatches();
int dailyMatches = matchesInfo.getAttributes().getDailyMatches();

if (match.getMaxHourMatches().intValue() < hourlyMatches
|| match.getMaxDayMatches().intValue() < dailyMatches) {
String msg = String.format(RULE_PROTECTION_ERROR_MESSAGE,
match.getFullRuleName(), hourlyMatches, dailyMatches, match.getAlertJson());
LOG.debug(msg);
exceptions.add(msg);
continue;
}

if (match.isVisibleAlert()) {
LOG.debug(SEND_MSG_LOG, match.getAlertJson(), outputTopic);
messages.add(new KafkaWriterMessage(outputTopic, match.getAlertJson()));
}

if (match.isCorrelationAlert()) {
if (match.getCorrelationKey().isEmpty()) {
String errorMsg = String.format(MISSING_CORRELATION_KEY_MSG, match.getAlertJson());
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}

LOG.debug(SEND_MSG_LOG, match.getAlertJson(), correlationTopic);
messages.add(new KafkaWriterMessage(correlationTopic,
match.getCorrelationKey().get(),
match.getAlertJson()));
}
}

for (var exception : exceptions) {
String errorMsgToSend = getErrorMessageToSend(exception);
LOG.debug(SEND_MSG_LOG, errorMsgToSend, errorTopic);
messages.add(new KafkaWriterMessage(errorTopic, errorMsgToSend));
}

var anchor = new KafkaWriterAnchor(tuple);
super.writeMessages(messages, anchor);
}

@Override
public void prepareInternally() {
ruleProtection = new RuleProtectionSystemImpl();
}

private String getErrorMessageToSend(String errorMsg) {
ErrorMessage error = new ErrorMessage();
error.setErrorType(ErrorType.ALERTING_ERROR);
error.setFailedSensorType(errorSensorType);
error.setMessage(errorMsg);
return error.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static StormTopology createTopology(AlertingStormAttributesDto attributes
.localOrShuffleGrouping(KAFKA_SPOUT);

builder.setBolt(KAFKA_WRITER,
new KafkaWriterBolt(attributes), attributes.getKafkaWriterBoltNumExecutors())
new AlertingKafkaWriterBolt(attributes), attributes.getKafkaWriterBoltNumExecutors())
.localOrShuffleGrouping(AlertingEngineType.SIEMBOL_ALERTS.getEngineName());

return builder.createTopology();
Expand All @@ -84,7 +84,7 @@ public static StormTopology createCorrelationAlertingTopology(AlertingStormAttri
.fieldsGrouping(KAFKA_SPOUT, new Fields(TupleFieldNames.CORRELATION_KEY.toString()));

builder.setBolt(KAFKA_WRITER,
new KafkaWriterBolt(attributes), attributes.getKafkaWriterBoltNumExecutors())
new AlertingKafkaWriterBolt(attributes), attributes.getKafkaWriterBoltNumExecutors())
.localOrShuffleGrouping(AlertingEngineType.SIEMBOL_CORRELATION_ALERTS.getEngineName());

return builder.createTopology();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

public class KafkaWriterBoltTest {
public class AlertingKafkaWriterBoltTest {
private static final ObjectReader JSON_PARSERS_CONFIG_READER = new ObjectMapper()
.readerFor(AlertingStormAttributesDto.class);
private static final ObjectReader JSON_MAP_READER = new ObjectMapper()
Expand Down Expand Up @@ -89,7 +89,7 @@ public class KafkaWriterBoltTest {
public static KafkaJunitRule kafkaRule = new KafkaJunitRule(EphemeralKafkaBroker.create());

private AlertingStormAttributesDto attributes;
private KafkaWriterBolt writerBolt;
private AlertingKafkaWriterBolt writerBolt;
private String bootstrapServer;
private Tuple tuple;
private OutputCollector collector;
Expand All @@ -113,7 +113,7 @@ public void setUp() throws Exception {
when(tuple.getValueByField(eq(TupleFieldNames.ALERTING_EXCEPTIONS.toString()))).thenReturn(exceptionMessages);

kafkaRule.waitForStartup();
writerBolt = new KafkaWriterBolt(attributes);
writerBolt = new AlertingKafkaWriterBolt(attributes);
writerBolt.prepare(null, null, collector);
}

Expand Down
2 changes: 1 addition & 1 deletion alerting/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.2.12-SNAPSHOT</version>
<version>2.2.13-SNAPSHOT</version>
</parent>
<modules>
<module>alerting-core</module>
Expand Down
4 changes: 2 additions & 2 deletions config-editor/config-editor-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId>
<version>2.2.12-SNAPSHOT</version>
<version>2.2.13-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.2.12-SNAPSHOT</version>
<version>2.2.13-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
Loading

0 comments on commit 8d2905a

Please sign in to comment.