diff --git a/alerting/alerting-core/pom.xml b/alerting/alerting-core/pom.xml index ae991174f..706eaa68c 100644 --- a/alerting/alerting-core/pom.xml +++ b/alerting/alerting-core/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol alerting - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT @@ -35,7 +35,7 @@ uk.co.gresearch.siembol siembol-common - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT junit diff --git a/alerting/alerting-spark/pom.xml b/alerting/alerting-spark/pom.xml index 6d5a1dd4e..0fe242055 100644 --- a/alerting/alerting-spark/pom.xml +++ b/alerting/alerting-spark/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol alerting - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT @@ -23,7 +23,7 @@ uk.co.gresearch.siembol alerting-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT jackson-databind diff --git a/alerting/alerting-storm/pom.xml b/alerting/alerting-storm/pom.xml index 2601d2b34..a638d0af4 100644 --- a/alerting/alerting-storm/pom.xml +++ b/alerting/alerting-storm/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol alerting - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT @@ -51,7 +51,7 @@ uk.co.gresearch.siembol alerting-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT org.slf4j diff --git a/alerting/alerting-storm/src/main/java/uk/co/gresearch/siembol/alerts/storm/AlertingKafkaWriterBolt.java b/alerting/alerting-storm/src/main/java/uk/co/gresearch/siembol/alerts/storm/AlertingKafkaWriterBolt.java new file mode 100644 index 000000000..7df1f47dd --- /dev/null +++ b/alerting/alerting-storm/src/main/java/uk/co/gresearch/siembol/alerts/storm/AlertingKafkaWriterBolt.java @@ -0,0 +1,117 @@ +package uk.co.gresearch.siembol.alerts.storm; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; + +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.co.gresearch.siembol.alerts.common.AlertingEngineType; +import uk.co.gresearch.siembol.common.error.ErrorMessage; +import uk.co.gresearch.siembol.common.error.ErrorType; +import uk.co.gresearch.siembol.alerts.common.AlertingResult; +import uk.co.gresearch.siembol.alerts.protection.RuleProtectionSystem; +import uk.co.gresearch.siembol.alerts.protection.RuleProtectionSystemImpl; +import uk.co.gresearch.siembol.alerts.storm.model.*; +import uk.co.gresearch.siembol.common.model.AlertingStormAttributesDto; +import uk.co.gresearch.siembol.common.storm.KafkaWriterAnchor; +import uk.co.gresearch.siembol.common.storm.KafkaWriterBoltBase; +import uk.co.gresearch.siembol.common.storm.KafkaWriterMessage; + +public class AlertingKafkaWriterBolt extends KafkaWriterBoltBase { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String WRONG_ALERTS_FIELD_MESSAGE = "Wrong alerts type in tuple"; + private static final String WRONG_EXCEPTION_FIELD_MESSAGE = "Wrong exceptions type in tuple"; + private static final String RULE_PROTECTION_ERROR_MESSAGE = + "The rule: %s reaches the limit\n hourly matches: %d, daily matches: %d, alert: %s"; + private static final String SEND_MSG_LOG = "Sending message {}\n to {} topic"; + private static final String MISSING_CORRELATION_KEY_MSG = "Missing key in correlation alert %s"; + + private final String errorSensorType; + private final String errorTopic; + private final String outputTopic; + private final String correlationTopic; + private RuleProtectionSystem ruleProtection; + + public AlertingKafkaWriterBolt(AlertingStormAttributesDto attributes) { + super(attributes.getKafkaProducerProperties().getProperties()); + this.outputTopic = attributes.getOutputTopic(); + this.errorTopic = attributes.getKafkaErrorTopic(); + this.correlationTopic = attributes.getCorrelationOutputTopic(); + AlertingEngineType engineType = AlertingEngineType.valueOfName(attributes.getAlertingEngine()); + errorSensorType = engineType.toString(); + } + + @Override + public void execute(Tuple tuple) { + Object matchesObject = tuple.getValueByField(TupleFieldNames.ALERTING_MATCHES.toString()); + if (!(matchesObject instanceof AlertMessages)) { + LOG.error(WRONG_ALERTS_FIELD_MESSAGE); + throw new IllegalStateException(WRONG_ALERTS_FIELD_MESSAGE); + } + AlertMessages matches = (AlertMessages)matchesObject; + + Object exceptionsObject = tuple.getValueByField(TupleFieldNames.ALERTING_EXCEPTIONS.toString()); + if (!(exceptionsObject instanceof ExceptionMessages)) { + LOG.error(WRONG_EXCEPTION_FIELD_MESSAGE); + throw new IllegalStateException(WRONG_EXCEPTION_FIELD_MESSAGE); + } + ExceptionMessages exceptions = (ExceptionMessages)exceptionsObject; + var messages = new ArrayList(); + + for (var match : matches) { + AlertingResult matchesInfo = ruleProtection.incrementRuleMatches(match.getFullRuleName()); + int hourlyMatches = matchesInfo.getAttributes().getHourlyMatches(); + int dailyMatches = matchesInfo.getAttributes().getDailyMatches(); + + if (match.getMaxHourMatches().intValue() < hourlyMatches + || match.getMaxDayMatches().intValue() < dailyMatches) { + String msg = String.format(RULE_PROTECTION_ERROR_MESSAGE, + match.getFullRuleName(), hourlyMatches, dailyMatches, match.getAlertJson()); + LOG.debug(msg); + exceptions.add(msg); + continue; + } + + if (match.isVisibleAlert()) { + LOG.debug(SEND_MSG_LOG, match.getAlertJson(), outputTopic); + messages.add(new KafkaWriterMessage(outputTopic, match.getAlertJson())); + } + + if (match.isCorrelationAlert()) { + if (match.getCorrelationKey().isEmpty()) { + String errorMsg = String.format(MISSING_CORRELATION_KEY_MSG, match.getAlertJson()); + LOG.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + + LOG.debug(SEND_MSG_LOG, match.getAlertJson(), correlationTopic); + messages.add(new KafkaWriterMessage(correlationTopic, + match.getCorrelationKey().get(), + match.getAlertJson())); + } + } + + for (var exception : exceptions) { + String errorMsgToSend = getErrorMessageToSend(exception); + LOG.debug(SEND_MSG_LOG, errorMsgToSend, errorTopic); + messages.add(new KafkaWriterMessage(errorTopic, errorMsgToSend)); + } + + var anchor = new KafkaWriterAnchor(tuple); + super.writeMessages(messages, anchor); + } + + @Override + public void prepareInternally() { + ruleProtection = new RuleProtectionSystemImpl(); + } + + private String getErrorMessageToSend(String errorMsg) { + ErrorMessage error = new ErrorMessage(); + error.setErrorType(ErrorType.ALERTING_ERROR); + error.setFailedSensorType(errorSensorType); + error.setMessage(errorMsg); + return error.toString(); + } +} diff --git a/alerting/alerting-storm/src/main/java/uk/co/gresearch/siembol/alerts/storm/AlertingStorm.java b/alerting/alerting-storm/src/main/java/uk/co/gresearch/siembol/alerts/storm/AlertingStorm.java index 74e9c1578..6e5d9f3d3 100644 --- a/alerting/alerting-storm/src/main/java/uk/co/gresearch/siembol/alerts/storm/AlertingStorm.java +++ b/alerting/alerting-storm/src/main/java/uk/co/gresearch/siembol/alerts/storm/AlertingStorm.java @@ -64,7 +64,7 @@ public static StormTopology createTopology(AlertingStormAttributesDto attributes .localOrShuffleGrouping(KAFKA_SPOUT); builder.setBolt(KAFKA_WRITER, - new KafkaWriterBolt(attributes), attributes.getKafkaWriterBoltNumExecutors()) + new AlertingKafkaWriterBolt(attributes), attributes.getKafkaWriterBoltNumExecutors()) .localOrShuffleGrouping(AlertingEngineType.SIEMBOL_ALERTS.getEngineName()); return builder.createTopology(); @@ -84,7 +84,7 @@ public static StormTopology createCorrelationAlertingTopology(AlertingStormAttri .fieldsGrouping(KAFKA_SPOUT, new Fields(TupleFieldNames.CORRELATION_KEY.toString())); builder.setBolt(KAFKA_WRITER, - new KafkaWriterBolt(attributes), attributes.getKafkaWriterBoltNumExecutors()) + new AlertingKafkaWriterBolt(attributes), attributes.getKafkaWriterBoltNumExecutors()) .localOrShuffleGrouping(AlertingEngineType.SIEMBOL_CORRELATION_ALERTS.getEngineName()); return builder.createTopology(); diff --git a/alerting/alerting-storm/src/main/java/uk/co/gresearch/siembol/alerts/storm/KafkaWriterBolt.java b/alerting/alerting-storm/src/main/java/uk/co/gresearch/siembol/alerts/storm/KafkaWriterBolt.java deleted file mode 100644 index 80bcef874..000000000 --- a/alerting/alerting-storm/src/main/java/uk/co/gresearch/siembol/alerts/storm/KafkaWriterBolt.java +++ /dev/null @@ -1,149 +0,0 @@ -package uk.co.gresearch.siembol.alerts.storm; -import java.lang.invoke.MethodHandles; -import java.util.*; - -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.errors.AuthorizationException; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Tuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import uk.co.gresearch.siembol.alerts.common.AlertingEngineType; -import uk.co.gresearch.siembol.common.error.ErrorMessage; -import uk.co.gresearch.siembol.common.error.ErrorType; -import uk.co.gresearch.siembol.alerts.common.AlertingResult; -import uk.co.gresearch.siembol.alerts.protection.RuleProtectionSystem; -import uk.co.gresearch.siembol.alerts.protection.RuleProtectionSystemImpl; -import uk.co.gresearch.siembol.alerts.storm.model.*; -import uk.co.gresearch.siembol.common.model.AlertingStormAttributesDto; - -public class KafkaWriterBolt extends BaseRichBolt { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final String WRONG_ALERTS_FIELD_MESSAGE = "Wrong alerts type in tuple"; - private static final String WRONG_EXCEPTION_FIELD_MESSAGE = "Wrong exceptions type in tuple"; - - private final String errorSensorType; - private final Properties props; - private final String errorTopic; - private final String outputTopic; - private final String correlationTopic; - private OutputCollector collector; - private Producer producer; - private RuleProtectionSystem ruleProtection; - - public KafkaWriterBolt(AlertingStormAttributesDto attributes) { - this.props = new Properties(); - attributes.getKafkaProducerProperties().getRawMap().entrySet().forEach(x -> props.put(x.getKey(), x.getValue())); - this.outputTopic = attributes.getOutputTopic(); - this.errorTopic = attributes.getKafkaErrorTopic(); - this.correlationTopic = attributes.getCorrelationOutputTopic(); - AlertingEngineType engineType = AlertingEngineType.valueOfName(attributes.getAlertingEngine()); - errorSensorType = engineType.toString(); - } - - @Override - public void execute(Tuple tuple) { - Object matchesObject = tuple.getValueByField(TupleFieldNames.ALERTING_MATCHES.toString()); - if (!(matchesObject instanceof AlertMessages)) { - LOG.error(WRONG_ALERTS_FIELD_MESSAGE); - throw new IllegalStateException(WRONG_ALERTS_FIELD_MESSAGE); - } - AlertMessages matches = (AlertMessages)matchesObject; - - Object exceptionsObject = tuple.getValueByField(TupleFieldNames.ALERTING_EXCEPTIONS.toString()); - if (!(exceptionsObject instanceof ExceptionMessages)) { - LOG.error(WRONG_EXCEPTION_FIELD_MESSAGE); - throw new IllegalStateException(WRONG_EXCEPTION_FIELD_MESSAGE); - } - ExceptionMessages exceptions = (ExceptionMessages)exceptionsObject; - - try { - for (AlertMessage match : matches) { - AlertingResult matchesInfo = ruleProtection.incrementRuleMatches(match.getFullRuleName()); - int hourlyMatches = matchesInfo.getAttributes().getHourlyMatches(); - int dailyMatches = matchesInfo.getAttributes().getDailyMatches(); - - if (match.getMaxHourMatches().intValue() < hourlyMatches - || match.getMaxDayMatches().intValue() < dailyMatches) { - String msg = String.format( - "The rule: %s reaches the limit\n hourly matches: %d, daily matches: %d, alert: %s", - match.getFullRuleName(), hourlyMatches, dailyMatches, match.getAlertJson()); - LOG.debug(msg); - exceptions.add(msg); - continue; - } - - if (match.isVisibleAlert()) { - LOG.debug("Sending message {}\n to output topic", match.getAlertJson()); - producer.send(new ProducerRecord<>(outputTopic, - String.valueOf(match.getAlertJson().hashCode()), - match.getAlertJson())); - } - - if (match.isCorrelationAlert()) { - LOG.debug("Sending message {}\n to correlation alerts topic", match.getAlertJson()); - producer.send(new ProducerRecord<>(correlationTopic, - match.getCorrelationKey().get(), - match.getAlertJson())); - } - } - - for (String errorMsg : exceptions) { - String errorMsgToSend = getErrorMessageToSend(errorMsg); - LOG.debug("Sending message {}\n to error topic", errorMsgToSend); - producer.send(new ProducerRecord<>(errorTopic, - String.valueOf(errorMsgToSend.hashCode()), - errorMsgToSend)); - } - - producer.flush(); - } catch (AuthorizationException e) { - LOG.error("Exception {} during writing messages to the kafka", - ExceptionUtils.getStackTrace(e)); - producer.close(); - throw new IllegalStateException(e); - } catch (KafkaException e) { - LOG.error("KafkaException {} during writing messages to the kafka", - ExceptionUtils.getStackTrace(e)); - collector.fail(tuple); - return; - } - - LOG.debug("Acking tuple"); - collector.ack(tuple); - } - - @SuppressWarnings("rawtypes") - @Override - public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { - this.collector = outputCollector; - ruleProtection = new RuleProtectionSystemImpl(); - producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); - } - - @Override - public void cleanup() { - producer.close(); - } - - private String getErrorMessageToSend(String errorMsg) { - ErrorMessage error = new ErrorMessage(); - error.setErrorType(ErrorType.ALERTING_ERROR); - error.setFailedSensorType(errorSensorType); - error.setMessage(errorMsg); - return error.toString(); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - } -} diff --git a/alerting/alerting-storm/src/test/java/uk/co/gresearch/siembol/alerts/storm/KafkaWriterBoltTest.java b/alerting/alerting-storm/src/test/java/uk/co/gresearch/siembol/alerts/storm/AlertingKafkaWriterBoltTest.java similarity index 98% rename from alerting/alerting-storm/src/test/java/uk/co/gresearch/siembol/alerts/storm/KafkaWriterBoltTest.java rename to alerting/alerting-storm/src/test/java/uk/co/gresearch/siembol/alerts/storm/AlertingKafkaWriterBoltTest.java index e30c99c78..5a955b342 100644 --- a/alerting/alerting-storm/src/test/java/uk/co/gresearch/siembol/alerts/storm/KafkaWriterBoltTest.java +++ b/alerting/alerting-storm/src/test/java/uk/co/gresearch/siembol/alerts/storm/AlertingKafkaWriterBoltTest.java @@ -24,7 +24,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; -public class KafkaWriterBoltTest { +public class AlertingKafkaWriterBoltTest { private static final ObjectReader JSON_PARSERS_CONFIG_READER = new ObjectMapper() .readerFor(AlertingStormAttributesDto.class); private static final ObjectReader JSON_MAP_READER = new ObjectMapper() @@ -89,7 +89,7 @@ public class KafkaWriterBoltTest { public static KafkaJunitRule kafkaRule = new KafkaJunitRule(EphemeralKafkaBroker.create()); private AlertingStormAttributesDto attributes; - private KafkaWriterBolt writerBolt; + private AlertingKafkaWriterBolt writerBolt; private String bootstrapServer; private Tuple tuple; private OutputCollector collector; @@ -113,7 +113,7 @@ public void setUp() throws Exception { when(tuple.getValueByField(eq(TupleFieldNames.ALERTING_EXCEPTIONS.toString()))).thenReturn(exceptionMessages); kafkaRule.waitForStartup(); - writerBolt = new KafkaWriterBolt(attributes); + writerBolt = new AlertingKafkaWriterBolt(attributes); writerBolt.prepare(null, null, collector); } diff --git a/alerting/pom.xml b/alerting/pom.xml index 914e32b22..bf0e53597 100644 --- a/alerting/pom.xml +++ b/alerting/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT alerting-core diff --git a/config-editor/config-editor-core/pom.xml b/config-editor/config-editor-core/pom.xml index fcd553387..20e27c8ba 100644 --- a/config-editor/config-editor-core/pom.xml +++ b/config-editor/config-editor-core/pom.xml @@ -9,13 +9,13 @@ uk.co.gresearch.siembol config-editor - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol siembol-common - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT org.apache.commons diff --git a/config-editor/config-editor-rest/pom.xml b/config-editor/config-editor-rest/pom.xml index 62087eedc..817648274 100644 --- a/config-editor/config-editor-rest/pom.xml +++ b/config-editor/config-editor-rest/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol config-editor - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT @@ -56,7 +56,7 @@ uk.co.gresearch.siembol siembol-common - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT org.slf4j @@ -67,22 +67,22 @@ uk.co.gresearch.siembol config-editor-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol config-editor-services - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol config-editor-sync - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol alerting-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT org.slf4j @@ -93,7 +93,7 @@ uk.co.gresearch.siembol parsing-app - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT org.slf4j @@ -104,7 +104,7 @@ uk.co.gresearch.siembol enriching-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT org.slf4j @@ -115,7 +115,7 @@ uk.co.gresearch.siembol responding-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT org.slf4j diff --git a/config-editor/config-editor-services/pom.xml b/config-editor/config-editor-services/pom.xml index 4d086e9ca..b708457d1 100644 --- a/config-editor/config-editor-services/pom.xml +++ b/config-editor/config-editor-services/pom.xml @@ -10,7 +10,7 @@ uk.co.gresearch.siembol config-editor - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT @@ -41,32 +41,32 @@ uk.co.gresearch.siembol siembol-common - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol config-editor-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol alerting-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol parsing-app - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol enriching-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol responding-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT junit diff --git a/config-editor/config-editor-sync/pom.xml b/config-editor/config-editor-sync/pom.xml index 1bce29aed..65b51dd65 100644 --- a/config-editor/config-editor-sync/pom.xml +++ b/config-editor/config-editor-sync/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol config-editor - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT @@ -20,17 +20,17 @@ uk.co.gresearch.siembol siembol-common - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol config-editor-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol parsing-app - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT provided diff --git a/config-editor/config-editor-sync/src/test/java/uk/co/gresearch/siembol/configeditor/sync/actions/GetParsingAppStormTopologyActionTest.java b/config-editor/config-editor-sync/src/test/java/uk/co/gresearch/siembol/configeditor/sync/actions/GetParsingAppStormTopologyActionTest.java index 9ef46d774..08f4326bf 100644 --- a/config-editor/config-editor-sync/src/test/java/uk/co/gresearch/siembol/configeditor/sync/actions/GetParsingAppStormTopologyActionTest.java +++ b/config-editor/config-editor-sync/src/test/java/uk/co/gresearch/siembol/configeditor/sync/actions/GetParsingAppStormTopologyActionTest.java @@ -58,7 +58,6 @@ public class GetParsingAppStormTopologyActionTest { { "application.name": "secret", "kafka.batch.writer.attributes": { - "batch.size": 1, "producer.properties": { "bootstrap.servers": "dummy", "security.protocol": "SASL_PLAINTEXT" @@ -92,7 +91,6 @@ public class GetParsingAppStormTopologyActionTest { "zk.max.retries": 3 }, "kafka.batch.writer.attributes": { - "batch.size": 50, "producer.properties": { "bootstrap.servers": "global_servers", "security.protocol": "SASL_PLAINTEXT" @@ -218,11 +216,6 @@ public void getStormTopologyOk() throws IOException { Assert.assertFalse(adminConfigPublicStr.contains("overridden.applications")); Assert.assertFalse(adminConfigPublicStr.contains("config_version")); - Assert.assertEquals(1, adminConfigSecret.getKafkaBatchWriterAttributes().getBatchSize().intValue()); - Assert.assertEquals(50, adminConfigPublic.getKafkaBatchWriterAttributes().getBatchSize().intValue()); - Assert.assertEquals(1, adminConfigSecret.getKafkaBatchWriterAttributes().getBatchSize().intValue()); - Assert.assertEquals(50, adminConfigPublic.getKafkaBatchWriterAttributes().getBatchSize().intValue()); - Assert.assertEquals(2, adminConfigSecret.getStormAttributes().getStormConfig().getRawMap().get("num.workers")); Assert.assertEquals(1, @@ -268,11 +261,6 @@ public void getStormTopologyNoOverriddenAppsOk() throws IOException { Assert.assertFalse(adminConfigPublicStr.contains("overridden.applications")); Assert.assertFalse(adminConfigPublicStr.contains("config_version")); - Assert.assertEquals(50, adminConfigSecret.getKafkaBatchWriterAttributes().getBatchSize().intValue()); - Assert.assertEquals(50, adminConfigPublic.getKafkaBatchWriterAttributes().getBatchSize().intValue()); - Assert.assertEquals(50, adminConfigSecret.getKafkaBatchWriterAttributes().getBatchSize().intValue()); - Assert.assertEquals(50, adminConfigPublic.getKafkaBatchWriterAttributes().getBatchSize().intValue()); - Assert.assertEquals(1, adminConfigSecret.getStormAttributes().getStormConfig().getRawMap().get("num.workers")); Assert.assertEquals(1, diff --git a/config-editor/pom.xml b/config-editor/pom.xml index ff9034eda..1e14a2b0f 100644 --- a/config-editor/pom.xml +++ b/config-editor/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT config-editor-core diff --git a/deployment/storm-topology-manager/pom.xml b/deployment/storm-topology-manager/pom.xml index 02e6637db..175b8ed1f 100644 --- a/deployment/storm-topology-manager/pom.xml +++ b/deployment/storm-topology-manager/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol siembol - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT ../../pom.xml @@ -43,7 +43,7 @@ uk.co.gresearch.siembol siembol-common - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT org.slf4j diff --git a/enriching/enriching-core/pom.xml b/enriching/enriching-core/pom.xml index 08c2fad46..4d547b8fa 100644 --- a/enriching/enriching-core/pom.xml +++ b/enriching/enriching-core/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol enriching - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT @@ -35,12 +35,12 @@ uk.co.gresearch.siembol siembol-common - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol alerting-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT junit diff --git a/enriching/enriching-storm/pom.xml b/enriching/enriching-storm/pom.xml index 81c9f4eea..6a6a8f021 100644 --- a/enriching/enriching-storm/pom.xml +++ b/enriching/enriching-storm/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol enriching - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT @@ -75,7 +75,7 @@ uk.co.gresearch.siembol enriching-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT org.slf4j diff --git a/enriching/enriching-storm/src/main/java/uk/co/gresearch/siembol/enrichments/storm/EnrichmentMergerBolt.java b/enriching/enriching-storm/src/main/java/uk/co/gresearch/siembol/enrichments/storm/EnrichmentMergerBolt.java index 38c5de4d6..5458edc9b 100644 --- a/enriching/enriching-storm/src/main/java/uk/co/gresearch/siembol/enrichments/storm/EnrichmentMergerBolt.java +++ b/enriching/enriching-storm/src/main/java/uk/co/gresearch/siembol/enrichments/storm/EnrichmentMergerBolt.java @@ -12,8 +12,8 @@ import uk.co.gresearch.siembol.common.error.ErrorMessage; import uk.co.gresearch.siembol.common.error.ErrorType; import uk.co.gresearch.siembol.common.constants.SiembolMessageFields; -import uk.co.gresearch.siembol.common.storm.KafkaBatchWriterMessage; -import uk.co.gresearch.siembol.common.storm.KafkaBatchWriterMessages; +import uk.co.gresearch.siembol.common.storm.KafkaWriterMessage; +import uk.co.gresearch.siembol.common.storm.KafkaWriterMessages; import uk.co.gresearch.siembol.enrichments.evaluation.EnrichmentEvaluatorLibrary; import uk.co.gresearch.siembol.enrichments.storm.common.EnrichmentTuples; import uk.co.gresearch.siembol.enrichments.storm.common.EnrichmentPairs; @@ -74,9 +74,9 @@ public void execute(Tuple tuple) { } LOG.debug(EVENT_INFO_LOG, event); - KafkaBatchWriterMessages messages = new KafkaBatchWriterMessages(); - messages.add(new KafkaBatchWriterMessage(outputTopic, event)); - exceptions.forEach(x -> messages.add(new KafkaBatchWriterMessage(errorTopic, x))); + KafkaWriterMessages messages = new KafkaWriterMessages(); + messages.add(new KafkaWriterMessage(outputTopic, event)); + exceptions.forEach(x -> messages.add(new KafkaWriterMessage(errorTopic, x))); collector.emit(tuple, new Values(messages)); collector.ack(tuple); } diff --git a/enriching/enriching-storm/src/main/java/uk/co/gresearch/siembol/enrichments/storm/StormEnrichingApplication.java b/enriching/enriching-storm/src/main/java/uk/co/gresearch/siembol/enrichments/storm/StormEnrichingApplication.java index 75cb38c45..5a345a8ff 100644 --- a/enriching/enriching-storm/src/main/java/uk/co/gresearch/siembol/enrichments/storm/StormEnrichingApplication.java +++ b/enriching/enriching-storm/src/main/java/uk/co/gresearch/siembol/enrichments/storm/StormEnrichingApplication.java @@ -14,7 +14,7 @@ import org.slf4j.LoggerFactory; import uk.co.gresearch.siembol.common.filesystem.SiembolFileSystemFactory; import uk.co.gresearch.siembol.common.filesystem.SupportedFileSystem; -import uk.co.gresearch.siembol.common.storm.KafkaBatchWriterBolt; +import uk.co.gresearch.siembol.common.storm.KafkaWriterBolt; import uk.co.gresearch.siembol.common.model.StormAttributesDto; import uk.co.gresearch.siembol.common.storm.StormHelper; import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactory; @@ -79,7 +79,7 @@ public static StormTopology createTopology(StormEnrichmentAttributesDto attribut .localOrShuffleGrouping(MEMORY_ENRICHING_BOLT_NAME); builder.setBolt(KAFKA_WRITER_BOLT_NAME, - new KafkaBatchWriterBolt(attributes.getKafkaBatchWriterAttributes(), + new KafkaWriterBolt(attributes.getKafkaBatchWriterAttributes(), EnrichmentTuples.KAFKA_MESSAGES.toString()), attributes.getKafkaWriterBoltNumExecutors()) .localOrShuffleGrouping(MERGING_BOLT_NAME); diff --git a/enriching/enriching-storm/src/test/java/uk/co/gresearch/siembol/enrichments/storm/EnrichmentMergerBoltTest.java b/enriching/enriching-storm/src/test/java/uk/co/gresearch/siembol/enrichments/storm/EnrichmentMergerBoltTest.java index 8befb8694..81bf0f204 100644 --- a/enriching/enriching-storm/src/test/java/uk/co/gresearch/siembol/enrichments/storm/EnrichmentMergerBoltTest.java +++ b/enriching/enriching-storm/src/test/java/uk/co/gresearch/siembol/enrichments/storm/EnrichmentMergerBoltTest.java @@ -11,7 +11,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import uk.co.gresearch.siembol.common.storm.KafkaBatchWriterMessages; +import uk.co.gresearch.siembol.common.storm.KafkaWriterMessages; import uk.co.gresearch.siembol.enrichments.storm.common.EnrichmentTuples; import uk.co.gresearch.siembol.enrichments.storm.common.EnrichmentPairs; import uk.co.gresearch.siembol.enrichments.storm.common.EnrichmentExceptions; @@ -68,8 +68,8 @@ public void testEmptyExceptionsEmptyEnrichments() { Values values = argumentEmitCaptor.getValue(); Assert.assertNotNull(values); Assert.assertEquals(1, values.size()); - Assert.assertTrue(values.get(0) instanceof KafkaBatchWriterMessages); - KafkaBatchWriterMessages messages = (KafkaBatchWriterMessages)values.get(0); + Assert.assertTrue(values.get(0) instanceof KafkaWriterMessages); + KafkaWriterMessages messages = (KafkaWriterMessages)values.get(0); Assert.assertEquals(outputTopic, messages.get(0).getTopic()); Assert.assertTrue(messages.get(0).getMessage().contains(enrichedEventPrefix.trim())); @@ -83,8 +83,8 @@ public void testNonEmptyExceptionsEmptyEnrichments() { Values values = argumentEmitCaptor.getValue(); Assert.assertNotNull(values); Assert.assertEquals(1, values.size()); - Assert.assertTrue(values.get(0) instanceof KafkaBatchWriterMessages); - KafkaBatchWriterMessages messages = (KafkaBatchWriterMessages)values.get(0); + Assert.assertTrue(values.get(0) instanceof KafkaWriterMessages); + KafkaWriterMessages messages = (KafkaWriterMessages)values.get(0); Assert.assertEquals(outputTopic, messages.get(0).getTopic()); Assert.assertFalse(messages.get(0).getMessage().isEmpty()); @@ -106,8 +106,8 @@ public void testEmptyExceptionsNonEmptyEnrichments() { Values values = argumentEmitCaptor.getValue(); Assert.assertNotNull(values); Assert.assertEquals(1, values.size()); - Assert.assertTrue(values.get(0) instanceof KafkaBatchWriterMessages); - KafkaBatchWriterMessages messages = (KafkaBatchWriterMessages)values.get(0); + Assert.assertTrue(values.get(0) instanceof KafkaWriterMessages); + KafkaWriterMessages messages = (KafkaWriterMessages)values.get(0); Assert.assertEquals(outputTopic, messages.get(0).getTopic()); @@ -126,8 +126,8 @@ public void testExceptionsNonEmptyEnrichments() { Values values = argumentEmitCaptor.getValue(); Assert.assertNotNull(values); Assert.assertEquals(1, values.size()); - Assert.assertTrue(values.get(0) instanceof KafkaBatchWriterMessages); - KafkaBatchWriterMessages messages = (KafkaBatchWriterMessages)values.get(0); + Assert.assertTrue(values.get(0) instanceof KafkaWriterMessages); + KafkaWriterMessages messages = (KafkaWriterMessages)values.get(0); Assert.assertEquals(outputTopic, messages.get(0).getTopic()); Assert.assertTrue(messages.get(0).getMessage().contains("\"test\":\"enrichment\"")); @@ -141,8 +141,8 @@ public void testInvalidEvent() { Values values = argumentEmitCaptor.getValue(); Assert.assertNotNull(values); Assert.assertEquals(1, values.size()); - Assert.assertTrue(values.get(0) instanceof KafkaBatchWriterMessages); - KafkaBatchWriterMessages messages = (KafkaBatchWriterMessages)values.get(0); + Assert.assertTrue(values.get(0) instanceof KafkaWriterMessages); + KafkaWriterMessages messages = (KafkaWriterMessages)values.get(0); Assert.assertEquals(outputTopic, messages.get(0).getTopic()); Assert.assertEquals("INVALID", messages.get(0).getMessage()); diff --git a/enriching/enriching-storm/src/test/java/uk/co/gresearch/siembol/enrichments/storm/StormEnrichingApplicationTest.java b/enriching/enriching-storm/src/test/java/uk/co/gresearch/siembol/enrichments/storm/StormEnrichingApplicationTest.java index 8c90bc7c7..c9b8cb0b7 100644 --- a/enriching/enriching-storm/src/test/java/uk/co/gresearch/siembol/enrichments/storm/StormEnrichingApplicationTest.java +++ b/enriching/enriching-storm/src/test/java/uk/co/gresearch/siembol/enrichments/storm/StormEnrichingApplicationTest.java @@ -52,7 +52,6 @@ public class StormEnrichingApplicationTest { "zk.max.retries": 10 }, "kafka.batch.writer.attributes": { - "batch.size": 1, "producer.properties": { "client.id": "writer", "compression.type": "snappy", diff --git a/enriching/pom.xml b/enriching/pom.xml index 53450d973..183184bab 100644 --- a/enriching/pom.xml +++ b/enriching/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT enriching-core diff --git a/parsing/parsing-app/pom.xml b/parsing/parsing-app/pom.xml index 223e2f0be..247dcc009 100644 --- a/parsing/parsing-app/pom.xml +++ b/parsing/parsing-app/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol parsing - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT @@ -39,12 +39,12 @@ uk.co.gresearch.siembol siembol-common - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol parsing-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT junit diff --git a/parsing/parsing-core/pom.xml b/parsing/parsing-core/pom.xml index 3aee0df27..0449203f2 100644 --- a/parsing/parsing-core/pom.xml +++ b/parsing/parsing-core/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol parsing - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT @@ -45,7 +45,7 @@ uk.co.gresearch.siembol siembol-common - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT joda-time diff --git a/parsing/parsing-storm/pom.xml b/parsing/parsing-storm/pom.xml index 4f90eb81f..22fb6458a 100644 --- a/parsing/parsing-storm/pom.xml +++ b/parsing/parsing-storm/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol parsing - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT @@ -75,7 +75,7 @@ uk.co.gresearch.siembol parsing-app - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT org.slf4j diff --git a/parsing/parsing-storm/src/main/java/uk/co/gresearch/siembol/parsers/storm/ParsingApplicationBolt.java b/parsing/parsing-storm/src/main/java/uk/co/gresearch/siembol/parsers/storm/ParsingApplicationBolt.java index 8ac6596b1..2d91d8b0c 100644 --- a/parsing/parsing-storm/src/main/java/uk/co/gresearch/siembol/parsers/storm/ParsingApplicationBolt.java +++ b/parsing/parsing-storm/src/main/java/uk/co/gresearch/siembol/parsers/storm/ParsingApplicationBolt.java @@ -13,8 +13,8 @@ import org.slf4j.LoggerFactory; import uk.co.gresearch.siembol.common.constants.SiembolConstants; import uk.co.gresearch.siembol.common.model.StormParsingApplicationAttributesDto; -import uk.co.gresearch.siembol.common.storm.KafkaBatchWriterMessage; -import uk.co.gresearch.siembol.common.storm.KafkaBatchWriterMessages; +import uk.co.gresearch.siembol.common.storm.KafkaWriterMessage; +import uk.co.gresearch.siembol.common.storm.KafkaWriterMessages; import uk.co.gresearch.siembol.common.model.ZooKeeperAttributesDto; import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnector; import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactory; @@ -127,10 +127,10 @@ public void execute(Tuple tuple) { byte[] log = (byte[])logObj; ArrayList results = currentParser.parse(source, metadata, log); if (!results.isEmpty()) { - KafkaBatchWriterMessages kafkaBatchWriterMessages = new KafkaBatchWriterMessages(); + KafkaWriterMessages kafkaWriterMessages = new KafkaWriterMessages(); results.forEach(x -> x.getMessages().forEach(y -> - kafkaBatchWriterMessages.add(new KafkaBatchWriterMessage(x.getTopic(), y)))); - collector.emit(tuple, new Values(kafkaBatchWriterMessages)); + kafkaWriterMessages.add(new KafkaWriterMessage(x.getTopic(), y)))); + collector.emit(tuple, new Values(kafkaWriterMessages)); } collector.ack(tuple); diff --git a/parsing/parsing-storm/src/main/java/uk/co/gresearch/siembol/parsers/storm/StormParsingApplication.java b/parsing/parsing-storm/src/main/java/uk/co/gresearch/siembol/parsers/storm/StormParsingApplication.java index 7863fbee4..50a5c1922 100644 --- a/parsing/parsing-storm/src/main/java/uk/co/gresearch/siembol/parsers/storm/StormParsingApplication.java +++ b/parsing/parsing-storm/src/main/java/uk/co/gresearch/siembol/parsers/storm/StormParsingApplication.java @@ -16,7 +16,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import uk.co.gresearch.siembol.common.model.StormParsingApplicationAttributesDto; -import uk.co.gresearch.siembol.common.storm.KafkaBatchWriterBolt; +import uk.co.gresearch.siembol.common.storm.KafkaWriterBolt; import uk.co.gresearch.siembol.common.model.StormAttributesDto; import uk.co.gresearch.siembol.common.storm.StormHelper; import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactory; @@ -103,7 +103,7 @@ public static StormTopology createTopology(StormParsingApplicationAttributesDto .localOrShuffleGrouping(KAFKA_SPOUT); builder.setBolt(KAFKA_WRITER, - new KafkaBatchWriterBolt(stormAppAttributes.getKafkaBatchWriterAttributes(), + new KafkaWriterBolt(stormAppAttributes.getKafkaBatchWriterAttributes(), ParsingApplicationTuples.PARSING_MESSAGES.toString()), parsingAttributes.getOutputParallelism()) .localOrShuffleGrouping(parsingAttributes.getName()); diff --git a/parsing/parsing-storm/src/test/java/uk/co/gresearch/siembol/parsers/storm/ParsingApplicationBoltTest.java b/parsing/parsing-storm/src/test/java/uk/co/gresearch/siembol/parsers/storm/ParsingApplicationBoltTest.java index 2a9bde5b3..af4176737 100644 --- a/parsing/parsing-storm/src/test/java/uk/co/gresearch/siembol/parsers/storm/ParsingApplicationBoltTest.java +++ b/parsing/parsing-storm/src/test/java/uk/co/gresearch/siembol/parsers/storm/ParsingApplicationBoltTest.java @@ -13,7 +13,7 @@ import org.mockito.Mockito; import uk.co.gresearch.siembol.common.constants.SiembolMessageFields; import uk.co.gresearch.siembol.common.model.StormParsingApplicationAttributesDto; -import uk.co.gresearch.siembol.common.storm.KafkaBatchWriterMessages; +import uk.co.gresearch.siembol.common.storm.KafkaWriterMessages; import uk.co.gresearch.siembol.common.model.ZooKeeperAttributesDto; import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnector; import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactory; @@ -127,8 +127,8 @@ public void testMatchRule() throws IOException { Values values = argumentEmitCaptor.getValue(); Assert.assertNotNull(values); Assert.assertEquals(1, values.size()); - Assert.assertTrue(values.get(0) instanceof KafkaBatchWriterMessages); - KafkaBatchWriterMessages messages = (KafkaBatchWriterMessages)values.get(0); + Assert.assertTrue(values.get(0) instanceof KafkaWriterMessages); + KafkaWriterMessages messages = (KafkaWriterMessages)values.get(0); Assert.assertEquals(1, messages.size()); Assert.assertEquals("output", messages.get(0).getTopic()); @@ -151,8 +151,8 @@ public void testExceptionMetadata() throws Exception { Values values = argumentEmitCaptor.getValue(); Assert.assertNotNull(values); Assert.assertEquals(1, values.size()); - Assert.assertTrue(values.get(0) instanceof KafkaBatchWriterMessages); - KafkaBatchWriterMessages messages = (KafkaBatchWriterMessages)values.get(0); + Assert.assertTrue(values.get(0) instanceof KafkaWriterMessages); + KafkaWriterMessages messages = (KafkaWriterMessages)values.get(0); Assert.assertEquals(1, messages.size()); Assert.assertEquals("error", messages.get(0).getTopic()); @@ -180,8 +180,8 @@ public void testMetadata() throws Exception { Values values = argumentEmitCaptor.getValue(); Assert.assertNotNull(values); Assert.assertEquals(1, values.size()); - Assert.assertTrue(values.get(0) instanceof KafkaBatchWriterMessages); - KafkaBatchWriterMessages messages = (KafkaBatchWriterMessages)values.get(0); + Assert.assertTrue(values.get(0) instanceof KafkaWriterMessages); + KafkaWriterMessages messages = (KafkaWriterMessages)values.get(0); Assert.assertEquals(1, messages.size()); Assert.assertEquals("output", messages.get(0).getTopic()); diff --git a/parsing/parsing-storm/src/test/java/uk/co/gresearch/siembol/parsers/storm/StormParsingApplicationTest.java b/parsing/parsing-storm/src/test/java/uk/co/gresearch/siembol/parsers/storm/StormParsingApplicationTest.java index 561a01922..525b38005 100644 --- a/parsing/parsing-storm/src/test/java/uk/co/gresearch/siembol/parsers/storm/StormParsingApplicationTest.java +++ b/parsing/parsing-storm/src/test/java/uk/co/gresearch/siembol/parsers/storm/StormParsingApplicationTest.java @@ -84,7 +84,6 @@ public class StormParsingApplicationTest { "zk.max.retries": 10 }, "kafka.batch.writer.attributes": { - "batch.size": 1, "producer.properties": { "security.protocol": "PLAINTEXT" } diff --git a/parsing/pom.xml b/parsing/pom.xml index d0f8da5a4..be0b4d0e5 100644 --- a/parsing/pom.xml +++ b/parsing/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT parsing-core diff --git a/pom.xml b/pom.xml index f683ac943..a663a35e6 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ uk.co.gresearch.siembol siembol siembol - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT A scalable, advanced security analytics framework based on open-source big data technologies. 2019 https://siembol.io/ diff --git a/responding/pom.xml b/responding/pom.xml index 0004f8a15..3272869b0 100644 --- a/responding/pom.xml +++ b/responding/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT responding-core diff --git a/responding/responding-core/pom.xml b/responding/responding-core/pom.xml index ad44e2000..f26596f95 100644 --- a/responding/responding-core/pom.xml +++ b/responding/responding-core/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol responding - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT @@ -35,12 +35,12 @@ uk.co.gresearch.siembol siembol-common - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT uk.co.gresearch.siembol alerting-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT com.jayway.jsonpath diff --git a/responding/responding-stream/pom.xml b/responding/responding-stream/pom.xml index 75015e9eb..7d7aa4fa4 100644 --- a/responding/responding-stream/pom.xml +++ b/responding/responding-stream/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol responding - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT @@ -51,7 +51,7 @@ uk.co.gresearch.siembol siembol-common - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT org.slf4j @@ -62,7 +62,7 @@ uk.co.gresearch.siembol responding-core - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT org.apache.kafka diff --git a/siembol-common/pom.xml b/siembol-common/pom.xml index 20d35fef4..19e2b407d 100644 --- a/siembol-common/pom.xml +++ b/siembol-common/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol siembol - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT diff --git a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/jsonschema/JsonRawStringDto.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/jsonschema/JsonRawStringDto.java index 8b84f0fe7..2205be3cf 100644 --- a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/jsonschema/JsonRawStringDto.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/jsonschema/JsonRawStringDto.java @@ -2,11 +2,13 @@ import com.fasterxml.jackson.annotation.JsonAnyGetter; import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.github.reinert.jjschema.Attributes; import com.github.reinert.jjschema.SchemaIgnore; import java.util.Map; import java.util.HashMap; +import java.util.Properties; @Attributes(title = "json raw string", description = "An arbitrary json object") public class JsonRawStringDto { @@ -23,4 +25,12 @@ public void set(String fieldName, Object value){ this.rawMap.put(fieldName, value); } + @JsonIgnore + @SchemaIgnore + public Properties getProperties() { + var props = new Properties(); + props.putAll(rawMap); + return props; + } + } diff --git a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/model/KafkaBatchWriterAttributesDto.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/model/KafkaBatchWriterAttributesDto.java index f80e2cd29..47a675a36 100644 --- a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/model/KafkaBatchWriterAttributesDto.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/model/KafkaBatchWriterAttributesDto.java @@ -1,26 +1,17 @@ package uk.co.gresearch.siembol.common.model; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.github.reinert.jjschema.Attributes; import uk.co.gresearch.siembol.common.jsonschema.JsonRawStringDto; +@JsonIgnoreProperties(ignoreUnknown = true) @Attributes(title = "kafka batch writer attributes", description = "Attributes for storm configuration") public class KafkaBatchWriterAttributesDto { - @JsonProperty("batch.size") - @Attributes(required = true, description = "The max size of batch for producing messages", minimum = 1) - private Integer batchSize = 1; @JsonProperty("producer.properties") @Attributes(required = true, description = "Defines kafka producer properties") private JsonRawStringDto producerProperties; - public Integer getBatchSize() { - return batchSize; - } - - public void setBatchSize(Integer batchSize) { - this.batchSize = batchSize; - } - public JsonRawStringDto getProducerProperties() { return producerProperties; } diff --git a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaBatchWriterMessage.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaBatchWriterMessage.java deleted file mode 100644 index 678db9411..000000000 --- a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaBatchWriterMessage.java +++ /dev/null @@ -1,22 +0,0 @@ -package uk.co.gresearch.siembol.common.storm; - -import java.io.Serializable; - -public class KafkaBatchWriterMessage implements Serializable { - private static final long serialVersionUID = 1L; - private final String topic; - private final String message; - - public KafkaBatchWriterMessage(String topic, String message) { - this.topic = topic; - this.message = message; - } - - public String getTopic() { - return topic; - } - - public String getMessage() { - return message; - } -} diff --git a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterAnchor.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterAnchor.java index 20d3145fe..988328960 100644 --- a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterAnchor.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterAnchor.java @@ -16,6 +16,10 @@ public void acquire() { counter.incrementAndGet(); } + public void acquire(int number) { + counter.addAndGet(number); + } + public boolean release() { return counter.decrementAndGet() <= 0; } diff --git a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterBolt.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterBolt.java new file mode 100644 index 000000000..edf79374e --- /dev/null +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterBolt.java @@ -0,0 +1,39 @@ +package uk.co.gresearch.siembol.common.storm; + +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.co.gresearch.siembol.common.model.KafkaBatchWriterAttributesDto; + +import java.lang.invoke.MethodHandles; + +public class KafkaWriterBolt extends KafkaWriterBoltBase { + private static final long serialVersionUID = 1L; + private static final Logger LOG = + LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String MISSING_MESSAGES_MSG = "Missing messages in tuple"; + private final String fieldName; + + public KafkaWriterBolt(KafkaBatchWriterAttributesDto attributes, String fieldName) { + super(attributes.getProducerProperties().getProperties()); + this.fieldName = fieldName; + } + + @Override + public void execute(Tuple tuple) { + Object messagesObject = tuple.getValueByField(fieldName); + if (!(messagesObject instanceof KafkaWriterMessages)) { + LOG.error(MISSING_MESSAGES_MSG); + throw new IllegalStateException(MISSING_MESSAGES_MSG); + } + + KafkaWriterMessages currentMessages = (KafkaWriterMessages)messagesObject; + if (currentMessages.isEmpty()) { + LOG.error(MISSING_MESSAGES_MSG); + throw new IllegalStateException(MISSING_MESSAGES_MSG); + } + + var anchor = new KafkaWriterAnchor(tuple); + writeMessages(currentMessages, anchor); + } +} diff --git a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaBatchWriterBolt.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterBoltBase.java similarity index 50% rename from siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaBatchWriterBolt.java rename to siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterBoltBase.java index 1814fa45f..2183c2f11 100644 --- a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaBatchWriterBolt.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterBoltBase.java @@ -1,92 +1,52 @@ package uk.co.gresearch.siembol.common.storm; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import uk.co.gresearch.siembol.common.model.KafkaBatchWriterAttributesDto; import java.lang.invoke.MethodHandles; -import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Properties; -import static org.apache.storm.utils.TupleUtils.isTick; -import static org.apache.storm.utils.TupleUtils.putTickFrequencyIntoComponentConfig; - -public class KafkaBatchWriterBolt extends BaseRichBolt { +public abstract class KafkaWriterBoltBase extends BaseRichBolt { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final int ACK_INTERVAL_ACK_IN_SEC = 1; private static final String AUTH_EXCEPTION_MESSAGE = "Authorization exception {} during writing messages to the kafka"; private static final String KAFKA_EXCEPTION_MESSAGE = "Exception {} during writing messages to the kafka"; private static final String SENDING_MESSAGE_LOG = - "Sending message: {} to the topic: {}"; - private static final String MISSING_MESSAGES_MSG = - "Missing messages in tuple"; + "Sending message: {}, key :{} to the topic: {} "; private final Properties props; - private final int batchSize; - private final String fieldName; - private final ArrayList> messages = new ArrayList<>(); private OutputCollector collector; private Producer producer; - public KafkaBatchWriterBolt(KafkaBatchWriterAttributesDto attributes, String fieldName) { - this.props = new Properties(); - props.putAll(attributes.getProducerProperties().getRawMap()); - this.batchSize = attributes.getBatchSize(); - this.messages.ensureCapacity(this.batchSize); - this.fieldName = fieldName; - } - - @Override - public void execute(Tuple tuple) { - if (isTick(tuple)) { - if (!messages.isEmpty()) { - writeTuples(); - } - return; - } - - Object messagesObject = tuple.getValueByField(fieldName); - if (!(messagesObject instanceof KafkaBatchWriterMessages)) { - LOG.error(MISSING_MESSAGES_MSG); - throw new IllegalStateException(MISSING_MESSAGES_MSG); - } - - KafkaBatchWriterMessages currentMessages = (KafkaBatchWriterMessages)messagesObject; - var anchor = new KafkaWriterAnchor(tuple); - currentMessages.forEach(x -> { - anchor.acquire(); - messages.add(Pair.of(x, anchor)); - }); - - if (messages.size() >= batchSize) { - writeTuples(); - } + protected KafkaWriterBoltBase(Properties producerProperties) { + this.props = producerProperties; } @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); + prepareInternally(); } + protected void prepareInternally() { + } + @Override public void cleanup() { producer.close(); @@ -96,9 +56,9 @@ public void cleanup() { public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } - @Override - public Map getComponentConfiguration() { - return putTickFrequencyIntoComponentConfig(null, ACK_INTERVAL_ACK_IN_SEC); + protected void writeMessages(List messages, KafkaWriterAnchor anchor) { + anchor.acquire(messages.size()); + messages.forEach(x -> writeMessage(x, anchor)); } private Callback createProducerCallback(final KafkaWriterAnchor anchor) { @@ -116,25 +76,18 @@ private Callback createProducerCallback(final KafkaWriterAnchor anchor) { }; } - private void writeTuples() { + private void writeMessage(KafkaWriterMessage message, KafkaWriterAnchor anchor) { try { - messages.forEach(x -> { - var message = x.getLeft(); - var callBack = createProducerCallback(x.getRight()); - - LOG.debug(SENDING_MESSAGE_LOG, message.getMessage(), message.getTopic()); - producer.send(new ProducerRecord<>(message.getTopic(), message.getMessage()), callBack); - }); - + var callBack = createProducerCallback(anchor); + LOG.debug(SENDING_MESSAGE_LOG, message.getMessage(), message.getKey(), message.getTopic()); + producer.send(message.getProducerRecord(), callBack); } catch (AuthorizationException e) { LOG.error(AUTH_EXCEPTION_MESSAGE, ExceptionUtils.getStackTrace(e)); producer.close(); throw new IllegalStateException(e); } catch (Exception e) { LOG.error(KAFKA_EXCEPTION_MESSAGE, ExceptionUtils.getStackTrace(e)); - messages.forEach(x -> collector.fail(x.getRight().getTuple())); - } finally { - messages.clear(); + collector.fail(anchor.getTuple()); } } } diff --git a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterMessage.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterMessage.java new file mode 100644 index 000000000..3c37747b1 --- /dev/null +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterMessage.java @@ -0,0 +1,38 @@ +package uk.co.gresearch.siembol.common.storm; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.io.Serializable; + +public class KafkaWriterMessage implements Serializable { + private static final long serialVersionUID = 1L; + private final String topic; + private final String message; + private final String key; + + public KafkaWriterMessage(String topic, String key, String message) { + this.topic = topic; + this.message = message; + this.key = key; + } + + public KafkaWriterMessage(String topic, String message) { + this(topic, null, message); + } + + public String getTopic() { + return topic; + } + + public String getMessage() { + return message; + } + + public String getKey() { + return key; + } + + public ProducerRecord getProducerRecord() { + return key == null ? new ProducerRecord<>(topic, message) : new ProducerRecord<>(topic, key, message); + } +} diff --git a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaBatchWriterMessages.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterMessages.java similarity index 61% rename from siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaBatchWriterMessages.java rename to siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterMessages.java index c6b6d7d42..c29b03785 100644 --- a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaBatchWriterMessages.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/storm/KafkaWriterMessages.java @@ -2,6 +2,6 @@ import java.util.ArrayList; -public class KafkaBatchWriterMessages extends ArrayList { +public class KafkaWriterMessages extends ArrayList { private static final long serialVersionUID = 1L; }