diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index cd82d54..cf894a1 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -1,3 +1,5 @@ +'use strict'; + var ee2 = require('eventemitter2').EventEmitter2; var net = require('net'); var util = require('util'); @@ -5,331 +7,331 @@ var types = require('./connectionType.js'); var kestrel = function( options ){ - this._settings = { - servers: ['127.0.0.1:22133'], - connectionType: types.ROUND_ROBIN - } - this._currentConnection = 0; - this._connections = []; - - if( options instanceof Object ){ - for( var key in options ){ - if( this._settings[key] != undefined ){ - this._settings[key] = options[key]; - } - } + this._settings = { + servers: ['127.0.0.1:22133'], + connectionType: types.ROUND_ROBIN + }; + this._currentConnection = 0; + this._connections = []; + + if( options instanceof Object ){ + for( var key in options ){ + if( this._settings[key] !== undefined ){ + this._settings[key] = options[key]; + } } + } - this._pendingSetCallback = null; + this._pendingSetCallback = null; + + ee2.call(this); +}; - ee2.call(this); -} util.inherits(kestrel,ee2); //open connections to kestrel server(s) kestrel.prototype.connect = function(){ - var type = this._settings.connectionType; + var type = this._settings.connectionType; - if( types[type] == undefined || types[type] == null ){ - throw "Kestrel Client Connection: Argument 1 Must Be A Valid Kestrel Connection Type"; - } + if( types[type] === undefined || types[type] === null ){ + throw 'Kestrel Client Connection: Argument 1 Must Be A Valid Kestrel Connection Type'; + } + + var parts,port,host; - switch( type ){ + switch( type ){ case types.RANDOM: case types.ROUND_ROBIN: - for(var i in this._settings.servers){ - var parts = this._settings.servers[i].split(':'); - var port = (parts.length>1)?parts[1]:'22133'; - var host = parts[0]; - this._connections.push( _createConnection(port,host,this) ); - } - break; + for(var i in this._settings.servers){ + parts = this._settings.servers[i].split(':'); + port = (parts.length>1)?parts[1]:'22133'; + host = parts[0]; + this._connections.push( _createConnection(port,host,this) ); + } + break; case types.FAILOVER: - var rand = Math.floor( Math.random()*this._settings.servers.length ); - var parts = this._settings.servers[rand].split(':'); - var port = (parts.length>1)?parts[1]:'22133'; - var host = parts[0]; - this._connections.push( _createConnection(port,host,this) ); - break; + var rand = Math.floor( Math.random()*this._settings.servers.length ); + parts = this._settings.servers[rand].split(':'); + port = (parts.length>1)?parts[1]:'22133'; + host = parts[0]; + this._connections.push( _createConnection(port,host,this) ); + break; default: - throw "Kestrel Client Connection: Unknown Connection Type"; - break; - } - -} + throw 'Kestrel Client Connection: Unknown Connection Type'; + } +}; function _createConnection(port, host, self){ - var connection = net.connect(port,host); + var connection = net.connect(port,host); - connection.on('error', function(err){ - console.dir(err); - }); - connection.on('data', function(data){ - _handleData(data,self); - self.emit('data', data); - }); + connection.on('error', function(err){ + console.dir(err); + }); + connection.on('data', function(data){ + _handleData(data,self); + self.emit('data', data); + }); - return connection; + return connection; } function _handleData(data, self){ - data = data.toString(); - - if( data.match('STORED\r\n') != null ){ - self.emit('stored', true); - if(self._pendingSetCallback){ - var callback = self._pendingSetCallback; - self._pendingSetCallback = null; - callback(null); - } - } - - if( data.match(/^VALUE/) != null ){ - _handleGet(data,self); - } - - if( data.match(/^STAT/) != null ){ - _handleStats(data,self); - } - - if( data.match(/^queue/) != null ){ - _handleDumpStats(data, self); - } - - if( data == 'DELETED\r\n' ){ - self.emit('deleted', true); - } - - if( data.match(/^VERSION/) != null ){ - self.emit('version', data.split(' ')[1].replace('\r\n','')); - } - - if( data == 'Reloaded config.\r\n' ){ - self.emit('reloaded', true); + data = data.toString(); + + if( data.match('STORED\r\n') ){ + self.emit('stored', true); + if(self._pendingSetCallback){ + var callback = self._pendingSetCallback; + self._pendingSetCallback = null; + callback(null); } - - - if( data == 'END\r\n' ){ - self.emit('empty', null); - } - + } + + if( data.match(/^VALUE/) ){ + _handleGet(data,self); + } + + if( data.match(/^STAT/) ){ + _handleStats(data,self); + } + + if( data.match(/^queue/) ){ + _handleDumpStats(data, self); + } + + if( data == 'DELETED\r\n' ){ + self.emit('deleted', true); + } + + if( data.match(/^VERSION/) ){ + self.emit('version', data.split(' ')[1].replace('\r\n','')); + } + + if( data == 'Reloaded config.\r\n' ){ + self.emit('reloaded', true); + } + + + if( data == 'END\r\n' ){ + self.emit('empty', null); + } + } function _handleGet(data, self){ - var msg = {}; - - var parts = data.split('\r\n'); - msg.queue = parts[0].match(/([a-zA-Z_]+)/mg)[1]; - msg.data = parts[1].replace('\r\n',''); - - self.emit('message', msg); + var msg = {}; + + var parts = data.split('\r\n'); + msg.queue = parts[0].match(/([a-zA-Z_]+)/mg)[1]; + msg.data = parts[1].replace('\r\n',''); + + self.emit('message', msg); } function _handleStats(data, self){ - var stats = {}; - - var parts = data.match(/STAT ([a-zA-Z_]+) (.*?)\r\n/g); + var stats = {}; - for( var i in parts ){ - var part = parts[i].split(' '); - stats[part[1]] = part[2].replace('\r\n', ''); - } + var parts = data.match(/STAT ([a-zA-Z_]+) (.*?)\r\n/g); + for( var i in parts ){ + var part = parts[i].split(' '); + stats[part[1]] = part[2].replace('\r\n', ''); + } - self.emit('stats', stats); + self.emit('stats', stats); } function _handleDumpStats(data, self){ - var stats = {}; - - var parts = data.replace('\r\n}\r\nEND','').split('}\r\n'); - - for( var i in parts ){ - var part = parts[i].split('\r\n'); - var queue = part[0].match(/[a-zA-Z_]+/g)[1]; - stats[queue] = {}; - for( var lcv = 1; lcv < part.length-1; ++lcv ){ - var p = part[lcv].split('='); - stats[queue][ p[0].replace(/ /g,'') ] = p[1]; - } + var stats = {}; + + var parts = data.replace('\r\n}\r\nEND','').split('}\r\n'); + + for( var i in parts ){ + var part = parts[i].split('\r\n'); + var queue = part[0].match(/[a-zA-Z_]+/g)[1]; + stats[queue] = {}; + for( var lcv = 1; lcv < part.length-1; ++lcv ){ + var p = part[lcv].split('='); + stats[queue][ p[0].replace(/ /g,'') ] = p[1]; } + } - self.emit('dump_stats', stats); + self.emit('dump_stats', stats); } kestrel.prototype._getConnection = function(){ - if( this._connections.length == 1 ){ - return this._connections[0]; - }else if( this._connections.length == 0 ){ - return null; - } + if( this._connections.length == 1 ) { + return this._connections[0]; + } else if( this._connections.length === 0 ) { + return null; + } - var connection = null; + var connection = null; - switch(this._settings.connectionType){ + switch(this._settings.connectionType) { case types.RANDOM: - var rand = Math.floor( Math.random()*this._connections.length ); - connection = this._connections[rand]; - break; + var rand = Math.floor( Math.random()*this._connections.length ); + connection = this._connections[rand]; + break; case types.ROUND_ROBIN: - this._currentConnection += 1; - if( this._currentConnection > this._connections.length ){ - this._currentConnection = 0; - } - connection = this._connections[this._currentConnection]; - break; + this._currentConnection += 1; + if( this._currentConnection > this._connections.length ){ + this._currentConnection = 0; + } + connection = this._connections[this._currentConnection]; + break; case types.FAILOVER: - connection = this._connections[0]; - break; - } + connection = this._connections[0]; + break; + } - return connection; -} + return connection; +}; kestrel.prototype.close = function(){ - //close any open connections - for( var i in this._connections ){ - this._connections[i].destroy(); - } -} - + //close any open connections + for( var i in this._connections ){ + this._connections[i].destroy(); + } +}; kestrel.prototype.set = function( queue, value, lifetime, callback){ - if( typeof(lifetime) === "function" ){ + if( typeof(lifetime) === 'function' ){ callback = lifetime; lifetime = null; + } + if( lifetime === undefined || lifetime === null ){ + lifetime = 0; + } + + if(this._pendingSetCallback && callback){ + return callback('Cannot write again. Still waiting for previous write to be stored'); + } + + var command = 'SET ' + queue + ' 0 ' + lifetime + ' '; + command += Buffer.byteLength(value, 'utf8') + '\r\n' + value + '\r\n'; + + var connection = this._getConnection(); + if( connection ){ + if(callback){ + this._pendingSetCallback = callback; } - if( lifetime == undefined || lifetime == null ){ - lifetime = 0; - } - - if(this._pendingSetCallback && callback){ - return callback("Cannot write again. Still waiting for previous write to be stored"); - } - - var command = "SET " + queue + " 0 " + lifetime + " "; - command += Buffer.byteLength(value, 'utf8') + "\r\n" + value + "\r\n"; - - var connection = this._getConnection(); - if( connection != null ){ - if(callback){ - this._pendingSetCallback = callback; - } - connection.write(command); + connection.write(command); + } else { + if(callback){ + callback('No connection'); } else { - if(callback){ - callback("No connection"); - } else { - throw new Error("No connection"); - } + throw new Error('No connection'); } + } - return this; -} + return this; +}; kestrel.prototype.get = function(queue, timeout){ - var command = "GET " + queue; - - timeout = parseInt(timeout); - if( timeout > 0 ){ - command += "/t="+timeout; - } - - var connection = this._getConnection(); - if( connection != null ){ - connection.write(command + '\r\n'); - } - - return this; -} + var command = 'GET ' + queue; + + timeout = parseInt(timeout,10); + if( timeout > 0 ){ + command += '/t='+timeout; + } + + var connection = this._getConnection(); + if( connection ){ + connection.write(command + '\r\n'); + } + + return this; +}; kestrel.prototype.delete = function(queue){ - //delete given queue - var connection = this._getConnection(); - if( connection != null ){ - connection.write('DELETE ' + queue + '\r\n'); - } + //delete given queue + var connection = this._getConnection(); + if( connection ){ + connection.write('DELETE ' + queue + '\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.flush = function(queue){ - //flush given queue - var connection = this._getConnection(); - if( connection != null ){ - connection.write('FLUSH ' + queue + '\r\n'); - } - - return this; -} + //flush given queue + var connection = this._getConnection(); + if( connection ){ + connection.write('FLUSH ' + queue + '\r\n'); + } + + return this; +}; kestrel.prototype.flushAll = function(){ - //flush all queues - var connection = this._getConnection(); - if( connection != null ){ - connection.write('FLUSH_ALL\r\n'); - } + //flush all queues + var connection = this._getConnection(); + if( connection ){ + connection.write('FLUSH_ALL\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.version = function(){ - //get version of server - var connection = this._getConnection(); - if( connection != null ){ - connection.write('VERSION\r\n'); - } + //get version of server + var connection = this._getConnection(); + if( connection ){ + connection.write('VERSION\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.shutdown = function(){ - //shutdown server - var connection = this._getConnection(); - if( connection != null ){ - connection.write('SHUTDOWN\r\n'); - } + //shutdown server + var connection = this._getConnection(); + if( connection ){ + connection.write('SHUTDOWN\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.reload = function(){ - //reload the server - var connection = this._getConnection(); - if( connection != null ){ - connection.write('RELOAD\r\n'); - } + //reload the server + var connection = this._getConnection(); + if( connection ){ + connection.write('RELOAD\r\n'); + } - return this; -} + return this; +}; -kestrel.prototype.stats = function( callback ){ - //get server stats - var connection = this._getConnection(); - if( connection != null ){ - connection.write('STATS\r\n'); - } +kestrel.prototype.stats = function(){ + //get server stats + var connection = this._getConnection(); + if( connection ){ + connection.write('STATS\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.dumpStats = function(){ - //dump server stats - var connection = this._getConnection(); - if( connection != null ){ - connection.write('DUMP_STATS\r\n'); - } + //dump server stats + var connection = this._getConnection(); + if( connection ){ + connection.write('DUMP_STATS\r\n'); + } + + return this; +}; - return this; -} kestrel.prototype.monitor = function(queue, seconds, maxItems){ //monitor the given queue -} +}; kestrel.prototype.confirm = function(queue, count){ //confirm received items -} +}; module.exports = kestrel; \ No newline at end of file diff --git a/lib/kestrelConsumer.js b/lib/kestrelConsumer.js index 49245b6..4324763 100644 --- a/lib/kestrelConsumer.js +++ b/lib/kestrelConsumer.js @@ -1,62 +1,62 @@ +'use strict'; + var client = require('./kestrelClient.js'); var ee2 = require('eventemitter2').EventEmitter2; var util = require('util'); - var consumer = function( queue, options ){ - this._queue = queue; - this._options = new client(options); - this._client = new client(options); - this._client.connect(); + this._queue = queue; + this._options = new client(options); + this._client = new client(options); + this._client.connect(); - this._callback = null; - this._consume = false; - this._timeout = 3000; + this._callback = null; + this._consume = false; + this._timeout = 3000; - var self = this; - this._client.on('message', function(message){ + var self = this; - if( typeof self._callback == 'function' ){ - self._callback(message); - } + this._client.on('message', function(message){ + if( typeof self._callback == 'function' ){ + self._callback(message); + } - if( self._consume ){ - self.get(self._timeout); - } + if( self._consume ){ + self.get(self._timeout); + } - self.emit('message', message); - }); + self.emit('message', message); + }); - this._client.on('empty', function(){ - self.get(self._timeout); - }); + this._client.on('empty', function(){ + self.get(self._timeout); + }); - ee2.call(this); -} + ee2.call(this); +}; util.inherits(consumer,ee2); - consumer.prototype.get = function( timeout ){ this._client.get(this._queue, timeout); -} +}; consumer.prototype.consume = function( callback ){ - if(!this._client){ - this._client = client(this._options); - this.client.connect(); - } - if( typeof(callback) == 'function' ){ - this._callback = callback; - } - this._consume = true; - this.get(this._timeout); -} + if(!this._client){ + this._client = client(this._options); + this.client.connect(); + } + if( typeof(callback) == 'function' ){ + this._callback = callback; + } + this._consume = true; + this.get(this._timeout); +}; consumer.prototype.stopConsuming = function(){ - this._consume = false; - this._client.close(); - this._client = null; -} + this._consume = false; + this._client.close(); + this._client = null; +}; module.exports = consumer; diff --git a/lib/kestrelProducer.js b/lib/kestrelProducer.js index a95a13d..4725c41 100644 --- a/lib/kestrelProducer.js +++ b/lib/kestrelProducer.js @@ -1,39 +1,40 @@ +'use strict'; + var ee2 = require('eventemitter2').EventEmitter2; var client = require('./kestrelClient.js'); var util = require('util'); var producer = function( queue, options ){ - this._queue = queue; - this._client = new client(options); - this._client.connect(); + this._queue = queue; + this._client = new client(options); + this._client.connect(); - var self = this; - this._client.on('stored', function(stored){ - self.emit('stored', stored); - }); + var self = this; + this._client.on('stored', function(stored){ + self.emit('stored', stored); + }); - ee2.call(this); -} + ee2.call(this); +}; util.inherits(producer,ee2); - - producer.prototype.send = function(message, lifetime, callback){ - if( typeof(lifetime) === "function" ){ + if( typeof(lifetime) === 'function' ){ callback = lifetime; lifetime = null; - } - if( lifetime == null || lifetime == undefined ){ - lifetime = 0; - } - lifetime = parseInt(lifetime); + } - this._client.set(this._queue, message, lifetime, callback); -} + if( lifetime === null || lifetime === undefined ){ + lifetime = 0; + } -producer.prototype.close = function(){ - this._client.close(); -} + lifetime = parseInt(lifetime,10); + + this._client.set(this._queue, message, lifetime, callback); +}; +producer.prototype.close = function(){ + this._client.close(); +}; module.exports = producer;