diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6bcb813..0e97304 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,7 +1,24 @@
# Changelog
All notable changes to this project will be documented in this file.
-## [Unreleased]
+## 2.0.0
+### Changed
+- Bumped the graylog image to 3.3
+- Increased the version of some of the images
+- Updated the services to fall in line with how the offical graylog documentation has them
+- Set the container names explicitly
+- Mounting the /usr/share/graylog/plugin volume explicitly so the jar can be manually built and copied into place
+
+### Added
+- Added some logging
+- Modified the configuration options. The 'queue' has been removed in favor of an 'exchange'. This is because the core model in RMQ is "a producer never sends any messages directly to a queue".
+- Added an optional routing_key configuration option.
+- Removed the Configuration check. This is done within the Graylog library itself so I wasn't sure that it was warranted.
+- Added a mandatory configuration item of 'vhost'. This will allow connecting to the non-default vhost.
+- Added new configuration options.
+- Increased the logging verbosity
+- Create a dummy channel so that we can validate an exchange exists. If it doesn't then we'll use the proper channel to create it, otherwise we just move on.
+- Modified the basicPublish to use the routing_key
## 1.4.1
### Fixed
diff --git a/Dockerfile b/Dockerfile
index f1992ff..1f08626 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -6,6 +6,6 @@ RUN cd /project
RUN mvn package
-FROM graylog/graylog:3.0
+FROM graylog/graylog:3.3
COPY --from=build /project/target/original-* /usr/share/graylog/plugin/
diff --git a/docker-compose.yml b/docker-compose.yml
index 3808390..48866a0 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -3,31 +3,58 @@ version: '3'
services:
mongodb:
image: mongo:3
+ container_name: mongo
+ networks:
+ - graylog
+
elasticsearch:
- image: elasticsearch:6.6.1
+ image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.8.10
+ container_name: elasticsearch
environment:
- http.host: 0.0.0.0
- transport.host: localhost
- network.host: 0.0.0.0
- ES_JAVA_OPTS: -Xms512m -Xmx512m
+ - http.host=0.0.0.0
+ - transport.host=localhost
+ - network.host=0.0.0.0
+ - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
+ ulimits:
+ memlock:
+ soft: -1
+ hard: -1
+ networks:
+ - graylog
+
graylog:
- build: .
+ image: graylog/graylog:3.3
+ container_name: graylog
environment:
- GRAYLOG_PASSWORD_SECRET: somepasswordpepper
+ # CHANGE ME (must be at least 16 characters)!
+ - GRAYLOG_PASSWORD_SECRET=somepasswordpepper
# Password: admin
- GRAYLOG_ROOT_PASSWORD_SHA2: 8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918
- GRAYLOG_HTTP_BIND_ADDRESS: 0.0.0.0:9000
- GRAYLOG_HTTP_EXTERNAL_URI: http://0.0.0.0:9000/
+ - GRAYLOG_ROOT_PASSWORD_SHA2=8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918
+ - GRAYLOG_HTTP_EXTERNAL_URI=http://127.0.0.1:9000/
+ networks:
+ - graylog
volumes:
+ - "./dist:/usr/share/graylog/plugin"
- "./dist:/out-plugin"
- links:
- - mongodb:mongo
+ ports:
+ - 9000:9000
+ - 12201:12201
+ - 1514:1514
depends_on:
- mongodb
- elasticsearch
+
rabbitmq:
image: rabbitmq:3-management
- hostname: rabbit
+ container_name: rabbitmq
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin
+ ports:
+ - 15672:15672
+ networks:
+ - graylog
+
+networks:
+ graylog:
+ driver: bridge
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 6b3796b..5b91bd6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
net.nexylan.graylog.plugins
graylog-rabbitmq
- 1.4.0
+ 2.0.0
jar
${project.artifactId}
diff --git a/src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java b/src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java
index 197c1c9..5c8eb04 100644
--- a/src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java
+++ b/src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java
@@ -19,6 +19,9 @@
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* This is the plugin. Your class should implement one of the existing plugin
* interfaces. (i.e. AlarmCallback, MessageInput, MessageOutput)
@@ -26,23 +29,22 @@
public class RabbitMq implements MessageOutput{
private static final String RABBIT_HOST = "rabbit_host";
private static final String RABBIT_PORT = "rabbit_port";
- private static final String RABBIT_QUEUE = "rabbit_queue";
+ private static final String RABBIT_VHOST = "rabbit_vhost";
+ private static final String RABBIT_EXCHANGE = "rabbit_exchange";
private static final String RABBIT_USER = "rabbit_user";
private static final String RABBIT_PASSWORD = "rabbit_password";
private static final String RABBIT_TTL = "rabbit_ttl";
- private static final String RABBIT_DURABLE = "rabbit_durable";
+ private static final String RABBIT_ROUTING_KEY = "rabbit_routing_key";
private static final String RABBIT_MESSAGE_FORMAT = "rabbit_message_format";
+ private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSender.class);
+
private boolean running;
private final Sender sender;
@Inject
public RabbitMq(@Assisted Configuration configuration) throws MessageOutputConfigurationException {
- // Check configuration.
- if (!checkConfiguration(configuration)) {
- throw new MessageOutputConfigurationException("Missing configuration.");
- }
//Convertir format to integer
final Map message_formats = ImmutableMap.of("message", 0, "json", 1);
@@ -51,11 +53,12 @@ public RabbitMq(@Assisted Configuration configuration) throws MessageOutputConfi
sender = new RabbitMQSender(
configuration.getString(RABBIT_HOST),
configuration.getInt(RABBIT_PORT),
- configuration.getString(RABBIT_QUEUE),
+ configuration.getString(RABBIT_VHOST),
+ configuration.getString(RABBIT_EXCHANGE),
configuration.getString(RABBIT_USER),
configuration.getString(RABBIT_PASSWORD),
configuration.getInt(RABBIT_TTL),
- configuration.getBoolean(RABBIT_DURABLE),
+ configuration.getString(RABBIT_ROUTING_KEY),
message_formats.get(configuration.getString(RABBIT_MESSAGE_FORMAT))
);
@@ -97,10 +100,6 @@ public void write(List list) {
}
}
- private boolean checkConfiguration(Configuration c) {
- return c.stringIsSet(RABBIT_HOST) && c.intIsSet(RABBIT_PORT) && c.stringIsSet(RABBIT_QUEUE) && c.stringIsSet(RABBIT_USER) && c.stringIsSet(RABBIT_PASSWORD) && c.intIsSet(RABBIT_TTL);
- }
-
@FactoryClass
public interface Factory extends MessageOutput.Factory {
@Override
@@ -132,8 +131,14 @@ public ConfigurationRequest getRequestedConfiguration() {
);
configurationRequest.addField(new TextField(
- RABBIT_QUEUE, "RabbitMQ Queue", "my_queue",
- "Name of the RabbitMQ Queue to push messages",
+ RABBIT_VHOST, "RabbitMQ Vhost", "",
+ "Name of the RabbitMQ vhost to connect to",
+ ConfigurationField.Optional.NOT_OPTIONAL)
+ );
+
+ configurationRequest.addField(new TextField(
+ RABBIT_EXCHANGE, "RabbitMQ Exchange", "",
+ "Name of the RabbitMQ Exchange to publish messages to",
ConfigurationField.Optional.NOT_OPTIONAL)
);
@@ -145,21 +150,22 @@ public ConfigurationRequest getRequestedConfiguration() {
configurationRequest.addField(new TextField(
RABBIT_PASSWORD, "RabbitMQ Password", "guest",
- "Password of the rabbitMQ user",
- ConfigurationField.Optional.NOT_OPTIONAL)
+ "Password of the rabbitMQ user. Default: guest",
+ ConfigurationField.Optional.NOT_OPTIONAL,
+ TextField.Attribute.IS_PASSWORD)
);
configurationRequest.addField(new NumberField(
RABBIT_TTL, "RabbitMQ TTL", -1,
- "The TTL of a message set -1 to disable",
+ "The TTL of a message. Set the value to any negative number to disable. Values are in milliseconds.",
ConfigurationField.Optional.NOT_OPTIONAL)
);
- configurationRequest.addField(new BooleanField(RABBIT_DURABLE,
- "RabbitMQ Durable",
- true,
- "May this queue must be durable ?"
- ));
+ configurationRequest.addField(new TextField(
+ RABBIT_ROUTING_KEY, "Queue Routing Key", "",
+ "The queue routing key.",
+ ConfigurationField.Optional.OPTIONAL)
+ );
final Map formats = ImmutableMap.of("message", "Message", "json", "JSON ( all fields )");
configurationRequest.addField(new DropdownField(
diff --git a/src/main/java/net/nexylan/graylog/rabbitmq/senders/RabbitMQSender.java b/src/main/java/net/nexylan/graylog/rabbitmq/senders/RabbitMQSender.java
index 573fe5a..69b81de 100644
--- a/src/main/java/net/nexylan/graylog/rabbitmq/senders/RabbitMQSender.java
+++ b/src/main/java/net/nexylan/graylog/rabbitmq/senders/RabbitMQSender.java
@@ -22,11 +22,12 @@ public class RabbitMQSender implements Sender {
//Queue properties
private String host;
private int port;
- private String queue;
+ private String vhost;
+ private String exchange;
private String user;
private String password;
private int ttl;
- private boolean durable;
+ private String routing_key;
//Message properties
private int message_format;
@@ -42,15 +43,16 @@ public class RabbitMQSender implements Sender {
private boolean is_initialized = false;
- public RabbitMQSender(String host, int port, String queue, String user, String password, int ttl, boolean durable, int message_format)
+ public RabbitMQSender(String host, int port, String vhost, String exchange, String user, String password, int ttl, String routing_key, int message_format)
{
this.host = host;
this.port = port;
- this.queue = queue;
+ this.vhost = vhost;
+ this.exchange = exchange;
this.user = user;
this.password = password;
this.ttl = ttl;
- this.durable = durable;
+ this.routing_key = routing_key;
this.message_format = message_format;
initialize();
@@ -68,39 +70,72 @@ public void initialize()
factory = new ConnectionFactory();
factory.setHost(this.host);
factory.setPort(this.port);
+ factory.setVirtualHost(this.vhost);
factory.setUsername(this.user);
factory.setPassword(this.password);
try {
this.connection = factory.newConnection();
- LOG.info("[RabbitMQ] Successfully connected to the server.");
+ LOG.info("[RabbitMQ] Successfully connected to vhost " + this.vhost + " on server " + this.host + ":" + this.port);
} catch (IOException e) {
- LOG.error("[RabbitMQ] Failed to connect to RabbitMQ Server.");
+ LOG.error("[RabbitMQ] Failed to connect to vhost " + this.vhost + " on server " + this.host + ":" + this.port);
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
- LOG.error("[RabbitMQ] The RabbitMQ Server timed out.");
+ LOG.error("[RabbitMQ] Time out connecting to vhost " + this.vhost + " on server " + this.host + ":" + this.port);
}
+
try {
this.channel = this.connection.createChannel();
- LOG.info("[RabbitMQ] The channel have been successfully created.");
+ LOG.info("[RabbitMQ] The channel has been successfully created.");
} catch (IOException e) {
LOG.error("[RabbitMQ] An error occurred during the channel creation.");
e.printStackTrace();
}
+ // Check to see if the exchange exists. If it doesn't then try to declare it.
+ // If it does exist then just move along.
+ Channel dummy_channel = null;
try {
- this.channel.queueDeclare(this.queue, false, this.durable, false, null);
- LOG.info("[RabbitMQ] The queue have been successfully created.");
- } catch (IOException e) {
- LOG.error("[RabbitMQ] Impossible to declare the queue.");
- e.printStackTrace();
+ // Create a dummy channel.
+ dummy_channel = this.connection.createChannel();
+ LOG.info("[RabbitMQ] " + this.exchange + " exists. We'll use it.");
+
+ // exchangeDeclarePassive throws IOException -
+ // the server will raise a 404 channel exception if the named exchange does not exist.
+ dummy_channel.exchangeDeclarePassive(this.exchange);
+
+ // Close the channel
+ try {
+ dummy_channel.close();
+ } catch (Exception e) {
+ LOG.error("[RabbitMQ] Exception occurred closing the dummy channel.", e);
+ }
+ } catch (IOException oe) {
+ // The exchange doesn't exist. Try to declare it.
+ try {
+ LOG.info("[RabbitMQ] " + this.exchange + " does not exist. Will attempt to declare as a direct, durable exchange");
+ this.channel.exchangeDeclare(this.exchange, "direct", true);
+ } catch (IOException e) {
+ LOG.error("[RabbitMQ] An error occurred declaring the exchange." + this.exchange, e);
+ }
+ }
+
+ if (this.routing_key == "") {
+ LOG.info("[RabbitMQ] Routing key was not specified. Defaulting to an empty value.");
+ this.routing_key = "";
+ } else {
+ LOG.info("[RabbitMQ] Using routing key " + this.routing_key);
}
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
- if(this.ttl != -1)
+ if (this.ttl >= 0) {
+ LOG.info("[RabbitMQ] Setting TTL to " + this.ttl);
builder.expiration(Integer.toString(this.ttl));
+ } else {
+ LOG.info("[RabbitMQ] Disabling TTL");
+ }
this.sendProperties = builder.build();
this.is_initialized = true;
@@ -113,7 +148,7 @@ public void stop()
try {
this.channel.close();
} catch (TimeoutException e) {
- LOG.error("[RabbitMQ] An error occurred while closing the channel.");
+ LOG.error("[RabbitMQ] Timeout while closing the channel.");
e.printStackTrace();
} catch (IOException e) {
LOG.error("[RabbitMQ] An error occurred.");
@@ -134,10 +169,10 @@ public void send(Message message)
try {
switch(this.message_format){
case 0:
- this.channel.basicPublish("", this.queue, this.sendProperties, message.getMessage().getBytes());
+ this.channel.basicPublish(this.exchange, this.routing_key, this.sendProperties, message.getMessage().getBytes());
break;
case 1:
- this.channel.basicPublish("", this.queue, this.sendProperties, this.formatToJson(message.getFields()).getBytes());
+ this.channel.basicPublish(this.exchange, this.routing_key, this.sendProperties, this.formatToJson(message.getFields()).getBytes());
break;
}
} catch (JsonProcessingException exception) {