Browse Source

made files pass jslint, set replaced tabs with spaces (tabs and spaces where mixed before), set indentation to 2 (as per rdahl convention)

pull/5/head
Matthias Goetzke 12 years ago
parent
commit
925b17d61a
3 changed files with 305 additions and 302 deletions
  1. +243
    -241
      lib/kestrelClient.js
  2. +39
    -39
      lib/kestrelConsumer.js
  3. +23
    -22
      lib/kestrelProducer.js

+ 243
- 241
lib/kestrelClient.js View File

@ -1,3 +1,5 @@
'use strict';
var ee2 = require('eventemitter2').EventEmitter2;
var net = require('net');
var util = require('util');
@ -5,331 +7,331 @@ var types = require('./connectionType.js');
var kestrel = function( options ){
this._settings = {
servers: ['127.0.0.1:22133'],
connectionType: types.ROUND_ROBIN
}
this._currentConnection = 0;
this._connections = [];
if( options instanceof Object ){
for( var key in options ){
if( this._settings[key] != undefined ){
this._settings[key] = options[key];
}
}
this._settings = {
servers: ['127.0.0.1:22133'],
connectionType: types.ROUND_ROBIN
};
this._currentConnection = 0;
this._connections = [];
if( options instanceof Object ){
for( var key in options ){
if( this._settings[key] !== undefined ){
this._settings[key] = options[key];
}
}
}
this._pendingSetCallback = null;
this._pendingSetCallback = null;
ee2.call(this);
};
ee2.call(this);
}
util.inherits(kestrel,ee2);
//open connections to kestrel server(s)
kestrel.prototype.connect = function(){
var type = this._settings.connectionType;
var type = this._settings.connectionType;
if( types[type] == undefined || types[type] == null ){
throw "Kestrel Client Connection: Argument 1 Must Be A Valid Kestrel Connection Type";
}
if( types[type] === undefined || types[type] === null ){
throw 'Kestrel Client Connection: Argument 1 Must Be A Valid Kestrel Connection Type';
}
var parts,port,host;
switch( type ){
switch( type ){
case types.RANDOM:
case types.ROUND_ROBIN:
for(var i in this._settings.servers){
var parts = this._settings.servers[i].split(':');
var port = (parts.length>1)?parts[1]:'22133';
var host = parts[0];
this._connections.push( _createConnection(port,host,this) );
}
break;
for(var i in this._settings.servers){
parts = this._settings.servers[i].split(':');
port = (parts.length>1)?parts[1]:'22133';
host = parts[0];
this._connections.push( _createConnection(port,host,this) );
}
break;
case types.FAILOVER:
var rand = Math.floor( Math.random()*this._settings.servers.length );
var parts = this._settings.servers[rand].split(':');
var port = (parts.length>1)?parts[1]:'22133';
var host = parts[0];
this._connections.push( _createConnection(port,host,this) );
break;
var rand = Math.floor( Math.random()*this._settings.servers.length );
parts = this._settings.servers[rand].split(':');
port = (parts.length>1)?parts[1]:'22133';
host = parts[0];
this._connections.push( _createConnection(port,host,this) );
break;
default:
throw "Kestrel Client Connection: Unknown Connection Type";
break;
}
}
throw 'Kestrel Client Connection: Unknown Connection Type';
}
};
function _createConnection(port, host, self){
var connection = net.connect(port,host);
var connection = net.connect(port,host);
connection.on('error', function(err){
console.dir(err);
});
connection.on('data', function(data){
_handleData(data,self);
self.emit('data', data);
});
connection.on('error', function(err){
console.dir(err);
});
connection.on('data', function(data){
_handleData(data,self);
self.emit('data', data);
});
return connection;
return connection;
}
function _handleData(data, self){
data = data.toString();
if( data.match('STORED\r\n') != null ){
self.emit('stored', true);
if(self._pendingSetCallback){
var callback = self._pendingSetCallback;
self._pendingSetCallback = null;
callback(null);
}
}
if( data.match(/^VALUE/) != null ){
_handleGet(data,self);
}
if( data.match(/^STAT/) != null ){
_handleStats(data,self);
}
if( data.match(/^queue/) != null ){
_handleDumpStats(data, self);
}
if( data == 'DELETED\r\n' ){
self.emit('deleted', true);
}
if( data.match(/^VERSION/) != null ){
self.emit('version', data.split(' ')[1].replace('\r\n',''));
}
if( data == 'Reloaded config.\r\n' ){
self.emit('reloaded', true);
data = data.toString();
if( data.match('STORED\r\n') ){
self.emit('stored', true);
if(self._pendingSetCallback){
var callback = self._pendingSetCallback;
self._pendingSetCallback = null;
callback(null);
}
if( data == 'END\r\n' ){
self.emit('empty', null);
}
}
if( data.match(/^VALUE/) ){
_handleGet(data,self);
}
if( data.match(/^STAT/) ){
_handleStats(data,self);
}
if( data.match(/^queue/) ){
_handleDumpStats(data, self);
}
if( data == 'DELETED\r\n' ){
self.emit('deleted', true);
}
if( data.match(/^VERSION/) ){
self.emit('version', data.split(' ')[1].replace('\r\n',''));
}
if( data == 'Reloaded config.\r\n' ){
self.emit('reloaded', true);
}
if( data == 'END\r\n' ){
self.emit('empty', null);
}
}
function _handleGet(data, self){
var msg = {};
var parts = data.split('\r\n');
msg.queue = parts[0].match(/([a-zA-Z_]+)/mg)[1];
msg.data = parts[1].replace('\r\n','');
self.emit('message', msg);
var msg = {};
var parts = data.split('\r\n');
msg.queue = parts[0].match(/([a-zA-Z_]+)/mg)[1];
msg.data = parts[1].replace('\r\n','');
self.emit('message', msg);
}
function _handleStats(data, self){
var stats = {};
var parts = data.match(/STAT ([a-zA-Z_]+) (.*?)\r\n/g);
var stats = {};
for( var i in parts ){
var part = parts[i].split(' ');
stats[part[1]] = part[2].replace('\r\n', '');
}
var parts = data.match(/STAT ([a-zA-Z_]+) (.*?)\r\n/g);
for( var i in parts ){
var part = parts[i].split(' ');
stats[part[1]] = part[2].replace('\r\n', '');
}
self.emit('stats', stats);
self.emit('stats', stats);
}
function _handleDumpStats(data, self){
var stats = {};
var parts = data.replace('\r\n}\r\nEND','').split('}\r\n');
for( var i in parts ){
var part = parts[i].split('\r\n');
var queue = part[0].match(/[a-zA-Z_]+/g)[1];
stats[queue] = {};
for( var lcv = 1; lcv < part.length-1; ++lcv ){
var p = part[lcv].split('=');
stats[queue][ p[0].replace(/ /g,'') ] = p[1];
}
var stats = {};
var parts = data.replace('\r\n}\r\nEND','').split('}\r\n');
for( var i in parts ){
var part = parts[i].split('\r\n');
var queue = part[0].match(/[a-zA-Z_]+/g)[1];
stats[queue] = {};
for( var lcv = 1; lcv < part.length-1; ++lcv ){
var p = part[lcv].split('=');
stats[queue][ p[0].replace(/ /g,'') ] = p[1];
}
}
self.emit('dump_stats', stats);
self.emit('dump_stats', stats);
}
kestrel.prototype._getConnection = function(){
if( this._connections.length == 1 ){
return this._connections[0];
}else if( this._connections.length == 0 ){
return null;
}
if( this._connections.length == 1 ) {
return this._connections[0];
} else if( this._connections.length === 0 ) {
return null;
}
var connection = null;
var connection = null;
switch(this._settings.connectionType){
switch(this._settings.connectionType) {
case types.RANDOM:
var rand = Math.floor( Math.random()*this._connections.length );
connection = this._connections[rand];
break;
var rand = Math.floor( Math.random()*this._connections.length );
connection = this._connections[rand];
break;
case types.ROUND_ROBIN:
this._currentConnection += 1;
if( this._currentConnection > this._connections.length ){
this._currentConnection = 0;
}
connection = this._connections[this._currentConnection];
break;
this._currentConnection += 1;
if( this._currentConnection > this._connections.length ){
this._currentConnection = 0;
}
connection = this._connections[this._currentConnection];
break;
case types.FAILOVER:
connection = this._connections[0];
break;
}
connection = this._connections[0];
break;
}
return connection;
}
return connection;
};
kestrel.prototype.close = function(){
//close any open connections
for( var i in this._connections ){
this._connections[i].destroy();
}
}
//close any open connections
for( var i in this._connections ){
this._connections[i].destroy();
}
};
kestrel.prototype.set = function( queue, value, lifetime, callback){
if( typeof(lifetime) === "function" ){
if( typeof(lifetime) === 'function' ){
callback = lifetime;
lifetime = null;
}
if( lifetime === undefined || lifetime === null ){
lifetime = 0;
}
if(this._pendingSetCallback && callback){
return callback('Cannot write again. Still waiting for previous write to be stored');
}
var command = 'SET ' + queue + ' 0 ' + lifetime + ' ';
command += Buffer.byteLength(value, 'utf8') + '\r\n' + value + '\r\n';
var connection = this._getConnection();
if( connection ){
if(callback){
this._pendingSetCallback = callback;
}
if( lifetime == undefined || lifetime == null ){
lifetime = 0;
}
if(this._pendingSetCallback && callback){
return callback("Cannot write again. Still waiting for previous write to be stored");
}
var command = "SET " + queue + " 0 " + lifetime + " ";
command += Buffer.byteLength(value, 'utf8') + "\r\n" + value + "\r\n";
var connection = this._getConnection();
if( connection != null ){
if(callback){
this._pendingSetCallback = callback;
}
connection.write(command);
connection.write(command);
} else {
if(callback){
callback('No connection');
} else {
if(callback){
callback("No connection");
} else {
throw new Error("No connection");
}
throw new Error('No connection');
}
}
return this;
}
return this;
};
kestrel.prototype.get = function(queue, timeout){
var command = "GET " + queue;
timeout = parseInt(timeout);
if( timeout > 0 ){
command += "/t="+timeout;
}
var connection = this._getConnection();
if( connection != null ){
connection.write(command + '\r\n');
}
return this;
}
var command = 'GET ' + queue;
timeout = parseInt(timeout,10);
if( timeout > 0 ){
command += '/t='+timeout;
}
var connection = this._getConnection();
if( connection ){
connection.write(command + '\r\n');
}
return this;
};
kestrel.prototype.delete = function(queue){
//delete given queue
var connection = this._getConnection();
if( connection != null ){
connection.write('DELETE ' + queue + '\r\n');
}
//delete given queue
var connection = this._getConnection();
if( connection ){
connection.write('DELETE ' + queue + '\r\n');
}
return this;
}
return this;
};
kestrel.prototype.flush = function(queue){
//flush given queue
var connection = this._getConnection();
if( connection != null ){
connection.write('FLUSH ' + queue + '\r\n');
}
return this;
}
//flush given queue
var connection = this._getConnection();
if( connection ){
connection.write('FLUSH ' + queue + '\r\n');
}
return this;
};
kestrel.prototype.flushAll = function(){
//flush all queues
var connection = this._getConnection();
if( connection != null ){
connection.write('FLUSH_ALL\r\n');
}
//flush all queues
var connection = this._getConnection();
if( connection ){
connection.write('FLUSH_ALL\r\n');
}
return this;
}
return this;
};
kestrel.prototype.version = function(){
//get version of server
var connection = this._getConnection();
if( connection != null ){
connection.write('VERSION\r\n');
}
//get version of server
var connection = this._getConnection();
if( connection ){
connection.write('VERSION\r\n');
}
return this;
}
return this;
};
kestrel.prototype.shutdown = function(){
//shutdown server
var connection = this._getConnection();
if( connection != null ){
connection.write('SHUTDOWN\r\n');
}
//shutdown server
var connection = this._getConnection();
if( connection ){
connection.write('SHUTDOWN\r\n');
}
return this;
}
return this;
};
kestrel.prototype.reload = function(){
//reload the server
var connection = this._getConnection();
if( connection != null ){
connection.write('RELOAD\r\n');
}
//reload the server
var connection = this._getConnection();
if( connection ){
connection.write('RELOAD\r\n');
}
return this;
}
return this;
};
kestrel.prototype.stats = function( callback ){
//get server stats
var connection = this._getConnection();
if( connection != null ){
connection.write('STATS\r\n');
}
kestrel.prototype.stats = function(){
//get server stats
var connection = this._getConnection();
if( connection ){
connection.write('STATS\r\n');
}
return this;
}
return this;
};
kestrel.prototype.dumpStats = function(){
//dump server stats
var connection = this._getConnection();
if( connection != null ){
connection.write('DUMP_STATS\r\n');
}
//dump server stats
var connection = this._getConnection();
if( connection ){
connection.write('DUMP_STATS\r\n');
}
return this;
};
return this;
}
kestrel.prototype.monitor = function(queue, seconds, maxItems){
//monitor the given queue
}
};
kestrel.prototype.confirm = function(queue, count){
//confirm received items
}
};
module.exports = kestrel;

+ 39
- 39
lib/kestrelConsumer.js View File

@ -1,62 +1,62 @@
'use strict';
var client = require('./kestrelClient.js');
var ee2 = require('eventemitter2').EventEmitter2;
var util = require('util');
var consumer = function( queue, options ){
this._queue = queue;
this._options = new client(options);
this._client = new client(options);
this._client.connect();
this._queue = queue;
this._options = new client(options);
this._client = new client(options);
this._client.connect();
this._callback = null;
this._consume = false;
this._timeout = 3000;
this._callback = null;
this._consume = false;
this._timeout = 3000;
var self = this;
this._client.on('message', function(message){
var self = this;
if( typeof self._callback == 'function' ){
self._callback(message);
}
this._client.on('message', function(message){
if( typeof self._callback == 'function' ){
self._callback(message);
}
if( self._consume ){
self.get(self._timeout);
}
if( self._consume ){
self.get(self._timeout);
}
self.emit('message', message);
});
self.emit('message', message);
});
this._client.on('empty', function(){
self.get(self._timeout);
});
this._client.on('empty', function(){
self.get(self._timeout);
});
ee2.call(this);
}
ee2.call(this);
};
util.inherits(consumer,ee2);
consumer.prototype.get = function( timeout ){
this._client.get(this._queue, timeout);
}
};
consumer.prototype.consume = function( callback ){
if(!this._client){
this._client = client(this._options);
this.client.connect();
}
if( typeof(callback) == 'function' ){
this._callback = callback;
}
this._consume = true;
this.get(this._timeout);
}
if(!this._client){
this._client = client(this._options);
this.client.connect();
}
if( typeof(callback) == 'function' ){
this._callback = callback;
}
this._consume = true;
this.get(this._timeout);
};
consumer.prototype.stopConsuming = function(){
this._consume = false;
this._client.close();
this._client = null;
}
this._consume = false;
this._client.close();
this._client = null;
};
module.exports = consumer;

+ 23
- 22
lib/kestrelProducer.js View File

@ -1,39 +1,40 @@
'use strict';
var ee2 = require('eventemitter2').EventEmitter2;
var client = require('./kestrelClient.js');
var util = require('util');
var producer = function( queue, options ){
this._queue = queue;
this._client = new client(options);
this._client.connect();
this._queue = queue;
this._client = new client(options);
this._client.connect();
var self = this;
this._client.on('stored', function(stored){
self.emit('stored', stored);
});
var self = this;
this._client.on('stored', function(stored){
self.emit('stored', stored);
});
ee2.call(this);
}
ee2.call(this);
};
util.inherits(producer,ee2);
producer.prototype.send = function(message, lifetime, callback){
if( typeof(lifetime) === "function" ){
if( typeof(lifetime) === 'function' ){
callback = lifetime;
lifetime = null;
}
if( lifetime == null || lifetime == undefined ){
lifetime = 0;
}
lifetime = parseInt(lifetime);
}
this._client.set(this._queue, message, lifetime, callback);
}
if( lifetime === null || lifetime === undefined ){
lifetime = 0;
}
producer.prototype.close = function(){
this._client.close();
}
lifetime = parseInt(lifetime,10);
this._client.set(this._queue, message, lifetime, callback);
};
producer.prototype.close = function(){
this._client.close();
};
module.exports = producer;

Loading…
Cancel
Save