Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated to reflect RMQ message model #13

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
# Changelog
All notable changes to this project will be documented in this file.

## [Unreleased]
## 2.0.0
### Changed
- Bumped the graylog image to 3.3
- Increased the version of some of the images
- Updated the services to fall in line with how the offical graylog documentation has them
- Set the container names explicitly
- Mounting the /usr/share/graylog/plugin volume explicitly so the jar can be manually built and copied into place

### Added
- Added some logging
- Modified the configuration options. The 'queue' has been removed in favor of an 'exchange'. This is because the core model in RMQ is "a producer never sends any messages directly to a queue".
- Added an optional routing_key configuration option.
- Removed the Configuration check. This is done within the Graylog library itself so I wasn't sure that it was warranted.
- Added a mandatory configuration item of 'vhost'. This will allow connecting to the non-default vhost.
- Added new configuration options.
- Increased the logging verbosity
- Create a dummy channel so that we can validate an exchange exists. If it doesn't then we'll use the proper channel to create it, otherwise we just move on.
- Modified the basicPublish to use the routing_key

## 1.4.1
### Fixed
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ RUN cd /project

RUN mvn package

FROM graylog/graylog:3.0
FROM graylog/graylog:3.3

COPY --from=build /project/target/original-* /usr/share/graylog/plugin/
53 changes: 40 additions & 13 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,58 @@ version: '3'
services:
mongodb:
image: mongo:3
container_name: mongo
networks:
- graylog

elasticsearch:
image: elasticsearch:6.6.1
image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.8.10
container_name: elasticsearch
environment:
http.host: 0.0.0.0
transport.host: localhost
network.host: 0.0.0.0
ES_JAVA_OPTS: -Xms512m -Xmx512m
- http.host=0.0.0.0
- transport.host=localhost
- network.host=0.0.0.0
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
networks:
- graylog

graylog:
build: .
image: graylog/graylog:3.3
container_name: graylog
environment:
GRAYLOG_PASSWORD_SECRET: somepasswordpepper
# CHANGE ME (must be at least 16 characters)!
- GRAYLOG_PASSWORD_SECRET=somepasswordpepper
# Password: admin
GRAYLOG_ROOT_PASSWORD_SHA2: 8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918
GRAYLOG_HTTP_BIND_ADDRESS: 0.0.0.0:9000
GRAYLOG_HTTP_EXTERNAL_URI: http://0.0.0.0:9000/
- GRAYLOG_ROOT_PASSWORD_SHA2=8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918
- GRAYLOG_HTTP_EXTERNAL_URI=http://127.0.0.1:9000/
networks:
- graylog
volumes:
- "./dist:/usr/share/graylog/plugin"
- "./dist:/out-plugin"
links:
- mongodb:mongo
ports:
- 9000:9000
- 12201:12201
- 1514:1514
depends_on:
- mongodb
- elasticsearch

rabbitmq:
image: rabbitmq:3-management
hostname: rabbit
container_name: rabbitmq
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin
ports:
- 15672:15672
networks:
- graylog

networks:
graylog:
driver: bridge
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>net.nexylan.graylog.plugins</groupId>
<artifactId>graylog-rabbitmq</artifactId>
<version>1.4.0</version>
<version>2.0.0</version>
<packaging>jar</packaging>

<name>${project.artifactId}</name>
Expand Down
50 changes: 28 additions & 22 deletions src/main/java/net/nexylan/graylog/rabbitmq/RabbitMq.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,32 @@
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This is the plugin. Your class should implement one of the existing plugin
* interfaces. (i.e. AlarmCallback, MessageInput, MessageOutput)
*/
public class RabbitMq implements MessageOutput{
private static final String RABBIT_HOST = "rabbit_host";
private static final String RABBIT_PORT = "rabbit_port";
private static final String RABBIT_QUEUE = "rabbit_queue";
private static final String RABBIT_VHOST = "rabbit_vhost";
private static final String RABBIT_EXCHANGE = "rabbit_exchange";
private static final String RABBIT_USER = "rabbit_user";
private static final String RABBIT_PASSWORD = "rabbit_password";
private static final String RABBIT_TTL = "rabbit_ttl";
private static final String RABBIT_DURABLE = "rabbit_durable";
private static final String RABBIT_ROUTING_KEY = "rabbit_routing_key";
private static final String RABBIT_MESSAGE_FORMAT = "rabbit_message_format";

private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSender.class);

private boolean running;

private final Sender sender;

@Inject
public RabbitMq(@Assisted Configuration configuration) throws MessageOutputConfigurationException {
// Check configuration.
if (!checkConfiguration(configuration)) {
throw new MessageOutputConfigurationException("Missing configuration.");
}

//Convertir format to integer
final Map<String, Integer> message_formats = ImmutableMap.of("message", 0, "json", 1);
Expand All @@ -51,11 +53,12 @@ public RabbitMq(@Assisted Configuration configuration) throws MessageOutputConfi
sender = new RabbitMQSender(
configuration.getString(RABBIT_HOST),
configuration.getInt(RABBIT_PORT),
configuration.getString(RABBIT_QUEUE),
configuration.getString(RABBIT_VHOST),
configuration.getString(RABBIT_EXCHANGE),
configuration.getString(RABBIT_USER),
configuration.getString(RABBIT_PASSWORD),
configuration.getInt(RABBIT_TTL),
configuration.getBoolean(RABBIT_DURABLE),
configuration.getString(RABBIT_ROUTING_KEY),
message_formats.get(configuration.getString(RABBIT_MESSAGE_FORMAT))
);

Expand Down Expand Up @@ -97,10 +100,6 @@ public void write(List<Message> list) {
}
}

private boolean checkConfiguration(Configuration c) {
return c.stringIsSet(RABBIT_HOST) && c.intIsSet(RABBIT_PORT) && c.stringIsSet(RABBIT_QUEUE) && c.stringIsSet(RABBIT_USER) && c.stringIsSet(RABBIT_PASSWORD) && c.intIsSet(RABBIT_TTL);
}

@FactoryClass
public interface Factory extends MessageOutput.Factory<RabbitMq> {
@Override
Expand Down Expand Up @@ -132,8 +131,14 @@ public ConfigurationRequest getRequestedConfiguration() {
);

configurationRequest.addField(new TextField(
RABBIT_QUEUE, "RabbitMQ Queue", "my_queue",
"Name of the RabbitMQ Queue to push messages",
RABBIT_VHOST, "RabbitMQ Vhost", "",
"Name of the RabbitMQ vhost to connect to",
ConfigurationField.Optional.NOT_OPTIONAL)
);

configurationRequest.addField(new TextField(
RABBIT_EXCHANGE, "RabbitMQ Exchange", "",
"Name of the RabbitMQ Exchange to publish messages to",
ConfigurationField.Optional.NOT_OPTIONAL)
);

Expand All @@ -145,21 +150,22 @@ public ConfigurationRequest getRequestedConfiguration() {

configurationRequest.addField(new TextField(
RABBIT_PASSWORD, "RabbitMQ Password", "guest",
"Password of the rabbitMQ user",
ConfigurationField.Optional.NOT_OPTIONAL)
"Password of the rabbitMQ user. Default: guest",
ConfigurationField.Optional.NOT_OPTIONAL,
TextField.Attribute.IS_PASSWORD)
);

configurationRequest.addField(new NumberField(
RABBIT_TTL, "RabbitMQ TTL", -1,
"The TTL of a message set -1 to disable",
"The TTL of a message. Set the value to any negative number to disable. Values are in milliseconds.",
ConfigurationField.Optional.NOT_OPTIONAL)
);

configurationRequest.addField(new BooleanField(RABBIT_DURABLE,
"RabbitMQ Durable",
true,
"May this queue must be durable ?"
));
configurationRequest.addField(new TextField(
RABBIT_ROUTING_KEY, "Queue Routing Key", "",
"The queue routing key.",
ConfigurationField.Optional.OPTIONAL)
);

final Map<String, String> formats = ImmutableMap.of("message", "Message", "json", "JSON ( all fields )");
configurationRequest.addField(new DropdownField(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ public class RabbitMQSender implements Sender {
//Queue properties
private String host;
private int port;
private String queue;
private String vhost;
private String exchange;
private String user;
private String password;
private int ttl;
private boolean durable;
private String routing_key;

//Message properties
private int message_format;
Expand All @@ -42,15 +43,16 @@ public class RabbitMQSender implements Sender {

private boolean is_initialized = false;

public RabbitMQSender(String host, int port, String queue, String user, String password, int ttl, boolean durable, int message_format)
public RabbitMQSender(String host, int port, String vhost, String exchange, String user, String password, int ttl, String routing_key, int message_format)
{
this.host = host;
this.port = port;
this.queue = queue;
this.vhost = vhost;
this.exchange = exchange;
this.user = user;
this.password = password;
this.ttl = ttl;
this.durable = durable;
this.routing_key = routing_key;
this.message_format = message_format;

initialize();
Expand All @@ -68,39 +70,72 @@ public void initialize()
factory = new ConnectionFactory();
factory.setHost(this.host);
factory.setPort(this.port);
factory.setVirtualHost(this.vhost);
factory.setUsername(this.user);
factory.setPassword(this.password);

try {
this.connection = factory.newConnection();
LOG.info("[RabbitMQ] Successfully connected to the server.");
LOG.info("[RabbitMQ] Successfully connected to vhost " + this.vhost + " on server " + this.host + ":" + this.port);
} catch (IOException e) {
LOG.error("[RabbitMQ] Failed to connect to RabbitMQ Server.");
LOG.error("[RabbitMQ] Failed to connect to vhost " + this.vhost + " on server " + this.host + ":" + this.port);
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
LOG.error("[RabbitMQ] The RabbitMQ Server timed out.");
LOG.error("[RabbitMQ] Time out connecting to vhost " + this.vhost + " on server " + this.host + ":" + this.port);
}

try {
this.channel = this.connection.createChannel();
LOG.info("[RabbitMQ] The channel have been successfully created.");
LOG.info("[RabbitMQ] The channel has been successfully created.");
} catch (IOException e) {
LOG.error("[RabbitMQ] An error occurred during the channel creation.");
e.printStackTrace();
}

// Check to see if the exchange exists. If it doesn't then try to declare it.
// If it does exist then just move along.
Channel dummy_channel = null;
try {
this.channel.queueDeclare(this.queue, false, this.durable, false, null);
LOG.info("[RabbitMQ] The queue have been successfully created.");
} catch (IOException e) {
LOG.error("[RabbitMQ] Impossible to declare the queue.");
e.printStackTrace();
// Create a dummy channel.
dummy_channel = this.connection.createChannel();
LOG.info("[RabbitMQ] " + this.exchange + " exists. We'll use it.");

// exchangeDeclarePassive throws IOException -
// the server will raise a 404 channel exception if the named exchange does not exist.
dummy_channel.exchangeDeclarePassive(this.exchange);

// Close the channel
try {
dummy_channel.close();
} catch (Exception e) {
LOG.error("[RabbitMQ] Exception occurred closing the dummy channel.", e);
}
} catch (IOException oe) {
// The exchange doesn't exist. Try to declare it.
try {
LOG.info("[RabbitMQ] " + this.exchange + " does not exist. Will attempt to declare as a direct, durable exchange");
this.channel.exchangeDeclare(this.exchange, "direct", true);
} catch (IOException e) {
LOG.error("[RabbitMQ] An error occurred declaring the exchange." + this.exchange, e);
}
}

if (this.routing_key == "") {
LOG.info("[RabbitMQ] Routing key was not specified. Defaulting to an empty value.");
this.routing_key = "";
} else {
LOG.info("[RabbitMQ] Using routing key " + this.routing_key);
}

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();

if(this.ttl != -1)
if (this.ttl >= 0) {
LOG.info("[RabbitMQ] Setting TTL to " + this.ttl);
builder.expiration(Integer.toString(this.ttl));
} else {
LOG.info("[RabbitMQ] Disabling TTL");
}

this.sendProperties = builder.build();
this.is_initialized = true;
Expand All @@ -113,7 +148,7 @@ public void stop()
try {
this.channel.close();
} catch (TimeoutException e) {
LOG.error("[RabbitMQ] An error occurred while closing the channel.");
LOG.error("[RabbitMQ] Timeout while closing the channel.");
e.printStackTrace();
} catch (IOException e) {
LOG.error("[RabbitMQ] An error occurred.");
Expand All @@ -134,10 +169,10 @@ public void send(Message message)
try {
switch(this.message_format){
case 0:
this.channel.basicPublish("", this.queue, this.sendProperties, message.getMessage().getBytes());
this.channel.basicPublish(this.exchange, this.routing_key, this.sendProperties, message.getMessage().getBytes());
break;
case 1:
this.channel.basicPublish("", this.queue, this.sendProperties, this.formatToJson(message.getFields()).getBytes());
this.channel.basicPublish(this.exchange, this.routing_key, this.sendProperties, this.formatToJson(message.getFields()).getBytes());
break;
}
} catch (JsonProcessingException exception) {
Expand Down