From 3243c9102a808fe624df5ae4a48aed17baba52ee Mon Sep 17 00:00:00 2001 From: Tim Honders Date: Wed, 25 Jan 2017 15:54:20 +0100 Subject: [PATCH] Added messages buffer & messages listener --- .../reactnativerabbitmq/RabbitMqQueue.java | 3 +- lib/Exchange.js | 4 ++ lib/Queue.js | 45 ++++++++++++++----- 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/android/src/main/java/nl/kega/reactnativerabbitmq/RabbitMqQueue.java b/android/src/main/java/nl/kega/reactnativerabbitmq/RabbitMqQueue.java index 22267a5..b01ebe4 100644 --- a/android/src/main/java/nl/kega/reactnativerabbitmq/RabbitMqQueue.java +++ b/android/src/main/java/nl/kega/reactnativerabbitmq/RabbitMqQueue.java @@ -41,7 +41,7 @@ public RabbitMqQueue (ReactApplicationContext context, Channel channel, Readable this.autodelete = (queue_condig.hasKey("autoDelete") ? queue_condig.getBoolean("autoDelete") : false); this.consumer_arguments = (queue_condig.hasKey("consumer_arguments") ? queue_condig.getMap("consumer_arguments") : null); - + Map args = toHashMap(arguments); try { @@ -163,7 +163,6 @@ private Map toHashMap(ReadableMap data){ } else { args.put(key, tmp); } - Log.e("RabbitMqQueue", "***"); break; case String: Log.e("RabbitMqQueue", data.getString(key)); diff --git a/lib/Exchange.js b/lib/Exchange.js index 7ed8f52..42e616f 100644 --- a/lib/Exchange.js +++ b/lib/Exchange.js @@ -27,6 +27,10 @@ export class Exchange { this.callbacks[event] = callback; } + removeon(event){ + delete this.callbacks[event]; + } + publish(message, routing_key = ''){ console.log('[Exchange] Send: ' + message + ' To: ' + this.name); diff --git a/lib/Queue.js b/lib/Queue.js index a73d5a0..f0ef710 100644 --- a/lib/Queue.js +++ b/lib/Queue.js @@ -11,21 +11,54 @@ export class Queue { this.queue_condig = queue_condig; this.arguments = args || {}; + this.message_buffer = []; + this.message_buffer_delay = (queue_condig.buffer_delay ? queue_condig.buffer_delay : 1000); + this.message_buffer_timeout = null; + DeviceEventEmitter.addListener('RabbitMqQueueEvent', this.handleEvent.bind(this)); this.rabbitmqconnection.addQueue(queue_condig, this.arguments); } handleEvent(event){ - if (event.queue_name == this.name && this.callbacks.hasOwnProperty(event.name)){ - this.callbacks[event.name](event) + + if (event.queue_name != this.name){ return; } + + if (event.name == 'message'){ + + if (this.callbacks.hasOwnProperty(event.name)){ + this.callbacks['message'](event); + } + + if (this.callbacks.hasOwnProperty('messages')){ + + this.message_buffer.push(event); + + clearTimeout(this.message_buffer_timeout); + + this.message_buffer_timeout = setTimeout(() => { + if (this.message_buffer.length > 0){ + this.callbacks['messages'](this.message_buffer); + this.message_buffer = []; + } + }, this.message_buffer_delay); + + } + + }else if (this.callbacks.hasOwnProperty(event.name)){ + this.callbacks[event.name](event); } + } on(event, callback){ this.callbacks[event] = callback; } + removeon(event){ + delete this.callbacks[event]; + } + bind(exchange, routing_key = ''){ console.log('[Queue] Bind'); this.rabbitmqconnection.bindQueue(exchange.name, this.name, routing_key); @@ -37,14 +70,6 @@ export class Queue { this.rabbitmqconnection.unbindQueue(exchange.name, this.name, routing_key); } - /* - publish(message, routing_key){ - - console.log('[Queue] Send: ' + message + ' To: ' + this.name); - - this.rabbitmqconnection.publishToQueue(message, this.exchange_condig.name, routing_key); - } - */ } export default Queue; \ No newline at end of file