From 38048d55aca222ffc615d2253d54ef5c90f71ce6 Mon Sep 17 00:00:00 2001 From: brett_langdon Date: Thu, 14 Jun 2012 22:43:59 -0400 Subject: [PATCH] wrote functions for kestrel producers and consumers, added examples --- examples/client.js | 50 +++++++++++++++++++++++++++++++++++++ examples/consumer.js | 17 +++++++++++++ examples/producer.js | 27 ++++++++++++++++++++ lib/kestrelClient.js | 11 ++++++--- lib/kestrelConsumer.js | 56 +++++++++++++++++++++++++++++++++++++++++- lib/kestrelProducer.js | 36 ++++++++++++++++++++++++++- 6 files changed, 192 insertions(+), 5 deletions(-) create mode 100644 examples/client.js create mode 100644 examples/consumer.js create mode 100644 examples/producer.js diff --git a/examples/client.js b/examples/client.js new file mode 100644 index 0000000..52bff4b --- /dev/null +++ b/examples/client.js @@ -0,0 +1,50 @@ +var kestrel = require('../'); + + +//setup our client +var client = new kestrel.kestrelClient( { + connectionType: kestrel.connectionType.RANDOM, + servers: ['127.0.0.1:22133'] +}); + +//make our connection(s) +client.connect(); + + +//get the server version, same as running 'VERSION' +client.version().once('version', function(version){ + console.log('Version: ' + version); +}); + +//get the server stats, same as running 'STATS' +client.stats().once('stats', function(stats){ + console.dir(stats); +}); + + +//input some data into the 'test' queue +client.set('test', 'some data'); +console.log('Message Sent'); + +//do this for every message that we 'GET' +client.on('message', function(message){ + console.log('New Message:'); + console.dir(message); + + //make sure to get more if the queue has them + client.get('test', 3000); +}); + + +//get from the queue, wait up to 3 seconds for a response +client.get('test', 3000); + + +setTimeout(function(){ + //this will SHUTDOWN the server! + //client.shutdown(); + + //close all client connections to server(s) + client.close(); + +}, 3000); \ No newline at end of file diff --git a/examples/consumer.js b/examples/consumer.js new file mode 100644 index 0000000..798728c --- /dev/null +++ b/examples/consumer.js @@ -0,0 +1,17 @@ +var kestrel = require('../'); + +var consumer = new kestrel.kestrelConsumer( 'test', { + connectionType: kestrel.connectionType.ROUND_ROBIN, + servers: ['127.0.0.1:22133'] +}); + +function consumed(message){ + console.log('Consumed Message:'); + console.dir(message); +} + +consumer.consume( consumed ); + +setTimeout( function(){ + consumer.stopConsuming(); +}, 6000); \ No newline at end of file diff --git a/examples/producer.js b/examples/producer.js new file mode 100644 index 0000000..c94031f --- /dev/null +++ b/examples/producer.js @@ -0,0 +1,27 @@ +var kestrel = require('../'); + + +//create our producer +var producer = new kestrel.kestrelProducer( 'test', { + connectionType: kestrel.connectionType.FAILOVER, + servers: ['127.0.0.1:22133'] +}); + + +//capture all 'stored' events +producer.on('stored', function(stored){ + console.log('Stored: ' + stored); +}); + + +//lets input some data +var interval = setInterval( function(){ + producer.send( (new Date().getTime()) + ' - New Message' ); +}, 100); + + +//close connection +setTimeout( function(){ + clearInterval(interval); + producer.close(); +}, 6000); \ No newline at end of file diff --git a/lib/kestrelClient.js b/lib/kestrelClient.js index 645b666..ac689a7 100644 --- a/lib/kestrelClient.js +++ b/lib/kestrelClient.js @@ -44,7 +44,7 @@ kestrel.prototype.connect = function(){ break; case types.FAILOVER: var rand = Math.floor( Math.random()*this._settings.servers.length ); - var parts = this._settings.servers[rand]; + var parts = this._settings.servers[rand].split(':'); var port = (parts.length>1)?parts[1]:'22133'; var host = parts[0]; this._connections.push( _createConnection(port,host,this) ); @@ -96,6 +96,11 @@ function _handleData(data, self){ if( data == 'Reloaded config.\r\n' ){ self.emit('reloaded', true); } + + + if( data == 'END\r\n' ){ + self.emit('empty', null); + } } @@ -174,13 +179,13 @@ kestrel.prototype._getConnection = function(){ kestrel.prototype.close = function(){ //close any open connections for( var i in this._connections ){ - this._connections.destroy(); + this._connections[i].destroy(); } } kestrel.prototype.set = function( queue, value, lifetime ){ - if( lifetime == undefined || lieftime == null ){ + if( lifetime == undefined || lifetime == null ){ lifetime = 0; } diff --git a/lib/kestrelConsumer.js b/lib/kestrelConsumer.js index 53feab8..b7e37be 100644 --- a/lib/kestrelConsumer.js +++ b/lib/kestrelConsumer.js @@ -1 +1,55 @@ -var kestrel = require('./kestrelClient.js'); \ No newline at end of file +var client = require('./kestrelClient.js'); +var ee2 = require('eventemitter2').EventEmitter2; +var util = require('util'); + + + +var consumer = function( queue, options ){ + this._queue = queue; + this._client = new client(options); + this._client.connect(); + + this._callback = null; + this._consume = false; + this._timeout = 3000; + + var self = this; + this._client.on('message', function(message){ + + if( typeof self._callback == 'function' ){ + self._callback(message); + } + + if( self._consume ){ + self.get(self._timeout); + } + + self.emit('message', message); + }); + + this._client.on('empty', function(){ + self.get(self._timeout); + }); + + ee2.call(this); +} +util.inherits(consumer,ee2); + + +consumer.prototype.get = function( timeout ){ + this._client.get(this._queue, timeout); +} + +consumer.prototype.consume = function( callback ){ + if( typeof(callback) == 'function' ){ + this._callback = callback; + } + this._consume = true; + this.get(this._timeout); +} + +consumer.prototype.stopConsuming = function(){ + this._consume = false; +} + +module.exports = consumer; \ No newline at end of file diff --git a/lib/kestrelProducer.js b/lib/kestrelProducer.js index 53feab8..8a26de6 100644 --- a/lib/kestrelProducer.js +++ b/lib/kestrelProducer.js @@ -1 +1,35 @@ -var kestrel = require('./kestrelClient.js'); \ No newline at end of file +var ee2 = require('eventemitter2').EventEmitter2; +var client = require('./kestrelClient.js'); +var util = require('util'); + +var producer = function( queue, options ){ + this._queue = queue; + this._client = new client(options); + this._client.connect(); + + var self = this; + this._client.on('stored', function(stored){ + self.emit('stored', stored); + }); + + ee2.call(this); +} +util.inherits(producer,ee2); + + + +producer.prototype.send = function(message, lifetime){ + if( lifetime == null || lifetime == undefined ){ + lifetime = 0; + } + lifetime = parseInt(lifetime); + + this._client.set(this._queue, message, lifetime); +} + +producer.prototype.close = function(){ + this._client.close(); +} + + +module.exports = producer;