From e7a11219054d3b01c10a018aa83f51d1c27a7f92 Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Tue, 11 Mar 2014 09:50:48 +0100 Subject: [PATCH 1/8] fixed typo in package.json --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 0265a98..d045626 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ , "contributors": [ { "name": "brett_langdon", "email": "brett@blangdon.com" }, { "name": "pablo_casado", "email": "p.casado.arias@gmail.com" }, - { "name: "Matthias Goetzke", "email": "m.goetzke@curasystems.com", "url": "https://twitter.com/mgoetzke" }, + { "name": "Matthias Goetzke", "email": "m.goetzke@curasystems.com", "url": "https://twitter.com/mgoetzke" } ] , "repository":{ "type": "git" From a9e74dc3edf10d162821ab0d2aa21c64a8687f6a Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Tue, 11 Mar 2014 10:13:26 +0100 Subject: [PATCH 2/8] minimal change to add /open/close reliable fetch --- examples/consumer.js | 30 +++++++++++++++------ examples/producer.js | 18 ++++++++----- lib/kestrelClient.js | 48 +++++++++++++++++++++++++++++++--- lib/kestrelConsumer.js | 59 ++++++++++++++++++++++++++++++++++-------- 4 files changed, 125 insertions(+), 30 deletions(-) diff --git a/examples/consumer.js b/examples/consumer.js index 798728c..9598c12 100644 --- a/examples/consumer.js +++ b/examples/consumer.js @@ -1,17 +1,31 @@ -var kestrel = require('../'); +'use strict'; + +var kestrel = require('..'); var consumer = new kestrel.kestrelConsumer( 'test', { - connectionType: kestrel.connectionType.ROUND_ROBIN, - servers: ['127.0.0.1:22133'] + connectionType: kestrel.connectionType.ROUND_ROBIN, + servers: ['127.0.0.1:22133'] }); -function consumed(message){ - console.log('Consumed Message:'); - console.dir(message); +var counter = 0; + +function consumed(message,cb){ + console.log('Consumed Message:'); + console.dir(message); + + if(counter++>1){ + //produce failure; + return process.exit(1); + } + + if(cb){ + cb(); + } } -consumer.consume( consumed ); +consumer.consume( {reliable:true}, consumed ); +//unreliable still possible as before: consumer.consume( consumed ); setTimeout( function(){ - consumer.stopConsuming(); + consumer.stopConsuming(); }, 6000); \ No newline at end of file diff --git a/examples/producer.js b/examples/producer.js index 91efc72..b2979a4 100644 --- a/examples/producer.js +++ b/examples/producer.js @@ -1,3 +1,5 @@ +'use strict'; + var kestrel = require('../'); @@ -16,13 +18,15 @@ producer.on('stored', function(stored){ //lets input some data var interval = setInterval( function(){ - producer.send( (new Date().getTime()) + ' - New Message', function(err){ - if(err){ - console.log("ERR",err); - } else { - console.log("STORED"); - } - } ); + var message = (new Date().getTime()) + ' - New Message'; + + producer.send( message , function(err){ + if(err){ + console.log('ERR',err); + } else { + console.log('STORED:' + message); + } + } ); }, 100); diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index cf894a1..b7b4392 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -9,10 +9,13 @@ var kestrel = function( options ){ this._settings = { servers: ['127.0.0.1:22133'], - connectionType: types.ROUND_ROBIN + connectionType: types.ROUND_ROBIN, + reconnect: false, + reconnectDelay: 200 }; this._currentConnection = 0; this._connections = []; + this._openConnection = null; if( options instanceof Object ){ for( var key in options ){ @@ -65,8 +68,15 @@ function _createConnection(port, host, self){ var connection = net.connect(port,host); connection.on('error', function(err){ - console.dir(err); + self._openConnection = null; + self.emit('error', err); + if(self._settings.reconnect){ + setTimeout(function(){ + self.connect(); + },self._settings.reconnectDelay); + } }); + connection.on('data', function(data){ _handleData(data,self); self.emit('data', data); @@ -77,7 +87,8 @@ function _createConnection(port, host, self){ function _handleData(data, self){ data = data.toString(); - + console.log(data.substr(0,20)+'...') + if( data.match('STORED\r\n') ){ self.emit('stored', true); if(self._pendingSetCallback){ @@ -231,7 +242,7 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){ kestrel.prototype.get = function(queue, timeout){ var command = 'GET ' + queue; - + timeout = parseInt(timeout,10); if( timeout > 0 ){ command += '/t='+timeout; @@ -245,6 +256,35 @@ kestrel.prototype.get = function(queue, timeout){ return this; }; + +kestrel.prototype.getNextOpen = function(queue, timeout){ + var command = 'GET ' + queue; + + timeout = parseInt(timeout,10); + if( timeout > 0 ){ + command += '/t='+timeout; + } + + var connection = this._openConnection; + + if(connection) { + command += '/close'; + } else { + connection = this._getConnection(); + this._openConnection = connection; + } + + command += '/open'; + + if( connection ){ + connection.write(command + '\r\n'); + } + + return this; +}; + + + kestrel.prototype.delete = function(queue){ //delete given queue var connection = this._getConnection(); diff --git a/lib/kestrelConsumer.js b/lib/kestrelConsumer.js index 4324763..d728d5d 100644 --- a/lib/kestrelConsumer.js +++ b/lib/kestrelConsumer.js @@ -18,12 +18,20 @@ var consumer = function( queue, options ){ var self = this; this._client.on('message', function(message){ - if( typeof self._callback == 'function' ){ - self._callback(message); - } + if(self._consume){ - if( self._consume ){ - self.get(self._timeout); + if(self._consume.reliable){ + self._callback(message, function(err){ + if(err){ + self.stopConsuming(); + } else { + self._consumeNextMessage(); + } + }); + } else { + self._callback(message); + self._consumeNextMessage(); + } } self.emit('message', message); @@ -38,25 +46,54 @@ var consumer = function( queue, options ){ util.inherits(consumer,ee2); consumer.prototype.get = function( timeout ){ - this._client.get(this._queue, timeout); + this._client.get(this._queue, timeout); +}; + +consumer.prototype.getNextOpen = function( timeout ){ + this._client.getNextOpen(this._queue, timeout); }; -consumer.prototype.consume = function( callback ){ +consumer.prototype.consume = function( options, callback ){ + + if(typeof(options) == 'function'){ + callback = options; + options = { + reliable: false + }; + } + + if(!options){ + throw new Error('options must be an object or left out'); + } + + if(!callback){ + throw new Error('callback to consume messages required'); + } + if(!this._client){ this._client = client(this._options); this.client.connect(); } - if( typeof(callback) == 'function' ){ - this._callback = callback; + + this._callback = callback; + this._consume = options; + + this._consumeNextMessage(); +}; + +consumer.prototype._consumeNextMessage = function(){ + if(this._consume.reliable){ + this.getNextOpen(this._timeout); + } else { + this.get(this._timeout); } - this._consume = true; - this.get(this._timeout); }; consumer.prototype.stopConsuming = function(){ this._consume = false; this._client.close(); this._client = null; + this.emit('stop'); }; module.exports = consumer; From 51393062983e6628799418dcc254c65d35f9505d Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Tue, 11 Mar 2014 10:14:15 +0100 Subject: [PATCH 3/8] removed debug log --- lib/kestrelClient.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index b7b4392..beedf01 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -87,7 +87,6 @@ function _createConnection(port, host, self){ function _handleData(data, self){ data = data.toString(); - console.log(data.substr(0,20)+'...') if( data.match('STORED\r\n') ){ self.emit('stored', true); From 7a6c40e8a315d186c25460d998923ee0e48ba212 Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Tue, 11 Mar 2014 10:20:13 +0100 Subject: [PATCH 4/8] removed whitespaces --- lib/kestrelConsumer.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/kestrelConsumer.js b/lib/kestrelConsumer.js index d728d5d..fa5d047 100644 --- a/lib/kestrelConsumer.js +++ b/lib/kestrelConsumer.js @@ -30,7 +30,7 @@ var consumer = function( queue, options ){ }); } else { self._callback(message); - self._consumeNextMessage(); + self._consumeNextMessage(); } } @@ -81,7 +81,7 @@ consumer.prototype.consume = function( options, callback ){ this._consumeNextMessage(); }; -consumer.prototype._consumeNextMessage = function(){ +consumer.prototype._consumeNextMessage = function(){ if(this._consume.reliable){ this.getNextOpen(this._timeout); } else { From 0520ae660705272521a3b692f47b2f0af352f435 Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Tue, 11 Mar 2014 10:46:47 +0100 Subject: [PATCH 5/8] added reconnect when reliable read returns error --- examples/consumer.js | 12 +++++++--- lib/kestrelClient.js | 6 +++-- lib/kestrelConsumer.js | 54 +++++++++++++++++++++++++++--------------- 3 files changed, 48 insertions(+), 24 deletions(-) diff --git a/examples/consumer.js b/examples/consumer.js index 9598c12..c417faf 100644 --- a/examples/consumer.js +++ b/examples/consumer.js @@ -13,13 +13,19 @@ function consumed(message,cb){ console.log('Consumed Message:'); console.dir(message); - if(counter++>1){ - //produce failure; + var errorConsumingMessage = null; + counter++; + + if(counter==2){ + errorConsumingMessage = new Error('oops'); + } + else if(counter==5){ + //simulate failure by never answering; return process.exit(1); } if(cb){ - cb(); + cb(errorConsumingMessage); } } diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index beedf01..7a44a64 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -87,7 +87,7 @@ function _createConnection(port, host, self){ function _handleData(data, self){ data = data.toString(); - + if( data.match('STORED\r\n') ){ self.emit('stored', true); if(self._pendingSetCallback){ @@ -220,7 +220,7 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){ } var command = 'SET ' + queue + ' 0 ' + lifetime + ' '; - command += Buffer.byteLength(value, 'utf8') + '\r\n' + value + '\r\n'; + command += Buffer.byteLength(value, 'utf8') + '\r\n'; var connection = this._getConnection(); if( connection ){ @@ -228,6 +228,8 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){ this._pendingSetCallback = callback; } connection.write(command); + connection.write(value); + connection.write('\r\n'); } else { if(callback){ callback('No connection'); diff --git a/lib/kestrelConsumer.js b/lib/kestrelConsumer.js index fa5d047..8c8c2b9 100644 --- a/lib/kestrelConsumer.js +++ b/lib/kestrelConsumer.js @@ -1,29 +1,41 @@ 'use strict'; -var client = require('./kestrelClient.js'); +var KestrelClient = require('./kestrelClient.js'); var ee2 = require('eventemitter2').EventEmitter2; var util = require('util'); var consumer = function( queue, options ){ this._queue = queue; - this._options = new client(options); - this._client = new client(options); - this._client.connect(); - + this._options = options; + this._callback = null; this._consume = false; this._timeout = 3000; - var self = this; + this._client = connectToClient(this); + + ee2.call(this); +}; +util.inherits(consumer,ee2); + +function connectToClient(self){ + + var client = new KestrelClient(self._options); + client.connect(); - this._client.on('message', function(message){ + client.on('message', function(message){ if(self._consume){ if(self._consume.reliable){ self._callback(message, function(err){ if(err){ - self.stopConsuming(); + var consumeOptions = self._consume; + var consumeCallback = self._callback; + + self.stopConsuming(function(){ + self.consume(consumeOptions,consumeCallback); + }); } else { self._consumeNextMessage(); } @@ -37,13 +49,12 @@ var consumer = function( queue, options ){ self.emit('message', message); }); - this._client.on('empty', function(){ + client.on('empty', function(){ self.get(self._timeout); }); - ee2.call(this); -}; -util.inherits(consumer,ee2); + return client; +} consumer.prototype.get = function( timeout ){ this._client.get(this._queue, timeout); @@ -71,8 +82,7 @@ consumer.prototype.consume = function( options, callback ){ } if(!this._client){ - this._client = client(this._options); - this.client.connect(); + this._client = connectToClient(this); } this._callback = callback; @@ -89,11 +99,17 @@ consumer.prototype._consumeNextMessage = function(){ } }; -consumer.prototype.stopConsuming = function(){ - this._consume = false; - this._client.close(); - this._client = null; - this.emit('stop'); +consumer.prototype.stopConsuming = function(cb){ + if(this._consume){ + this._consume = false; + this._client.close(); + this._client = null; + this.emit('stop'); + } + + if(cb){ + cb(); + } }; module.exports = consumer; From 588b48ed3c0e80a40e4def68415b60639612b6b6 Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Tue, 11 Mar 2014 10:52:57 +0100 Subject: [PATCH 6/8] corrected consume and reliable fetch when hitting end of queue --- lib/kestrelClient.js | 1 + lib/kestrelConsumer.js | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index 7a44a64..2acbf20 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -123,6 +123,7 @@ function _handleData(data, self){ if( data == 'END\r\n' ){ + self._openConnection = null; self.emit('empty', null); } diff --git a/lib/kestrelConsumer.js b/lib/kestrelConsumer.js index 8c8c2b9..6c70d1c 100644 --- a/lib/kestrelConsumer.js +++ b/lib/kestrelConsumer.js @@ -37,12 +37,12 @@ function connectToClient(self){ self.consume(consumeOptions,consumeCallback); }); } else { - self._consumeNextMessage(); + self._getNextMessage(); } }); } else { self._callback(message); - self._consumeNextMessage(); + self._getNextMessage(); } } @@ -50,7 +50,7 @@ function connectToClient(self){ }); client.on('empty', function(){ - self.get(self._timeout); + self._getNextMessage(); }); return client; @@ -88,10 +88,10 @@ consumer.prototype.consume = function( options, callback ){ this._callback = callback; this._consume = options; - this._consumeNextMessage(); + this._getNextMessage(); }; -consumer.prototype._consumeNextMessage = function(){ +consumer.prototype._getNextMessage = function(){ if(this._consume.reliable){ this.getNextOpen(this._timeout); } else { From d706c44898f30b92cf6b1ae00ae8ed94e8877b23 Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Tue, 11 Mar 2014 16:00:07 +0100 Subject: [PATCH 7/8] messages should be read as buffers and not try to split data based on \r\n but on actual data length --- lib/kestrelClient.js | 123 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 105 insertions(+), 18 deletions(-) diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index 2acbf20..b732fa6 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -70,7 +70,7 @@ function _createConnection(port, host, self){ connection.on('error', function(err){ self._openConnection = null; self.emit('error', err); - if(self._settings.reconnect){ + if(self._settings.reconnect){ setTimeout(function(){ self.connect(); },self._settings.reconnectDelay); @@ -86,9 +86,15 @@ function _createConnection(port, host, self){ } function _handleData(data, self){ - data = data.toString(); + - if( data.match('STORED\r\n') ){ + if( isResponse(data,'VALUE') ){ + return _handleGet(data,self); + } + + data = data.toString(); + + if( isResponse(data,'STORED') ) { self.emit('stored', true); if(self._pendingSetCallback){ var callback = self._pendingSetCallback; @@ -97,48 +103,129 @@ function _handleData(data, self){ } } - if( data.match(/^VALUE/) ){ - _handleGet(data,self); - } - - if( data.match(/^STAT/) ){ + else if( isResponse(data,'STAT') ){ _handleStats(data,self); } - if( data.match(/^queue/) ){ + else if( isResponse(data,'queue') ){ _handleDumpStats(data, self); } - if( data == 'DELETED\r\n' ){ + else if( isResponse(data,'DELETED\r\n') ){ self.emit('deleted', true); } - if( data.match(/^VERSION/) ){ + else if( isResponse(data,'VERSION') ){ self.emit('version', data.split(' ')[1].replace('\r\n','')); } - if( data == 'Reloaded config.\r\n' ){ + else if( isResponse(data,'Reloaded config.\r\n') ){ self.emit('reloaded', true); } - - if( data == 'END\r\n' ){ + else if( isResponse(data,'END\r\n') ){ self._openConnection = null; self.emit('empty', null); } + else { + console.log('unknown data from server',data, data.toString()); + } +} + +function isResponse(data,responseType){ + var buffer = null; + + if(Buffer.isBuffer(data)){ + buffer = data; + } else { + buffer = new Buffer(data,'utf8'); + } + return bufferStartsWithString(buffer,responseType); +} + +function bufferStartsWithString(buffer,responseType){ + var responseTypeAsBuffer = new Buffer(responseType,'utf8'); + return bufferStartsWith(buffer,responseTypeAsBuffer); +} + +function bufferStartsWith(buffer,prefixBuffer){ + if(!buffer) + return false; + + if( buffer.length < prefixBuffer.length){ + return false; + } + + for(var i = 0; i < prefixBuffer.length; i++) { + if(prefixBuffer[i] != buffer[i]){ + return false; + } + } + + return true; } function _handleGet(data, self){ var msg = {}; + + var header = parseValueHeader(data); - var parts = data.split('\r\n'); - msg.queue = parts[0].match(/([a-zA-Z_]+)/mg)[1]; - msg.data = parts[1].replace('\r\n',''); + msg.queue = header.queue; + msg.data = data.slice(header.dataStart,header.dataEnd); self.emit('message', msg); } +function parseValueHeader(data){ + var endOfLineMarker = '\r\n'; + var indexOfEndOfLineMarker = findEndOfLine(data,endOfLineMarker); + + if(indexOfEndOfLineMarker<=0){ + throw new Error('incorrect \r\n in VALUE header'); + } + + var headerString = data.slice(0,indexOfEndOfLineMarker).toString(); + var headerParts = headerString.split(' '); + + if(headerParts[0] != 'VALUE'){ + throw new Error('Invalid VALUE header', headerString); + } + + var dataLength = parseInt(headerParts[3],10); + var headerLength = indexOfEndOfLineMarker + endOfLineMarker.length; + + return { + queue: headerParts[1], + length: dataLength, + dataStart: headerLength, + dataEnd: headerLength+dataLength + }; +} + +function findEndOfLine(buffer,endOfLineMarker){ + var eol = new Buffer(endOfLineMarker,'utf8'); + var firstEolCharacter = eol[0]; + + if(buffer.length < eol.length){ + return; + } + + for (var i = 0; i < buffer.length-eol.length; i++) { + if(buffer[i] !== firstEolCharacter){ + continue; + } + + var comparisonBuffer = buffer.slice(i,i+eol.length); + console.log(i,eol,comparisonBuffer); + if(bufferStartsWith(comparisonBuffer,eol)){ + return i; + } + } + + return -1; +} + function _handleStats(data, self){ var stats = {}; @@ -221,7 +308,7 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){ } var command = 'SET ' + queue + ' 0 ' + lifetime + ' '; - command += Buffer.byteLength(value, 'utf8') + '\r\n'; + command += value.length + '\r\n'; var connection = this._getConnection(); if( connection ){ From 24492599bd37ec67cb480a6440808133e56f153e Mon Sep 17 00:00:00 2001 From: Matthias Goetzke Date: Tue, 11 Mar 2014 16:29:32 +0100 Subject: [PATCH 8/8] added support for values longer than one tcp data packet, also added decode setting which defaults to true and converts incoming values toString() --- examples/consumer.js | 6 ++-- lib/kestrelClient.js | 85 ++++++++++++++++++++++++++++++++++---------- 2 files changed, 70 insertions(+), 21 deletions(-) diff --git a/examples/consumer.js b/examples/consumer.js index c417faf..e663323 100644 --- a/examples/consumer.js +++ b/examples/consumer.js @@ -4,14 +4,16 @@ var kestrel = require('..'); var consumer = new kestrel.kestrelConsumer( 'test', { connectionType: kestrel.connectionType.ROUND_ROBIN, - servers: ['127.0.0.1:22133'] + servers: ['127.0.0.1:22133'], + decode: true }); var counter = 0; function consumed(message,cb){ - console.log('Consumed Message:'); + console.log('Consumed Message:',message.data.length); console.dir(message); + //console.log('.'); var errorConsumingMessage = null; counter++; diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index b732fa6..c800be6 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -11,11 +11,13 @@ var kestrel = function( options ){ servers: ['127.0.0.1:22133'], connectionType: types.ROUND_ROBIN, reconnect: false, - reconnectDelay: 200 + reconnectDelay: 200, + decode: true }; this._currentConnection = 0; this._connections = []; this._openConnection = null; + this._receivingValue = null; if( options instanceof Object ){ for( var key in options ){ @@ -92,7 +94,7 @@ function _handleData(data, self){ return _handleGet(data,self); } - data = data.toString(); + var stringData = data.toString(); if( isResponse(data,'STORED') ) { self.emit('stored', true); @@ -104,11 +106,11 @@ function _handleData(data, self){ } else if( isResponse(data,'STAT') ){ - _handleStats(data,self); + _handleStats(stringData,self); } else if( isResponse(data,'queue') ){ - _handleDumpStats(data, self); + _handleDumpStats(stringData, self); } else if( isResponse(data,'DELETED\r\n') ){ @@ -116,7 +118,7 @@ function _handleData(data, self){ } else if( isResponse(data,'VERSION') ){ - self.emit('version', data.split(' ')[1].replace('\r\n','')); + self.emit('version', stringData.split(' ')[1].replace('\r\n','')); } else if( isResponse(data,'Reloaded config.\r\n') ){ @@ -129,7 +131,7 @@ function _handleData(data, self){ } else { - console.log('unknown data from server',data, data.toString()); + _handleGet(data,self); } } @@ -167,39 +169,84 @@ function bufferStartsWith(buffer,prefixBuffer){ } function _handleGet(data, self){ - var msg = {}; + + if(self._receivingValue){ + var buffer = data.slice(0,self._receivingValue.bytesMissing); + self._receivingValue.bytesMissing -= buffer.length; + self._receivingValue.dataBuffers.push(buffer); + + if(self._receivingValue.bytesMissing === 0){ + + var msg = { + queue: self._receivingValue.header.queue, + data: Buffer.concat(self._receivingValue.dataBuffers) + }; + + self._receivingValue = null; + + if(self._settings.decode){ + msg.data = msg.data.toString(); + } + + self.emit('message', msg); + } + return; + } var header = parseValueHeader(data); - - msg.queue = header.queue; - msg.data = data.slice(header.dataStart,header.dataEnd); - - self.emit('message', msg); + + if(header.incomplete){ + var partialData = data.slice(header.dataStart); + + self._receivingValue = { + header: header, + dataBuffers: [partialData], + bytesMissing: header.dataLength - partialData.length + }; + + return; + + } else { + + var msg = { + queue: header.queue, + data: data.slice(header.dataStart,header.dataEnd+1) + }; + + if(self._settings.decode){ + msg.data = msg.data.toString(); + } + + self.emit('message', msg); + } } -function parseValueHeader(data){ + +function parseValueHeader(buffer){ var endOfLineMarker = '\r\n'; - var indexOfEndOfLineMarker = findEndOfLine(data,endOfLineMarker); + var indexOfEndOfLineMarker = findEndOfLine(buffer,endOfLineMarker); if(indexOfEndOfLineMarker<=0){ throw new Error('incorrect \r\n in VALUE header'); } - var headerString = data.slice(0,indexOfEndOfLineMarker).toString(); + var headerString = buffer.slice(0,indexOfEndOfLineMarker).toString(); var headerParts = headerString.split(' '); - + if(headerParts[0] != 'VALUE'){ throw new Error('Invalid VALUE header', headerString); } var dataLength = parseInt(headerParts[3],10); var headerLength = indexOfEndOfLineMarker + endOfLineMarker.length; + var dataEnd = headerLength+dataLength-1; return { queue: headerParts[1], - length: dataLength, + dataLength: dataLength, dataStart: headerLength, - dataEnd: headerLength+dataLength + dataEnd: dataEnd, + incomplete: (dataEnd >= buffer.length) }; } @@ -217,7 +264,7 @@ function findEndOfLine(buffer,endOfLineMarker){ } var comparisonBuffer = buffer.slice(i,i+eol.length); - console.log(i,eol,comparisonBuffer); + if(bufferStartsWith(comparisonBuffer,eol)){ return i; }