Skip to content

Commit

Permalink
new version
Browse files Browse the repository at this point in the history
  • Loading branch information
stongo committed Jul 28, 2015
1 parent e380893 commit 460f38a
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 437 deletions.
45 changes: 13 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
# nsquishy

A **[Hapi](http://github.com/hapijs/hapijis)** plugin that creates a wrapper of **[nsqjs](https://github.com/dudleycarr/nsqjs)** to simplify microservice workers using NSQ
A wrapper of **[nsqjs](https://github.com/dudleycarr/nsqjs)** to simplify microservice workers using NSQ

## Example

```
var Hapi = require('hapi');
var server = new Hapi.Server();
server.connection();
var Nsquishy = require('nsquishy');
// specify nsqlookupd host yourself
var options = {
Expand Down Expand Up @@ -39,47 +36,35 @@ var optionsNoTopic = {
nsqdPort: '4150'
}
server.register([
{
register: require('nsquishy'),
options: options
//options: optionsEtcd
//options: optionsNsqd
//options: optiionsNoTopic
}
], function (err) {
var nsquishy = new Nsquishy(options)
nsquishy.squish(function (err) {
if (err) {
console.error('Failed to load a plugin:', err);
throw err;
}
server.app.nsqReader.init(function (err, callback) {
nsquishy.nsqReader.init(function (err, callback) {
if (err) {
throw err;
}
server.app.nsqReader.on('message', function(msg) {
nsquishy.nsqReader.on('message', function(msg) {
console.log('received message: %j', msg);
});
});
server.app.nsqWriter.init(function (err, callback) {
nsquishy.nsqWriter.init(function (err, callback) {
if (err) {
throw err;
}
setInterval(function () {
server.app.nsqWriter.publish('test', 'hello world');
nsquishy.nsqWriter.publish('test', 'hello world');
}, 30000);
});
});
server.start(function () {
console.log('Server running at:', server.info.uri);
});
```

## Plugin Options
## Options

The following options are available when registering the plugin
The following options are available when instantiating Nsquishy

### nsqlookupd

Expand All @@ -105,7 +90,7 @@ The following options are available when registering the plugin

## Writer

On registration, `server.app.nsqWriter` is assigned an initialized instance of a **[nsqjs writer](https://github.com/dudleycarr/nsqjs)**
On calling `nsquishy.squish()`, `nsquishy.nsqWriter` is assigned an initialized instance of a **[nsqjs writer](https://github.com/dudleycarr/nsqjs)**

### Methods:

Expand All @@ -115,14 +100,10 @@ See nsqjs for full writer documentation

## Reader

On registration, `server.app.nsqReader` is assigned an initialized instance of a **[nsqjs reader](https://github.com/dudleycarr/nsqjs)**
On calling `nsquishy.squish()`, `nsquishy.nsqReader` is assigned an initialized instance of a **[nsqjs reader](https://github.com/dudleycarr/nsqjs)**

### Methods:

* `init` - initializes connection to nsq. Callback fires on `nsqd_connected` event. Automatically handles `err` and `nsqd_connected` events

See nsqjs for full reader documentation

## Testing

Install https://github.com/stongo/nsq-vagrant to run tests and develop your workers
263 changes: 253 additions & 10 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,263 @@
var Nsquishy = require('./lib');
var nsq = require('nsqjs');
var Wreck = require('wreck');
var Utils = require('hoek');
var Schema = require('./schema');

exports.register = function (server, options, next) {
// Declare internals

var nsquishy = new Nsquishy(options);
nsquishy.squish(function squishCallback (err) {
var internals = {};

// Defaults

internals.defaults = {
topic: null,
channel: null,
nsqlookupd: [],
nsqlookupdHttpProtocol: 'http://',
etcd: null,
etcdNsqlookupdKey: null,
nsqdHost: null,
nsqdPort: null,
readerOptions: {},
writerOptions: {}
};

// Helpers to start and log nsq reader and writer

nsq.Reader.prototype.init = function (callback) {

this.connect();

this.on('nsqd_connected', function _nsqConect () {
console.log(['nsq', 'startup'], 'nsq reader connection established');
callback(null);
});

this.on('error', function _nsqError (err) {
throw err;
});

this.on('nsqd_closed', function _nsqClosed () {
throw new Error('nsqd closed connection');
});
};

nsq.Writer.prototype.init = function (callback) {

this.connect();

this.on('ready', function _nsqReady () {
console.log(['nsq', 'startup'], 'nsq writer connection established');
callback(null);
});

this.on('error', function _nsqWriterError (err) {
throw err;
});

this.on('closed', function _nsqWriterClosed () {
throw new Error('nsqd closed connection');
});
};

internals.getNsqlookupd = function (settings, cb) {

// If nsqlookupd registered in etcd
if (settings.nsqlookupd && settings.nsqlookupd.length === 0 && settings.etcd && settings.etcdNsqlookupdKey) {

this._url = [settings.etcd, '/v2/keys', settings.etcdNsqlookupdKey].join('');

Wreck.get(this._url, function (err, res, payload) {

if (err) {
return cb(err);
}

var body;
try {
body = JSON.parse(payload);
}
catch (e) {
return cb('error parsing etcd payload: ' + e);
}

if (!body.node.nodes || body.node.nodes.length === 0) {
return cb('etcd key not set');
}

var addresses = body.node.nodes.map(function _mapCallback (node) {
return node.value;
});

cb(null, addresses);
});
}
else {

return cb('cannot determine nsqlookupd location');
}
};

internals.getNsqd = function (nsqlookupd, settings, cb) {

// Fetch random nsqd if not specified in setings

if (!nsqlookupd) {
throw new Error('nsqlookupd needed to lookup nsqd');
}

var url = internals._urlNsqloolkupdAll = [settings.nsqlookupdHttpProtocol, nsqlookupd, '/nodes'].join('');

if (settings.topic) {
url = [settings.nsqlookupdHttpProtocol, nsqlookupd, '/lookup?topic=', settings.topic].join('');
}

Wreck.get(url, function (err, res, payload) {

if (err) {
return next(err);
return cb(err);
}

server.app.nsquishy = nsquishy;
console.log('server.app.nsquishy: %j', Object.keys(server.app.nsquishy));
next();
var body;
try {
body = JSON.parse(payload);
}
catch (e) {
return cb('error parsing nsqlookupd payload: ' + e);
}

// If topic specific nsqd search fails, get a list of all nsqd producers
if (settings.topic && !body.data || body.data.producers.length === 0) {
Wreck.get(internals._urlNsqloolkupdAll, function (err, res, payload) {

if (err) {
return cb(err);
}
var body;
try {
body = JSON.parse(payload);
}
catch (e) {
return cb('error parsing nsqlookupd payload: ' + e);
}

return cb(null, body.data.producers[Math.floor(Math.random()*body.data.producers.length)]);
});
}
else {
// return a random nsqd
return cb(null, body.data.producers[Math.floor(Math.random()*body.data.producers.length)]);
}
});
};

exports.register.attributes = {
pkg: require('../package.json')
function Nsquishy (options) {

if (!options) {
throw new Error('Nsquishy must be instantiated with a valid "options" object');
}

var self = this;

// Init reader and writer

return Schema.validate(options, function (err) {

if (err) {
throw err;
}

self.settings = Utils.applyToDefaults(internals.defaults, options);
});
}

Nsquishy.prototype.squish = function (next) {

var settings = this.settings;
var self = this;

internals.initNsqlookupd = function(addresses, settings, cb) {

var nsqOptions = {};

if (settings.topic && settings.channel) {
delete settings.readerOptions.nsqdTCPAddresses;
settings.readerOptions.lookupdHTTPAddresses = addresses;
self.nsqReader = new nsq.Writer(settings.topic, settings.channel, settings.readerOptions);
}
else {
console.log(['nsq', 'warn'], 'nsq reader not loaded - please set topic and channel to load reader');
}

var address = addresses[Math.floor(Math.random() * addresses.length)];

return internals.getNsqd(address, settings, function (err, producer) {

console.log(['nsq', 'writer'], 'creating writer for: ', producer);

if (err) {
return cb(err);
}

if (!producer) {
return cb(new Error('no nsqd instance found'));
}

self.nsqWriter = new nsq.Writer(producer.broadcast_address, producer.tcp_port, settings.writerOptions);

return cb(null);
});
};

// if setings.nsqlookups specified as string, convert to array
if (settings.nsqlookupd && !Array.isArray(settings.nsqlookupd) && settings.nsqlookupd.length !== 0) {
settings.nsqlookupd = [settings.nsqlookupd];
}

// Use nsqd host if specified
if (settings.nsqdHost && settings.nsqdPort) {

if (settings.topic && settings.channel) {
delete settings.readerOptions.lookupdHTTPAddresses;
settings.readerOptions.nsqdTCPAddresses = [settings.nsqdHost, settings.nsqdPort].join(':');
self.nsqReader = new nsq.Reader(settings.topic, settings.channel, settings.readerOptions);
}
else {
console.log(['nsq', 'warn'], 'nsq reader not loaded - please set topic and channel to load reader');
}
self.nsqWriter = new nsq.Writer(settings.nsqdHost, settings.nsqdPort, settings.writerOptions);

next();
}

// Use provided nsqdlookupd setting
else if (settings.nsqlookupd && settings.nsqlookupd.length !== 0) {
internals.initNsqlookupd(settings.nsqlookupd, settings, function (err) {

if (err) {
return next(err);
}
next();
});
}

//Fetch nsqlookupd host from etcd
else {
internals.getNsqlookupd(settings, function (err, address) {

if (err) {
return next(err);
}

internals.initNsqlookupd(address, settings, function (err) {

if (err) {
next(err);
}
next();
});
});
}
};

module.exports = Nsquishy;
Loading

0 comments on commit 460f38a

Please sign in to comment.