From d706c44898f30b92cf6b1ae00ae8ed94e8877b23 Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Tue, 11 Mar 2014 16:00:07 +0100 Subject: [PATCH] messages should be read as buffers and not try to split data based on \r\n but on actual data length --- lib/kestrelClient.js | 123 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 105 insertions(+), 18 deletions(-) diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index 2acbf20..b732fa6 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -70,7 +70,7 @@ function _createConnection(port, host, self){ connection.on('error', function(err){ self._openConnection = null; self.emit('error', err); - if(self._settings.reconnect){ + if(self._settings.reconnect){ setTimeout(function(){ self.connect(); },self._settings.reconnectDelay); @@ -86,9 +86,15 @@ function _createConnection(port, host, self){ } function _handleData(data, self){ - data = data.toString(); + - if( data.match('STORED\r\n') ){ + if( isResponse(data,'VALUE') ){ + return _handleGet(data,self); + } + + data = data.toString(); + + if( isResponse(data,'STORED') ) { self.emit('stored', true); if(self._pendingSetCallback){ var callback = self._pendingSetCallback; @@ -97,48 +103,129 @@ function _handleData(data, self){ } } - if( data.match(/^VALUE/) ){ - _handleGet(data,self); - } - - if( data.match(/^STAT/) ){ + else if( isResponse(data,'STAT') ){ _handleStats(data,self); } - if( data.match(/^queue/) ){ + else if( isResponse(data,'queue') ){ _handleDumpStats(data, self); } - if( data == 'DELETED\r\n' ){ + else if( isResponse(data,'DELETED\r\n') ){ self.emit('deleted', true); } - if( data.match(/^VERSION/) ){ + else if( isResponse(data,'VERSION') ){ self.emit('version', data.split(' ')[1].replace('\r\n','')); } - if( data == 'Reloaded config.\r\n' ){ + else if( isResponse(data,'Reloaded config.\r\n') ){ self.emit('reloaded', true); } - - if( data == 'END\r\n' ){ + else if( isResponse(data,'END\r\n') ){ self._openConnection = null; self.emit('empty', null); } + else { + console.log('unknown data from server',data, data.toString()); + } +} + +function isResponse(data,responseType){ + var buffer = null; + + if(Buffer.isBuffer(data)){ + buffer = data; + } else { + buffer = new Buffer(data,'utf8'); + } + return bufferStartsWithString(buffer,responseType); +} + +function bufferStartsWithString(buffer,responseType){ + var responseTypeAsBuffer = new Buffer(responseType,'utf8'); + return bufferStartsWith(buffer,responseTypeAsBuffer); +} + +function bufferStartsWith(buffer,prefixBuffer){ + if(!buffer) + return false; + + if( buffer.length < prefixBuffer.length){ + return false; + } + + for(var i = 0; i < prefixBuffer.length; i++) { + if(prefixBuffer[i] != buffer[i]){ + return false; + } + } + + return true; } function _handleGet(data, self){ var msg = {}; + + var header = parseValueHeader(data); - var parts = data.split('\r\n'); - msg.queue = parts[0].match(/([a-zA-Z_]+)/mg)[1]; - msg.data = parts[1].replace('\r\n',''); + msg.queue = header.queue; + msg.data = data.slice(header.dataStart,header.dataEnd); self.emit('message', msg); } +function parseValueHeader(data){ + var endOfLineMarker = '\r\n'; + var indexOfEndOfLineMarker = findEndOfLine(data,endOfLineMarker); + + if(indexOfEndOfLineMarker<=0){ + throw new Error('incorrect \r\n in VALUE header'); + } + + var headerString = data.slice(0,indexOfEndOfLineMarker).toString(); + var headerParts = headerString.split(' '); + + if(headerParts[0] != 'VALUE'){ + throw new Error('Invalid VALUE header', headerString); + } + + var dataLength = parseInt(headerParts[3],10); + var headerLength = indexOfEndOfLineMarker + endOfLineMarker.length; + + return { + queue: headerParts[1], + length: dataLength, + dataStart: headerLength, + dataEnd: headerLength+dataLength + }; +} + +function findEndOfLine(buffer,endOfLineMarker){ + var eol = new Buffer(endOfLineMarker,'utf8'); + var firstEolCharacter = eol[0]; + + if(buffer.length < eol.length){ + return; + } + + for (var i = 0; i < buffer.length-eol.length; i++) { + if(buffer[i] !== firstEolCharacter){ + continue; + } + + var comparisonBuffer = buffer.slice(i,i+eol.length); + console.log(i,eol,comparisonBuffer); + if(bufferStartsWith(comparisonBuffer,eol)){ + return i; + } + } + + return -1; +} + function _handleStats(data, self){ var stats = {}; @@ -221,7 +308,7 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){ } var command = 'SET ' + queue + ' 0 ' + lifetime + ' '; - command += Buffer.byteLength(value, 'utf8') + '\r\n'; + command += value.length + '\r\n'; var connection = this._getConnection(); if( connection ){