diff --git a/index.js b/index.js index 9ee7b7f..f9bfeb3 100644 --- a/index.js +++ b/index.js @@ -17,6 +17,9 @@ Metroplex.server = function server(primus, options) { primus.on('connection', function connection(spark) { metroplex.connect(spark); + spark.on('incoming::ping', function heartbeat() { + metroplex.heartbeat(spark); + }); }).on('disconnection', function disconnection(spark) { metroplex.disconnect(spark); }).on('close', function close(options, next) { diff --git a/metroplex.js b/metroplex.js index abaab89..bb867a7 100644 --- a/metroplex.js +++ b/metroplex.js @@ -16,10 +16,10 @@ Leverage.scripts = Leverage.scripts.concat( * Add defaults to the supplied options. The following options are available: * * - redis: The Redis instance we should use to store data - * - namespace: The namespace prefix to prevent collision's. - * - interval: Expire interval to keep the server alive in Redis - * - timeout: Timeout for sparks who are alive. - * - latency: Time it takes for our Redis commands to execute. + * - namespace: The namespace prefix to prevent collisions (default: 'metroplex'). + * - interval: Expire interval to keep the server alive in Redis (default: 5 minutes) + * - latency: Time it takes for our Redis commands to execute. (default: 500 ms) + * - sparkTimeout: Timeout for sparks who are alive. This should be greater than the client ping interval. (default: 1 minute) * * @param {Primus} primus The Primus instance that received the plugin. * @param {Object} options Configuration. @@ -39,8 +39,8 @@ function Metroplex(primus, options) { this.redis = options.redis || require('redis').createClient(); this.namespace = (options.namespace || 'metroplex') +':'; this.interval = options.interval || 5 * 60 * 1000; - this.timeout = options.timeout || 30 * 60; - this.latency = options.latency || 2000; + this.sparkTimeout = options.sparkTimeout || 60 * 1000; + this.latency = options.latency || 500; this.leverage = new Leverage(this.redis, { namespace: this.namespace }); @@ -92,32 +92,16 @@ Metroplex.readable('parse', function parse(server) { Metroplex.readable('register', function register(address, fn) { var metroplex = this; - metroplex.address = this.parse(address || metroplex.address); + metroplex.address = metroplex.parse(address || metroplex.address); if (!metroplex.address) { if (fn) fn(); return this; } - metroplex.leverage.annihilate(metroplex.address, function annihilate(err) { - if (err) { - if (fn) return fn(err); - return metroplex.emit('error', err); - } - - metroplex.redis.multi() - .setex(metroplex.namespace + metroplex.address, metroplex.interval, Date.now()) - .sadd(metroplex.namespace +'servers', metroplex.address) - .exec(function register(err) { - if (err) { - if (fn) return fn(err); - return metroplex.emit('error', err); - } - - metroplex.emit('register', metroplex.address); - metroplex.setInterval(); - - if (fn) fn(err, metroplex.address); - }); + metroplex.redis.psetex(metroplex.namespace + 'server:' + metroplex.address, metroplex.interval, Date.now(), function(err, result) { + metroplex.emit('register', metroplex.address); + metroplex.setInterval(); + if (fn) fn(err, metroplex.address); }); }); @@ -132,23 +116,17 @@ Metroplex.readable('register', function register(address, fn) { Metroplex.readable('unregister', function unregister(address, fn) { var metroplex = this; - address = this.parse(address || metroplex.address); + address = metroplex.parse(address || metroplex.address); if (!metroplex.address) { if (fn) fn(); return this; } - metroplex.leverage.annihilate(address, function annihilate(err) { - if (err) { - if (fn) return fn(err); - return metroplex.emit('error', err); - } + metroplex.redis.del(metroplex.namespace + 'server:' + metroplex.address); + metroplex.emit('unregister', address); - metroplex.emit('unregister', address); - - if (metroplex.timer) clearInterval(metroplex.timer); - if (fn) fn(err, address); - }); + if (metroplex.timer) clearInterval(metroplex.timer); + if (fn) fn(null, address); return this; }); @@ -161,11 +139,7 @@ Metroplex.readable('unregister', function unregister(address, fn) { * @api public */ Metroplex.readable('connect', function connect(spark) { - this.redis.multi() - .hset(this.namespace +'sparks', spark.id, this.address) - .sadd(this.namespace + this.address +':sparks', spark.id) - .exec(); - + this.redis.psetex(this.namespace + 'spark:' + spark.id, this.sparkTimeout, this.address); return this; }); @@ -177,11 +151,7 @@ Metroplex.readable('connect', function connect(spark) { * @api public */ Metroplex.readable('disconnect', function disconnect(spark) { - this.redis.multi() - .hdel(this.namespace +'sparks', spark.id) - .srem(this.namespace + this.address +':sparks', spark.id) - .exec(); - + this.redis.del(this.namespace + 'spark:' + spark.id); return this; }); @@ -192,18 +162,13 @@ Metroplex.readable('disconnect', function disconnect(spark) { * @returns {Metroplex} * @api public */ -Metroplex.readable('servers', function servers(self, fn) { +Metroplex.readable('servers', function servers(fn) { var metroplex = this; - if ('boolean' !== typeof self) { - fn = self; - self = 0; - } - - this.redis.smembers(this.namespace +'servers', function smembers(err, members) { - if (self) return fn(err, members); - - fn(err, (members || []).filter(function filter(address) { + metroplex.redis.keys(metroplex.namespace + 'server:*', function keyList(err, list) { + fn(err, (list || []).map(function(key) { + return key.replace(metroplex.namespace + 'server:', ''); + }).filter(function filter(address) { return address !== metroplex.address; })); }); @@ -211,6 +176,18 @@ Metroplex.readable('servers', function servers(self, fn) { return this; }); +/** + * Reset the time to live for a registered spark. + * + * @param {Spark} spark The connection/spark from Primus. + * @returns {Metroplex} + * @api private +**/ +Metroplex.readable('heartbeat', function heartbeat(spark) { + this.redis.psetex(this.namespace + 'spark:' + spark.id, this.sparkTimeout, this.address); + return this; +}); + /** * Get the server address for a given spark id. * @@ -220,23 +197,23 @@ Metroplex.readable('servers', function servers(self, fn) { * @api public */ Metroplex.readable('spark', function spark(id, fn) { - this.redis.hget(this.namespace +'sparks', id, fn); + this.redis.get(this.namespace + 'spark:' + id, fn); return this; }); /** * Get all server addresses for the given spark ids. * - * @param {Array} args The spark id's we need to look up + * @param {Array} ids The spark id's we need to look up * @param {Function} fn Callback. * @returns {Metroplex} * @api public */ -Metroplex.readable('sparks', function sparks(args, fn) { - args.push(fn); - args.shift(this.namespace +'sparks'); - this.redis.hmget.apply(this.redis, args); - +Metroplex.readable('sparks', function sparks(ids, fn) { + var metroplex = this; + metroplex.leverage.multiget('spark:', ids, function(err, result) { + fn(err, JSON.parse(result)); + }); return this; }); @@ -252,31 +229,12 @@ Metroplex.readable('sparks', function sparks(args, fn) { Metroplex.readable('setInterval', function setIntervals() { if (this.timer) clearInterval(this.timer); - var alive = this.namespace + this.address +':alive' - , redis = this.redis + var redis = this.redis , metroplex = this; - this.timer = setInterval(function interval() { - // - // Redis expects the expire value in seconds instead of milliseconds so we - // need to correct our interval. - // - redis.setex(alive, metroplex.interval / 1000, Date.now()); - - metroplex.servers(function servers(err, list) { - if (err) return metroplex.emit('error', err); - - list.forEach(function expired(address) { - redis.get(metroplex.namespace + address, function get(err, stamp) { - if (err || Date.now() - +stamp < metroplex.interval) return; - - metroplex.leverage.annihilate(address, function murdered(err) { - if (err) return metroplex.emit('error', err); - }); - }); - }); - }); - }, this.interval - this.latency); + metroplex.timer = setInterval(function interval() { + redis.psetex(metroplex.namespace + 'server:' + metroplex.address, metroplex.interval, Date.now()); + }, metroplex.interval - metroplex.latency); }); // diff --git a/redis/annihilate.lua b/redis/annihilate.lua deleted file mode 100644 index 0d33fa6..0000000 --- a/redis/annihilate.lua +++ /dev/null @@ -1,32 +0,0 @@ --- --- Gather all the information. --- -local namespace = '{leverage::namespace}' -local address = assert(KEYS[1], 'The server address is missing') - --- --- Get all the sparks for our given address so we can nuke them from our "global" --- spark registry> --- -local sparks = redis.call('SMEMBERS', namespace .. address ..':sparks') - --- --- Iterate over all the sparks in our collection and completely nuke every spark --- which is connected on the given server address as it's dead. --- -for i = 1, #sparks do - redis.call('HDEL', namespace ..'sparks', sparks[i]) -end - --- --- Delete all left over references to this server address which are: --- --- 1. Our dedicated sparks set --- 2. Our server in the servers list --- 3. The keep alive server update --- -redis.call('DEL', namespace .. address ..':sparks'); -redis.call('SREM', namespace ..'servers', address); -redis.call('DEL', namespace .. address); - -return 1 diff --git a/redis/multiget.lua b/redis/multiget.lua new file mode 100644 index 0000000..8e173ca --- /dev/null +++ b/redis/multiget.lua @@ -0,0 +1,10 @@ +local namespace = '{leverage::namespace}' +local prefix = KEYS[1] +local keys = KEYS[2] +local result = { } + +for key in string.gmatch(keys, '([^,]+)') do + result[key] = redis.call('GET', namespace .. prefix .. key) +end + +return cjson.encode(result) diff --git a/test/integration.test.js b/test/integration.test.js index 4e93ca5..d5dd9c5 100644 --- a/test/integration.test.js +++ b/test/integration.test.js @@ -55,11 +55,11 @@ describe('plugin', function () { it('has added server to redis after the register event', function (next) { server.use('metroplex', metroplex); server.once('register', function (address) { - redis.smembers('metroplex:servers', function (err, servers) { + redis.keys('metroplex:server:*', function (err, servers) { if (err) return next(err); assume(servers).to.be.a('array'); - assume(!!~servers.indexOf(address)).to.be.true(); + assume(!!~servers.indexOf('metroplex:server:' + address)).to.be.true(); next(); }); }); @@ -73,7 +73,7 @@ describe('plugin', function () { server.once('unregister', function (address) { assume(address).to.equal(addr); - redis.smembers('metroplex:servers', function (err, servers) { + redis.get('metroplex:server:' + addr, function (err, servers) { if (err) return next(err); assume(!!~servers).to.be.true(); @@ -91,71 +91,150 @@ describe('plugin', function () { }); }); - it('stores and removes the spark in the sparks hash', function (next) { + it('stores and removes the spark', function (next) { server.use('metroplex', metroplex); var client = server.Socket('http://localhost:'+ http.port); client.id(function (id) { - redis.hget('metroplex:sparks', id, function canihas(err, address) { + redis.get('metroplex:spark:' + id, function(err, address) { if (err) return next(err); - assume(address).to.contain(http.port); client.end(); }); }); server.once('disconnection', function (spark) { - redis.hget('metroplex:sparks', spark.id, function rmshit(err, address) { + redis.get('metroplex:spark:' + spark.id, function(err, address) { if (err) return next(err); - assume(!address).to.be.true(); next(); }); }); }); - it('also stores the spark under the server address', function (next) { + it('generates address only once the server is started', function (next) { + var http = require('http').createServer() + , primus = new Primus(http, { redis: redis }) + , portnumber = port++; + + primus.use('metroplex', metroplex); + assume(primus.metroplex.address).to.be.falsey(); + + http.once('listening', function () { + assume(primus.metroplex.address).to.contain(portnumber); + next(); + }); + + http.listen(portnumber); + }); + + it('updates the spark TTL when on connection heartbeats', function(next) { server.use('metroplex', metroplex); + var client = server.Socket('http://localhost:'+ http.port); - var client = server.Socket(server.metroplex.address); + client.id(function(id) { + // set the spark TTL to 500 ms + redis.pexpire('metroplex:spark:' + id, 500, function(err) { + if(err) return next(err); - client.id(function (id) { - redis.smembers('metroplex:'+ server.metroplex.address +':sparks', function (err, sparks) { - if (err) return next(err); + // ping the server + client.write('primus::ping::' + (+new Date())); - assume(sparks).is.a('array'); - assume(id).to.equal(sparks[0]); + // wait for the ping to be received + server.spark(id).once('incoming::ping', function() { - client.end(); + // fetch the TTL again + redis.pttl('metroplex:spark:' + id, function(err, ttlAfter) { + if(err) return next(err); + + // ensure that the TTL has been reset + assume(ttlAfter).is.greaterThan(500); + client.end(); + next(); + }); + }); }); }); + }); - server.once('disconnection', function (spark) { - redis.smembers('metroplex:'+ server.metroplex.address +':sparks', function (err, sparks) { - if (err) return next(err); - - assume(sparks).is.a('array'); - assume(!~sparks.indexOf(spark.id)).to.be.true(); + it('finds the server for a spark', function(next) { + server.use('metroplex', metroplex); + var client = server.Socket('http://localhost:'+ http.port); + client.id(function(id) { + server.metroplex.spark(id, function(err, address) { + if(err) return next(err); + assume(address).equals(server.metroplex.address); next(); }); }); }); - it('generates address only once the server is started', function (next) { - var http = require('http').createServer() - , primus = new Primus(http, { redis: redis }) - , portnumber = port++; - - primus.use('metroplex', metroplex); - assume(primus.metroplex.address).to.be.falsey(); + it('finds servers for a list of sparks', function(next) { + server.use('metroplex', metroplex); + var clients = [], + numClients = 5; + + for(var i = 0; i < numClients; i++) { + server.Socket('http://localhost:'+ http.port).id(function(id) { + clients.push(id); + if(clients.length == numClients) { + server.metroplex.sparks(clients, function(err, addresses) { + if(err) return next(err); + assume(Object.keys(addresses).length).equals(clients.length); + for(var id in addresses) { + assume(addresses[id]).equals(server.metroplex.address); + } + + next(); + }); + } + }); + } + }); - http.once('listening', function () { - assume(primus.metroplex.address).to.contain(portnumber); - next(); + it('resets server TTL periodically', function(next) { + server.use('metroplex', metroplex); + server.once('register', function() { + redis.pexpire('metroplex:server:' + server.metroplex.address, 500, function(err) { + if(err) return next(error); + + // force the timer to fire every 1ms + server.metroplex.latency = server.metroplex.interval - 1; + server.metroplex.setInterval(); + setTimeout(function() { + redis.pttl('metroplex:server:' + server.metroplex.address, function(err, ttl) { + if(err) return next(err); + assume(ttl).is.greaterThan(500); + next(); + }); + }, 5); + }); }); + }); - http.listen(portnumber); + it('finds a list of active servers', function(next) { + server.use('metroplex', metroplex); + server.once('register', function(address) { + server2.use('metroplex', metroplex); + + server2.once('register', function(address2) { + server.metroplex.servers(function(err, list) { + if(err) return next(err); + // ensure server1 reports only server2 + assume(list.length).equals(1); + assume(list[0]).equals(address2); + + server2.metroplex.servers(function(err, list2) { + if(err) return next(err); + // ensure server2 reports only server1 + assume(list2.length).equals(1); + assume(list2[0]).equals(address); + next(); + }); + }); + }); + }); }); });