From d28a2e4de77e5cb4a7d2dce1be6e4bdc23d22518 Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Thu, 14 Jun 2012 13:02:10 -0400 Subject: [PATCH] client protoype with kestrel-client --- build.xml | 1 + .../blangdon/flume/kestrel/KestrelSink.java | 21 +++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/build.xml b/build.xml index eab911d..e6d73d9 100644 --- a/build.xml +++ b/build.xml @@ -16,6 +16,7 @@ + diff --git a/src/com/blangdon/flume/kestrel/KestrelSink.java b/src/com/blangdon/flume/kestrel/KestrelSink.java index 442c1ab..6140713 100644 --- a/src/com/blangdon/flume/kestrel/KestrelSink.java +++ b/src/com/blangdon/flume/kestrel/KestrelSink.java @@ -1,6 +1,8 @@ package com.blangdon.flume.Kestrel; +import iinteractive.kestrel.*; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -19,24 +21,39 @@ import com.google.common.base.Preconditions; public class KestrelSink extends EventSink.Base { - public KestrelSink(){ + private String queue = null; + private String server = null; + private Client client = null; + + public KestrelSink(String queue, String server){ //constructor + this.queue = queue; + this.server = server; + this.client = new Client(this.server); } @Override public void open() throws IOException { // Initialized the sink + this.client.connect(); } @Override public void append(Event e) throws IOException { //send to Kestrel + String message = new String(e.getBody()); + try{ + this.client.put(this.queue, message); + }catch( KestrelException ke ){ + throw new IOException(ke.getMessage()); + } } @Override public void close() throws IOException { // Cleanup + this.client.disconnect(); } public static SinkBuilder builder() { @@ -47,7 +64,7 @@ public class KestrelSink extends EventSink.Base { Preconditions.checkArgument(argv.length > 1, "usage: kestrelSink(queueName, server:port, [server2:port, server3:port,...])"); - return new KestrelSink(); + return new KestrelSink(argv[0], argv[1]); } }; }