From 5e23dc3980999941ff21c1ac29262cd116d7bfdd Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Mon, 10 Mar 2014 14:34:12 +0100 Subject: [PATCH 1/5] added callback to client.set and producer.send --- examples/producer.js | 12 +++++++++--- lib/kestrelClient.js | 29 ++++++++++++++++++++++++++--- lib/kestrelProducer.js | 8 ++++++-- 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/examples/producer.js b/examples/producer.js index c94031f..74996ff 100644 --- a/examples/producer.js +++ b/examples/producer.js @@ -16,12 +16,18 @@ producer.on('stored', function(stored){ //lets input some data var interval = setInterval( function(){ - producer.send( (new Date().getTime()) + ' - New Message' ); -}, 100); + producer.send( (new Date().getTime()) + ' - New Message', function(err){ + if(err){ + console.log("ERR",err); + } else { + console.log("STORED"); + } + } ); +}, 0); //close connection setTimeout( function(){ clearInterval(interval); producer.close(); -}, 6000); \ No newline at end of file +}, 500); \ No newline at end of file diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index 579e5f5..cd82d54 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -20,6 +20,7 @@ var kestrel = function( options ){ } } + this._pendingSetCallback = null; ee2.call(this); } @@ -75,7 +76,12 @@ function _handleData(data, self){ data = data.toString(); if( data.match('STORED\r\n') != null ){ - self.emit('stored', true); + self.emit('stored', true); + if(self._pendingSetCallback){ + var callback = self._pendingSetCallback; + self._pendingSetCallback = null; + callback(null); + } } if( data.match(/^VALUE/) != null ){ @@ -189,17 +195,34 @@ kestrel.prototype.close = function(){ } -kestrel.prototype.set = function( queue, value, lifetime ){ +kestrel.prototype.set = function( queue, value, lifetime, callback){ + if( typeof(lifetime) === "function" ){ + callback = lifetime; + lifetime = null; + } if( lifetime == undefined || lifetime == null ){ lifetime = 0; } + if(this._pendingSetCallback && callback){ + return callback("Cannot write again. Still waiting for previous write to be stored"); + } + var command = "SET " + queue + " 0 " + lifetime + " "; command += Buffer.byteLength(value, 'utf8') + "\r\n" + value + "\r\n"; var connection = this._getConnection(); if( connection != null ){ - connection.write(command); + if(callback){ + this._pendingSetCallback = callback; + } + connection.write(command); + } else { + if(callback){ + callback("No connection"); + } else { + throw new Error("No connection"); + } } return this; diff --git a/lib/kestrelProducer.js b/lib/kestrelProducer.js index 8a26de6..a95a13d 100644 --- a/lib/kestrelProducer.js +++ b/lib/kestrelProducer.js @@ -18,13 +18,17 @@ util.inherits(producer,ee2); -producer.prototype.send = function(message, lifetime){ +producer.prototype.send = function(message, lifetime, callback){ + if( typeof(lifetime) === "function" ){ + callback = lifetime; + lifetime = null; + } if( lifetime == null || lifetime == undefined ){ lifetime = 0; } lifetime = parseInt(lifetime); - this._client.set(this._queue, message, lifetime); + this._client.set(this._queue, message, lifetime, callback); } producer.prototype.close = function(){ From 19a3ac684837f9e5e3603c88b636f4e409f793b6 Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Mon, 10 Mar 2014 14:36:38 +0100 Subject: [PATCH 2/5] added set/send callbacks to README --- README.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index b2b6d87..f53cb6d 100644 --- a/README.md +++ b/README.md @@ -30,14 +30,16 @@ Opens all server connections. #### close() Closes all server connections. -#### set(queue, value, lifetime) +#### set(queue, value, lifetime, callback) Send sthe Kestrel `SET` command `queue` is the string name of the queue you wish to append to. `value` is the string message that you wish to append to `queue`. -`lifetime` is an interger value to represent the TTL of the message in seconds. +`lifetime` is an integer value to represent the TTL of the message in seconds. + +`callback` when specified is being called once kestrel responded with STORED. No other set calls are possible until the callback has been called though. #### get(queue, timeout) Tries to get a message from `queue` @@ -96,7 +98,9 @@ Appends a message onto the queue provided in the constructor `message` string message to append to the queue -`lifetime` is the TTL for the message in seconds. +`lifetime` is the TTL for the message in seconds (OPTIONAL). + +`callback` when specified is being called once kestrel responded with STORED. No other set calls are possible until the callback has been called though. #### close() Closes all server connections. From 2e804cc21f41ff40da71e84ed9f426f0ec40e62c Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Mon, 10 Mar 2014 14:38:57 +0100 Subject: [PATCH 3/5] changed producer example back to 6s --- examples/producer.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/producer.js b/examples/producer.js index 74996ff..91efc72 100644 --- a/examples/producer.js +++ b/examples/producer.js @@ -23,11 +23,11 @@ var interval = setInterval( function(){ console.log("STORED"); } } ); -}, 0); +}, 100); //close connection setTimeout( function(){ clearInterval(interval); producer.close(); -}, 500); \ No newline at end of file +}, 6000); \ No newline at end of file From 925b17d61a1386413fe2d378874fa4633a4dee0d Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Mon, 10 Mar 2014 18:01:23 +0100 Subject: [PATCH 4/5] made files pass jslint, set replaced tabs with spaces (tabs and spaces where mixed before), set indentation to 2 (as per rdahl convention) --- lib/kestrelClient.js | 484 +++++++++++++++++++++-------------------- lib/kestrelConsumer.js | 78 +++---- lib/kestrelProducer.js | 45 ++-- 3 files changed, 305 insertions(+), 302 deletions(-) diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index cd82d54..cf894a1 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -1,3 +1,5 @@ +'use strict'; + var ee2 = require('eventemitter2').EventEmitter2; var net = require('net'); var util = require('util'); @@ -5,331 +7,331 @@ var types = require('./connectionType.js'); var kestrel = function( options ){ - this._settings = { - servers: ['127.0.0.1:22133'], - connectionType: types.ROUND_ROBIN - } - this._currentConnection = 0; - this._connections = []; - - if( options instanceof Object ){ - for( var key in options ){ - if( this._settings[key] != undefined ){ - this._settings[key] = options[key]; - } - } + this._settings = { + servers: ['127.0.0.1:22133'], + connectionType: types.ROUND_ROBIN + }; + this._currentConnection = 0; + this._connections = []; + + if( options instanceof Object ){ + for( var key in options ){ + if( this._settings[key] !== undefined ){ + this._settings[key] = options[key]; + } } + } - this._pendingSetCallback = null; + this._pendingSetCallback = null; + + ee2.call(this); +}; - ee2.call(this); -} util.inherits(kestrel,ee2); //open connections to kestrel server(s) kestrel.prototype.connect = function(){ - var type = this._settings.connectionType; + var type = this._settings.connectionType; - if( types[type] == undefined || types[type] == null ){ - throw "Kestrel Client Connection: Argument 1 Must Be A Valid Kestrel Connection Type"; - } + if( types[type] === undefined || types[type] === null ){ + throw 'Kestrel Client Connection: Argument 1 Must Be A Valid Kestrel Connection Type'; + } + + var parts,port,host; - switch( type ){ + switch( type ){ case types.RANDOM: case types.ROUND_ROBIN: - for(var i in this._settings.servers){ - var parts = this._settings.servers[i].split(':'); - var port = (parts.length>1)?parts[1]:'22133'; - var host = parts[0]; - this._connections.push( _createConnection(port,host,this) ); - } - break; + for(var i in this._settings.servers){ + parts = this._settings.servers[i].split(':'); + port = (parts.length>1)?parts[1]:'22133'; + host = parts[0]; + this._connections.push( _createConnection(port,host,this) ); + } + break; case types.FAILOVER: - var rand = Math.floor( Math.random()*this._settings.servers.length ); - var parts = this._settings.servers[rand].split(':'); - var port = (parts.length>1)?parts[1]:'22133'; - var host = parts[0]; - this._connections.push( _createConnection(port,host,this) ); - break; + var rand = Math.floor( Math.random()*this._settings.servers.length ); + parts = this._settings.servers[rand].split(':'); + port = (parts.length>1)?parts[1]:'22133'; + host = parts[0]; + this._connections.push( _createConnection(port,host,this) ); + break; default: - throw "Kestrel Client Connection: Unknown Connection Type"; - break; - } - -} + throw 'Kestrel Client Connection: Unknown Connection Type'; + } +}; function _createConnection(port, host, self){ - var connection = net.connect(port,host); + var connection = net.connect(port,host); - connection.on('error', function(err){ - console.dir(err); - }); - connection.on('data', function(data){ - _handleData(data,self); - self.emit('data', data); - }); + connection.on('error', function(err){ + console.dir(err); + }); + connection.on('data', function(data){ + _handleData(data,self); + self.emit('data', data); + }); - return connection; + return connection; } function _handleData(data, self){ - data = data.toString(); - - if( data.match('STORED\r\n') != null ){ - self.emit('stored', true); - if(self._pendingSetCallback){ - var callback = self._pendingSetCallback; - self._pendingSetCallback = null; - callback(null); - } - } - - if( data.match(/^VALUE/) != null ){ - _handleGet(data,self); - } - - if( data.match(/^STAT/) != null ){ - _handleStats(data,self); - } - - if( data.match(/^queue/) != null ){ - _handleDumpStats(data, self); - } - - if( data == 'DELETED\r\n' ){ - self.emit('deleted', true); - } - - if( data.match(/^VERSION/) != null ){ - self.emit('version', data.split(' ')[1].replace('\r\n','')); - } - - if( data == 'Reloaded config.\r\n' ){ - self.emit('reloaded', true); + data = data.toString(); + + if( data.match('STORED\r\n') ){ + self.emit('stored', true); + if(self._pendingSetCallback){ + var callback = self._pendingSetCallback; + self._pendingSetCallback = null; + callback(null); } - - - if( data == 'END\r\n' ){ - self.emit('empty', null); - } - + } + + if( data.match(/^VALUE/) ){ + _handleGet(data,self); + } + + if( data.match(/^STAT/) ){ + _handleStats(data,self); + } + + if( data.match(/^queue/) ){ + _handleDumpStats(data, self); + } + + if( data == 'DELETED\r\n' ){ + self.emit('deleted', true); + } + + if( data.match(/^VERSION/) ){ + self.emit('version', data.split(' ')[1].replace('\r\n','')); + } + + if( data == 'Reloaded config.\r\n' ){ + self.emit('reloaded', true); + } + + + if( data == 'END\r\n' ){ + self.emit('empty', null); + } + } function _handleGet(data, self){ - var msg = {}; - - var parts = data.split('\r\n'); - msg.queue = parts[0].match(/([a-zA-Z_]+)/mg)[1]; - msg.data = parts[1].replace('\r\n',''); - - self.emit('message', msg); + var msg = {}; + + var parts = data.split('\r\n'); + msg.queue = parts[0].match(/([a-zA-Z_]+)/mg)[1]; + msg.data = parts[1].replace('\r\n',''); + + self.emit('message', msg); } function _handleStats(data, self){ - var stats = {}; - - var parts = data.match(/STAT ([a-zA-Z_]+) (.*?)\r\n/g); + var stats = {}; - for( var i in parts ){ - var part = parts[i].split(' '); - stats[part[1]] = part[2].replace('\r\n', ''); - } + var parts = data.match(/STAT ([a-zA-Z_]+) (.*?)\r\n/g); + for( var i in parts ){ + var part = parts[i].split(' '); + stats[part[1]] = part[2].replace('\r\n', ''); + } - self.emit('stats', stats); + self.emit('stats', stats); } function _handleDumpStats(data, self){ - var stats = {}; - - var parts = data.replace('\r\n}\r\nEND','').split('}\r\n'); - - for( var i in parts ){ - var part = parts[i].split('\r\n'); - var queue = part[0].match(/[a-zA-Z_]+/g)[1]; - stats[queue] = {}; - for( var lcv = 1; lcv < part.length-1; ++lcv ){ - var p = part[lcv].split('='); - stats[queue][ p[0].replace(/ /g,'') ] = p[1]; - } + var stats = {}; + + var parts = data.replace('\r\n}\r\nEND','').split('}\r\n'); + + for( var i in parts ){ + var part = parts[i].split('\r\n'); + var queue = part[0].match(/[a-zA-Z_]+/g)[1]; + stats[queue] = {}; + for( var lcv = 1; lcv < part.length-1; ++lcv ){ + var p = part[lcv].split('='); + stats[queue][ p[0].replace(/ /g,'') ] = p[1]; } + } - self.emit('dump_stats', stats); + self.emit('dump_stats', stats); } kestrel.prototype._getConnection = function(){ - if( this._connections.length == 1 ){ - return this._connections[0]; - }else if( this._connections.length == 0 ){ - return null; - } + if( this._connections.length == 1 ) { + return this._connections[0]; + } else if( this._connections.length === 0 ) { + return null; + } - var connection = null; + var connection = null; - switch(this._settings.connectionType){ + switch(this._settings.connectionType) { case types.RANDOM: - var rand = Math.floor( Math.random()*this._connections.length ); - connection = this._connections[rand]; - break; + var rand = Math.floor( Math.random()*this._connections.length ); + connection = this._connections[rand]; + break; case types.ROUND_ROBIN: - this._currentConnection += 1; - if( this._currentConnection > this._connections.length ){ - this._currentConnection = 0; - } - connection = this._connections[this._currentConnection]; - break; + this._currentConnection += 1; + if( this._currentConnection > this._connections.length ){ + this._currentConnection = 0; + } + connection = this._connections[this._currentConnection]; + break; case types.FAILOVER: - connection = this._connections[0]; - break; - } + connection = this._connections[0]; + break; + } - return connection; -} + return connection; +}; kestrel.prototype.close = function(){ - //close any open connections - for( var i in this._connections ){ - this._connections[i].destroy(); - } -} - + //close any open connections + for( var i in this._connections ){ + this._connections[i].destroy(); + } +}; kestrel.prototype.set = function( queue, value, lifetime, callback){ - if( typeof(lifetime) === "function" ){ + if( typeof(lifetime) === 'function' ){ callback = lifetime; lifetime = null; + } + if( lifetime === undefined || lifetime === null ){ + lifetime = 0; + } + + if(this._pendingSetCallback && callback){ + return callback('Cannot write again. Still waiting for previous write to be stored'); + } + + var command = 'SET ' + queue + ' 0 ' + lifetime + ' '; + command += Buffer.byteLength(value, 'utf8') + '\r\n' + value + '\r\n'; + + var connection = this._getConnection(); + if( connection ){ + if(callback){ + this._pendingSetCallback = callback; } - if( lifetime == undefined || lifetime == null ){ - lifetime = 0; - } - - if(this._pendingSetCallback && callback){ - return callback("Cannot write again. Still waiting for previous write to be stored"); - } - - var command = "SET " + queue + " 0 " + lifetime + " "; - command += Buffer.byteLength(value, 'utf8') + "\r\n" + value + "\r\n"; - - var connection = this._getConnection(); - if( connection != null ){ - if(callback){ - this._pendingSetCallback = callback; - } - connection.write(command); + connection.write(command); + } else { + if(callback){ + callback('No connection'); } else { - if(callback){ - callback("No connection"); - } else { - throw new Error("No connection"); - } + throw new Error('No connection'); } + } - return this; -} + return this; +}; kestrel.prototype.get = function(queue, timeout){ - var command = "GET " + queue; - - timeout = parseInt(timeout); - if( timeout > 0 ){ - command += "/t="+timeout; - } - - var connection = this._getConnection(); - if( connection != null ){ - connection.write(command + '\r\n'); - } - - return this; -} + var command = 'GET ' + queue; + + timeout = parseInt(timeout,10); + if( timeout > 0 ){ + command += '/t='+timeout; + } + + var connection = this._getConnection(); + if( connection ){ + connection.write(command + '\r\n'); + } + + return this; +}; kestrel.prototype.delete = function(queue){ - //delete given queue - var connection = this._getConnection(); - if( connection != null ){ - connection.write('DELETE ' + queue + '\r\n'); - } + //delete given queue + var connection = this._getConnection(); + if( connection ){ + connection.write('DELETE ' + queue + '\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.flush = function(queue){ - //flush given queue - var connection = this._getConnection(); - if( connection != null ){ - connection.write('FLUSH ' + queue + '\r\n'); - } - - return this; -} + //flush given queue + var connection = this._getConnection(); + if( connection ){ + connection.write('FLUSH ' + queue + '\r\n'); + } + + return this; +}; kestrel.prototype.flushAll = function(){ - //flush all queues - var connection = this._getConnection(); - if( connection != null ){ - connection.write('FLUSH_ALL\r\n'); - } + //flush all queues + var connection = this._getConnection(); + if( connection ){ + connection.write('FLUSH_ALL\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.version = function(){ - //get version of server - var connection = this._getConnection(); - if( connection != null ){ - connection.write('VERSION\r\n'); - } + //get version of server + var connection = this._getConnection(); + if( connection ){ + connection.write('VERSION\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.shutdown = function(){ - //shutdown server - var connection = this._getConnection(); - if( connection != null ){ - connection.write('SHUTDOWN\r\n'); - } + //shutdown server + var connection = this._getConnection(); + if( connection ){ + connection.write('SHUTDOWN\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.reload = function(){ - //reload the server - var connection = this._getConnection(); - if( connection != null ){ - connection.write('RELOAD\r\n'); - } + //reload the server + var connection = this._getConnection(); + if( connection ){ + connection.write('RELOAD\r\n'); + } - return this; -} + return this; +}; -kestrel.prototype.stats = function( callback ){ - //get server stats - var connection = this._getConnection(); - if( connection != null ){ - connection.write('STATS\r\n'); - } +kestrel.prototype.stats = function(){ + //get server stats + var connection = this._getConnection(); + if( connection ){ + connection.write('STATS\r\n'); + } - return this; -} + return this; +}; kestrel.prototype.dumpStats = function(){ - //dump server stats - var connection = this._getConnection(); - if( connection != null ){ - connection.write('DUMP_STATS\r\n'); - } + //dump server stats + var connection = this._getConnection(); + if( connection ){ + connection.write('DUMP_STATS\r\n'); + } + + return this; +}; - return this; -} kestrel.prototype.monitor = function(queue, seconds, maxItems){ //monitor the given queue -} +}; kestrel.prototype.confirm = function(queue, count){ //confirm received items -} +}; module.exports = kestrel; \ No newline at end of file diff --git a/lib/kestrelConsumer.js b/lib/kestrelConsumer.js index 49245b6..4324763 100644 --- a/lib/kestrelConsumer.js +++ b/lib/kestrelConsumer.js @@ -1,62 +1,62 @@ +'use strict'; + var client = require('./kestrelClient.js'); var ee2 = require('eventemitter2').EventEmitter2; var util = require('util'); - var consumer = function( queue, options ){ - this._queue = queue; - this._options = new client(options); - this._client = new client(options); - this._client.connect(); + this._queue = queue; + this._options = new client(options); + this._client = new client(options); + this._client.connect(); - this._callback = null; - this._consume = false; - this._timeout = 3000; + this._callback = null; + this._consume = false; + this._timeout = 3000; - var self = this; - this._client.on('message', function(message){ + var self = this; - if( typeof self._callback == 'function' ){ - self._callback(message); - } + this._client.on('message', function(message){ + if( typeof self._callback == 'function' ){ + self._callback(message); + } - if( self._consume ){ - self.get(self._timeout); - } + if( self._consume ){ + self.get(self._timeout); + } - self.emit('message', message); - }); + self.emit('message', message); + }); - this._client.on('empty', function(){ - self.get(self._timeout); - }); + this._client.on('empty', function(){ + self.get(self._timeout); + }); - ee2.call(this); -} + ee2.call(this); +}; util.inherits(consumer,ee2); - consumer.prototype.get = function( timeout ){ this._client.get(this._queue, timeout); -} +}; consumer.prototype.consume = function( callback ){ - if(!this._client){ - this._client = client(this._options); - this.client.connect(); - } - if( typeof(callback) == 'function' ){ - this._callback = callback; - } - this._consume = true; - this.get(this._timeout); -} + if(!this._client){ + this._client = client(this._options); + this.client.connect(); + } + if( typeof(callback) == 'function' ){ + this._callback = callback; + } + this._consume = true; + this.get(this._timeout); +}; consumer.prototype.stopConsuming = function(){ - this._consume = false; - this._client.close(); - this._client = null; -} + this._consume = false; + this._client.close(); + this._client = null; +}; module.exports = consumer; diff --git a/lib/kestrelProducer.js b/lib/kestrelProducer.js index a95a13d..4725c41 100644 --- a/lib/kestrelProducer.js +++ b/lib/kestrelProducer.js @@ -1,39 +1,40 @@ +'use strict'; + var ee2 = require('eventemitter2').EventEmitter2; var client = require('./kestrelClient.js'); var util = require('util'); var producer = function( queue, options ){ - this._queue = queue; - this._client = new client(options); - this._client.connect(); + this._queue = queue; + this._client = new client(options); + this._client.connect(); - var self = this; - this._client.on('stored', function(stored){ - self.emit('stored', stored); - }); + var self = this; + this._client.on('stored', function(stored){ + self.emit('stored', stored); + }); - ee2.call(this); -} + ee2.call(this); +}; util.inherits(producer,ee2); - - producer.prototype.send = function(message, lifetime, callback){ - if( typeof(lifetime) === "function" ){ + if( typeof(lifetime) === 'function' ){ callback = lifetime; lifetime = null; - } - if( lifetime == null || lifetime == undefined ){ - lifetime = 0; - } - lifetime = parseInt(lifetime); + } - this._client.set(this._queue, message, lifetime, callback); -} + if( lifetime === null || lifetime === undefined ){ + lifetime = 0; + } -producer.prototype.close = function(){ - this._client.close(); -} + lifetime = parseInt(lifetime,10); + + this._client.set(this._queue, message, lifetime, callback); +}; +producer.prototype.close = function(){ + this._client.close(); +}; module.exports = producer; From 2fb899b69cd2c3e756e3e12096dcd6627af2d490 Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Mon, 10 Mar 2014 18:04:41 +0100 Subject: [PATCH 5/5] added m.goetzke as contributor: --- package.json | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 2e3c520..0265a98 100644 --- a/package.json +++ b/package.json @@ -7,14 +7,15 @@ , "author": "brett_langdon (http://brett.is)" , "contributors": [ { "name": "brett_langdon", "email": "brett@blangdon.com" }, - { "name": "pablo_casado", "email": "p.casado.arias@gmail.com" } + { "name": "pablo_casado", "email": "p.casado.arias@gmail.com" }, + { "name: "Matthias Goetzke", "email": "m.goetzke@curasystems.com", "url": "https://twitter.com/mgoetzke" }, ] , "repository":{ "type": "git" , "url": "github.com/brettlangdon/node-kestrel.git" } , "dependencies": { - "eventemitter2": ">=0.4.9" + "eventemitter2": ">=0.4.9" } , "main": "./lib/index.js" , "engines": { "node": ">= 0.6.0" }