Browse Source

Merge 24492599bd into 0b3f91b848

pull/6/merge
matthiasg 12 years ago
parent
commit
59c4833cf5
4 changed files with 324 additions and 69 deletions
  1. +30
    -8
      examples/consumer.js
  2. +11
    -7
      examples/producer.js
  3. +201
    -25
      lib/kestrelClient.js
  4. +82
    -29
      lib/kestrelConsumer.js

+ 30
- 8
examples/consumer.js View File

@ -1,17 +1,39 @@
var kestrel = require('../');
'use strict';
var kestrel = require('..');
var consumer = new kestrel.kestrelConsumer( 'test', {
connectionType: kestrel.connectionType.ROUND_ROBIN,
servers: ['127.0.0.1:22133']
connectionType: kestrel.connectionType.ROUND_ROBIN,
servers: ['127.0.0.1:22133'],
decode: true
});
function consumed(message){
console.log('Consumed Message:');
console.dir(message);
var counter = 0;
function consumed(message,cb){
console.log('Consumed Message:',message.data.length);
console.dir(message);
//console.log('.');
var errorConsumingMessage = null;
counter++;
if(counter==2){
errorConsumingMessage = new Error('oops');
}
else if(counter==5){
//simulate failure by never answering;
return process.exit(1);
}
if(cb){
cb(errorConsumingMessage);
}
}
consumer.consume( consumed );
consumer.consume( {reliable:true}, consumed );
//unreliable still possible as before: consumer.consume( consumed );
setTimeout( function(){
consumer.stopConsuming();
consumer.stopConsuming();
}, 6000);

+ 11
- 7
examples/producer.js View File

@ -1,3 +1,5 @@
'use strict';
var kestrel = require('../');
@ -16,13 +18,15 @@ producer.on('stored', function(stored){
//lets input some data
var interval = setInterval( function(){
producer.send( (new Date().getTime()) + ' - New Message', function(err){
if(err){
console.log("ERR",err);
} else {
console.log("STORED");
}
} );
var message = (new Date().getTime()) + ' - New Message';
producer.send( message , function(err){
if(err){
console.log('ERR',err);
} else {
console.log('STORED:' + message);
}
} );
}, 100);


+ 201
- 25
lib/kestrelClient.js View File

@ -9,10 +9,15 @@ var kestrel = function( options ){
this._settings = {
servers: ['127.0.0.1:22133'],
connectionType: types.ROUND_ROBIN
connectionType: types.ROUND_ROBIN,
reconnect: false,
reconnectDelay: 200,
decode: true
};
this._currentConnection = 0;
this._connections = [];
this._openConnection = null;
this._receivingValue = null;
if( options instanceof Object ){
for( var key in options ){
@ -65,8 +70,15 @@ function _createConnection(port, host, self){
var connection = net.connect(port,host);
connection.on('error', function(err){
console.dir(err);
self._openConnection = null;
self.emit('error', err);
if(self._settings.reconnect){
setTimeout(function(){
self.connect();
},self._settings.reconnectDelay);
}
});
connection.on('data', function(data){
_handleData(data,self);
self.emit('data', data);
@ -76,9 +88,15 @@ function _createConnection(port, host, self){
}
function _handleData(data, self){
data = data.toString();
if( data.match('STORED\r\n') ){
if( isResponse(data,'VALUE') ){
return _handleGet(data,self);
}
var stringData = data.toString();
if( isResponse(data,'STORED') ) {
self.emit('stored', true);
if(self._pendingSetCallback){
var callback = self._pendingSetCallback;
@ -87,45 +105,172 @@ function _handleData(data, self){
}
}
if( data.match(/^VALUE/) ){
_handleGet(data,self);
}
if( data.match(/^STAT/) ){
_handleStats(data,self);
else if( isResponse(data,'STAT') ){
_handleStats(stringData,self);
}
if( data.match(/^queue/) ){
_handleDumpStats(data, self);
else if( isResponse(data,'queue') ){
_handleDumpStats(stringData, self);
}
if( data == 'DELETED\r\n' ){
else if( isResponse(data,'DELETED\r\n') ){
self.emit('deleted', true);
}
if( data.match(/^VERSION/) ){
self.emit('version', data.split(' ')[1].replace('\r\n',''));
else if( isResponse(data,'VERSION') ){
self.emit('version', stringData.split(' ')[1].replace('\r\n',''));
}
if( data == 'Reloaded config.\r\n' ){
else if( isResponse(data,'Reloaded config.\r\n') ){
self.emit('reloaded', true);
}
if( data == 'END\r\n' ){
else if( isResponse(data,'END\r\n') ){
self._openConnection = null;
self.emit('empty', null);
}
else {
_handleGet(data,self);
}
}
function isResponse(data,responseType){
var buffer = null;
if(Buffer.isBuffer(data)){
buffer = data;
} else {
buffer = new Buffer(data,'utf8');
}
return bufferStartsWithString(buffer,responseType);
}
function bufferStartsWithString(buffer,responseType){
var responseTypeAsBuffer = new Buffer(responseType,'utf8');
return bufferStartsWith(buffer,responseTypeAsBuffer);
}
function bufferStartsWith(buffer,prefixBuffer){
if(!buffer)
return false;
if( buffer.length < prefixBuffer.length){
return false;
}
for(var i = 0; i < prefixBuffer.length; i++) {
if(prefixBuffer[i] != buffer[i]){
return false;
}
}
return true;
}
function _handleGet(data, self){
var msg = {};
var parts = data.split('\r\n');
msg.queue = parts[0].match(/([a-zA-Z_]+)/mg)[1];
msg.data = parts[1].replace('\r\n','');
if(self._receivingValue){
var buffer = data.slice(0,self._receivingValue.bytesMissing);
self._receivingValue.bytesMissing -= buffer.length;
self._receivingValue.dataBuffers.push(buffer);
if(self._receivingValue.bytesMissing === 0){
var msg = {
queue: self._receivingValue.header.queue,
data: Buffer.concat(self._receivingValue.dataBuffers)
};
self._receivingValue = null;
if(self._settings.decode){
msg.data = msg.data.toString();
}
self.emit('message', msg);
}
return;
}
var header = parseValueHeader(data);
if(header.incomplete){
var partialData = data.slice(header.dataStart);
self._receivingValue = {
header: header,
dataBuffers: [partialData],
bytesMissing: header.dataLength - partialData.length
};
return;
} else {
var msg = {
queue: header.queue,
data: data.slice(header.dataStart,header.dataEnd+1)
};
if(self._settings.decode){
msg.data = msg.data.toString();
}
self.emit('message', msg);
}
}
function parseValueHeader(buffer){
var endOfLineMarker = '\r\n';
var indexOfEndOfLineMarker = findEndOfLine(buffer,endOfLineMarker);
if(indexOfEndOfLineMarker<=0){
throw new Error('incorrect \r\n in VALUE header');
}
var headerString = buffer.slice(0,indexOfEndOfLineMarker).toString();
var headerParts = headerString.split(' ');
if(headerParts[0] != 'VALUE'){
throw new Error('Invalid VALUE header', headerString);
}
var dataLength = parseInt(headerParts[3],10);
var headerLength = indexOfEndOfLineMarker + endOfLineMarker.length;
var dataEnd = headerLength+dataLength-1;
return {
queue: headerParts[1],
dataLength: dataLength,
dataStart: headerLength,
dataEnd: dataEnd,
incomplete: (dataEnd >= buffer.length)
};
}
function findEndOfLine(buffer,endOfLineMarker){
var eol = new Buffer(endOfLineMarker,'utf8');
var firstEolCharacter = eol[0];
self.emit('message', msg);
if(buffer.length < eol.length){
return;
}
for (var i = 0; i < buffer.length-eol.length; i++) {
if(buffer[i] !== firstEolCharacter){
continue;
}
var comparisonBuffer = buffer.slice(i,i+eol.length);
if(bufferStartsWith(comparisonBuffer,eol)){
return i;
}
}
return -1;
}
function _handleStats(data, self){
@ -210,7 +355,7 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){
}
var command = 'SET ' + queue + ' 0 ' + lifetime + ' ';
command += Buffer.byteLength(value, 'utf8') + '\r\n' + value + '\r\n';
command += value.length + '\r\n';
var connection = this._getConnection();
if( connection ){
@ -218,6 +363,8 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){
this._pendingSetCallback = callback;
}
connection.write(command);
connection.write(value);
connection.write('\r\n');
} else {
if(callback){
callback('No connection');
@ -231,7 +378,7 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){
kestrel.prototype.get = function(queue, timeout){
var command = 'GET ' + queue;
timeout = parseInt(timeout,10);
if( timeout > 0 ){
command += '/t='+timeout;
@ -245,6 +392,35 @@ kestrel.prototype.get = function(queue, timeout){
return this;
};
kestrel.prototype.getNextOpen = function(queue, timeout){
var command = 'GET ' + queue;
timeout = parseInt(timeout,10);
if( timeout > 0 ){
command += '/t='+timeout;
}
var connection = this._openConnection;
if(connection) {
command += '/close';
} else {
connection = this._getConnection();
this._openConnection = connection;
}
command += '/open';
if( connection ){
connection.write(command + '\r\n');
}
return this;
};
kestrel.prototype.delete = function(queue){
//delete given queue
var connection = this._getConnection();


+ 82
- 29
lib/kestrelConsumer.js View File

@ -1,62 +1,115 @@
'use strict';
var client = require('./kestrelClient.js');
var KestrelClient = require('./kestrelClient.js');
var ee2 = require('eventemitter2').EventEmitter2;
var util = require('util');
var consumer = function( queue, options ){
this._queue = queue;
this._options = new client(options);
this._client = new client(options);
this._client.connect();
this._options = options;
this._callback = null;
this._consume = false;
this._timeout = 3000;
var self = this;
this._client = connectToClient(this);
this._client.on('message', function(message){
if( typeof self._callback == 'function' ){
self._callback(message);
}
ee2.call(this);
};
util.inherits(consumer,ee2);
function connectToClient(self){
var client = new KestrelClient(self._options);
client.connect();
if( self._consume ){
self.get(self._timeout);
client.on('message', function(message){
if(self._consume){
if(self._consume.reliable){
self._callback(message, function(err){
if(err){
var consumeOptions = self._consume;
var consumeCallback = self._callback;
self.stopConsuming(function(){
self.consume(consumeOptions,consumeCallback);
});
} else {
self._getNextMessage();
}
});
} else {
self._callback(message);
self._getNextMessage();
}
}
self.emit('message', message);
});
this._client.on('empty', function(){
self.get(self._timeout);
client.on('empty', function(){
self._getNextMessage();
});
ee2.call(this);
};
util.inherits(consumer,ee2);
return client;
}
consumer.prototype.get = function( timeout ){
this._client.get(this._queue, timeout);
this._client.get(this._queue, timeout);
};
consumer.prototype.getNextOpen = function( timeout ){
this._client.getNextOpen(this._queue, timeout);
};
consumer.prototype.consume = function( callback ){
consumer.prototype.consume = function( options, callback ){
if(typeof(options) == 'function'){
callback = options;
options = {
reliable: false
};
}
if(!options){
throw new Error('options must be an object or left out');
}
if(!callback){
throw new Error('callback to consume messages required');
}
if(!this._client){
this._client = client(this._options);
this.client.connect();
this._client = connectToClient(this);
}
if( typeof(callback) == 'function' ){
this._callback = callback;
this._callback = callback;
this._consume = options;
this._getNextMessage();
};
consumer.prototype._getNextMessage = function(){
if(this._consume.reliable){
this.getNextOpen(this._timeout);
} else {
this.get(this._timeout);
}
this._consume = true;
this.get(this._timeout);
};
consumer.prototype.stopConsuming = function(){
this._consume = false;
this._client.close();
this._client = null;
consumer.prototype.stopConsuming = function(cb){
if(this._consume){
this._consume = false;
this._client.close();
this._client = null;
this.emit('stop');
}
if(cb){
cb();
}
};
module.exports = consumer;

Loading…
Cancel
Save