diff --git a/examples/consumer.js b/examples/consumer.js index 798728c..9598c12 100644 --- a/examples/consumer.js +++ b/examples/consumer.js @@ -1,17 +1,31 @@ -var kestrel = require('../'); +'use strict'; + +var kestrel = require('..'); var consumer = new kestrel.kestrelConsumer( 'test', { - connectionType: kestrel.connectionType.ROUND_ROBIN, - servers: ['127.0.0.1:22133'] + connectionType: kestrel.connectionType.ROUND_ROBIN, + servers: ['127.0.0.1:22133'] }); -function consumed(message){ - console.log('Consumed Message:'); - console.dir(message); +var counter = 0; + +function consumed(message,cb){ + console.log('Consumed Message:'); + console.dir(message); + + if(counter++>1){ + //produce failure; + return process.exit(1); + } + + if(cb){ + cb(); + } } -consumer.consume( consumed ); +consumer.consume( {reliable:true}, consumed ); +//unreliable still possible as before: consumer.consume( consumed ); setTimeout( function(){ - consumer.stopConsuming(); + consumer.stopConsuming(); }, 6000); \ No newline at end of file diff --git a/examples/producer.js b/examples/producer.js index 91efc72..b2979a4 100644 --- a/examples/producer.js +++ b/examples/producer.js @@ -1,3 +1,5 @@ +'use strict'; + var kestrel = require('../'); @@ -16,13 +18,15 @@ producer.on('stored', function(stored){ //lets input some data var interval = setInterval( function(){ - producer.send( (new Date().getTime()) + ' - New Message', function(err){ - if(err){ - console.log("ERR",err); - } else { - console.log("STORED"); - } - } ); + var message = (new Date().getTime()) + ' - New Message'; + + producer.send( message , function(err){ + if(err){ + console.log('ERR',err); + } else { + console.log('STORED:' + message); + } + } ); }, 100); diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index cf894a1..b7b4392 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -9,10 +9,13 @@ var kestrel = function( options ){ this._settings = { servers: ['127.0.0.1:22133'], - connectionType: types.ROUND_ROBIN + connectionType: types.ROUND_ROBIN, + reconnect: false, + reconnectDelay: 200 }; this._currentConnection = 0; this._connections = []; + this._openConnection = null; if( options instanceof Object ){ for( var key in options ){ @@ -65,8 +68,15 @@ function _createConnection(port, host, self){ var connection = net.connect(port,host); connection.on('error', function(err){ - console.dir(err); + self._openConnection = null; + self.emit('error', err); + if(self._settings.reconnect){ + setTimeout(function(){ + self.connect(); + },self._settings.reconnectDelay); + } }); + connection.on('data', function(data){ _handleData(data,self); self.emit('data', data); @@ -77,7 +87,8 @@ function _createConnection(port, host, self){ function _handleData(data, self){ data = data.toString(); - + console.log(data.substr(0,20)+'...') + if( data.match('STORED\r\n') ){ self.emit('stored', true); if(self._pendingSetCallback){ @@ -231,7 +242,7 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){ kestrel.prototype.get = function(queue, timeout){ var command = 'GET ' + queue; - + timeout = parseInt(timeout,10); if( timeout > 0 ){ command += '/t='+timeout; @@ -245,6 +256,35 @@ kestrel.prototype.get = function(queue, timeout){ return this; }; + +kestrel.prototype.getNextOpen = function(queue, timeout){ + var command = 'GET ' + queue; + + timeout = parseInt(timeout,10); + if( timeout > 0 ){ + command += '/t='+timeout; + } + + var connection = this._openConnection; + + if(connection) { + command += '/close'; + } else { + connection = this._getConnection(); + this._openConnection = connection; + } + + command += '/open'; + + if( connection ){ + connection.write(command + '\r\n'); + } + + return this; +}; + + + kestrel.prototype.delete = function(queue){ //delete given queue var connection = this._getConnection(); diff --git a/lib/kestrelConsumer.js b/lib/kestrelConsumer.js index 4324763..d728d5d 100644 --- a/lib/kestrelConsumer.js +++ b/lib/kestrelConsumer.js @@ -18,12 +18,20 @@ var consumer = function( queue, options ){ var self = this; this._client.on('message', function(message){ - if( typeof self._callback == 'function' ){ - self._callback(message); - } + if(self._consume){ - if( self._consume ){ - self.get(self._timeout); + if(self._consume.reliable){ + self._callback(message, function(err){ + if(err){ + self.stopConsuming(); + } else { + self._consumeNextMessage(); + } + }); + } else { + self._callback(message); + self._consumeNextMessage(); + } } self.emit('message', message); @@ -38,25 +46,54 @@ var consumer = function( queue, options ){ util.inherits(consumer,ee2); consumer.prototype.get = function( timeout ){ - this._client.get(this._queue, timeout); + this._client.get(this._queue, timeout); +}; + +consumer.prototype.getNextOpen = function( timeout ){ + this._client.getNextOpen(this._queue, timeout); }; -consumer.prototype.consume = function( callback ){ +consumer.prototype.consume = function( options, callback ){ + + if(typeof(options) == 'function'){ + callback = options; + options = { + reliable: false + }; + } + + if(!options){ + throw new Error('options must be an object or left out'); + } + + if(!callback){ + throw new Error('callback to consume messages required'); + } + if(!this._client){ this._client = client(this._options); this.client.connect(); } - if( typeof(callback) == 'function' ){ - this._callback = callback; + + this._callback = callback; + this._consume = options; + + this._consumeNextMessage(); +}; + +consumer.prototype._consumeNextMessage = function(){ + if(this._consume.reliable){ + this.getNextOpen(this._timeout); + } else { + this.get(this._timeout); } - this._consume = true; - this.get(this._timeout); }; consumer.prototype.stopConsuming = function(){ this._consume = false; this._client.close(); this._client = null; + this.emit('stop'); }; module.exports = consumer;