diff --git a/build.xml b/build.xml
index eab911d..e6d73d9 100644
--- a/build.xml
+++ b/build.xml
@@ -16,6 +16,7 @@
+
diff --git a/src/com/blangdon/flume/kestrel/KestrelSink.java b/src/com/blangdon/flume/kestrel/KestrelSink.java
index 442c1ab..6140713 100644
--- a/src/com/blangdon/flume/kestrel/KestrelSink.java
+++ b/src/com/blangdon/flume/kestrel/KestrelSink.java
@@ -1,6 +1,8 @@
package com.blangdon.flume.Kestrel;
+import iinteractive.kestrel.*;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -19,24 +21,39 @@ import com.google.common.base.Preconditions;
public class KestrelSink extends EventSink.Base {
- public KestrelSink(){
+ private String queue = null;
+ private String server = null;
+ private Client client = null;
+
+ public KestrelSink(String queue, String server){
//constructor
+ this.queue = queue;
+ this.server = server;
+ this.client = new Client(this.server);
}
@Override
public void open() throws IOException {
// Initialized the sink
+ this.client.connect();
}
@Override
public void append(Event e) throws IOException {
//send to Kestrel
+ String message = new String(e.getBody());
+ try{
+ this.client.put(this.queue, message);
+ }catch( KestrelException ke ){
+ throw new IOException(ke.getMessage());
+ }
}
@Override
public void close() throws IOException {
// Cleanup
+ this.client.disconnect();
}
public static SinkBuilder builder() {
@@ -47,7 +64,7 @@ public class KestrelSink extends EventSink.Base {
Preconditions.checkArgument(argv.length > 1,
"usage: kestrelSink(queueName, server:port, [server2:port, server3:port,...])");
- return new KestrelSink();
+ return new KestrelSink(argv[0], argv[1]);
}
};
}