Browse Source

initial commit

master
Brett Langdon 11 years ago
commit
102391bee9
7 changed files with 736 additions and 0 deletions
  1. +2
    -0
      .gitignore
  2. +4
    -0
      README.md
  3. +68
    -0
      main.go
  4. +267
    -0
      server/connection.go
  5. +53
    -0
      server/experiment.go
  6. +7
    -0
      server/logger.go
  7. +335
    -0
      server/server.go

+ 2
- 0
.gitignore View File

@ -0,0 +1,2 @@
dump.rdb
golab

+ 4
- 0
README.md View File

@ -0,0 +1,4 @@
GoLab
=====
A/B Testing server.

+ 68
- 0
main.go View File

@ -0,0 +1,68 @@
package main
import (
"flag"
"github.com/Sirupsen/logrus"
"github.com/brettlangdon/golab/server"
"github.com/garyburd/redigo/redis"
"net"
"strings"
"time"
)
func newPool(host string, idle int) *redis.Pool {
return &redis.Pool{
MaxIdle: idle,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
server.Log.Debugln("Worker Connecting to Redis Host", host)
c, err := redis.Dial("tcp", host)
if err != nil {
return nil, err
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
server.Log.Debugln("Worker PINGing Redis Host", host)
_, err := c.Do("PING")
return err
},
}
}
func main() {
var bind = flag.String("bind", ":11222", "Which [host]:port to bind to [default: ':11222']")
var level = flag.String("log", "INFO", "Set the log level [default: 'INFO']")
var rhost = flag.String("redis", ":6379", "Which redis [host]:port to connect to [default: ':6379']")
var rpool = flag.Int("pool", 3, "How many redis connections to have in the pool [default: 3]")
flag.Parse()
switch strings.ToLower(*level) {
case "debug":
server.Log.Level = logrus.DebugLevel
case "info":
server.Log.Level = logrus.InfoLevel
case "warn":
server.Log.Level = logrus.WarnLevel
case "error":
server.Log.Level = logrus.ErrorLevel
}
server.Log.Infoln("Starting Golab Server")
ln, err := net.Listen("tcp", *bind)
if err != nil {
return
}
server.Log.Debugln("Connecting to Redis Pool", *rhost, "with", *rpool, "Connections")
var pool *redis.Pool = newPool(*rhost, *rpool)
server.Log.Infoln("Accepting Connections on", *bind)
for {
conn, err := ln.Accept()
if err != nil {
continue
}
go server.HandleConnection(conn, pool)
}
}

+ 267
- 0
server/connection.go View File

@ -0,0 +1,267 @@
package server
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/garyburd/redigo/redis"
"math/rand"
"net"
"strconv"
)
type ClientConnection struct {
conn net.Conn
reader *bufio.Reader
pool *redis.Pool
redisConn redis.Conn
}
func (client *ClientConnection) ObtainRedisConnection() {
client.redisConn = client.pool.Get()
}
func (client *ClientConnection) ReleaseRedisConnection() {
client.redisConn.Close()
client.redisConn = nil
}
func (client *ClientConnection) ReadLine() (line []byte, err error) {
line, err = client.reader.ReadBytes('\n')
line = bytes.Trim(line, "\r\n ")
return line, err
}
func (client *ClientConnection) Error() (num int, err error) {
return client.conn.Write([]byte("ERROR\r\n"))
}
func (client *ClientConnection) ClientError(msg []byte) (num int, err error) {
response := append([]byte("CLIENT_ERROR "), msg...)
response = append(response, '\r', '\n')
return client.conn.Write(response)
}
func (client *ClientConnection) ServerError(msg []byte) (num int, err error) {
response := append([]byte("Server_ERROR "), msg...)
response = append(response, '\r', '\n')
return client.conn.Write(response)
}
func (client *ClientConnection) SendData(data []byte) (num int, err error) {
data = append(data, '\r', '\n')
return client.conn.Write(data)
}
func (client *ClientConnection) SendValue(key []byte, value []byte) (num int, err error) {
response := append([]byte("VALUE "), key...)
size := strconv.Itoa(len(value))
response = append(response, []byte(" 0 ")...)
response = append(response, []byte(size)...)
response = append(response, '\r', '\n')
response = append(response, value...)
response = append(response, '\r', '\n')
return client.conn.Write(response)
}
func (client *ClientConnection) SendStat(key []byte, value []byte) (num int, err error) {
response := append([]byte("STAT "), key...)
response = append(response, ' ')
response = bytes.Replace(response, []byte(":"), []byte("."), -1)
response = append(response, value...)
response = append(response, '\r', '\n')
return client.conn.Write(response)
}
func (client *ClientConnection) SendEnd() (num int, err error) {
return client.conn.Write([]byte("END\r\n"))
}
func (client *ClientConnection) SendStored() (num int, err error) {
return client.conn.Write([]byte("STORED\r\n"))
}
func (client *ClientConnection) SendNotStored() (num int, err error) {
return client.conn.Write([]byte("NOT_STORED\r\n"))
}
func (client *ClientConnection) SendDeleted() (num int, err error) {
return client.conn.Write([]byte("DELETED\r\n"))
}
func (client *ClientConnection) SendTouched() (num int, err error) {
return client.conn.Write([]byte("TOUCHED\r\n"))
}
func (client *ClientConnection) SendNotFound() (num int, err error) {
return client.conn.Write([]byte("NOT_FOUND\r\n"))
}
func (client *ClientConnection) GetExperiment(id uint64, active bool) (data Experiment, err error) {
dummy := Experiment{Id: id}
raw, err := client.redisConn.Do("GET", dummy.Key())
if err != nil {
fmt.Printf("%+v\r\n", err)
} else if raw != nil {
err = json.Unmarshal(raw.([]byte), &data)
if err == nil && data.Id == 0 {
data.Id = id
}
if err == nil && active {
raw, err := client.redisConn.Do("SISMEMBER", "active_experiments", id)
if err != nil || int(raw.(int64)) == 0 {
return data, errors.New("Experiment is not active")
}
}
} else {
return data, errors.New("No Experiment Found")
}
return data, err
}
func (client *ClientConnection) GetUserBucket(experiment Experiment, userId []byte, assign bool) (data Variant, err error) {
userKey := append([]byte("user:"), userId...)
raw, err := client.redisConn.Do("HGET", userKey, experiment.Key())
if err != nil {
fmt.Printf("%+v\r\n", err)
} else if raw != nil {
variantId, err := strconv.ParseUint(string(raw.([]byte)), 10, 64)
if err == nil {
for _, variant := range experiment.Variants {
if variant.Id == variantId {
data = variant
}
}
}
}
if assign && data.Id == 0 {
idx := rand.Intn(len(experiment.Variants))
data = experiment.Variants[idx]
_, err = client.redisConn.Do("HSET", userKey, experiment.Key(), data.Id)
_, err = client.redisConn.Do("SADD", experiment.BucketUniqueKey(data.Id), userId)
}
_, err = client.redisConn.Do("INCR", experiment.BucketImpressionsKey(data.Id))
return data, err
}
func (client *ClientConnection) ConvertUser(experimentId uint64, userId []byte) (err error) {
experiment, err := client.GetExperiment(experimentId, true)
if err != nil {
return err
}
bucket, err := client.GetUserBucket(experiment, userId, false)
if bucket.Id != 0 {
_, err = client.redisConn.Do("SADD", experiment.ConvertUniqueKey(bucket.Id), userId)
_, err = client.redisConn.Do("INCR", experiment.ConvertImpressionsKey(bucket.Id))
}
return err
}
func (client *ClientConnection) GetActiveExperimentIds() (data [][]byte, err error) {
raw, err := client.redisConn.Do("SMEMBERS", "active_experiments")
for _, id := range raw.([]interface{}) {
data = append(data, id.([]byte))
}
return data, err
}
func (client *ClientConnection) GetAllExperimentIds() (data [][]byte, err error) {
raw, err := client.redisConn.Do("KEYS", "experiment:*")
for _, key := range raw.([]interface{}) {
parts := bytes.Split(key.([]byte), []byte(":"))
if len(parts) != 2 {
continue
}
data = append(data, parts[1])
}
return data, err
}
func (client *ClientConnection) GetExperimentStats(experimentId uint64) (err error) {
experiment, err := client.GetExperiment(experimentId, false)
for _, variant := range experiment.Variants {
raw, _ := client.redisConn.Do("SCARD", experiment.BucketUniqueKey(variant.Id))
if raw == nil {
raw = int64(0)
}
client.SendStat(experiment.BucketUniqueKey(variant.Id), []byte(strconv.Itoa(int(raw.(int64)))))
raw, _ = client.redisConn.Do("GET", experiment.BucketImpressionsKey(variant.Id))
if raw == nil {
raw = []uint8{0}
}
client.SendStat(experiment.BucketImpressionsKey(variant.Id), raw.([]uint8))
raw, _ = client.redisConn.Do("SCARD", experiment.ConvertUniqueKey(variant.Id))
if raw == nil {
raw = int64(0)
}
client.SendStat(experiment.ConvertUniqueKey(variant.Id), []byte(strconv.Itoa(int(raw.(int64)))))
raw, _ = client.redisConn.Do("GET", experiment.ConvertImpressionsKey(variant.Id))
if raw == nil {
raw = []uint8{0}
}
client.SendStat(experiment.ConvertImpressionsKey(variant.Id), raw.([]uint8))
}
return err
}
func (client *ClientConnection) AddNewExperiment(data []byte) (err error) {
err = json.Unmarshal(data, new(interface{}))
if err != nil {
return err
}
raw, err := client.redisConn.Do("INCR", "experiments")
nextId := uint64(raw.(int64))
dummy := Experiment{Id: nextId}
_, err = client.redisConn.Do("SET", dummy.Key(), data)
return err
}
func (client *ClientConnection) UpdateExperiment(experimentId uint64, data []byte) (err error) {
err = json.Unmarshal(data, new(interface{}))
if err != nil {
return err
}
experiment, err := client.GetExperiment(experimentId, false)
if err != nil {
return err
}
_, err = client.redisConn.Do("SET", experiment.Key(), data)
return err
}
func (client *ClientConnection) DeactivateExperiment(experimentId uint64) (err error) {
_, err = client.redisConn.Do("SREM", "active_experiments", experimentId)
return err
}
func (client *ClientConnection) ActivateExperiment(experimentId uint64) (err error) {
_, err = client.redisConn.Do("SADD", "active_experiments", experimentId)
return err
}

+ 53
- 0
server/experiment.go View File

@ -0,0 +1,53 @@
package server
import (
"strconv"
)
type Experiment struct {
Id uint64 `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Hypothesis string `json:"hypothesis"`
Variants []Variant `json:"variants"`
}
func (exp *Experiment) Key() (key []byte) {
return append([]byte("experiment:"), []byte(strconv.FormatUint(exp.Id, 10))...)
}
func (exp *Experiment) BucketUniqueKey(id uint64) (key []byte) {
key = []byte("bucket-users:")
key = append(key, []byte(strconv.FormatUint(exp.Id, 10))...)
key = append(key, []byte(":")...)
return append(key, []byte(strconv.FormatUint(id, 10))...)
}
func (exp *Experiment) BucketImpressionsKey(id uint64) (key []byte) {
key = []byte("bucket-impressions:")
key = append(key, []byte(strconv.FormatUint(exp.Id, 10))...)
key = append(key, []byte(":")...)
return append(key, []byte(strconv.FormatUint(id, 10))...)
}
func (exp *Experiment) ConvertUniqueKey(id uint64) (key []byte) {
key = []byte("convert-users:")
key = append(key, []byte(strconv.FormatUint(exp.Id, 10))...)
key = append(key, []byte(":")...)
return append(key, []byte(strconv.FormatUint(id, 10))...)
}
func (exp *Experiment) ConvertImpressionsKey(id uint64) (key []byte) {
key = []byte("convert-impressions:")
key = append(key, []byte(strconv.FormatUint(exp.Id, 10))...)
key = append(key, []byte(":")...)
return append(key, []byte(strconv.FormatUint(id, 10))...)
}
type Variant struct {
Id uint64 `json:"id"`
Name string `json:"name"`
Value string `json:"value"`
Control bool `json:"control"`
Weight int64 `json:"weight"`
}

+ 7
- 0
server/logger.go View File

@ -0,0 +1,7 @@
package server
import (
"github.com/Sirupsen/logrus"
)
var Log = logrus.New()

+ 335
- 0
server/server.go View File

@ -0,0 +1,335 @@
package server
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"github.com/garyburd/redigo/redis"
"net"
"strconv"
)
func HandleConnection(conn net.Conn, pool *redis.Pool) {
defer conn.Close()
client := ClientConnection{
conn: conn,
reader: bufio.NewReader(conn),
pool: pool,
}
Log.Debugln("New Client Connection =>", conn.RemoteAddr())
for {
line, err := client.ReadLine()
if err != nil {
break
}
Log.Debugln("New Message =>", string(line))
err = HandleLine(line, client)
if err != nil {
break
}
}
}
func HandleLine(line []byte, client ClientConnection) (err error) {
parts := bytes.Split(line, []byte(" "))
if len(parts[0]) > 0 {
command := bytes.ToLower(parts[0])
args := parts[1:]
switch string(command) {
case "get", "gets":
err = HandleGet(args, client)
case "set", "replace":
err = HandleSet(args, client)
case "add":
err = HandleAdd(args, client)
case "delete":
err = HandleDelete(args, client)
case "incr":
err = HandleIncr(args, client)
case "touch":
err = HandleTouch(args, client)
case "stats":
err = HandleStats(args, client)
case "quit":
err = errors.New("Quitting")
case "decr", "flush", "append", "prepend", "cas":
Log.Debugln("Command", string(command), "Not Implemented")
_, err = client.ClientError([]byte("Command Not Implemented"))
default:
Log.Debugln("Command", string(command), "Unknown")
_, err = client.ClientError([]byte("Unknown Command"))
}
}
return err
}
func HandleGet(args [][]byte, client ClientConnection) (err error) {
client.ObtainRedisConnection()
defer client.ReleaseRedisConnection()
if len(args) == 0 {
_, err = client.ClientError([]byte("GET Command needs at least 1 '<experiment>:<user_id>' pair"))
} else {
for _, key := range args {
parts := bytes.Split(key, []byte(":"))
if len(parts) < 2 {
_, err = client.ClientError([]byte("GET argument must be in the format '<experiment>:<user_id>'"))
} else {
if bytes.Equal(bytes.ToLower(parts[0]), []byte("experiment")) {
ids := [][]byte{parts[1]}
multi := false
if bytes.Equal(parts[1], []byte("*")) {
multi = true
ids, err = client.GetAllExperimentIds()
if err != nil {
continue
}
} else if bytes.Equal(parts[1], []byte("active")) {
multi = true
ids, err = client.GetActiveExperimentIds()
if err != nil {
continue
}
}
experiments := make([]Experiment, 0)
for _, id := range ids {
experimentId, err := strconv.ParseUint(string(id), 10, 64)
if err != nil {
continue
}
experiment, err := client.GetExperiment(experimentId, false)
if err != nil {
continue
}
experiments = append(experiments, experiment)
}
if len(experiments) == 0 {
continue
}
var data []byte
if len(experiments) == 1 && multi == false {
data, err = json.Marshal(experiments[0])
} else {
data, err = json.Marshal(experiments)
}
if err != nil {
continue
}
_, err = client.SendValue(key, data)
if err != nil {
continue
}
} else {
experimentId, err := strconv.ParseUint(string(parts[0]), 10, 64)
if err != nil {
continue
}
experiment, err := client.GetExperiment(experimentId, true)
if err != nil {
continue
}
bucket, err := client.GetUserBucket(experiment, parts[1], true)
_, err = client.SendValue(key, []byte(bucket.Value))
if err != nil {
break
}
}
}
}
_, err = client.SendEnd()
}
return err
}
func HandleIncr(args [][]byte, client ClientConnection) (err error) {
client.ObtainRedisConnection()
defer client.ReleaseRedisConnection()
if len(args) < 1 {
_, err = client.ClientError([]byte("INCR command needs at least 1 argument <key> [delta]"))
} else {
parts := bytes.Split(args[0], []byte(":"))
stored := false
if len(parts) < 2 {
_, err = client.ClientError([]byte("INCR key must be in the format '<experiment>:<user_id>'"))
} else {
experimentId, err := strconv.ParseUint(string(parts[0]), 10, 64)
if err == nil {
err = client.ConvertUser(experimentId, parts[1])
stored = err == nil
}
}
if stored {
_, err = client.SendData([]byte("1"))
} else if stored == false {
_, err = client.SendData([]byte("0"))
}
}
return err
}
func HandleAdd(args [][]byte, client ClientConnection) (err error) {
client.ObtainRedisConnection()
defer client.ReleaseRedisConnection()
if len(args) < 4 {
_, err = client.ClientError([]byte("ADD Command needs at least 4 arguments, <key> <flags> <exptime> <bytes> [noreply]"))
} else {
key, rawBytes := args[0], args[3]
noreply := (len(args) >= 5 && bytes.Equal(bytes.ToLower(args[4]), []byte("noreply")))
numBytes, err := strconv.ParseUint(string(rawBytes), 10, 64)
stored := false
if err == nil {
data, err := client.ReadLine()
if err != nil || uint64(len(data)) != numBytes {
_, err = client.ClientError([]byte("Value length does not match number of bytes send"))
} else {
experimentId, err := strconv.ParseUint(string(key), 10, 64)
if err != nil || experimentId <= 0 {
_, err = client.ClientError([]byte("ADD key must be a positive integer"))
}
err = client.AddNewExperiment(data)
stored = err == nil
}
}
if stored && noreply == false {
_, err = client.SendStored()
} else if stored == false && noreply == false {
_, err = client.SendNotStored()
}
}
return err
}
func HandleSet(args [][]byte, client ClientConnection) (err error) {
client.ObtainRedisConnection()
defer client.ReleaseRedisConnection()
if len(args) < 4 {
_, err = client.ClientError([]byte("SET Command needs at least 4 arguments, <key> <flags> <exptime> <bytes> [noreply]"))
} else {
key, rawBytes := args[0], args[3]
noreply := (len(args) >= 5 && bytes.Equal(bytes.ToLower(args[4]), []byte("noreply")))
numBytes, err := strconv.ParseUint(string(rawBytes), 10, 64)
stored := false
if err == nil {
data, err := client.ReadLine()
if err != nil || uint64(len(data)) != numBytes {
_, err = client.ClientError([]byte("Value length does not match number of bytes send"))
} else {
experimentId, err := strconv.ParseUint(string(key), 10, 64)
if err != nil || experimentId <= 0 {
_, err = client.ClientError([]byte("SET key must be a positive integer"))
}
err = client.UpdateExperiment(experimentId, data)
stored = err == nil
}
}
if stored && noreply == false {
_, err = client.SendStored()
} else if stored == false && noreply == false {
_, err = client.SendNotStored()
}
}
return err
}
func HandleStats(args [][]byte, client ClientConnection) (err error) {
client.ObtainRedisConnection()
defer client.ReleaseRedisConnection()
if len(args) == 0 {
args, err = client.GetActiveExperimentIds()
}
for _, rawId := range args {
id, err := strconv.ParseUint(string(rawId), 10, 64)
if err != nil {
continue
}
err = client.GetExperimentStats(id)
if err != nil {
break
}
}
_, err = client.SendEnd()
return err
}
func HandleDelete(args [][]byte, client ClientConnection) (err error) {
client.ObtainRedisConnection()
defer client.ReleaseRedisConnection()
if len(args) < 1 {
_, err = client.ClientError([]byte("DELETE command takes 1 argument <key>"))
} else {
id, err := strconv.ParseUint(string(args[0]), 10, 64)
if err == nil {
err = client.DeactivateExperiment(id)
if err == nil {
_, err = client.SendDeleted()
} else {
_, err = client.SendNotFound()
}
} else {
_, err = client.SendNotFound()
}
}
return err
}
func HandleTouch(args [][]byte, client ClientConnection) (err error) {
client.ObtainRedisConnection()
defer client.ReleaseRedisConnection()
if len(args) < 2 {
_, err = client.ClientError([]byte("TOUCH command takes at least 2 arguments <key> <exptime> [noreply]"))
} else {
id, err := strconv.ParseUint(string(args[0]), 10, 64)
noreply := (len(args) >= 3 && bytes.Equal(bytes.ToLower(args[2]), []byte("noreply")))
if err == nil {
err = client.ActivateExperiment(id)
if err == nil && noreply == false {
_, err = client.SendTouched()
} else if noreply == false {
_, err = client.SendNotFound()
}
} else if noreply == false {
_, err = client.SendNotFound()
}
}
return err
}

Loading…
Cancel
Save