Browse Source

messages should be read as buffers and not try to split data based on \r\n but on actual data length

pull/6/head
Matthias Goetzke 12 years ago
parent
commit
d706c44898
1 changed files with 105 additions and 18 deletions
  1. +105
    -18
      lib/kestrelClient.js

+ 105
- 18
lib/kestrelClient.js View File

@ -70,7 +70,7 @@ function _createConnection(port, host, self){
connection.on('error', function(err){ connection.on('error', function(err){
self._openConnection = null; self._openConnection = null;
self.emit('error', err); self.emit('error', err);
if(self._settings.reconnect){
if(self._settings.reconnect){
setTimeout(function(){ setTimeout(function(){
self.connect(); self.connect();
},self._settings.reconnectDelay); },self._settings.reconnectDelay);
@ -86,9 +86,15 @@ function _createConnection(port, host, self){
} }
function _handleData(data, self){ function _handleData(data, self){
data = data.toString();
if( data.match('STORED\r\n') ){
if( isResponse(data,'VALUE') ){
return _handleGet(data,self);
}
data = data.toString();
if( isResponse(data,'STORED') ) {
self.emit('stored', true); self.emit('stored', true);
if(self._pendingSetCallback){ if(self._pendingSetCallback){
var callback = self._pendingSetCallback; var callback = self._pendingSetCallback;
@ -97,48 +103,129 @@ function _handleData(data, self){
} }
} }
if( data.match(/^VALUE/) ){
_handleGet(data,self);
}
if( data.match(/^STAT/) ){
else if( isResponse(data,'STAT') ){
_handleStats(data,self); _handleStats(data,self);
} }
if( data.match(/^queue/) ){
else if( isResponse(data,'queue') ){
_handleDumpStats(data, self); _handleDumpStats(data, self);
} }
if( data == 'DELETED\r\n' ){
else if( isResponse(data,'DELETED\r\n') ){
self.emit('deleted', true); self.emit('deleted', true);
} }
if( data.match(/^VERSION/) ){
else if( isResponse(data,'VERSION') ){
self.emit('version', data.split(' ')[1].replace('\r\n','')); self.emit('version', data.split(' ')[1].replace('\r\n',''));
} }
if( data == 'Reloaded config.\r\n' ){
else if( isResponse(data,'Reloaded config.\r\n') ){
self.emit('reloaded', true); self.emit('reloaded', true);
} }
if( data == 'END\r\n' ){
else if( isResponse(data,'END\r\n') ){
self._openConnection = null; self._openConnection = null;
self.emit('empty', null); self.emit('empty', null);
} }
else {
console.log('unknown data from server',data, data.toString());
}
}
function isResponse(data,responseType){
var buffer = null;
if(Buffer.isBuffer(data)){
buffer = data;
} else {
buffer = new Buffer(data,'utf8');
}
return bufferStartsWithString(buffer,responseType);
}
function bufferStartsWithString(buffer,responseType){
var responseTypeAsBuffer = new Buffer(responseType,'utf8');
return bufferStartsWith(buffer,responseTypeAsBuffer);
}
function bufferStartsWith(buffer,prefixBuffer){
if(!buffer)
return false;
if( buffer.length < prefixBuffer.length){
return false;
}
for(var i = 0; i < prefixBuffer.length; i++) {
if(prefixBuffer[i] != buffer[i]){
return false;
}
}
return true;
} }
function _handleGet(data, self){ function _handleGet(data, self){
var msg = {}; var msg = {};
var header = parseValueHeader(data);
var parts = data.split('\r\n');
msg.queue = parts[0].match(/([a-zA-Z_]+)/mg)[1];
msg.data = parts[1].replace('\r\n','');
msg.queue = header.queue;
msg.data = data.slice(header.dataStart,header.dataEnd);
self.emit('message', msg); self.emit('message', msg);
} }
function parseValueHeader(data){
var endOfLineMarker = '\r\n';
var indexOfEndOfLineMarker = findEndOfLine(data,endOfLineMarker);
if(indexOfEndOfLineMarker<=0){
throw new Error('incorrect \r\n in VALUE header');
}
var headerString = data.slice(0,indexOfEndOfLineMarker).toString();
var headerParts = headerString.split(' ');
if(headerParts[0] != 'VALUE'){
throw new Error('Invalid VALUE header', headerString);
}
var dataLength = parseInt(headerParts[3],10);
var headerLength = indexOfEndOfLineMarker + endOfLineMarker.length;
return {
queue: headerParts[1],
length: dataLength,
dataStart: headerLength,
dataEnd: headerLength+dataLength
};
}
function findEndOfLine(buffer,endOfLineMarker){
var eol = new Buffer(endOfLineMarker,'utf8');
var firstEolCharacter = eol[0];
if(buffer.length < eol.length){
return;
}
for (var i = 0; i < buffer.length-eol.length; i++) {
if(buffer[i] !== firstEolCharacter){
continue;
}
var comparisonBuffer = buffer.slice(i,i+eol.length);
console.log(i,eol,comparisonBuffer);
if(bufferStartsWith(comparisonBuffer,eol)){
return i;
}
}
return -1;
}
function _handleStats(data, self){ function _handleStats(data, self){
var stats = {}; var stats = {};
@ -221,7 +308,7 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){
} }
var command = 'SET ' + queue + ' 0 ' + lifetime + ' '; var command = 'SET ' + queue + ' 0 ' + lifetime + ' ';
command += Buffer.byteLength(value, 'utf8') + '\r\n';
command += value.length + '\r\n';
var connection = this._getConnection(); var connection = this._getConnection();
if( connection ){ if( connection ){


Loading…
Cancel
Save