Skip to content

Commit

Permalink
Added messages buffer & messages listener
Browse files Browse the repository at this point in the history
  • Loading branch information
timhonders committed Jan 25, 2017
1 parent d4cd6b1 commit 3243c91
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public RabbitMqQueue (ReactApplicationContext context, Channel channel, Readable
this.autodelete = (queue_condig.hasKey("autoDelete") ? queue_condig.getBoolean("autoDelete") : false);

this.consumer_arguments = (queue_condig.hasKey("consumer_arguments") ? queue_condig.getMap("consumer_arguments") : null);

Map<String, Object> args = toHashMap(arguments);

try {
Expand Down Expand Up @@ -163,7 +163,6 @@ private Map<String, Object> toHashMap(ReadableMap data){
} else {
args.put(key, tmp);
}
Log.e("RabbitMqQueue", "***");
break;
case String:
Log.e("RabbitMqQueue", data.getString(key));
Expand Down
4 changes: 4 additions & 0 deletions lib/Exchange.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ export class Exchange {
this.callbacks[event] = callback;
}

removeon(event){
delete this.callbacks[event];
}

publish(message, routing_key = ''){

console.log('[Exchange] Send: ' + message + ' To: ' + this.name);
Expand Down
45 changes: 35 additions & 10 deletions lib/Queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,54 @@ export class Queue {
this.queue_condig = queue_condig;
this.arguments = args || {};

this.message_buffer = [];
this.message_buffer_delay = (queue_condig.buffer_delay ? queue_condig.buffer_delay : 1000);
this.message_buffer_timeout = null;

DeviceEventEmitter.addListener('RabbitMqQueueEvent', this.handleEvent.bind(this));

this.rabbitmqconnection.addQueue(queue_condig, this.arguments);
}

handleEvent(event){
if (event.queue_name == this.name && this.callbacks.hasOwnProperty(event.name)){
this.callbacks[event.name](event)

if (event.queue_name != this.name){ return; }

if (event.name == 'message'){

if (this.callbacks.hasOwnProperty(event.name)){
this.callbacks['message'](event);
}

if (this.callbacks.hasOwnProperty('messages')){

this.message_buffer.push(event);

clearTimeout(this.message_buffer_timeout);

this.message_buffer_timeout = setTimeout(() => {
if (this.message_buffer.length > 0){
this.callbacks['messages'](this.message_buffer);
this.message_buffer = [];
}
}, this.message_buffer_delay);

}

}else if (this.callbacks.hasOwnProperty(event.name)){
this.callbacks[event.name](event);
}

}

on(event, callback){
this.callbacks[event] = callback;
}

removeon(event){
delete this.callbacks[event];
}

bind(exchange, routing_key = ''){
console.log('[Queue] Bind');
this.rabbitmqconnection.bindQueue(exchange.name, this.name, routing_key);
Expand All @@ -37,14 +70,6 @@ export class Queue {
this.rabbitmqconnection.unbindQueue(exchange.name, this.name, routing_key);
}

/*
publish(message, routing_key){
console.log('[Queue] Send: ' + message + ' To: ' + this.name);
this.rabbitmqconnection.publishToQueue(message, this.exchange_condig.name, routing_key);
}
*/
}

export default Queue;

0 comments on commit 3243c91

Please sign in to comment.