diff --git a/examples/consumer.js b/examples/consumer.js index 798728c..e663323 100644 --- a/examples/consumer.js +++ b/examples/consumer.js @@ -1,17 +1,39 @@ -var kestrel = require('../'); +'use strict'; + +var kestrel = require('..'); var consumer = new kestrel.kestrelConsumer( 'test', { - connectionType: kestrel.connectionType.ROUND_ROBIN, - servers: ['127.0.0.1:22133'] + connectionType: kestrel.connectionType.ROUND_ROBIN, + servers: ['127.0.0.1:22133'], + decode: true }); -function consumed(message){ - console.log('Consumed Message:'); - console.dir(message); +var counter = 0; + +function consumed(message,cb){ + console.log('Consumed Message:',message.data.length); + console.dir(message); + //console.log('.'); + + var errorConsumingMessage = null; + counter++; + + if(counter==2){ + errorConsumingMessage = new Error('oops'); + } + else if(counter==5){ + //simulate failure by never answering; + return process.exit(1); + } + + if(cb){ + cb(errorConsumingMessage); + } } -consumer.consume( consumed ); +consumer.consume( {reliable:true}, consumed ); +//unreliable still possible as before: consumer.consume( consumed ); setTimeout( function(){ - consumer.stopConsuming(); + consumer.stopConsuming(); }, 6000); \ No newline at end of file diff --git a/examples/producer.js b/examples/producer.js index 91efc72..b2979a4 100644 --- a/examples/producer.js +++ b/examples/producer.js @@ -1,3 +1,5 @@ +'use strict'; + var kestrel = require('../'); @@ -16,13 +18,15 @@ producer.on('stored', function(stored){ //lets input some data var interval = setInterval( function(){ - producer.send( (new Date().getTime()) + ' - New Message', function(err){ - if(err){ - console.log("ERR",err); - } else { - console.log("STORED"); - } - } ); + var message = (new Date().getTime()) + ' - New Message'; + + producer.send( message , function(err){ + if(err){ + console.log('ERR',err); + } else { + console.log('STORED:' + message); + } + } ); }, 100); diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index cf894a1..c800be6 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -9,10 +9,15 @@ var kestrel = function( options ){ this._settings = { servers: ['127.0.0.1:22133'], - connectionType: types.ROUND_ROBIN + connectionType: types.ROUND_ROBIN, + reconnect: false, + reconnectDelay: 200, + decode: true }; this._currentConnection = 0; this._connections = []; + this._openConnection = null; + this._receivingValue = null; if( options instanceof Object ){ for( var key in options ){ @@ -65,8 +70,15 @@ function _createConnection(port, host, self){ var connection = net.connect(port,host); connection.on('error', function(err){ - console.dir(err); + self._openConnection = null; + self.emit('error', err); + if(self._settings.reconnect){ + setTimeout(function(){ + self.connect(); + },self._settings.reconnectDelay); + } }); + connection.on('data', function(data){ _handleData(data,self); self.emit('data', data); @@ -76,9 +88,15 @@ function _createConnection(port, host, self){ } function _handleData(data, self){ - data = data.toString(); + - if( data.match('STORED\r\n') ){ + if( isResponse(data,'VALUE') ){ + return _handleGet(data,self); + } + + var stringData = data.toString(); + + if( isResponse(data,'STORED') ) { self.emit('stored', true); if(self._pendingSetCallback){ var callback = self._pendingSetCallback; @@ -87,45 +105,172 @@ function _handleData(data, self){ } } - if( data.match(/^VALUE/) ){ - _handleGet(data,self); - } - - if( data.match(/^STAT/) ){ - _handleStats(data,self); + else if( isResponse(data,'STAT') ){ + _handleStats(stringData,self); } - if( data.match(/^queue/) ){ - _handleDumpStats(data, self); + else if( isResponse(data,'queue') ){ + _handleDumpStats(stringData, self); } - if( data == 'DELETED\r\n' ){ + else if( isResponse(data,'DELETED\r\n') ){ self.emit('deleted', true); } - if( data.match(/^VERSION/) ){ - self.emit('version', data.split(' ')[1].replace('\r\n','')); + else if( isResponse(data,'VERSION') ){ + self.emit('version', stringData.split(' ')[1].replace('\r\n','')); } - if( data == 'Reloaded config.\r\n' ){ + else if( isResponse(data,'Reloaded config.\r\n') ){ self.emit('reloaded', true); } - - if( data == 'END\r\n' ){ + else if( isResponse(data,'END\r\n') ){ + self._openConnection = null; self.emit('empty', null); } + else { + _handleGet(data,self); + } +} + +function isResponse(data,responseType){ + var buffer = null; + + if(Buffer.isBuffer(data)){ + buffer = data; + } else { + buffer = new Buffer(data,'utf8'); + } + return bufferStartsWithString(buffer,responseType); +} + +function bufferStartsWithString(buffer,responseType){ + var responseTypeAsBuffer = new Buffer(responseType,'utf8'); + return bufferStartsWith(buffer,responseTypeAsBuffer); +} + +function bufferStartsWith(buffer,prefixBuffer){ + if(!buffer) + return false; + + if( buffer.length < prefixBuffer.length){ + return false; + } + + for(var i = 0; i < prefixBuffer.length; i++) { + if(prefixBuffer[i] != buffer[i]){ + return false; + } + } + + return true; } function _handleGet(data, self){ - var msg = {}; - var parts = data.split('\r\n'); - msg.queue = parts[0].match(/([a-zA-Z_]+)/mg)[1]; - msg.data = parts[1].replace('\r\n',''); + if(self._receivingValue){ + var buffer = data.slice(0,self._receivingValue.bytesMissing); + self._receivingValue.bytesMissing -= buffer.length; + self._receivingValue.dataBuffers.push(buffer); + + if(self._receivingValue.bytesMissing === 0){ + + var msg = { + queue: self._receivingValue.header.queue, + data: Buffer.concat(self._receivingValue.dataBuffers) + }; + + self._receivingValue = null; + + if(self._settings.decode){ + msg.data = msg.data.toString(); + } + + self.emit('message', msg); + } + return; + } + + var header = parseValueHeader(data); + + if(header.incomplete){ + var partialData = data.slice(header.dataStart); + + self._receivingValue = { + header: header, + dataBuffers: [partialData], + bytesMissing: header.dataLength - partialData.length + }; + + return; + + } else { + + var msg = { + queue: header.queue, + data: data.slice(header.dataStart,header.dataEnd+1) + }; + + if(self._settings.decode){ + msg.data = msg.data.toString(); + } + + self.emit('message', msg); + } +} + + +function parseValueHeader(buffer){ + var endOfLineMarker = '\r\n'; + var indexOfEndOfLineMarker = findEndOfLine(buffer,endOfLineMarker); + + if(indexOfEndOfLineMarker<=0){ + throw new Error('incorrect \r\n in VALUE header'); + } + + var headerString = buffer.slice(0,indexOfEndOfLineMarker).toString(); + var headerParts = headerString.split(' '); + + if(headerParts[0] != 'VALUE'){ + throw new Error('Invalid VALUE header', headerString); + } + + var dataLength = parseInt(headerParts[3],10); + var headerLength = indexOfEndOfLineMarker + endOfLineMarker.length; + var dataEnd = headerLength+dataLength-1; + + return { + queue: headerParts[1], + dataLength: dataLength, + dataStart: headerLength, + dataEnd: dataEnd, + incomplete: (dataEnd >= buffer.length) + }; +} + +function findEndOfLine(buffer,endOfLineMarker){ + var eol = new Buffer(endOfLineMarker,'utf8'); + var firstEolCharacter = eol[0]; - self.emit('message', msg); + if(buffer.length < eol.length){ + return; + } + + for (var i = 0; i < buffer.length-eol.length; i++) { + if(buffer[i] !== firstEolCharacter){ + continue; + } + + var comparisonBuffer = buffer.slice(i,i+eol.length); + + if(bufferStartsWith(comparisonBuffer,eol)){ + return i; + } + } + + return -1; } function _handleStats(data, self){ @@ -210,7 +355,7 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){ } var command = 'SET ' + queue + ' 0 ' + lifetime + ' '; - command += Buffer.byteLength(value, 'utf8') + '\r\n' + value + '\r\n'; + command += value.length + '\r\n'; var connection = this._getConnection(); if( connection ){ @@ -218,6 +363,8 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){ this._pendingSetCallback = callback; } connection.write(command); + connection.write(value); + connection.write('\r\n'); } else { if(callback){ callback('No connection'); @@ -231,7 +378,7 @@ kestrel.prototype.set = function( queue, value, lifetime, callback){ kestrel.prototype.get = function(queue, timeout){ var command = 'GET ' + queue; - + timeout = parseInt(timeout,10); if( timeout > 0 ){ command += '/t='+timeout; @@ -245,6 +392,35 @@ kestrel.prototype.get = function(queue, timeout){ return this; }; + +kestrel.prototype.getNextOpen = function(queue, timeout){ + var command = 'GET ' + queue; + + timeout = parseInt(timeout,10); + if( timeout > 0 ){ + command += '/t='+timeout; + } + + var connection = this._openConnection; + + if(connection) { + command += '/close'; + } else { + connection = this._getConnection(); + this._openConnection = connection; + } + + command += '/open'; + + if( connection ){ + connection.write(command + '\r\n'); + } + + return this; +}; + + + kestrel.prototype.delete = function(queue){ //delete given queue var connection = this._getConnection(); diff --git a/lib/kestrelConsumer.js b/lib/kestrelConsumer.js index 4324763..6c70d1c 100644 --- a/lib/kestrelConsumer.js +++ b/lib/kestrelConsumer.js @@ -1,62 +1,115 @@ 'use strict'; -var client = require('./kestrelClient.js'); +var KestrelClient = require('./kestrelClient.js'); var ee2 = require('eventemitter2').EventEmitter2; var util = require('util'); var consumer = function( queue, options ){ this._queue = queue; - this._options = new client(options); - this._client = new client(options); - this._client.connect(); - + this._options = options; + this._callback = null; this._consume = false; this._timeout = 3000; - var self = this; + this._client = connectToClient(this); - this._client.on('message', function(message){ - if( typeof self._callback == 'function' ){ - self._callback(message); - } + ee2.call(this); +}; +util.inherits(consumer,ee2); + +function connectToClient(self){ + + var client = new KestrelClient(self._options); + client.connect(); - if( self._consume ){ - self.get(self._timeout); + client.on('message', function(message){ + if(self._consume){ + + if(self._consume.reliable){ + self._callback(message, function(err){ + if(err){ + var consumeOptions = self._consume; + var consumeCallback = self._callback; + + self.stopConsuming(function(){ + self.consume(consumeOptions,consumeCallback); + }); + } else { + self._getNextMessage(); + } + }); + } else { + self._callback(message); + self._getNextMessage(); + } } self.emit('message', message); }); - this._client.on('empty', function(){ - self.get(self._timeout); + client.on('empty', function(){ + self._getNextMessage(); }); - ee2.call(this); -}; -util.inherits(consumer,ee2); + return client; +} consumer.prototype.get = function( timeout ){ - this._client.get(this._queue, timeout); + this._client.get(this._queue, timeout); +}; + +consumer.prototype.getNextOpen = function( timeout ){ + this._client.getNextOpen(this._queue, timeout); }; -consumer.prototype.consume = function( callback ){ +consumer.prototype.consume = function( options, callback ){ + + if(typeof(options) == 'function'){ + callback = options; + options = { + reliable: false + }; + } + + if(!options){ + throw new Error('options must be an object or left out'); + } + + if(!callback){ + throw new Error('callback to consume messages required'); + } + if(!this._client){ - this._client = client(this._options); - this.client.connect(); + this._client = connectToClient(this); } - if( typeof(callback) == 'function' ){ - this._callback = callback; + + this._callback = callback; + this._consume = options; + + this._getNextMessage(); +}; + +consumer.prototype._getNextMessage = function(){ + if(this._consume.reliable){ + this.getNextOpen(this._timeout); + } else { + this.get(this._timeout); } - this._consume = true; - this.get(this._timeout); }; -consumer.prototype.stopConsuming = function(){ - this._consume = false; - this._client.close(); - this._client = null; +consumer.prototype.stopConsuming = function(cb){ + if(this._consume){ + this._consume = false; + this._client.close(); + this._client = null; + this.emit('stop'); + } + + if(cb){ + cb(); + } }; module.exports = consumer;