Browse Source

added support for values longer than one tcp data packet, also added decode setting which defaults to true and converts incoming values toString()

pull/6/head
Matthias Goetzke 12 years ago
parent
commit
24492599bd
2 changed files with 70 additions and 21 deletions
  1. +4
    -2
      examples/consumer.js
  2. +66
    -19
      lib/kestrelClient.js

+ 4
- 2
examples/consumer.js View File

@ -4,14 +4,16 @@ var kestrel = require('..');
var consumer = new kestrel.kestrelConsumer( 'test', {
connectionType: kestrel.connectionType.ROUND_ROBIN,
servers: ['127.0.0.1:22133']
servers: ['127.0.0.1:22133'],
decode: true
});
var counter = 0;
function consumed(message,cb){
console.log('Consumed Message:');
console.log('Consumed Message:',message.data.length);
console.dir(message);
//console.log('.');
var errorConsumingMessage = null;
counter++;


+ 66
- 19
lib/kestrelClient.js View File

@ -11,11 +11,13 @@ var kestrel = function( options ){
servers: ['127.0.0.1:22133'],
connectionType: types.ROUND_ROBIN,
reconnect: false,
reconnectDelay: 200
reconnectDelay: 200,
decode: true
};
this._currentConnection = 0;
this._connections = [];
this._openConnection = null;
this._receivingValue = null;
if( options instanceof Object ){
for( var key in options ){
@ -92,7 +94,7 @@ function _handleData(data, self){
return _handleGet(data,self);
}
data = data.toString();
var stringData = data.toString();
if( isResponse(data,'STORED') ) {
self.emit('stored', true);
@ -104,11 +106,11 @@ function _handleData(data, self){
}
else if( isResponse(data,'STAT') ){
_handleStats(data,self);
_handleStats(stringData,self);
}
else if( isResponse(data,'queue') ){
_handleDumpStats(data, self);
_handleDumpStats(stringData, self);
}
else if( isResponse(data,'DELETED\r\n') ){
@ -116,7 +118,7 @@ function _handleData(data, self){
}
else if( isResponse(data,'VERSION') ){
self.emit('version', data.split(' ')[1].replace('\r\n',''));
self.emit('version', stringData.split(' ')[1].replace('\r\n',''));
}
else if( isResponse(data,'Reloaded config.\r\n') ){
@ -129,7 +131,7 @@ function _handleData(data, self){
}
else {
console.log('unknown data from server',data, data.toString());
_handleGet(data,self);
}
}
@ -167,39 +169,84 @@ function bufferStartsWith(buffer,prefixBuffer){
}
function _handleGet(data, self){
var msg = {};
if(self._receivingValue){
var buffer = data.slice(0,self._receivingValue.bytesMissing);
self._receivingValue.bytesMissing -= buffer.length;
self._receivingValue.dataBuffers.push(buffer);
if(self._receivingValue.bytesMissing === 0){
var msg = {
queue: self._receivingValue.header.queue,
data: Buffer.concat(self._receivingValue.dataBuffers)
};
self._receivingValue = null;
if(self._settings.decode){
msg.data = msg.data.toString();
}
self.emit('message', msg);
}
return;
}
var header = parseValueHeader(data);
msg.queue = header.queue;
msg.data = data.slice(header.dataStart,header.dataEnd);
self.emit('message', msg);
if(header.incomplete){
var partialData = data.slice(header.dataStart);
self._receivingValue = {
header: header,
dataBuffers: [partialData],
bytesMissing: header.dataLength - partialData.length
};
return;
} else {
var msg = {
queue: header.queue,
data: data.slice(header.dataStart,header.dataEnd+1)
};
if(self._settings.decode){
msg.data = msg.data.toString();
}
self.emit('message', msg);
}
}
function parseValueHeader(data){
function parseValueHeader(buffer){
var endOfLineMarker = '\r\n';
var indexOfEndOfLineMarker = findEndOfLine(data,endOfLineMarker);
var indexOfEndOfLineMarker = findEndOfLine(buffer,endOfLineMarker);
if(indexOfEndOfLineMarker<=0){
throw new Error('incorrect \r\n in VALUE header');
}
var headerString = data.slice(0,indexOfEndOfLineMarker).toString();
var headerString = buffer.slice(0,indexOfEndOfLineMarker).toString();
var headerParts = headerString.split(' ');
if(headerParts[0] != 'VALUE'){
throw new Error('Invalid VALUE header', headerString);
}
var dataLength = parseInt(headerParts[3],10);
var headerLength = indexOfEndOfLineMarker + endOfLineMarker.length;
var dataEnd = headerLength+dataLength-1;
return {
queue: headerParts[1],
length: dataLength,
dataLength: dataLength,
dataStart: headerLength,
dataEnd: headerLength+dataLength
dataEnd: dataEnd,
incomplete: (dataEnd >= buffer.length)
};
}
@ -217,7 +264,7 @@ function findEndOfLine(buffer,endOfLineMarker){
}
var comparisonBuffer = buffer.slice(i,i+eol.length);
console.log(i,eol,comparisonBuffer);
if(bufferStartsWith(comparisonBuffer,eol)){
return i;
}


Loading…
Cancel
Save