diff --git a/examples/consumer.js b/examples/consumer.js index 9598c12..c417faf 100644 --- a/examples/consumer.js +++ b/examples/consumer.js @@ -13,13 +13,19 @@ function consumed(message,cb){ console.log('Consumed Message:'); console.dir(message); - if(counter++>1){ - //produce failure; + var errorConsumingMessage = null; + counter++; + + if(counter==2){ + errorConsumingMessage = new Error('oops'); + } + else if(counter==5){ + //simulate failure by never answering; return process.exit(1); } if(cb){ - cb(); + cb(errorConsumingMessage); } } diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index beedf01..7a44a64 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -87,7 +87,7 @@ function _createConnection(port, host, self){ function _handleData(data, self){ data = data.toString(); - + if( data.match('STORED\r\n') ){ self.emit('stored', true); if(self._pendingSetCallback){ @@ -220,7 +220,7 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){ } var command = 'SET ' + queue + ' 0 ' + lifetime + ' '; - command += Buffer.byteLength(value, 'utf8') + '\r\n' + value + '\r\n'; + command += Buffer.byteLength(value, 'utf8') + '\r\n'; var connection = this._getConnection(); if( connection ){ @@ -228,6 +228,8 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){ this._pendingSetCallback = callback; } connection.write(command); + connection.write(value); + connection.write('\r\n'); } else { if(callback){ callback('No connection'); diff --git a/lib/kestrelConsumer.js b/lib/kestrelConsumer.js index fa5d047..8c8c2b9 100644 --- a/lib/kestrelConsumer.js +++ b/lib/kestrelConsumer.js @@ -1,29 +1,41 @@ 'use strict'; -var client = require('./kestrelClient.js'); +var KestrelClient = require('./kestrelClient.js'); var ee2 = require('eventemitter2').EventEmitter2; var util = require('util'); var consumer = function( queue, options ){ this._queue = queue; - this._options = new client(options); - this._client = new client(options); - this._client.connect(); - + this._options = options; + this._callback = null; this._consume = false; this._timeout = 3000; - var self = this; + this._client = connectToClient(this); + + ee2.call(this); +}; +util.inherits(consumer,ee2); + +function connectToClient(self){ + + var client = new KestrelClient(self._options); + client.connect(); - this._client.on('message', function(message){ + client.on('message', function(message){ if(self._consume){ if(self._consume.reliable){ self._callback(message, function(err){ if(err){ - self.stopConsuming(); + var consumeOptions = self._consume; + var consumeCallback = self._callback; + + self.stopConsuming(function(){ + self.consume(consumeOptions,consumeCallback); + }); } else { self._consumeNextMessage(); } @@ -37,13 +49,12 @@ var consumer = function( queue, options ){ self.emit('message', message); }); - this._client.on('empty', function(){ + client.on('empty', function(){ self.get(self._timeout); }); - ee2.call(this); -}; -util.inherits(consumer,ee2); + return client; +} consumer.prototype.get = function( timeout ){ this._client.get(this._queue, timeout); @@ -71,8 +82,7 @@ consumer.prototype.consume = function( options, callback ){ } if(!this._client){ - this._client = client(this._options); - this.client.connect(); + this._client = connectToClient(this); } this._callback = callback; @@ -89,11 +99,17 @@ consumer.prototype._consumeNextMessage = function(){ } }; -consumer.prototype.stopConsuming = function(){ - this._consume = false; - this._client.close(); - this._client = null; - this.emit('stop'); +consumer.prototype.stopConsuming = function(cb){ + if(this._consume){ + this._consume = false; + this._client.close(); + this._client = null; + this.emit('stop'); + } + + if(cb){ + cb(); + } }; module.exports = consumer;