Browse Source

minimal change to add /open/close reliable fetch

pull/6/head
Matthias Goetzke 12 years ago
parent
commit
a9e74dc3ed
4 changed files with 125 additions and 30 deletions
  1. +22
    -8
      examples/consumer.js
  2. +11
    -7
      examples/producer.js
  3. +44
    -4
      lib/kestrelClient.js
  4. +48
    -11
      lib/kestrelConsumer.js

+ 22
- 8
examples/consumer.js View File

@ -1,17 +1,31 @@
var kestrel = require('../');
'use strict';
var kestrel = require('..');
var consumer = new kestrel.kestrelConsumer( 'test', {
connectionType: kestrel.connectionType.ROUND_ROBIN,
servers: ['127.0.0.1:22133']
connectionType: kestrel.connectionType.ROUND_ROBIN,
servers: ['127.0.0.1:22133']
});
function consumed(message){
console.log('Consumed Message:');
console.dir(message);
var counter = 0;
function consumed(message,cb){
console.log('Consumed Message:');
console.dir(message);
if(counter++>1){
//produce failure;
return process.exit(1);
}
if(cb){
cb();
}
}
consumer.consume( consumed );
consumer.consume( {reliable:true}, consumed );
//unreliable still possible as before: consumer.consume( consumed );
setTimeout( function(){
consumer.stopConsuming();
consumer.stopConsuming();
}, 6000);

+ 11
- 7
examples/producer.js View File

@ -1,3 +1,5 @@
'use strict';
var kestrel = require('../');
@ -16,13 +18,15 @@ producer.on('stored', function(stored){
//lets input some data
var interval = setInterval( function(){
producer.send( (new Date().getTime()) + ' - New Message', function(err){
if(err){
console.log("ERR",err);
} else {
console.log("STORED");
}
} );
var message = (new Date().getTime()) + ' - New Message';
producer.send( message , function(err){
if(err){
console.log('ERR',err);
} else {
console.log('STORED:' + message);
}
} );
}, 100);


+ 44
- 4
lib/kestrelClient.js View File

@ -9,10 +9,13 @@ var kestrel = function( options ){
this._settings = {
servers: ['127.0.0.1:22133'],
connectionType: types.ROUND_ROBIN
connectionType: types.ROUND_ROBIN,
reconnect: false,
reconnectDelay: 200
};
this._currentConnection = 0;
this._connections = [];
this._openConnection = null;
if( options instanceof Object ){
for( var key in options ){
@ -65,8 +68,15 @@ function _createConnection(port, host, self){
var connection = net.connect(port,host);
connection.on('error', function(err){
console.dir(err);
self._openConnection = null;
self.emit('error', err);
if(self._settings.reconnect){
setTimeout(function(){
self.connect();
},self._settings.reconnectDelay);
}
});
connection.on('data', function(data){
_handleData(data,self);
self.emit('data', data);
@ -77,7 +87,8 @@ function _createConnection(port, host, self){
function _handleData(data, self){
data = data.toString();
console.log(data.substr(0,20)+'...')
if( data.match('STORED\r\n') ){
self.emit('stored', true);
if(self._pendingSetCallback){
@ -231,7 +242,7 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){
kestrel.prototype.get = function(queue, timeout){
var command = 'GET ' + queue;
timeout = parseInt(timeout,10);
if( timeout > 0 ){
command += '/t='+timeout;
@ -245,6 +256,35 @@ kestrel.prototype.get = function(queue, timeout){
return this;
};
kestrel.prototype.getNextOpen = function(queue, timeout){
var command = 'GET ' + queue;
timeout = parseInt(timeout,10);
if( timeout > 0 ){
command += '/t='+timeout;
}
var connection = this._openConnection;
if(connection) {
command += '/close';
} else {
connection = this._getConnection();
this._openConnection = connection;
}
command += '/open';
if( connection ){
connection.write(command + '\r\n');
}
return this;
};
kestrel.prototype.delete = function(queue){
//delete given queue
var connection = this._getConnection();


+ 48
- 11
lib/kestrelConsumer.js View File

@ -18,12 +18,20 @@ var consumer = function( queue, options ){
var self = this;
this._client.on('message', function(message){
if( typeof self._callback == 'function' ){
self._callback(message);
}
if(self._consume){
if( self._consume ){
self.get(self._timeout);
if(self._consume.reliable){
self._callback(message, function(err){
if(err){
self.stopConsuming();
} else {
self._consumeNextMessage();
}
});
} else {
self._callback(message);
self._consumeNextMessage();
}
}
self.emit('message', message);
@ -38,25 +46,54 @@ var consumer = function( queue, options ){
util.inherits(consumer,ee2);
consumer.prototype.get = function( timeout ){
this._client.get(this._queue, timeout);
this._client.get(this._queue, timeout);
};
consumer.prototype.getNextOpen = function( timeout ){
this._client.getNextOpen(this._queue, timeout);
};
consumer.prototype.consume = function( callback ){
consumer.prototype.consume = function( options, callback ){
if(typeof(options) == 'function'){
callback = options;
options = {
reliable: false
};
}
if(!options){
throw new Error('options must be an object or left out');
}
if(!callback){
throw new Error('callback to consume messages required');
}
if(!this._client){
this._client = client(this._options);
this.client.connect();
}
if( typeof(callback) == 'function' ){
this._callback = callback;
this._callback = callback;
this._consume = options;
this._consumeNextMessage();
};
consumer.prototype._consumeNextMessage = function(){
if(this._consume.reliable){
this.getNextOpen(this._timeout);
} else {
this.get(this._timeout);
}
this._consume = true;
this.get(this._timeout);
};
consumer.prototype.stopConsuming = function(){
this._consume = false;
this._client.close();
this._client = null;
this.emit('stop');
};
module.exports = consumer;

Loading…
Cancel
Save