From 5c738b7f373017e0075fc1b40d2470d67b5271a2 Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Sun, 28 Jul 2013 21:45:35 -0400 Subject: [PATCH] initial rewrite of go --- README.md | 2 + commands/commands.go | 157 +++++++++++++++++++++++++++++++++++++++++++ common/common.go | 13 ++++ main.go | 82 ++++++++++++++++++++++ proxy/proxy.go | 29 ++++++++ 5 files changed, 283 insertions(+) create mode 100644 README.md create mode 100644 commands/commands.go create mode 100644 common/common.go create mode 100644 main.go create mode 100644 proxy/proxy.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..d6b48f6 --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +ferrite +======= diff --git a/commands/commands.go b/commands/commands.go new file mode 100644 index 0000000..a1371c4 --- /dev/null +++ b/commands/commands.go @@ -0,0 +1,157 @@ +package commands + +import ( + "bitbucket.org/ww/cabinet" + "bytes" + "fmt" + "github.com/brettlangdon/ferrite/common" + "github.com/brettlangdon/ferrite/proxy" + "net/url" + "os" + "strconv" + "strings" + "sync/atomic" + "time" +) + +func ParseCommand(request string) (string, []string) { + request = strings.ToLower(request) + tokens := strings.Split(strings.Trim(request, " \r\n"), " ") + var command string + if len(tokens) > 0 { + command = tokens[0] + } + return command, tokens +} + +func HandleCommand(command string, tokens []string, db *cabinet.KCDB) string { + var result string + switch command { + case "get": + result = HandleGet(tokens[1:], db) + break + case "stats": + result = HandleStats(db) + break + case "flush_all": + result = HandleFlushAll(db) + break + case "delete": + result = HandleDelete(tokens[1:], db) + break + case "version": + result = HandleVersion() + break + default: + result = "ERROR\r\n" + break + } + + return result +} + +func HandleGet(tokens []string, db *cabinet.KCDB) string { + var out string + if len(tokens) >= 1 { + key := tokens[0] + if strings.HasPrefix(key, "http%3a%2f%2f") || strings.HasPrefix(key, "https%3a%2f%2f") { + key, _ = url.QueryUnescape(key) + } + result, err := db.Get([]byte(key)) + if err != nil { + atomic.AddInt32(&common.MISSES, 1) + err := db.Set([]byte(key), []byte("0")) + if err == nil { + go proxy.CallProxy(key, db) + } else { + fmt.Fprintf(os.Stderr, "Error Setting Key \"%s\" Value \"0\": $s\r\n", key, err.Error()) + } + out = "END\r\n" + } else if bytes.Equal(result, []byte("0")) { + atomic.AddInt32(&common.MISSES, 1) + out = "END\r\n" + } else { + parts := strings.SplitN(string(result), ":", 2) + var expire int64 = 0 + if len(parts) == 2 { + expire, err = strconv.ParseInt(parts[0], 10, 32) + if err != nil { + expire = 0 + fmt.Fprintf(os.Stderr, "Error Parsing Expiration \"%s\": %s\r\n", parts[0], err.Error()) + } + result = []byte(parts[1]) + } + now := int64(time.Now().Unix()) + if expire > 0 && expire < now { + err := db.Set([]byte(key), []byte("0")) + if err == nil { + go proxy.CallProxy(key, db) + } else { + fmt.Fprintf(os.Stderr, "Error Setting Key \"%s\" Value \"0\": $s\r\n", key, err.Error()) + } + atomic.AddInt32(&common.MISSES, 1) + out = "END\r\n" + } else { + atomic.AddInt32(&common.HITS, 1) + out = fmt.Sprintf("VALUE %s 0 %d\r\n%s\r\nEND\r\n", key, len(result), result) + } + } + } else { + out = "ERROR\r\n" + } + return out +} + +func HandleStats(db *cabinet.KCDB) string { + hits := atomic.LoadInt32(&common.HITS) + misses := atomic.LoadInt32(&common.MISSES) + connections := atomic.LoadInt32(&common.CONNECTIONS) + var hit_ratio float32 = 0 + if hits > 0 { + hit_ratio = float32(hits) / float32(hits+misses) + } + + db_stats := "" + status, err := db.Status() + if err == nil { + lines := strings.Split(status, "\n") + for _, line := range lines { + parts := strings.SplitN(strings.Trim(line, "\r\n"), "\t", 2) + if len(parts) == 2 { + db_stats += fmt.Sprintf("STAT %s %s\r\n", parts[0], parts[1]) + } + } + } else { + fmt.Fprintf(os.Stderr, "Error Retrieving DB Status: %s\r\n", err.Error()) + } + return fmt.Sprintf("STAT hits %d\r\n"+ + "STAT misses %d\r\n"+ + "STAT hit_ratio %1.4f\r\n"+ + "STAT connections %d\r\n"+ + "%s"+ + "END\r\n", + hits, misses, hit_ratio, connections, db_stats, + ) +} + +func HandleFlushAll(db *cabinet.KCDB) string { + db.Clear() + return "OK\r\n" +} + +func HandleDelete(tokens []string, db *cabinet.KCDB) string { + if len(tokens) <= 0 { + return "ERROR\r\n" + } + + key := tokens[0] + err := db.Remove([]byte(key)) + if err != nil { + return "NOT_FOUND\r\n" + } + return "DELETED\r\n" +} + +func HandleVersion() string { + return fmt.Sprintf("VERSION %s\r\n", common.VERSION) +} diff --git a/common/common.go b/common/common.go new file mode 100644 index 0000000..c1c265c --- /dev/null +++ b/common/common.go @@ -0,0 +1,13 @@ +package common + +import ( + "flag" +) + +var HITS int32 = 0 +var MISSES int32 = 0 +var CONNECTIONS int32 = 0 +var TTL *int = flag.Int("ttl", 3600, "the TTL in seconds for a newly cached object") +var BIND *string = flag.String("bind", "0.0.0.0:7000", "the [
]: to bind to") +var CACHE *string = flag.String("cache", "*", "the kyoto cabinet cache path") +var VERSION string = "0.0.1" diff --git a/main.go b/main.go new file mode 100644 index 0000000..ee9c21a --- /dev/null +++ b/main.go @@ -0,0 +1,82 @@ +package main + +import ( + "bitbucket.org/ww/cabinet" + "flag" + "fmt" + "github.com/brettlangdon/ferrite/commands" + "github.com/brettlangdon/ferrite/common" + "net" + "os" + "os/signal" + "sync/atomic" +) + +func handler(client net.Conn, db *cabinet.KCDB) { + atomic.AddInt32(&common.CONNECTIONS, 1) + defer func() { + client.Close() + atomic.AddInt32(&common.CONNECTIONS, -1) + }() + buffer := make([]byte, 1024) + for { + n, err := client.Read(buffer) + if err != nil { + break + } + if n <= 0 { + continue + } + + command, tokens := commands.ParseCommand(string(buffer[:n])) + if len(command) == 0 { + continue + } + if command == "quit" || command == "exit" { + break + } + response := commands.HandleCommand(command, tokens, db) + _, err = client.Write([]byte(response)) + } +} + +func main() { + flag.Parse() + + fmt.Printf("Using TTL: %d (sec)\r\n", *common.TTL) + fmt.Printf("Opening Cache: %s\r\n", *common.CACHE) + db := cabinet.New() + err := db.Open(*common.CACHE, cabinet.KCOWRITER|cabinet.KCOCREATE) + if err != nil { + fmt.Fprintf(os.Stderr, "Could not open cache \"%s\": %s\r\n", *common.CACHE, err.Error()) + os.Exit(-1) + } + + listener, err := net.Listen("tcp", *common.BIND) + if err != nil { + fmt.Fprintf(os.Stderr, "Could not bind to \"%s\": %s\r\n", *common.BIND, err.Error()) + os.Exit(-1) + } + fmt.Printf("Listening at %s\t\n", *common.BIND) + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + go func() { + for _ = range c { + fmt.Println("") + fmt.Printf("Shutting down ferrite\r\n") + db.Close() + db.Del() + os.Exit(0) + } + }() + + for { + client, err := listener.Accept() + if err == nil { + go handler(client, db) + } else { + fmt.Fprintf(os.Stderr, "Error Accepting Client Connection: %s\r\n", err.Error()) + } + } +} diff --git a/proxy/proxy.go b/proxy/proxy.go new file mode 100644 index 0000000..e00977b --- /dev/null +++ b/proxy/proxy.go @@ -0,0 +1,29 @@ +package proxy + +import ( + "bitbucket.org/ww/cabinet" + "fmt" + "github.com/brettlangdon/ferrite/common" + "io/ioutil" + "net/http" + "os" + "time" +) + +func CallProxy(url string, db *cabinet.KCDB) { + response, err := http.Get(url) + if err == nil { + defer response.Body.Close() + result, err := ioutil.ReadAll(response.Body) + if err == nil { + now := int32(time.Now().Unix()) + ttl := int32(*common.TTL) + result = []byte(fmt.Sprintf("%d:%s", now+ttl, result)) + db.Set([]byte(url), result) + } else { + fmt.Fprintf(os.Stderr, "Error Reading Response for \"%s\": %s\r\n", url, err.Error()) + } + } else { + fmt.Fprintf(os.Stderr, "Error Retrieving Proxy Results for \"%s\": %s\r\n", url, err.Error()) + } +}