From 5e23dc3980999941ff21c1ac29262cd116d7bfdd Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Mon, 10 Mar 2014 14:34:12 +0100 Subject: [PATCH] added callback to client.set and producer.send --- examples/producer.js | 12 +++++++++--- lib/kestrelClient.js | 29 ++++++++++++++++++++++++++--- lib/kestrelProducer.js | 8 ++++++-- 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/examples/producer.js b/examples/producer.js index c94031f..74996ff 100644 --- a/examples/producer.js +++ b/examples/producer.js @@ -16,12 +16,18 @@ producer.on('stored', function(stored){ //lets input some data var interval = setInterval( function(){ - producer.send( (new Date().getTime()) + ' - New Message' ); -}, 100); + producer.send( (new Date().getTime()) + ' - New Message', function(err){ + if(err){ + console.log("ERR",err); + } else { + console.log("STORED"); + } + } ); +}, 0); //close connection setTimeout( function(){ clearInterval(interval); producer.close(); -}, 6000); \ No newline at end of file +}, 500); \ No newline at end of file diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index 579e5f5..cd82d54 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -20,6 +20,7 @@ var kestrel = function( options ){ } } + this._pendingSetCallback = null; ee2.call(this); } @@ -75,7 +76,12 @@ function _handleData(data, self){ data = data.toString(); if( data.match('STORED\r\n') != null ){ - self.emit('stored', true); + self.emit('stored', true); + if(self._pendingSetCallback){ + var callback = self._pendingSetCallback; + self._pendingSetCallback = null; + callback(null); + } } if( data.match(/^VALUE/) != null ){ @@ -189,17 +195,34 @@ kestrel.prototype.close = function(){ } -kestrel.prototype.set = function( queue, value, lifetime ){ +kestrel.prototype.set = function( queue, value, lifetime, callback){ + if( typeof(lifetime) === "function" ){ + callback = lifetime; + lifetime = null; + } if( lifetime == undefined || lifetime == null ){ lifetime = 0; } + if(this._pendingSetCallback && callback){ + return callback("Cannot write again. Still waiting for previous write to be stored"); + } + var command = "SET " + queue + " 0 " + lifetime + " "; command += Buffer.byteLength(value, 'utf8') + "\r\n" + value + "\r\n"; var connection = this._getConnection(); if( connection != null ){ - connection.write(command); + if(callback){ + this._pendingSetCallback = callback; + } + connection.write(command); + } else { + if(callback){ + callback("No connection"); + } else { + throw new Error("No connection"); + } } return this; diff --git a/lib/kestrelProducer.js b/lib/kestrelProducer.js index 8a26de6..a95a13d 100644 --- a/lib/kestrelProducer.js +++ b/lib/kestrelProducer.js @@ -18,13 +18,17 @@ util.inherits(producer,ee2); -producer.prototype.send = function(message, lifetime){ +producer.prototype.send = function(message, lifetime, callback){ + if( typeof(lifetime) === "function" ){ + callback = lifetime; + lifetime = null; + } if( lifetime == null || lifetime == undefined ){ lifetime = 0; } lifetime = parseInt(lifetime); - this._client.set(this._queue, message, lifetime); + this._client.set(this._queue, message, lifetime, callback); } producer.prototype.close = function(){