From 102391bee90dcc7c09b8a1a6c38e0f79f92329c8 Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Mon, 2 Feb 2015 22:04:10 -0500 Subject: [PATCH] initial commit --- .gitignore | 2 + README.md | 4 + main.go | 68 +++++++++ server/connection.go | 267 ++++++++++++++++++++++++++++++++++ server/experiment.go | 53 +++++++ server/logger.go | 7 + server/server.go | 335 +++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 736 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 main.go create mode 100644 server/connection.go create mode 100644 server/experiment.go create mode 100644 server/logger.go create mode 100644 server/server.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..42e8517 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +dump.rdb +golab diff --git a/README.md b/README.md new file mode 100644 index 0000000..1387d29 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +GoLab +===== + +A/B Testing server. diff --git a/main.go b/main.go new file mode 100644 index 0000000..0f29e62 --- /dev/null +++ b/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "flag" + "github.com/Sirupsen/logrus" + "github.com/brettlangdon/golab/server" + "github.com/garyburd/redigo/redis" + "net" + "strings" + "time" +) + +func newPool(host string, idle int) *redis.Pool { + return &redis.Pool{ + MaxIdle: idle, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + server.Log.Debugln("Worker Connecting to Redis Host", host) + c, err := redis.Dial("tcp", host) + if err != nil { + return nil, err + } + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + server.Log.Debugln("Worker PINGing Redis Host", host) + _, err := c.Do("PING") + return err + }, + } +} + +func main() { + var bind = flag.String("bind", ":11222", "Which [host]:port to bind to [default: ':11222']") + var level = flag.String("log", "INFO", "Set the log level [default: 'INFO']") + var rhost = flag.String("redis", ":6379", "Which redis [host]:port to connect to [default: ':6379']") + var rpool = flag.Int("pool", 3, "How many redis connections to have in the pool [default: 3]") + flag.Parse() + + switch strings.ToLower(*level) { + case "debug": + server.Log.Level = logrus.DebugLevel + case "info": + server.Log.Level = logrus.InfoLevel + case "warn": + server.Log.Level = logrus.WarnLevel + case "error": + server.Log.Level = logrus.ErrorLevel + } + + server.Log.Infoln("Starting Golab Server") + ln, err := net.Listen("tcp", *bind) + if err != nil { + return + } + + server.Log.Debugln("Connecting to Redis Pool", *rhost, "with", *rpool, "Connections") + var pool *redis.Pool = newPool(*rhost, *rpool) + + server.Log.Infoln("Accepting Connections on", *bind) + for { + conn, err := ln.Accept() + if err != nil { + continue + } + go server.HandleConnection(conn, pool) + } +} diff --git a/server/connection.go b/server/connection.go new file mode 100644 index 0000000..5ff645f --- /dev/null +++ b/server/connection.go @@ -0,0 +1,267 @@ +package server + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "fmt" + "github.com/garyburd/redigo/redis" + "math/rand" + "net" + "strconv" +) + +type ClientConnection struct { + conn net.Conn + reader *bufio.Reader + pool *redis.Pool + redisConn redis.Conn +} + +func (client *ClientConnection) ObtainRedisConnection() { + client.redisConn = client.pool.Get() +} + +func (client *ClientConnection) ReleaseRedisConnection() { + client.redisConn.Close() + client.redisConn = nil +} + +func (client *ClientConnection) ReadLine() (line []byte, err error) { + line, err = client.reader.ReadBytes('\n') + line = bytes.Trim(line, "\r\n ") + return line, err +} + +func (client *ClientConnection) Error() (num int, err error) { + return client.conn.Write([]byte("ERROR\r\n")) +} + +func (client *ClientConnection) ClientError(msg []byte) (num int, err error) { + response := append([]byte("CLIENT_ERROR "), msg...) + response = append(response, '\r', '\n') + return client.conn.Write(response) +} + +func (client *ClientConnection) ServerError(msg []byte) (num int, err error) { + response := append([]byte("Server_ERROR "), msg...) + response = append(response, '\r', '\n') + return client.conn.Write(response) +} + +func (client *ClientConnection) SendData(data []byte) (num int, err error) { + data = append(data, '\r', '\n') + return client.conn.Write(data) +} + +func (client *ClientConnection) SendValue(key []byte, value []byte) (num int, err error) { + response := append([]byte("VALUE "), key...) + size := strconv.Itoa(len(value)) + + response = append(response, []byte(" 0 ")...) + response = append(response, []byte(size)...) + response = append(response, '\r', '\n') + response = append(response, value...) + response = append(response, '\r', '\n') + return client.conn.Write(response) +} + +func (client *ClientConnection) SendStat(key []byte, value []byte) (num int, err error) { + response := append([]byte("STAT "), key...) + response = append(response, ' ') + response = bytes.Replace(response, []byte(":"), []byte("."), -1) + response = append(response, value...) + response = append(response, '\r', '\n') + return client.conn.Write(response) +} + +func (client *ClientConnection) SendEnd() (num int, err error) { + return client.conn.Write([]byte("END\r\n")) +} + +func (client *ClientConnection) SendStored() (num int, err error) { + return client.conn.Write([]byte("STORED\r\n")) +} + +func (client *ClientConnection) SendNotStored() (num int, err error) { + return client.conn.Write([]byte("NOT_STORED\r\n")) +} + +func (client *ClientConnection) SendDeleted() (num int, err error) { + return client.conn.Write([]byte("DELETED\r\n")) +} + +func (client *ClientConnection) SendTouched() (num int, err error) { + return client.conn.Write([]byte("TOUCHED\r\n")) +} + +func (client *ClientConnection) SendNotFound() (num int, err error) { + return client.conn.Write([]byte("NOT_FOUND\r\n")) +} + +func (client *ClientConnection) GetExperiment(id uint64, active bool) (data Experiment, err error) { + dummy := Experiment{Id: id} + raw, err := client.redisConn.Do("GET", dummy.Key()) + if err != nil { + fmt.Printf("%+v\r\n", err) + } else if raw != nil { + err = json.Unmarshal(raw.([]byte), &data) + if err == nil && data.Id == 0 { + data.Id = id + } + + if err == nil && active { + raw, err := client.redisConn.Do("SISMEMBER", "active_experiments", id) + if err != nil || int(raw.(int64)) == 0 { + return data, errors.New("Experiment is not active") + } + } + + } else { + return data, errors.New("No Experiment Found") + } + return data, err +} + +func (client *ClientConnection) GetUserBucket(experiment Experiment, userId []byte, assign bool) (data Variant, err error) { + userKey := append([]byte("user:"), userId...) + raw, err := client.redisConn.Do("HGET", userKey, experiment.Key()) + if err != nil { + fmt.Printf("%+v\r\n", err) + } else if raw != nil { + variantId, err := strconv.ParseUint(string(raw.([]byte)), 10, 64) + if err == nil { + for _, variant := range experiment.Variants { + if variant.Id == variantId { + data = variant + } + } + } + } + + if assign && data.Id == 0 { + idx := rand.Intn(len(experiment.Variants)) + data = experiment.Variants[idx] + _, err = client.redisConn.Do("HSET", userKey, experiment.Key(), data.Id) + _, err = client.redisConn.Do("SADD", experiment.BucketUniqueKey(data.Id), userId) + } + + _, err = client.redisConn.Do("INCR", experiment.BucketImpressionsKey(data.Id)) + + return data, err +} + +func (client *ClientConnection) ConvertUser(experimentId uint64, userId []byte) (err error) { + experiment, err := client.GetExperiment(experimentId, true) + if err != nil { + return err + } + + bucket, err := client.GetUserBucket(experiment, userId, false) + + if bucket.Id != 0 { + _, err = client.redisConn.Do("SADD", experiment.ConvertUniqueKey(bucket.Id), userId) + _, err = client.redisConn.Do("INCR", experiment.ConvertImpressionsKey(bucket.Id)) + } + + return err +} + +func (client *ClientConnection) GetActiveExperimentIds() (data [][]byte, err error) { + raw, err := client.redisConn.Do("SMEMBERS", "active_experiments") + + for _, id := range raw.([]interface{}) { + data = append(data, id.([]byte)) + } + + return data, err +} + +func (client *ClientConnection) GetAllExperimentIds() (data [][]byte, err error) { + raw, err := client.redisConn.Do("KEYS", "experiment:*") + + for _, key := range raw.([]interface{}) { + parts := bytes.Split(key.([]byte), []byte(":")) + if len(parts) != 2 { + continue + } + data = append(data, parts[1]) + } + + return data, err +} + +func (client *ClientConnection) GetExperimentStats(experimentId uint64) (err error) { + experiment, err := client.GetExperiment(experimentId, false) + + for _, variant := range experiment.Variants { + raw, _ := client.redisConn.Do("SCARD", experiment.BucketUniqueKey(variant.Id)) + if raw == nil { + raw = int64(0) + } + client.SendStat(experiment.BucketUniqueKey(variant.Id), []byte(strconv.Itoa(int(raw.(int64))))) + + raw, _ = client.redisConn.Do("GET", experiment.BucketImpressionsKey(variant.Id)) + if raw == nil { + raw = []uint8{0} + } + client.SendStat(experiment.BucketImpressionsKey(variant.Id), raw.([]uint8)) + + raw, _ = client.redisConn.Do("SCARD", experiment.ConvertUniqueKey(variant.Id)) + if raw == nil { + raw = int64(0) + } + client.SendStat(experiment.ConvertUniqueKey(variant.Id), []byte(strconv.Itoa(int(raw.(int64))))) + + raw, _ = client.redisConn.Do("GET", experiment.ConvertImpressionsKey(variant.Id)) + if raw == nil { + raw = []uint8{0} + } + client.SendStat(experiment.ConvertImpressionsKey(variant.Id), raw.([]uint8)) + } + + return err +} + +func (client *ClientConnection) AddNewExperiment(data []byte) (err error) { + err = json.Unmarshal(data, new(interface{})) + + if err != nil { + return err + } + + raw, err := client.redisConn.Do("INCR", "experiments") + nextId := uint64(raw.(int64)) + + dummy := Experiment{Id: nextId} + _, err = client.redisConn.Do("SET", dummy.Key(), data) + return err +} + +func (client *ClientConnection) UpdateExperiment(experimentId uint64, data []byte) (err error) { + err = json.Unmarshal(data, new(interface{})) + + if err != nil { + return err + } + + experiment, err := client.GetExperiment(experimentId, false) + + if err != nil { + return err + } + + _, err = client.redisConn.Do("SET", experiment.Key(), data) + return err +} + +func (client *ClientConnection) DeactivateExperiment(experimentId uint64) (err error) { + _, err = client.redisConn.Do("SREM", "active_experiments", experimentId) + return err +} + +func (client *ClientConnection) ActivateExperiment(experimentId uint64) (err error) { + _, err = client.redisConn.Do("SADD", "active_experiments", experimentId) + return err +} diff --git a/server/experiment.go b/server/experiment.go new file mode 100644 index 0000000..1437cac --- /dev/null +++ b/server/experiment.go @@ -0,0 +1,53 @@ +package server + +import ( + "strconv" +) + +type Experiment struct { + Id uint64 `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Hypothesis string `json:"hypothesis"` + Variants []Variant `json:"variants"` +} + +func (exp *Experiment) Key() (key []byte) { + return append([]byte("experiment:"), []byte(strconv.FormatUint(exp.Id, 10))...) +} + +func (exp *Experiment) BucketUniqueKey(id uint64) (key []byte) { + key = []byte("bucket-users:") + key = append(key, []byte(strconv.FormatUint(exp.Id, 10))...) + key = append(key, []byte(":")...) + return append(key, []byte(strconv.FormatUint(id, 10))...) +} + +func (exp *Experiment) BucketImpressionsKey(id uint64) (key []byte) { + key = []byte("bucket-impressions:") + key = append(key, []byte(strconv.FormatUint(exp.Id, 10))...) + key = append(key, []byte(":")...) + return append(key, []byte(strconv.FormatUint(id, 10))...) +} + +func (exp *Experiment) ConvertUniqueKey(id uint64) (key []byte) { + key = []byte("convert-users:") + key = append(key, []byte(strconv.FormatUint(exp.Id, 10))...) + key = append(key, []byte(":")...) + return append(key, []byte(strconv.FormatUint(id, 10))...) +} + +func (exp *Experiment) ConvertImpressionsKey(id uint64) (key []byte) { + key = []byte("convert-impressions:") + key = append(key, []byte(strconv.FormatUint(exp.Id, 10))...) + key = append(key, []byte(":")...) + return append(key, []byte(strconv.FormatUint(id, 10))...) +} + +type Variant struct { + Id uint64 `json:"id"` + Name string `json:"name"` + Value string `json:"value"` + Control bool `json:"control"` + Weight int64 `json:"weight"` +} diff --git a/server/logger.go b/server/logger.go new file mode 100644 index 0000000..68ffef4 --- /dev/null +++ b/server/logger.go @@ -0,0 +1,7 @@ +package server + +import ( + "github.com/Sirupsen/logrus" +) + +var Log = logrus.New() diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..531d5a2 --- /dev/null +++ b/server/server.go @@ -0,0 +1,335 @@ +package server + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "github.com/garyburd/redigo/redis" + "net" + "strconv" +) + +func HandleConnection(conn net.Conn, pool *redis.Pool) { + defer conn.Close() + + client := ClientConnection{ + conn: conn, + reader: bufio.NewReader(conn), + pool: pool, + } + Log.Debugln("New Client Connection =>", conn.RemoteAddr()) + + for { + line, err := client.ReadLine() + if err != nil { + break + } + + Log.Debugln("New Message =>", string(line)) + err = HandleLine(line, client) + if err != nil { + break + } + } +} + +func HandleLine(line []byte, client ClientConnection) (err error) { + parts := bytes.Split(line, []byte(" ")) + + if len(parts[0]) > 0 { + command := bytes.ToLower(parts[0]) + args := parts[1:] + switch string(command) { + case "get", "gets": + err = HandleGet(args, client) + case "set", "replace": + err = HandleSet(args, client) + case "add": + err = HandleAdd(args, client) + case "delete": + err = HandleDelete(args, client) + case "incr": + err = HandleIncr(args, client) + case "touch": + err = HandleTouch(args, client) + case "stats": + err = HandleStats(args, client) + case "quit": + err = errors.New("Quitting") + case "decr", "flush", "append", "prepend", "cas": + Log.Debugln("Command", string(command), "Not Implemented") + _, err = client.ClientError([]byte("Command Not Implemented")) + default: + Log.Debugln("Command", string(command), "Unknown") + _, err = client.ClientError([]byte("Unknown Command")) + } + } + + return err +} + +func HandleGet(args [][]byte, client ClientConnection) (err error) { + client.ObtainRedisConnection() + defer client.ReleaseRedisConnection() + + if len(args) == 0 { + _, err = client.ClientError([]byte("GET Command needs at least 1 ':' pair")) + } else { + for _, key := range args { + parts := bytes.Split(key, []byte(":")) + if len(parts) < 2 { + _, err = client.ClientError([]byte("GET argument must be in the format ':'")) + } else { + if bytes.Equal(bytes.ToLower(parts[0]), []byte("experiment")) { + ids := [][]byte{parts[1]} + multi := false + if bytes.Equal(parts[1], []byte("*")) { + multi = true + ids, err = client.GetAllExperimentIds() + if err != nil { + continue + } + } else if bytes.Equal(parts[1], []byte("active")) { + multi = true + ids, err = client.GetActiveExperimentIds() + if err != nil { + continue + } + } + + experiments := make([]Experiment, 0) + for _, id := range ids { + experimentId, err := strconv.ParseUint(string(id), 10, 64) + if err != nil { + continue + } + + experiment, err := client.GetExperiment(experimentId, false) + if err != nil { + continue + } + experiments = append(experiments, experiment) + } + + if len(experiments) == 0 { + continue + } + + var data []byte + if len(experiments) == 1 && multi == false { + data, err = json.Marshal(experiments[0]) + } else { + data, err = json.Marshal(experiments) + } + + if err != nil { + continue + } + + _, err = client.SendValue(key, data) + if err != nil { + continue + } + + } else { + experimentId, err := strconv.ParseUint(string(parts[0]), 10, 64) + if err != nil { + continue + } + experiment, err := client.GetExperiment(experimentId, true) + if err != nil { + continue + } + + bucket, err := client.GetUserBucket(experiment, parts[1], true) + _, err = client.SendValue(key, []byte(bucket.Value)) + if err != nil { + break + } + } + } + } + _, err = client.SendEnd() + } + + return err +} + +func HandleIncr(args [][]byte, client ClientConnection) (err error) { + client.ObtainRedisConnection() + defer client.ReleaseRedisConnection() + + if len(args) < 1 { + _, err = client.ClientError([]byte("INCR command needs at least 1 argument [delta]")) + } else { + parts := bytes.Split(args[0], []byte(":")) + stored := false + if len(parts) < 2 { + _, err = client.ClientError([]byte("INCR key must be in the format ':'")) + } else { + experimentId, err := strconv.ParseUint(string(parts[0]), 10, 64) + if err == nil { + err = client.ConvertUser(experimentId, parts[1]) + stored = err == nil + } + + } + if stored { + _, err = client.SendData([]byte("1")) + } else if stored == false { + _, err = client.SendData([]byte("0")) + } + } + + return err + +} + +func HandleAdd(args [][]byte, client ClientConnection) (err error) { + client.ObtainRedisConnection() + defer client.ReleaseRedisConnection() + + if len(args) < 4 { + _, err = client.ClientError([]byte("ADD Command needs at least 4 arguments, [noreply]")) + } else { + key, rawBytes := args[0], args[3] + noreply := (len(args) >= 5 && bytes.Equal(bytes.ToLower(args[4]), []byte("noreply"))) + + numBytes, err := strconv.ParseUint(string(rawBytes), 10, 64) + stored := false + if err == nil { + data, err := client.ReadLine() + if err != nil || uint64(len(data)) != numBytes { + _, err = client.ClientError([]byte("Value length does not match number of bytes send")) + } else { + experimentId, err := strconv.ParseUint(string(key), 10, 64) + if err != nil || experimentId <= 0 { + _, err = client.ClientError([]byte("ADD key must be a positive integer")) + } + + err = client.AddNewExperiment(data) + stored = err == nil + } + } + + if stored && noreply == false { + _, err = client.SendStored() + } else if stored == false && noreply == false { + _, err = client.SendNotStored() + } + + } + + return err +} + +func HandleSet(args [][]byte, client ClientConnection) (err error) { + client.ObtainRedisConnection() + defer client.ReleaseRedisConnection() + + if len(args) < 4 { + _, err = client.ClientError([]byte("SET Command needs at least 4 arguments, [noreply]")) + } else { + key, rawBytes := args[0], args[3] + noreply := (len(args) >= 5 && bytes.Equal(bytes.ToLower(args[4]), []byte("noreply"))) + + numBytes, err := strconv.ParseUint(string(rawBytes), 10, 64) + stored := false + if err == nil { + data, err := client.ReadLine() + if err != nil || uint64(len(data)) != numBytes { + _, err = client.ClientError([]byte("Value length does not match number of bytes send")) + } else { + experimentId, err := strconv.ParseUint(string(key), 10, 64) + if err != nil || experimentId <= 0 { + _, err = client.ClientError([]byte("SET key must be a positive integer")) + } + + err = client.UpdateExperiment(experimentId, data) + stored = err == nil + } + } + + if stored && noreply == false { + _, err = client.SendStored() + } else if stored == false && noreply == false { + _, err = client.SendNotStored() + } + + } + + return err +} + +func HandleStats(args [][]byte, client ClientConnection) (err error) { + client.ObtainRedisConnection() + defer client.ReleaseRedisConnection() + + if len(args) == 0 { + args, err = client.GetActiveExperimentIds() + } + + for _, rawId := range args { + id, err := strconv.ParseUint(string(rawId), 10, 64) + if err != nil { + continue + } + err = client.GetExperimentStats(id) + if err != nil { + break + } + } + _, err = client.SendEnd() + + return err +} + +func HandleDelete(args [][]byte, client ClientConnection) (err error) { + client.ObtainRedisConnection() + defer client.ReleaseRedisConnection() + + if len(args) < 1 { + _, err = client.ClientError([]byte("DELETE command takes 1 argument ")) + } else { + + id, err := strconv.ParseUint(string(args[0]), 10, 64) + if err == nil { + err = client.DeactivateExperiment(id) + if err == nil { + _, err = client.SendDeleted() + } else { + _, err = client.SendNotFound() + } + } else { + _, err = client.SendNotFound() + } + } + + return err +} + +func HandleTouch(args [][]byte, client ClientConnection) (err error) { + client.ObtainRedisConnection() + defer client.ReleaseRedisConnection() + + if len(args) < 2 { + _, err = client.ClientError([]byte("TOUCH command takes at least 2 arguments [noreply]")) + } else { + id, err := strconv.ParseUint(string(args[0]), 10, 64) + noreply := (len(args) >= 3 && bytes.Equal(bytes.ToLower(args[2]), []byte("noreply"))) + + if err == nil { + err = client.ActivateExperiment(id) + if err == nil && noreply == false { + _, err = client.SendTouched() + } else if noreply == false { + _, err = client.SendNotFound() + } + } else if noreply == false { + _, err = client.SendNotFound() + } + } + + return err +}