diff --git a/README.md b/README.md index b2b6d87..f53cb6d 100644 --- a/README.md +++ b/README.md @@ -30,14 +30,16 @@ Opens all server connections. #### close() Closes all server connections. -#### set(queue, value, lifetime) +#### set(queue, value, lifetime, callback) Send sthe Kestrel `SET` command `queue` is the string name of the queue you wish to append to. `value` is the string message that you wish to append to `queue`. -`lifetime` is an interger value to represent the TTL of the message in seconds. +`lifetime` is an integer value to represent the TTL of the message in seconds. + +`callback` when specified is being called once kestrel responded with STORED. No other set calls are possible until the callback has been called though. #### get(queue, timeout) Tries to get a message from `queue` @@ -96,7 +98,9 @@ Appends a message onto the queue provided in the constructor `message` string message to append to the queue -`lifetime` is the TTL for the message in seconds. +`lifetime` is the TTL for the message in seconds (OPTIONAL). + +`callback` when specified is being called once kestrel responded with STORED. No other set calls are possible until the callback has been called though. #### close() Closes all server connections. diff --git a/examples/producer.js b/examples/producer.js index c94031f..91efc72 100644 --- a/examples/producer.js +++ b/examples/producer.js @@ -16,7 +16,13 @@ producer.on('stored', function(stored){ //lets input some data var interval = setInterval( function(){ - producer.send( (new Date().getTime()) + ' - New Message' ); + producer.send( (new Date().getTime()) + ' - New Message', function(err){ + if(err){ + console.log("ERR",err); + } else { + console.log("STORED"); + } + } ); }, 100); diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index 579e5f5..cf894a1 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -1,3 +1,5 @@ +'use strict'; + var ee2 = require('eventemitter2').EventEmitter2; var net = require('net'); var util = require('util'); @@ -5,308 +7,331 @@ var types = require('./connectionType.js'); var kestrel = function( options ){ - this._settings = { - servers: ['127.0.0.1:22133'], - connectionType: types.ROUND_ROBIN - } - this._currentConnection = 0; - this._connections = []; - - if( options instanceof Object ){ - for( var key in options ){ - if( this._settings[key] != undefined ){ - this._settings[key] = options[key]; - } - } + this._settings = { + servers: ['127.0.0.1:22133'], + connectionType: types.ROUND_ROBIN + }; + this._currentConnection = 0; + this._connections = []; + + if( options instanceof Object ){ + for( var key in options ){ + if( this._settings[key] !== undefined ){ + this._settings[key] = options[key]; + } } + } + this._pendingSetCallback = null; + + ee2.call(this); +}; - ee2.call(this); -} util.inherits(kestrel,ee2); //open connections to kestrel server(s) kestrel.prototype.connect = function(){ - var type = this._settings.connectionType; + var type = this._settings.connectionType; - if( types[type] == undefined || types[type] == null ){ - throw "Kestrel Client Connection: Argument 1 Must Be A Valid Kestrel Connection Type"; - } + if( types[type] === undefined || types[type] === null ){ + throw 'Kestrel Client Connection: Argument 1 Must Be A Valid Kestrel Connection Type'; + } + + var parts,port,host; - switch( type ){ + switch( type ){ case types.RANDOM: case types.ROUND_ROBIN: - for(var i in this._settings.servers){ - var parts = this._settings.servers[i].split(':'); - var port = (parts.length>1)?parts[1]:'22133'; - var host = parts[0]; - this._connections.push( _createConnection(port,host,this) ); - } - break; + for(var i in this._settings.servers){ + parts = this._settings.servers[i].split(':'); + port = (parts.length>1)?parts[1]:'22133'; + host = parts[0]; + this._connections.push( _createConnection(port,host,this) ); + } + break; case types.FAILOVER: - var rand = Math.floor( Math.random()*this._settings.servers.length ); - var parts = this._settings.servers[rand].split(':'); - var port = (parts.length>1)?parts[1]:'22133'; - var host = parts[0]; - this._connections.push( _createConnection(port,host,this) ); - break; + var rand = Math.floor( Math.random()*this._settings.servers.length ); + parts = this._settings.servers[rand].split(':'); + port = (parts.length>1)?parts[1]:'22133'; + host = parts[0]; + this._connections.push( _createConnection(port,host,this) ); + break; default: - throw "Kestrel Client Connection: Unknown Connection Type"; - break; - } - -} + throw 'Kestrel Client Connection: Unknown Connection Type'; + } +}; function _createConnection(port, host, self){ - var connection = net.connect(port,host); + var connection = net.connect(port,host); - connection.on('error', function(err){ - console.dir(err); - }); - connection.on('data', function(data){ - _handleData(data,self); - self.emit('data', data); - }); + connection.on('error', function(err){ + console.dir(err); + }); + connection.on('data', function(data){ + _handleData(data,self); + self.emit('data', data); + }); - return connection; + return connection; } function _handleData(data, self){ - data = data.toString(); - - if( data.match('STORED\r\n') != null ){ - self.emit('stored', true); - } - - if( data.match(/^VALUE/) != null ){ - _handleGet(data,self); - } - - if( data.match(/^STAT/) != null ){ - _handleStats(data,self); - } - - if( data.match(/^queue/) != null ){ - _handleDumpStats(data, self); - } - - if( data == 'DELETED\r\n' ){ - self.emit('deleted', true); + data = data.toString(); + + if( data.match('STORED\r\n') ){ + self.emit('stored', true); + if(self._pendingSetCallback){ + var callback = self._pendingSetCallback; + self._pendingSetCallback = null; + callback(null); } - - if( data.match(/^VERSION/) != null ){ - self.emit('version', data.split(' ')[1].replace('\r\n','')); - } - - if( data == 'Reloaded config.\r\n' ){ - self.emit('reloaded', true); - } - - - if( data == 'END\r\n' ){ - self.emit('empty', null); - } - + } + + if( data.match(/^VALUE/) ){ + _handleGet(data,self); + } + + if( data.match(/^STAT/) ){ + _handleStats(data,self); + } + + if( data.match(/^queue/) ){ + _handleDumpStats(data, self); + } + + if( data == 'DELETED\r\n' ){ + self.emit('deleted', true); + } + + if( data.match(/^VERSION/) ){ + self.emit('version', data.split(' ')[1].replace('\r\n','')); + } + + if( data == 'Reloaded config.\r\n' ){ + self.emit('reloaded', true); + } + + + if( data == 'END\r\n' ){ + self.emit('empty', null); + } + } function _handleGet(data, self){ - var msg = {}; - - var parts = data.split('\r\n'); - msg.queue = parts[0].match(/([a-zA-Z_]+)/mg)[1]; - msg.data = parts[1].replace('\r\n',''); - - self.emit('message', msg); + var msg = {}; + + var parts = data.split('\r\n'); + msg.queue = parts[0].match(/([a-zA-Z_]+)/mg)[1]; + msg.data = parts[1].replace('\r\n',''); + + self.emit('message', msg); } function _handleStats(data, self){ - var stats = {}; + var stats = {}; - var parts = data.match(/STAT ([a-zA-Z_]+) (.*?)\r\n/g); - - for( var i in parts ){ - var part = parts[i].split(' '); - stats[part[1]] = part[2].replace('\r\n', ''); - } + var parts = data.match(/STAT ([a-zA-Z_]+) (.*?)\r\n/g); + for( var i in parts ){ + var part = parts[i].split(' '); + stats[part[1]] = part[2].replace('\r\n', ''); + } - self.emit('stats', stats); + self.emit('stats', stats); } function _handleDumpStats(data, self){ - var stats = {}; - - var parts = data.replace('\r\n}\r\nEND','').split('}\r\n'); - - for( var i in parts ){ - var part = parts[i].split('\r\n'); - var queue = part[0].match(/[a-zA-Z_]+/g)[1]; - stats[queue] = {}; - for( var lcv = 1; lcv < part.length-1; ++lcv ){ - var p = part[lcv].split('='); - stats[queue][ p[0].replace(/ /g,'') ] = p[1]; - } + var stats = {}; + + var parts = data.replace('\r\n}\r\nEND','').split('}\r\n'); + + for( var i in parts ){ + var part = parts[i].split('\r\n'); + var queue = part[0].match(/[a-zA-Z_]+/g)[1]; + stats[queue] = {}; + for( var lcv = 1; lcv < part.length-1; ++lcv ){ + var p = part[lcv].split('='); + stats[queue][ p[0].replace(/ /g,'') ] = p[1]; } + } - self.emit('dump_stats', stats); + self.emit('dump_stats', stats); } kestrel.prototype._getConnection = function(){ - if( this._connections.length == 1 ){ - return this._connections[0]; - }else if( this._connections.length == 0 ){ - return null; - } + if( this._connections.length == 1 ) { + return this._connections[0]; + } else if( this._connections.length === 0 ) { + return null; + } - var connection = null; + var connection = null; - switch(this._settings.connectionType){ + switch(this._settings.connectionType) { case types.RANDOM: - var rand = Math.floor( Math.random()*this._connections.length ); - connection = this._connections[rand]; - break; + var rand = Math.floor( Math.random()*this._connections.length ); + connection = this._connections[rand]; + break; case types.ROUND_ROBIN: - this._currentConnection += 1; - if( this._currentConnection > this._connections.length ){ - this._currentConnection = 0; - } - connection = this._connections[this._currentConnection]; - break; + this._currentConnection += 1; + if( this._currentConnection > this._connections.length ){ + this._currentConnection = 0; + } + connection = this._connections[this._currentConnection]; + break; case types.FAILOVER: - connection = this._connections[0]; - break; - } + connection = this._connections[0]; + break; + } - return connection; -} + return connection; +}; kestrel.prototype.close = function(){ - //close any open connections - for( var i in this._connections ){ - this._connections[i].destroy(); + //close any open connections + for( var i in this._connections ){ + this._connections[i].destroy(); + } +}; + +kestrel.prototype.set = function( queue, value, lifetime, callback){ + if( typeof(lifetime) === 'function' ){ + callback = lifetime; + lifetime = null; + } + if( lifetime === undefined || lifetime === null ){ + lifetime = 0; + } + + if(this._pendingSetCallback && callback){ + return callback('Cannot write again. Still waiting for previous write to be stored'); + } + + var command = 'SET ' + queue + ' 0 ' + lifetime + ' '; + command += Buffer.byteLength(value, 'utf8') + '\r\n' + value + '\r\n'; + + var connection = this._getConnection(); + if( connection ){ + if(callback){ + this._pendingSetCallback = callback; } -} - - -kestrel.prototype.set = function( queue, value, lifetime ){ - if( lifetime == undefined || lifetime == null ){ - lifetime = 0; - } - - var command = "SET " + queue + " 0 " + lifetime + " "; - command += Buffer.byteLength(value, 'utf8') + "\r\n" + value + "\r\n"; - - var connection = this._getConnection(); - if( connection != null ){ - connection.write(command); + connection.write(command); + } else { + if(callback){ + callback('No connection'); + } else { + throw new Error('No connection'); } + } - return this; -} + return this; +}; kestrel.prototype.get = function(queue, timeout){ - var command = "GET " + queue; - - timeout = parseInt(timeout); - if( timeout > 0 ){ - command += "/t="+timeout; - } - - var connection = this._getConnection(); - if( connection != null ){ - connection.write(command + '\r\n'); - } - - return this; -} + var command = 'GET ' + queue; + + timeout = parseInt(timeout,10); + if( timeout > 0 ){ + command += '/t='+timeout; + } + + var connection = this._getConnection(); + if( connection ){ + connection.write(command + '\r\n'); + } + + return this; +}; kestrel.prototype.delete = function(queue){ - //delete given queue - var connection = this._getConnection(); - if( connection != null ){ - connection.write('DELETE ' + queue + '\r\n'); - } + //delete given queue + var connection = this._getConnection(); + if( connection ){ + connection.write('DELETE ' + queue + '\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.flush = function(queue){ - //flush given queue - var connection = this._getConnection(); - if( connection != null ){ - connection.write('FLUSH ' + queue + '\r\n'); - } - - return this; -} + //flush given queue + var connection = this._getConnection(); + if( connection ){ + connection.write('FLUSH ' + queue + '\r\n'); + } + + return this; +}; kestrel.prototype.flushAll = function(){ - //flush all queues - var connection = this._getConnection(); - if( connection != null ){ - connection.write('FLUSH_ALL\r\n'); - } + //flush all queues + var connection = this._getConnection(); + if( connection ){ + connection.write('FLUSH_ALL\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.version = function(){ - //get version of server - var connection = this._getConnection(); - if( connection != null ){ - connection.write('VERSION\r\n'); - } + //get version of server + var connection = this._getConnection(); + if( connection ){ + connection.write('VERSION\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.shutdown = function(){ - //shutdown server - var connection = this._getConnection(); - if( connection != null ){ - connection.write('SHUTDOWN\r\n'); - } + //shutdown server + var connection = this._getConnection(); + if( connection ){ + connection.write('SHUTDOWN\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.reload = function(){ - //reload the server - var connection = this._getConnection(); - if( connection != null ){ - connection.write('RELOAD\r\n'); - } + //reload the server + var connection = this._getConnection(); + if( connection ){ + connection.write('RELOAD\r\n'); + } - return this; -} + return this; +}; -kestrel.prototype.stats = function( callback ){ - //get server stats - var connection = this._getConnection(); - if( connection != null ){ - connection.write('STATS\r\n'); - } +kestrel.prototype.stats = function(){ + //get server stats + var connection = this._getConnection(); + if( connection ){ + connection.write('STATS\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.dumpStats = function(){ - //dump server stats - var connection = this._getConnection(); - if( connection != null ){ - connection.write('DUMP_STATS\r\n'); - } + //dump server stats + var connection = this._getConnection(); + if( connection ){ + connection.write('DUMP_STATS\r\n'); + } + + return this; +}; - return this; -} kestrel.prototype.monitor = function(queue, seconds, maxItems){ //monitor the given queue -} +}; kestrel.prototype.confirm = function(queue, count){ //confirm received items -} +}; module.exports = kestrel; \ No newline at end of file diff --git a/lib/kestrelConsumer.js b/lib/kestrelConsumer.js index 49245b6..4324763 100644 --- a/lib/kestrelConsumer.js +++ b/lib/kestrelConsumer.js @@ -1,62 +1,62 @@ +'use strict'; + var client = require('./kestrelClient.js'); var ee2 = require('eventemitter2').EventEmitter2; var util = require('util'); - var consumer = function( queue, options ){ - this._queue = queue; - this._options = new client(options); - this._client = new client(options); - this._client.connect(); + this._queue = queue; + this._options = new client(options); + this._client = new client(options); + this._client.connect(); - this._callback = null; - this._consume = false; - this._timeout = 3000; + this._callback = null; + this._consume = false; + this._timeout = 3000; - var self = this; - this._client.on('message', function(message){ + var self = this; - if( typeof self._callback == 'function' ){ - self._callback(message); - } + this._client.on('message', function(message){ + if( typeof self._callback == 'function' ){ + self._callback(message); + } - if( self._consume ){ - self.get(self._timeout); - } + if( self._consume ){ + self.get(self._timeout); + } - self.emit('message', message); - }); + self.emit('message', message); + }); - this._client.on('empty', function(){ - self.get(self._timeout); - }); + this._client.on('empty', function(){ + self.get(self._timeout); + }); - ee2.call(this); -} + ee2.call(this); +}; util.inherits(consumer,ee2); - consumer.prototype.get = function( timeout ){ this._client.get(this._queue, timeout); -} +}; consumer.prototype.consume = function( callback ){ - if(!this._client){ - this._client = client(this._options); - this.client.connect(); - } - if( typeof(callback) == 'function' ){ - this._callback = callback; - } - this._consume = true; - this.get(this._timeout); -} + if(!this._client){ + this._client = client(this._options); + this.client.connect(); + } + if( typeof(callback) == 'function' ){ + this._callback = callback; + } + this._consume = true; + this.get(this._timeout); +}; consumer.prototype.stopConsuming = function(){ - this._consume = false; - this._client.close(); - this._client = null; -} + this._consume = false; + this._client.close(); + this._client = null; +}; module.exports = consumer; diff --git a/lib/kestrelProducer.js b/lib/kestrelProducer.js index 8a26de6..4725c41 100644 --- a/lib/kestrelProducer.js +++ b/lib/kestrelProducer.js @@ -1,35 +1,40 @@ +'use strict'; + var ee2 = require('eventemitter2').EventEmitter2; var client = require('./kestrelClient.js'); var util = require('util'); var producer = function( queue, options ){ - this._queue = queue; - this._client = new client(options); - this._client.connect(); + this._queue = queue; + this._client = new client(options); + this._client.connect(); - var self = this; - this._client.on('stored', function(stored){ - self.emit('stored', stored); - }); + var self = this; + this._client.on('stored', function(stored){ + self.emit('stored', stored); + }); - ee2.call(this); -} + ee2.call(this); +}; util.inherits(producer,ee2); +producer.prototype.send = function(message, lifetime, callback){ + if( typeof(lifetime) === 'function' ){ + callback = lifetime; + lifetime = null; + } + if( lifetime === null || lifetime === undefined ){ + lifetime = 0; + } -producer.prototype.send = function(message, lifetime){ - if( lifetime == null || lifetime == undefined ){ - lifetime = 0; - } - lifetime = parseInt(lifetime); + lifetime = parseInt(lifetime,10); - this._client.set(this._queue, message, lifetime); -} + this._client.set(this._queue, message, lifetime, callback); +}; producer.prototype.close = function(){ - this._client.close(); -} - + this._client.close(); +}; module.exports = producer; diff --git a/package.json b/package.json index 2e3c520..0265a98 100644 --- a/package.json +++ b/package.json @@ -7,14 +7,15 @@ , "author": "brett_langdon (http://brett.is)" , "contributors": [ { "name": "brett_langdon", "email": "brett@blangdon.com" }, - { "name": "pablo_casado", "email": "p.casado.arias@gmail.com" } + { "name": "pablo_casado", "email": "p.casado.arias@gmail.com" }, + { "name: "Matthias Goetzke", "email": "m.goetzke@curasystems.com", "url": "https://twitter.com/mgoetzke" }, ] , "repository":{ "type": "git" , "url": "github.com/brettlangdon/node-kestrel.git" } , "dependencies": { - "eventemitter2": ">=0.4.9" + "eventemitter2": ">=0.4.9" } , "main": "./lib/index.js" , "engines": { "node": ">= 0.6.0" }