Browse Source

added reconnect when reliable read returns error

pull/6/head
Matthias Goetzke 12 years ago
parent
commit
0520ae6607
3 changed files with 48 additions and 24 deletions
  1. +9
    -3
      examples/consumer.js
  2. +4
    -2
      lib/kestrelClient.js
  3. +35
    -19
      lib/kestrelConsumer.js

+ 9
- 3
examples/consumer.js View File

@ -13,13 +13,19 @@ function consumed(message,cb){
console.log('Consumed Message:');
console.dir(message);
if(counter++>1){
//produce failure;
var errorConsumingMessage = null;
counter++;
if(counter==2){
errorConsumingMessage = new Error('oops');
}
else if(counter==5){
//simulate failure by never answering;
return process.exit(1);
}
if(cb){
cb();
cb(errorConsumingMessage);
}
}


+ 4
- 2
lib/kestrelClient.js View File

@ -87,7 +87,7 @@ function _createConnection(port, host, self){
function _handleData(data, self){
data = data.toString();
if( data.match('STORED\r\n') ){
self.emit('stored', true);
if(self._pendingSetCallback){
@ -220,7 +220,7 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){
}
var command = 'SET ' + queue + ' 0 ' + lifetime + ' ';
command += Buffer.byteLength(value, 'utf8') + '\r\n' + value + '\r\n';
command += Buffer.byteLength(value, 'utf8') + '\r\n';
var connection = this._getConnection();
if( connection ){
@ -228,6 +228,8 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){
this._pendingSetCallback = callback;
}
connection.write(command);
connection.write(value);
connection.write('\r\n');
} else {
if(callback){
callback('No connection');


+ 35
- 19
lib/kestrelConsumer.js View File

@ -1,29 +1,41 @@
'use strict';
var client = require('./kestrelClient.js');
var KestrelClient = require('./kestrelClient.js');
var ee2 = require('eventemitter2').EventEmitter2;
var util = require('util');
var consumer = function( queue, options ){
this._queue = queue;
this._options = new client(options);
this._client = new client(options);
this._client.connect();
this._options = options;
this._callback = null;
this._consume = false;
this._timeout = 3000;
var self = this;
this._client = connectToClient(this);
ee2.call(this);
};
util.inherits(consumer,ee2);
function connectToClient(self){
var client = new KestrelClient(self._options);
client.connect();
this._client.on('message', function(message){
client.on('message', function(message){
if(self._consume){
if(self._consume.reliable){
self._callback(message, function(err){
if(err){
self.stopConsuming();
var consumeOptions = self._consume;
var consumeCallback = self._callback;
self.stopConsuming(function(){
self.consume(consumeOptions,consumeCallback);
});
} else {
self._consumeNextMessage();
}
@ -37,13 +49,12 @@ var consumer = function( queue, options ){
self.emit('message', message);
});
this._client.on('empty', function(){
client.on('empty', function(){
self.get(self._timeout);
});
ee2.call(this);
};
util.inherits(consumer,ee2);
return client;
}
consumer.prototype.get = function( timeout ){
this._client.get(this._queue, timeout);
@ -71,8 +82,7 @@ consumer.prototype.consume = function( options, callback ){
}
if(!this._client){
this._client = client(this._options);
this.client.connect();
this._client = connectToClient(this);
}
this._callback = callback;
@ -89,11 +99,17 @@ consumer.prototype._consumeNextMessage = function(){
}
};
consumer.prototype.stopConsuming = function(){
this._consume = false;
this._client.close();
this._client = null;
this.emit('stop');
consumer.prototype.stopConsuming = function(cb){
if(this._consume){
this._consume = false;
this._client.close();
this._client = null;
this.emit('stop');
}
if(cb){
cb();
}
};
module.exports = consumer;

Loading…
Cancel
Save