From 24492599bd37ec67cb480a6440808133e56f153e Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Tue, 11 Mar 2014 16:29:32 +0100 Subject: [PATCH] added support for values longer than one tcp data packet, also added decode setting which defaults to true and converts incoming values toString() --- examples/consumer.js | 6 ++-- lib/kestrelClient.js | 85 ++++++++++++++++++++++++++++++++++---------- 2 files changed, 70 insertions(+), 21 deletions(-) diff --git a/examples/consumer.js b/examples/consumer.js index c417faf..e663323 100644 --- a/examples/consumer.js +++ b/examples/consumer.js @@ -4,14 +4,16 @@ var kestrel = require('..'); var consumer = new kestrel.kestrelConsumer( 'test', { connectionType: kestrel.connectionType.ROUND_ROBIN, - servers: ['127.0.0.1:22133'] + servers: ['127.0.0.1:22133'], + decode: true }); var counter = 0; function consumed(message,cb){ - console.log('Consumed Message:'); + console.log('Consumed Message:',message.data.length); console.dir(message); + //console.log('.'); var errorConsumingMessage = null; counter++; diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index b732fa6..c800be6 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -11,11 +11,13 @@ var kestrel = function( options ){ servers: ['127.0.0.1:22133'], connectionType: types.ROUND_ROBIN, reconnect: false, - reconnectDelay: 200 + reconnectDelay: 200, + decode: true }; this._currentConnection = 0; this._connections = []; this._openConnection = null; + this._receivingValue = null; if( options instanceof Object ){ for( var key in options ){ @@ -92,7 +94,7 @@ function _handleData(data, self){ return _handleGet(data,self); } - data = data.toString(); + var stringData = data.toString(); if( isResponse(data,'STORED') ) { self.emit('stored', true); @@ -104,11 +106,11 @@ function _handleData(data, self){ } else if( isResponse(data,'STAT') ){ - _handleStats(data,self); + _handleStats(stringData,self); } else if( isResponse(data,'queue') ){ - _handleDumpStats(data, self); + _handleDumpStats(stringData, self); } else if( isResponse(data,'DELETED\r\n') ){ @@ -116,7 +118,7 @@ function _handleData(data, self){ } else if( isResponse(data,'VERSION') ){ - self.emit('version', data.split(' ')[1].replace('\r\n','')); + self.emit('version', stringData.split(' ')[1].replace('\r\n','')); } else if( isResponse(data,'Reloaded config.\r\n') ){ @@ -129,7 +131,7 @@ function _handleData(data, self){ } else { - console.log('unknown data from server',data, data.toString()); + _handleGet(data,self); } } @@ -167,39 +169,84 @@ function bufferStartsWith(buffer,prefixBuffer){ } function _handleGet(data, self){ - var msg = {}; + + if(self._receivingValue){ + var buffer = data.slice(0,self._receivingValue.bytesMissing); + self._receivingValue.bytesMissing -= buffer.length; + self._receivingValue.dataBuffers.push(buffer); + + if(self._receivingValue.bytesMissing === 0){ + + var msg = { + queue: self._receivingValue.header.queue, + data: Buffer.concat(self._receivingValue.dataBuffers) + }; + + self._receivingValue = null; + + if(self._settings.decode){ + msg.data = msg.data.toString(); + } + + self.emit('message', msg); + } + return; + } var header = parseValueHeader(data); - - msg.queue = header.queue; - msg.data = data.slice(header.dataStart,header.dataEnd); - - self.emit('message', msg); + + if(header.incomplete){ + var partialData = data.slice(header.dataStart); + + self._receivingValue = { + header: header, + dataBuffers: [partialData], + bytesMissing: header.dataLength - partialData.length + }; + + return; + + } else { + + var msg = { + queue: header.queue, + data: data.slice(header.dataStart,header.dataEnd+1) + }; + + if(self._settings.decode){ + msg.data = msg.data.toString(); + } + + self.emit('message', msg); + } } -function parseValueHeader(data){ + +function parseValueHeader(buffer){ var endOfLineMarker = '\r\n'; - var indexOfEndOfLineMarker = findEndOfLine(data,endOfLineMarker); + var indexOfEndOfLineMarker = findEndOfLine(buffer,endOfLineMarker); if(indexOfEndOfLineMarker<=0){ throw new Error('incorrect \r\n in VALUE header'); } - var headerString = data.slice(0,indexOfEndOfLineMarker).toString(); + var headerString = buffer.slice(0,indexOfEndOfLineMarker).toString(); var headerParts = headerString.split(' '); - + if(headerParts[0] != 'VALUE'){ throw new Error('Invalid VALUE header', headerString); } var dataLength = parseInt(headerParts[3],10); var headerLength = indexOfEndOfLineMarker + endOfLineMarker.length; + var dataEnd = headerLength+dataLength-1; return { queue: headerParts[1], - length: dataLength, + dataLength: dataLength, dataStart: headerLength, - dataEnd: headerLength+dataLength + dataEnd: dataEnd, + incomplete: (dataEnd >= buffer.length) }; } @@ -217,7 +264,7 @@ function findEndOfLine(buffer,endOfLineMarker){ } var comparisonBuffer = buffer.slice(i,i+eol.length); - console.log(i,eol,comparisonBuffer); + if(bufferStartsWith(comparisonBuffer,eol)){ return i; }