From 1b7f5c258b10a9dd958aa8cf6027c7096b8872cd Mon Sep 17 00:00:00 2001 From: mjtice Date: Thu, 17 Sep 2020 11:15:02 -0600 Subject: [PATCH 01/10] Dockerfile * Bumped the graylog image to 3.3 --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index f1992ff..1f08626 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,6 +6,6 @@ RUN cd /project RUN mvn package -FROM graylog/graylog:3.0 +FROM graylog/graylog:3.3 COPY --from=build /project/target/original-* /usr/share/graylog/plugin/ From 492d2f04d3dfe4e2ce005c5c55ad2ec2b34e062b Mon Sep 17 00:00:00 2001 From: mjtice Date: Thu, 17 Sep 2020 11:17:59 -0600 Subject: [PATCH 02/10] docker-compose.yml * Increased the version of some of the images * Updated the services to fall in line with how the offical graylog documentation has them * Set the container names explicitly * Mounting the /usr/share/graylog/plugin volume explicitly so the jar can be manually built and copied into place --- docker-compose.yml | 53 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 3808390..48866a0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,31 +3,58 @@ version: '3' services: mongodb: image: mongo:3 + container_name: mongo + networks: + - graylog + elasticsearch: - image: elasticsearch:6.6.1 + image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.8.10 + container_name: elasticsearch environment: - http.host: 0.0.0.0 - transport.host: localhost - network.host: 0.0.0.0 - ES_JAVA_OPTS: -Xms512m -Xmx512m + - http.host=0.0.0.0 + - transport.host=localhost + - network.host=0.0.0.0 + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + networks: + - graylog + graylog: - build: . + image: graylog/graylog:3.3 + container_name: graylog environment: - GRAYLOG_PASSWORD_SECRET: somepasswordpepper + # CHANGE ME (must be at least 16 characters)! + - GRAYLOG_PASSWORD_SECRET=somepasswordpepper # Password: admin - GRAYLOG_ROOT_PASSWORD_SHA2: 8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918 - GRAYLOG_HTTP_BIND_ADDRESS: 0.0.0.0:9000 - GRAYLOG_HTTP_EXTERNAL_URI: http://0.0.0.0:9000/ + - GRAYLOG_ROOT_PASSWORD_SHA2=8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918 + - GRAYLOG_HTTP_EXTERNAL_URI=http://127.0.0.1:9000/ + networks: + - graylog volumes: + - "./dist:/usr/share/graylog/plugin" - "./dist:/out-plugin" - links: - - mongodb:mongo + ports: + - 9000:9000 + - 12201:12201 + - 1514:1514 depends_on: - mongodb - elasticsearch + rabbitmq: image: rabbitmq:3-management - hostname: rabbit + container_name: rabbitmq environment: RABBITMQ_DEFAULT_USER: admin RABBITMQ_DEFAULT_PASS: admin + ports: + - 15672:15672 + networks: + - graylog + +networks: + graylog: + driver: bridge \ No newline at end of file From b7082813c343fc952dfd1cec63d3bf7c53a69e9f Mon Sep 17 00:00:00 2001 From: mjtice Date: Thu, 17 Sep 2020 11:23:47 -0600 Subject: [PATCH 03/10] RabbitMQ.java * Added some logging * Modified the configuration options. The 'queue' has been removed in favor of an 'exchange'. This is because the core model in RMQ is "a producer never sends any messages directly to a queue". * Added an optional routing_key configuration option. * Removed the Configuration check. This is done within the Graylog library itself so I wasn't sure that it was warranted. * Added a mandatory configuration item of 'vhost'. This will allow connecting to the non-default vhost. --- .../nexylan/graylog/rabbitmq/RabbitMq.java | 45 ++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java b/src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java index 197c1c9..884ca11 100644 --- a/src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java +++ b/src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java @@ -19,6 +19,9 @@ import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * This is the plugin. Your class should implement one of the existing plugin * interfaces. (i.e. AlarmCallback, MessageInput, MessageOutput) @@ -26,23 +29,22 @@ public class RabbitMq implements MessageOutput{ private static final String RABBIT_HOST = "rabbit_host"; private static final String RABBIT_PORT = "rabbit_port"; - private static final String RABBIT_QUEUE = "rabbit_queue"; + private static final String RABBIT_VHOST = "rabbit_vhost"; + private static final String RABBIT_EXCHANGE = "rabbit_exchange"; private static final String RABBIT_USER = "rabbit_user"; private static final String RABBIT_PASSWORD = "rabbit_password"; private static final String RABBIT_TTL = "rabbit_ttl"; - private static final String RABBIT_DURABLE = "rabbit_durable"; + private static final String RABBIT_ROUTING_KEY = "rabbit_routing_key"; private static final String RABBIT_MESSAGE_FORMAT = "rabbit_message_format"; + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSender.class); + private boolean running; private final Sender sender; @Inject public RabbitMq(@Assisted Configuration configuration) throws MessageOutputConfigurationException { - // Check configuration. - if (!checkConfiguration(configuration)) { - throw new MessageOutputConfigurationException("Missing configuration."); - } //Convertir format to integer final Map message_formats = ImmutableMap.of("message", 0, "json", 1); @@ -51,11 +53,12 @@ public RabbitMq(@Assisted Configuration configuration) throws MessageOutputConfi sender = new RabbitMQSender( configuration.getString(RABBIT_HOST), configuration.getInt(RABBIT_PORT), - configuration.getString(RABBIT_QUEUE), + configuration.getString(RABBIT_VHOST), + configuration.getString(RABBIT_EXCHANGE), configuration.getString(RABBIT_USER), configuration.getString(RABBIT_PASSWORD), configuration.getInt(RABBIT_TTL), - configuration.getBoolean(RABBIT_DURABLE), + configuration.getString(RABBIT_ROUTING_KEY), message_formats.get(configuration.getString(RABBIT_MESSAGE_FORMAT)) ); @@ -97,10 +100,6 @@ public void write(List list) { } } - private boolean checkConfiguration(Configuration c) { - return c.stringIsSet(RABBIT_HOST) && c.intIsSet(RABBIT_PORT) && c.stringIsSet(RABBIT_QUEUE) && c.stringIsSet(RABBIT_USER) && c.stringIsSet(RABBIT_PASSWORD) && c.intIsSet(RABBIT_TTL); - } - @FactoryClass public interface Factory extends MessageOutput.Factory { @Override @@ -132,8 +131,14 @@ public ConfigurationRequest getRequestedConfiguration() { ); configurationRequest.addField(new TextField( - RABBIT_QUEUE, "RabbitMQ Queue", "my_queue", - "Name of the RabbitMQ Queue to push messages", + RABBIT_VHOST, "RabbitMQ Vhost", "", + "Name of the RabbitMQ vhost to connect to", + ConfigurationField.Optional.NOT_OPTIONAL) + ); + + configurationRequest.addField(new TextField( + RABBIT_EXCHANGE, "RabbitMQ Exchange", "", + "Name of the RabbitMQ Exchange to publish messages to", ConfigurationField.Optional.NOT_OPTIONAL) ); @@ -151,15 +156,15 @@ public ConfigurationRequest getRequestedConfiguration() { configurationRequest.addField(new NumberField( RABBIT_TTL, "RabbitMQ TTL", -1, - "The TTL of a message set -1 to disable", + "The TTL of a message. Set the value to any negative number to disable. Values are in milliseconds.", ConfigurationField.Optional.NOT_OPTIONAL) ); - configurationRequest.addField(new BooleanField(RABBIT_DURABLE, - "RabbitMQ Durable", - true, - "May this queue must be durable ?" - )); + configurationRequest.addField(new TextField( + RABBIT_ROUTING_KEY, "Queue Routing Key", "", + "The queue routing key.", + ConfigurationField.Optional.OPTIONAL) + ); final Map formats = ImmutableMap.of("message", "Message", "json", "JSON ( all fields )"); configurationRequest.addField(new DropdownField( From 064cf44ef74ecd9b5e47dbe4f5eb9a9d54ab2c51 Mon Sep 17 00:00:00 2001 From: mjtice Date: Thu, 17 Sep 2020 11:25:54 -0600 Subject: [PATCH 04/10] RabbitMQSender.java * Added new configuration options. * Increased the logging verbosity * Create a dummy channel so that we can validate an exchange exists. If it doesn't then we'll use the proper channel to create it, otherwise we just move on. * Modified the basicPublish to use the routing_key --- .../rabbitmq/senders/RabbitMQSender.java | 63 ++++++++++++++----- 1 file changed, 46 insertions(+), 17 deletions(-) diff --git a/src/main/java/net/nexylan/graylog/rabbitmq/senders/RabbitMQSender.java b/src/main/java/net/nexylan/graylog/rabbitmq/senders/RabbitMQSender.java index 573fe5a..8d83882 100644 --- a/src/main/java/net/nexylan/graylog/rabbitmq/senders/RabbitMQSender.java +++ b/src/main/java/net/nexylan/graylog/rabbitmq/senders/RabbitMQSender.java @@ -22,11 +22,12 @@ public class RabbitMQSender implements Sender { //Queue properties private String host; private int port; - private String queue; + private String vhost; + private String exchange; private String user; private String password; private int ttl; - private boolean durable; + private String routing_key; //Message properties private int message_format; @@ -34,6 +35,7 @@ public class RabbitMQSender implements Sender { //RabbitMQ objects private Connection connection; private Channel channel; + private Channel dummy_channel; private boolean lock; private AMQP.BasicProperties sendProperties; @@ -42,15 +44,16 @@ public class RabbitMQSender implements Sender { private boolean is_initialized = false; - public RabbitMQSender(String host, int port, String queue, String user, String password, int ttl, boolean durable, int message_format) + public RabbitMQSender(String host, int port, String vhost, String exchange, String user, String password, int ttl, String routing_key, int message_format) { this.host = host; this.port = port; - this.queue = queue; + this.vhost = vhost; + this.exchange = exchange; this.user = user; this.password = password; this.ttl = ttl; - this.durable = durable; + this.routing_key = routing_key; this.message_format = message_format; initialize(); @@ -68,19 +71,21 @@ public void initialize() factory = new ConnectionFactory(); factory.setHost(this.host); factory.setPort(this.port); + factory.setVirtualHost(this.vhost); factory.setUsername(this.user); factory.setPassword(this.password); try { this.connection = factory.newConnection(); - LOG.info("[RabbitMQ] Successfully connected to the server."); + LOG.info("[RabbitMQ] Successfully connected to vhost " + this.vhost + " on server " + this.host + ":" + this.port); } catch (IOException e) { - LOG.error("[RabbitMQ] Failed to connect to RabbitMQ Server."); + LOG.error("[RabbitMQ] Failed to connect to vhost " + this.vhost + " on server " + this.host + ":" + this.port); e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); - LOG.error("[RabbitMQ] The RabbitMQ Server timed out."); + LOG.error("[RabbitMQ] Time out connecting to vhost " + this.vhost + " on server " + this.host + ":" + this.port); } + try { this.channel = this.connection.createChannel(); LOG.info("[RabbitMQ] The channel have been successfully created."); @@ -89,18 +94,42 @@ public void initialize() e.printStackTrace(); } + // Attempt to declare the exchange. If it already exists then capture the exception try { - this.channel.queueDeclare(this.queue, false, this.durable, false, null); - LOG.info("[RabbitMQ] The queue have been successfully created."); - } catch (IOException e) { - LOG.error("[RabbitMQ] Impossible to declare the queue."); - e.printStackTrace(); + // Create a dummy channel. + this.dummy_channel = this.connection.createChannel(); + this.dummy_channel.exchangeDeclarePassive(this.exchange); + } catch (IOException oe) { + try { + LOG.info("[RabbitMQ] Attempting to declare " + this.exchange + " as a direct, durable exchange (if it doesn't already exist)"); + this.channel.exchangeDeclare(this.exchange, "direct", true); + try { + this.dummy_channel.close(); + } catch (TimeoutException ie) { + LOG.error("[RabbitMQ] Timeout occurred while closing the channel."); + ie.printStackTrace(); + } + } catch (IOException e) { + LOG.error("[RabbitMQ] An error occurred declaring the exchange."); + e.printStackTrace(); + } + } + + if (this.routing_key == "") { + LOG.info("[RabbitMQ] Routing key was not specified. Defaulting to an empty value."); + this.routing_key = ""; + } else { + LOG.info("[RabbitMQ] Using routing key " + this.routing_key); } AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); - if(this.ttl != -1) + if (this.ttl >= 0) { + LOG.info("[RabbitMQ] Setting TTL to " + this.ttl); builder.expiration(Integer.toString(this.ttl)); + } else { + LOG.info("[RabbitMQ] Disabling TTL"); + } this.sendProperties = builder.build(); this.is_initialized = true; @@ -113,7 +142,7 @@ public void stop() try { this.channel.close(); } catch (TimeoutException e) { - LOG.error("[RabbitMQ] An error occurred while closing the channel."); + LOG.error("[RabbitMQ] Timeout while closing the channel."); e.printStackTrace(); } catch (IOException e) { LOG.error("[RabbitMQ] An error occurred."); @@ -134,10 +163,10 @@ public void send(Message message) try { switch(this.message_format){ case 0: - this.channel.basicPublish("", this.queue, this.sendProperties, message.getMessage().getBytes()); + this.channel.basicPublish(this.exchange, this.routing_key, this.sendProperties, message.getMessage().getBytes()); break; case 1: - this.channel.basicPublish("", this.queue, this.sendProperties, this.formatToJson(message.getFields()).getBytes()); + this.channel.basicPublish(this.exchange, this.routing_key, this.sendProperties, this.formatToJson(message.getFields()).getBytes()); break; } } catch (JsonProcessingException exception) { From 89764df6178b401e8583eda97a3e181247f5eadc Mon Sep 17 00:00:00 2001 From: mjtice Date: Thu, 17 Sep 2020 11:26:28 -0600 Subject: [PATCH 05/10] CHANGELOG.md * Updating. --- CHANGELOG.md | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bcb813..6ac76c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,24 @@ # Changelog All notable changes to this project will be documented in this file. -## [Unreleased] +## 1.5.0 +### Changed +- Bumped the graylog image to 3.3 +- Increased the version of some of the images +- Updated the services to fall in line with how the offical graylog documentation has them +- Set the container names explicitly +- Mounting the /usr/share/graylog/plugin volume explicitly so the jar can be manually built and copied into place + +### Added +- Added some logging +- Modified the configuration options. The 'queue' has been removed in favor of an 'exchange'. This is because the core model in RMQ is "a producer never sends any messages directly to a queue". +- Added an optional routing_key configuration option. +- Removed the Configuration check. This is done within the Graylog library itself so I wasn't sure that it was warranted. +- Added a mandatory configuration item of 'vhost'. This will allow connecting to the non-default vhost. +- Added new configuration options. +- Increased the logging verbosity +- Create a dummy channel so that we can validate an exchange exists. If it doesn't then we'll use the proper channel to create it, otherwise we just move on. +- Modified the basicPublish to use the routing_key ## 1.4.1 ### Fixed From 8eea2f29fad0db9d600dfe50890163861f57289a Mon Sep 17 00:00:00 2001 From: mjtice Date: Thu, 17 Sep 2020 11:27:37 -0600 Subject: [PATCH 06/10] pom.xml * Updating ver. to 1.5.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6b3796b..368ed75 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 net.nexylan.graylog.plugins graylog-rabbitmq - 1.4.0 + 1.5.0 jar ${project.artifactId} From b34ba58e8c129bdbcc6ce25aaf8926138385c77a Mon Sep 17 00:00:00 2001 From: mjtice Date: Fri, 18 Sep 2020 08:23:06 -0600 Subject: [PATCH 07/10] RabbitMQSender.java * Closing the dummy channel after a successful exchange validation. * Adding some additional logging. --- .../rabbitmq/senders/RabbitMQSender.java | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/src/main/java/net/nexylan/graylog/rabbitmq/senders/RabbitMQSender.java b/src/main/java/net/nexylan/graylog/rabbitmq/senders/RabbitMQSender.java index 8d83882..69b81de 100644 --- a/src/main/java/net/nexylan/graylog/rabbitmq/senders/RabbitMQSender.java +++ b/src/main/java/net/nexylan/graylog/rabbitmq/senders/RabbitMQSender.java @@ -35,7 +35,6 @@ public class RabbitMQSender implements Sender { //RabbitMQ objects private Connection connection; private Channel channel; - private Channel dummy_channel; private boolean lock; private AMQP.BasicProperties sendProperties; @@ -88,30 +87,37 @@ public void initialize() try { this.channel = this.connection.createChannel(); - LOG.info("[RabbitMQ] The channel have been successfully created."); + LOG.info("[RabbitMQ] The channel has been successfully created."); } catch (IOException e) { LOG.error("[RabbitMQ] An error occurred during the channel creation."); e.printStackTrace(); } - // Attempt to declare the exchange. If it already exists then capture the exception + // Check to see if the exchange exists. If it doesn't then try to declare it. + // If it does exist then just move along. + Channel dummy_channel = null; try { // Create a dummy channel. - this.dummy_channel = this.connection.createChannel(); - this.dummy_channel.exchangeDeclarePassive(this.exchange); + dummy_channel = this.connection.createChannel(); + LOG.info("[RabbitMQ] " + this.exchange + " exists. We'll use it."); + + // exchangeDeclarePassive throws IOException - + // the server will raise a 404 channel exception if the named exchange does not exist. + dummy_channel.exchangeDeclarePassive(this.exchange); + + // Close the channel + try { + dummy_channel.close(); + } catch (Exception e) { + LOG.error("[RabbitMQ] Exception occurred closing the dummy channel.", e); + } } catch (IOException oe) { + // The exchange doesn't exist. Try to declare it. try { - LOG.info("[RabbitMQ] Attempting to declare " + this.exchange + " as a direct, durable exchange (if it doesn't already exist)"); + LOG.info("[RabbitMQ] " + this.exchange + " does not exist. Will attempt to declare as a direct, durable exchange"); this.channel.exchangeDeclare(this.exchange, "direct", true); - try { - this.dummy_channel.close(); - } catch (TimeoutException ie) { - LOG.error("[RabbitMQ] Timeout occurred while closing the channel."); - ie.printStackTrace(); - } } catch (IOException e) { - LOG.error("[RabbitMQ] An error occurred declaring the exchange."); - e.printStackTrace(); + LOG.error("[RabbitMQ] An error occurred declaring the exchange." + this.exchange, e); } } From 30cc7276681a1456d64d642df5d88af06227277e Mon Sep 17 00:00:00 2001 From: mjtice Date: Fri, 18 Sep 2020 08:23:42 -0600 Subject: [PATCH 08/10] RabbitMq.java * Setting the RMQ Password config with the IS_PASSWORD attribute. --- src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java b/src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java index 884ca11..5c8eb04 100644 --- a/src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java +++ b/src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java @@ -150,8 +150,9 @@ public ConfigurationRequest getRequestedConfiguration() { configurationRequest.addField(new TextField( RABBIT_PASSWORD, "RabbitMQ Password", "guest", - "Password of the rabbitMQ user", - ConfigurationField.Optional.NOT_OPTIONAL) + "Password of the rabbitMQ user. Default: guest", + ConfigurationField.Optional.NOT_OPTIONAL, + TextField.Attribute.IS_PASSWORD) ); configurationRequest.addField(new NumberField( From a02e577b7c0a1b6f16f10bf78d4d5ba567d275df Mon Sep 17 00:00:00 2001 From: mjtice Date: Fri, 18 Sep 2020 08:24:35 -0600 Subject: [PATCH 09/10] pom.xml * I feel this is a pretty big departure from 1.4.1 so I'm going to bump this up a major release. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 368ed75..5b91bd6 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 net.nexylan.graylog.plugins graylog-rabbitmq - 1.5.0 + 2.0.0 jar ${project.artifactId} From 2163ec47170025afced793499f8f1aeebd206554 Mon Sep 17 00:00:00 2001 From: mjtice Date: Fri, 18 Sep 2020 08:26:38 -0600 Subject: [PATCH 10/10] CHANGELOG.md * Updating latest version to 2.0.0 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ac76c9..0e97304 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ # Changelog All notable changes to this project will be documented in this file. -## 1.5.0 +## 2.0.0 ### Changed - Bumped the graylog image to 3.3 - Increased the version of some of the images