diff --git a/build.xml b/build.xml index e6d73d9..c64e16b 100644 --- a/build.xml +++ b/build.xml @@ -16,7 +16,7 @@ - + diff --git a/src/com/blangdon/flume/kestrel/KestrelSink.java b/src/com/blangdon/flume/kestrel/KestrelSink.java index 6140713..d331d36 100644 --- a/src/com/blangdon/flume/kestrel/KestrelSink.java +++ b/src/com/blangdon/flume/kestrel/KestrelSink.java @@ -1,12 +1,17 @@ -package com.blangdon.flume.Kestrel; +package com.blangdon.flume.kestrel; -import iinteractive.kestrel.*; +import net.rubyeye.xmemcached.MemcachedClient; +import net.rubyeye.xmemcached.XMemcachedClientBuilder; +import net.rubyeye.xmemcached.MemcachedClientBuilder; +import net.rubyeye.xmemcached.command.KestrelCommandFactory; +import net.rubyeye.xmemcached.utils.AddrUtil; +import net.rubyeye.xmemcached.exception.MemcachedException; import java.io.IOException; import java.util.ArrayList; import java.util.List; - +import java.util.concurrent.TimeoutException; import com.cloudera.flume.conf.Context; import com.cloudera.flume.conf.SinkFactory.SinkBuilder; @@ -22,30 +27,39 @@ public class KestrelSink extends EventSink.Base { private String queue = null; - private String server = null; - private Client client = null; + private String servers = null; + private MemcachedClient client = null; + private MemcachedClientBuilder builder = null; - public KestrelSink(String queue, String server){ + public KestrelSink(String queue, String servers){ //constructor this.queue = queue; - this.server = server; - this.client = new Client(this.server); + this.servers = servers; + + this.builder = new XMemcachedClientBuilder( AddrUtil.getAddresses(this.servers) ); + builder.setCommandFactory(new KestrelCommandFactory()); + } @Override public void open() throws IOException { // Initialized the sink - this.client.connect(); + this.client = this.builder.build(); } @Override public void append(Event e) throws IOException { //send to Kestrel - String message = new String(e.getBody()); + try{ - this.client.put(this.queue, message); - }catch( KestrelException ke ){ - throw new IOException(ke.getMessage()); + String message = new String(e.getBody()); + this.client.set(this.queue, 0, message); + }catch(MemcachedException ex){ + throw new IOException("Kestrel Command Failed: " + ex.getMessage()); + }catch(TimeoutException ex){ + throw new IOException("Kestrel Command Timeout: " + ex.getMessage()); + }catch(InterruptedException ex){ + //meh } } @@ -53,7 +67,7 @@ public class KestrelSink extends EventSink.Base { @Override public void close() throws IOException { // Cleanup - this.client.disconnect(); + this.client.shutdown(); } public static SinkBuilder builder() { @@ -64,7 +78,15 @@ public class KestrelSink extends EventSink.Base { Preconditions.checkArgument(argv.length > 1, "usage: kestrelSink(queueName, server:port, [server2:port, server3:port,...])"); - return new KestrelSink(argv[0], argv[1]); + String servers = ""; + for( int i = 1; i < argv.length; ++i ){ + if( i > 1 ){ + servers += " "; + } + servers += argv[i]; + } + + return new KestrelSink(argv[0], servers); } }; }