Browse Source

wrote functions for kestrel producers and consumers, added examples

pull/3/head
Brett Langdon 14 years ago
parent
commit
38048d55ac
6 changed files with 192 additions and 5 deletions
  1. +50
    -0
      examples/client.js
  2. +17
    -0
      examples/consumer.js
  3. +27
    -0
      examples/producer.js
  4. +8
    -3
      lib/kestrelClient.js
  5. +55
    -1
      lib/kestrelConsumer.js
  6. +35
    -1
      lib/kestrelProducer.js

+ 50
- 0
examples/client.js View File

@ -0,0 +1,50 @@
var kestrel = require('../');
//setup our client
var client = new kestrel.kestrelClient( {
connectionType: kestrel.connectionType.RANDOM,
servers: ['127.0.0.1:22133']
});
//make our connection(s)
client.connect();
//get the server version, same as running 'VERSION'
client.version().once('version', function(version){
console.log('Version: ' + version);
});
//get the server stats, same as running 'STATS'
client.stats().once('stats', function(stats){
console.dir(stats);
});
//input some data into the 'test' queue
client.set('test', 'some data');
console.log('Message Sent');
//do this for every message that we 'GET'
client.on('message', function(message){
console.log('New Message:');
console.dir(message);
//make sure to get more if the queue has them
client.get('test', 3000);
});
//get from the queue, wait up to 3 seconds for a response
client.get('test', 3000);
setTimeout(function(){
//this will SHUTDOWN the server!
//client.shutdown();
//close all client connections to server(s)
client.close();
}, 3000);

+ 17
- 0
examples/consumer.js View File

@ -0,0 +1,17 @@
var kestrel = require('../');
var consumer = new kestrel.kestrelConsumer( 'test', {
connectionType: kestrel.connectionType.ROUND_ROBIN,
servers: ['127.0.0.1:22133']
});
function consumed(message){
console.log('Consumed Message:');
console.dir(message);
}
consumer.consume( consumed );
setTimeout( function(){
consumer.stopConsuming();
}, 6000);

+ 27
- 0
examples/producer.js View File

@ -0,0 +1,27 @@
var kestrel = require('../');
//create our producer
var producer = new kestrel.kestrelProducer( 'test', {
connectionType: kestrel.connectionType.FAILOVER,
servers: ['127.0.0.1:22133']
});
//capture all 'stored' events
producer.on('stored', function(stored){
console.log('Stored: ' + stored);
});
//lets input some data
var interval = setInterval( function(){
producer.send( (new Date().getTime()) + ' - New Message' );
}, 100);
//close connection
setTimeout( function(){
clearInterval(interval);
producer.close();
}, 6000);

+ 8
- 3
lib/kestrelClient.js View File

@ -44,7 +44,7 @@ kestrel.prototype.connect = function(){
break;
case types.FAILOVER:
var rand = Math.floor( Math.random()*this._settings.servers.length );
var parts = this._settings.servers[rand];
var parts = this._settings.servers[rand].split(':');
var port = (parts.length>1)?parts[1]:'22133';
var host = parts[0];
this._connections.push( _createConnection(port,host,this) );
@ -96,6 +96,11 @@ function _handleData(data, self){
if( data == 'Reloaded config.\r\n' ){
self.emit('reloaded', true);
}
if( data == 'END\r\n' ){
self.emit('empty', null);
}
}
@ -174,13 +179,13 @@ kestrel.prototype._getConnection = function(){
kestrel.prototype.close = function(){
//close any open connections
for( var i in this._connections ){
this._connections.destroy();
this._connections[i].destroy();
}
}
kestrel.prototype.set = function( queue, value, lifetime ){
if( lifetime == undefined || lieftime == null ){
if( lifetime == undefined || lifetime == null ){
lifetime = 0;
}


+ 55
- 1
lib/kestrelConsumer.js View File

@ -1 +1,55 @@
var kestrel = require('./kestrelClient.js');
var client = require('./kestrelClient.js');
var ee2 = require('eventemitter2').EventEmitter2;
var util = require('util');
var consumer = function( queue, options ){
this._queue = queue;
this._client = new client(options);
this._client.connect();
this._callback = null;
this._consume = false;
this._timeout = 3000;
var self = this;
this._client.on('message', function(message){
if( typeof self._callback == 'function' ){
self._callback(message);
}
if( self._consume ){
self.get(self._timeout);
}
self.emit('message', message);
});
this._client.on('empty', function(){
self.get(self._timeout);
});
ee2.call(this);
}
util.inherits(consumer,ee2);
consumer.prototype.get = function( timeout ){
this._client.get(this._queue, timeout);
}
consumer.prototype.consume = function( callback ){
if( typeof(callback) == 'function' ){
this._callback = callback;
}
this._consume = true;
this.get(this._timeout);
}
consumer.prototype.stopConsuming = function(){
this._consume = false;
}
module.exports = consumer;

+ 35
- 1
lib/kestrelProducer.js View File

@ -1 +1,35 @@
var kestrel = require('./kestrelClient.js');
var ee2 = require('eventemitter2').EventEmitter2;
var client = require('./kestrelClient.js');
var util = require('util');
var producer = function( queue, options ){
this._queue = queue;
this._client = new client(options);
this._client.connect();
var self = this;
this._client.on('stored', function(stored){
self.emit('stored', stored);
});
ee2.call(this);
}
util.inherits(producer,ee2);
producer.prototype.send = function(message, lifetime){
if( lifetime == null || lifetime == undefined ){
lifetime = 0;
}
lifetime = parseInt(lifetime);
this._client.set(this._queue, message, lifetime);
}
producer.prototype.close = function(){
this._client.close();
}
module.exports = producer;

Loading…
Cancel
Save