|
|
|
@ -1,12 +1,17 @@ |
|
|
|
package com.blangdon.flume.Kestrel; |
|
|
|
package com.blangdon.flume.kestrel; |
|
|
|
|
|
|
|
|
|
|
|
import iinteractive.kestrel.*; |
|
|
|
import net.rubyeye.xmemcached.MemcachedClient; |
|
|
|
import net.rubyeye.xmemcached.XMemcachedClientBuilder; |
|
|
|
import net.rubyeye.xmemcached.MemcachedClientBuilder; |
|
|
|
import net.rubyeye.xmemcached.command.KestrelCommandFactory; |
|
|
|
import net.rubyeye.xmemcached.utils.AddrUtil; |
|
|
|
import net.rubyeye.xmemcached.exception.MemcachedException; |
|
|
|
|
|
|
|
import java.io.IOException; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.List; |
|
|
|
|
|
|
|
import java.util.concurrent.TimeoutException; |
|
|
|
|
|
|
|
import com.cloudera.flume.conf.Context; |
|
|
|
import com.cloudera.flume.conf.SinkFactory.SinkBuilder; |
|
|
|
@ -22,30 +27,39 @@ public class KestrelSink extends EventSink.Base { |
|
|
|
|
|
|
|
|
|
|
|
private String queue = null; |
|
|
|
private String server = null; |
|
|
|
private Client client = null; |
|
|
|
private String servers = null; |
|
|
|
private MemcachedClient client = null; |
|
|
|
private MemcachedClientBuilder builder = null; |
|
|
|
|
|
|
|
public KestrelSink(String queue, String server){ |
|
|
|
public KestrelSink(String queue, String servers){ |
|
|
|
//constructor |
|
|
|
this.queue = queue; |
|
|
|
this.server = server; |
|
|
|
this.client = new Client(this.server); |
|
|
|
this.servers = servers; |
|
|
|
|
|
|
|
this.builder = new XMemcachedClientBuilder( AddrUtil.getAddresses(this.servers) ); |
|
|
|
builder.setCommandFactory(new KestrelCommandFactory()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void open() throws IOException { |
|
|
|
// Initialized the sink |
|
|
|
this.client.connect(); |
|
|
|
this.client = this.builder.build(); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void append(Event e) throws IOException { |
|
|
|
//send to Kestrel |
|
|
|
String message = new String(e.getBody()); |
|
|
|
|
|
|
|
try{ |
|
|
|
this.client.put(this.queue, message); |
|
|
|
}catch( KestrelException ke ){ |
|
|
|
throw new IOException(ke.getMessage()); |
|
|
|
String message = new String(e.getBody()); |
|
|
|
this.client.set(this.queue, 0, message); |
|
|
|
}catch(MemcachedException ex){ |
|
|
|
throw new IOException("Kestrel Command Failed: " + ex.getMessage()); |
|
|
|
}catch(TimeoutException ex){ |
|
|
|
throw new IOException("Kestrel Command Timeout: " + ex.getMessage()); |
|
|
|
}catch(InterruptedException ex){ |
|
|
|
//meh |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -53,7 +67,7 @@ public class KestrelSink extends EventSink.Base { |
|
|
|
@Override |
|
|
|
public void close() throws IOException { |
|
|
|
// Cleanup |
|
|
|
this.client.disconnect(); |
|
|
|
this.client.shutdown(); |
|
|
|
} |
|
|
|
|
|
|
|
public static SinkBuilder builder() { |
|
|
|
@ -64,7 +78,15 @@ public class KestrelSink extends EventSink.Base { |
|
|
|
Preconditions.checkArgument(argv.length > 1, |
|
|
|
"usage: kestrelSink(queueName, server:port, [server2:port, server3:port,...])"); |
|
|
|
|
|
|
|
return new KestrelSink(argv[0], argv[1]); |
|
|
|
String servers = ""; |
|
|
|
for( int i = 1; i < argv.length; ++i ){ |
|
|
|
if( i > 1 ){ |
|
|
|
servers += " "; |
|
|
|
} |
|
|
|
servers += argv[i]; |
|
|
|
} |
|
|
|
|
|
|
|
return new KestrelSink(argv[0], servers); |
|
|
|
} |
|
|
|
}; |
|
|
|
} |
|
|
|
|