diff --git a/Makefile b/Makefile deleted file mode 100644 index 2ae2bbf..0000000 --- a/Makefile +++ /dev/null @@ -1,32 +0,0 @@ -INSTALL=/usr/bin/install -CFLAGS=$(shell curl-config --cflags) -Wall $(EXTRA_CFLAGS) -LFLAGS=$(shell curl-config --libs) -lpthread -lkyotocabinet -CC=gcc -PREFIX=/usr/local -BINDIR = $(PREFIX)/bin - -default: ferrite - -src/%.o: src/%.c src/%.h - $(CC) -c $(CFLAGS) -o $@ $< - -ferrite: ./src/util.o ./src/queue.o ./src/proxy.o ./src/handlers.o ./src/main.o - $(CC) $(CFLAGS) -o $@ $^ $(LFLAGS) - -debug: - make -f Makefile EXTRA_CFLAGS="-g -O0" - -run: - ./ferrite - -.PHONY: clean debug release - -release: clean - make -f Makefile EXTRA_CFLAGS="-O3" - -install: - $(INSTALL) ./ferrite $(BINDIR) - -clean: - rm -f ./src/*.o - rm -f ./ferrite diff --git a/README.md b/README.md index bd63549..7d44d25 100644 --- a/README.md +++ b/README.md @@ -3,91 +3,6 @@ ferrite A very fast kyoto cabinet powered memcached interface API proxy caching server. -This is not your normal caching proxy server. For starters it doesn't speak HTTP (other than to the backend), -you interact with it via a subset of the memcached commands. The main purpose is you give it a url like: -`http://api.my-domain.com/api/` and then you make memcache get calls like `get/users` or `user/12` or any other -GET only api end point. The result content does not matter at all (which means you could use this to cache HTTP get calls). - -The other _very_ important thing about `ferrite` is that when it gets a cache miss it does now wait around for a response -from the proxy server, it will send back an empty string to the client making the request and then queue up the request to -the proxy server. This means that although you will get back an empty string when you get a cache miss your response times -from this proxy server will be consistent (which can be very important for high performance applications). - -## Name -The name `ferrite` comes from: [Ferrite Magnet](http://en.wikipedia.org/wiki/Ferrite_\(magnet\)) - -## Install -### Requirements - * [Kyoto Cabinet](http://fallabs.com/kyotocabinet/pkg/) - built and tested with version `1.2.76` - * [libcurl](http://curl.haxx.se/libcurl/) - built and tested with version `7.21.4` - -```bash -git clone git://github.com/brettlangdon/ferrite.git -cd ./ferrite -make release -make install -``` - -## Using -### Usage -```bash -$ ferrite --help -Usage: ferrite [-h|--help] [-p|--port ] [-w|--workers ] [-u|--url ] [-c|--cache ] [-e|--expire ] - -p, --port - which port number to bind too [default: 7000] - -w, --workers - how many background workers to spawn [default: 10] - -u, --url - which url to proxy requests to [default: http://127.0.0.1:8000] - -c, --cache - kyoto cabinet cache to use [default: "*"] - -e, --exp - the expiration time in seconds from when a record is cached, 0 to disable [default:3600] - -h, --help - display this message -``` - -### Memcache Client -Just use your favorite memcache client -```python -import pymemcache.client -mc = pymemcache.client.Client([("127.0.0.1", 7000)]) -users = mc.get("/all/users") -``` - -### Telnet -```bash -telnet 127.0.0.1 7000 -Trying 127.0.0.1... -Connected to localhost. -Escape character is '^]'. -stats -STAT connections 1 -STAT requests 0 -STAT hits 0 -STAT misses 0 -STAT hit_ratio 0.0000 -STAT backlog 0 -STAT bnum 1048583 -STAT capcnt -1 -STAT capsiz -1 -STAT chksum 255 -STAT count 0 -STAT fmtver 5 -STAT librev 13 -STAT libver 16 -STAT opts 0 -STAT path * -STAT realtype 32 -STAT recovered 0 -STAT reorganized 0 -STAT size 8390432 -STAT type 32 -END -get all/users -VALUE all/users 0 0 - -END -get all/users -VALUE all/users 0 2 -{} -END -``` - ## License ``` The MIT License (MIT) diff --git a/commands/commands.go b/commands/commands.go new file mode 100644 index 0000000..a1371c4 --- /dev/null +++ b/commands/commands.go @@ -0,0 +1,157 @@ +package commands + +import ( + "bitbucket.org/ww/cabinet" + "bytes" + "fmt" + "github.com/brettlangdon/ferrite/common" + "github.com/brettlangdon/ferrite/proxy" + "net/url" + "os" + "strconv" + "strings" + "sync/atomic" + "time" +) + +func ParseCommand(request string) (string, []string) { + request = strings.ToLower(request) + tokens := strings.Split(strings.Trim(request, " \r\n"), " ") + var command string + if len(tokens) > 0 { + command = tokens[0] + } + return command, tokens +} + +func HandleCommand(command string, tokens []string, db *cabinet.KCDB) string { + var result string + switch command { + case "get": + result = HandleGet(tokens[1:], db) + break + case "stats": + result = HandleStats(db) + break + case "flush_all": + result = HandleFlushAll(db) + break + case "delete": + result = HandleDelete(tokens[1:], db) + break + case "version": + result = HandleVersion() + break + default: + result = "ERROR\r\n" + break + } + + return result +} + +func HandleGet(tokens []string, db *cabinet.KCDB) string { + var out string + if len(tokens) >= 1 { + key := tokens[0] + if strings.HasPrefix(key, "http%3a%2f%2f") || strings.HasPrefix(key, "https%3a%2f%2f") { + key, _ = url.QueryUnescape(key) + } + result, err := db.Get([]byte(key)) + if err != nil { + atomic.AddInt32(&common.MISSES, 1) + err := db.Set([]byte(key), []byte("0")) + if err == nil { + go proxy.CallProxy(key, db) + } else { + fmt.Fprintf(os.Stderr, "Error Setting Key \"%s\" Value \"0\": $s\r\n", key, err.Error()) + } + out = "END\r\n" + } else if bytes.Equal(result, []byte("0")) { + atomic.AddInt32(&common.MISSES, 1) + out = "END\r\n" + } else { + parts := strings.SplitN(string(result), ":", 2) + var expire int64 = 0 + if len(parts) == 2 { + expire, err = strconv.ParseInt(parts[0], 10, 32) + if err != nil { + expire = 0 + fmt.Fprintf(os.Stderr, "Error Parsing Expiration \"%s\": %s\r\n", parts[0], err.Error()) + } + result = []byte(parts[1]) + } + now := int64(time.Now().Unix()) + if expire > 0 && expire < now { + err := db.Set([]byte(key), []byte("0")) + if err == nil { + go proxy.CallProxy(key, db) + } else { + fmt.Fprintf(os.Stderr, "Error Setting Key \"%s\" Value \"0\": $s\r\n", key, err.Error()) + } + atomic.AddInt32(&common.MISSES, 1) + out = "END\r\n" + } else { + atomic.AddInt32(&common.HITS, 1) + out = fmt.Sprintf("VALUE %s 0 %d\r\n%s\r\nEND\r\n", key, len(result), result) + } + } + } else { + out = "ERROR\r\n" + } + return out +} + +func HandleStats(db *cabinet.KCDB) string { + hits := atomic.LoadInt32(&common.HITS) + misses := atomic.LoadInt32(&common.MISSES) + connections := atomic.LoadInt32(&common.CONNECTIONS) + var hit_ratio float32 = 0 + if hits > 0 { + hit_ratio = float32(hits) / float32(hits+misses) + } + + db_stats := "" + status, err := db.Status() + if err == nil { + lines := strings.Split(status, "\n") + for _, line := range lines { + parts := strings.SplitN(strings.Trim(line, "\r\n"), "\t", 2) + if len(parts) == 2 { + db_stats += fmt.Sprintf("STAT %s %s\r\n", parts[0], parts[1]) + } + } + } else { + fmt.Fprintf(os.Stderr, "Error Retrieving DB Status: %s\r\n", err.Error()) + } + return fmt.Sprintf("STAT hits %d\r\n"+ + "STAT misses %d\r\n"+ + "STAT hit_ratio %1.4f\r\n"+ + "STAT connections %d\r\n"+ + "%s"+ + "END\r\n", + hits, misses, hit_ratio, connections, db_stats, + ) +} + +func HandleFlushAll(db *cabinet.KCDB) string { + db.Clear() + return "OK\r\n" +} + +func HandleDelete(tokens []string, db *cabinet.KCDB) string { + if len(tokens) <= 0 { + return "ERROR\r\n" + } + + key := tokens[0] + err := db.Remove([]byte(key)) + if err != nil { + return "NOT_FOUND\r\n" + } + return "DELETED\r\n" +} + +func HandleVersion() string { + return fmt.Sprintf("VERSION %s\r\n", common.VERSION) +} diff --git a/common/common.go b/common/common.go new file mode 100644 index 0000000..c1c265c --- /dev/null +++ b/common/common.go @@ -0,0 +1,13 @@ +package common + +import ( + "flag" +) + +var HITS int32 = 0 +var MISSES int32 = 0 +var CONNECTIONS int32 = 0 +var TTL *int = flag.Int("ttl", 3600, "the TTL in seconds for a newly cached object") +var BIND *string = flag.String("bind", "0.0.0.0:7000", "the [
]: to bind to") +var CACHE *string = flag.String("cache", "*", "the kyoto cabinet cache path") +var VERSION string = "0.0.1" diff --git a/main.go b/main.go new file mode 100644 index 0000000..ee9c21a --- /dev/null +++ b/main.go @@ -0,0 +1,82 @@ +package main + +import ( + "bitbucket.org/ww/cabinet" + "flag" + "fmt" + "github.com/brettlangdon/ferrite/commands" + "github.com/brettlangdon/ferrite/common" + "net" + "os" + "os/signal" + "sync/atomic" +) + +func handler(client net.Conn, db *cabinet.KCDB) { + atomic.AddInt32(&common.CONNECTIONS, 1) + defer func() { + client.Close() + atomic.AddInt32(&common.CONNECTIONS, -1) + }() + buffer := make([]byte, 1024) + for { + n, err := client.Read(buffer) + if err != nil { + break + } + if n <= 0 { + continue + } + + command, tokens := commands.ParseCommand(string(buffer[:n])) + if len(command) == 0 { + continue + } + if command == "quit" || command == "exit" { + break + } + response := commands.HandleCommand(command, tokens, db) + _, err = client.Write([]byte(response)) + } +} + +func main() { + flag.Parse() + + fmt.Printf("Using TTL: %d (sec)\r\n", *common.TTL) + fmt.Printf("Opening Cache: %s\r\n", *common.CACHE) + db := cabinet.New() + err := db.Open(*common.CACHE, cabinet.KCOWRITER|cabinet.KCOCREATE) + if err != nil { + fmt.Fprintf(os.Stderr, "Could not open cache \"%s\": %s\r\n", *common.CACHE, err.Error()) + os.Exit(-1) + } + + listener, err := net.Listen("tcp", *common.BIND) + if err != nil { + fmt.Fprintf(os.Stderr, "Could not bind to \"%s\": %s\r\n", *common.BIND, err.Error()) + os.Exit(-1) + } + fmt.Printf("Listening at %s\t\n", *common.BIND) + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + go func() { + for _ = range c { + fmt.Println("") + fmt.Printf("Shutting down ferrite\r\n") + db.Close() + db.Del() + os.Exit(0) + } + }() + + for { + client, err := listener.Accept() + if err == nil { + go handler(client, db) + } else { + fmt.Fprintf(os.Stderr, "Error Accepting Client Connection: %s\r\n", err.Error()) + } + } +} diff --git a/proxy/proxy.go b/proxy/proxy.go new file mode 100644 index 0000000..e00977b --- /dev/null +++ b/proxy/proxy.go @@ -0,0 +1,29 @@ +package proxy + +import ( + "bitbucket.org/ww/cabinet" + "fmt" + "github.com/brettlangdon/ferrite/common" + "io/ioutil" + "net/http" + "os" + "time" +) + +func CallProxy(url string, db *cabinet.KCDB) { + response, err := http.Get(url) + if err == nil { + defer response.Body.Close() + result, err := ioutil.ReadAll(response.Body) + if err == nil { + now := int32(time.Now().Unix()) + ttl := int32(*common.TTL) + result = []byte(fmt.Sprintf("%d:%s", now+ttl, result)) + db.Set([]byte(url), result) + } else { + fmt.Fprintf(os.Stderr, "Error Reading Response for \"%s\": %s\r\n", url, err.Error()) + } + } else { + fmt.Fprintf(os.Stderr, "Error Retrieving Proxy Results for \"%s\": %s\r\n", url, err.Error()) + } +} diff --git a/src/common.h b/src/common.h deleted file mode 100644 index 88578b2..0000000 --- a/src/common.h +++ /dev/null @@ -1,12 +0,0 @@ -#ifndef FERRITE_COMMON -#define FERRITE_COMMON -#include "queue.h" - -extern sig_atomic_t misses; -extern sig_atomic_t hits; -extern sig_atomic_t connections; -extern int record_expire_time; -KCDB* db; -int server; -QUEUE requests; -#endif diff --git a/src/handlers.c b/src/handlers.c deleted file mode 100644 index a62dba9..0000000 --- a/src/handlers.c +++ /dev/null @@ -1,164 +0,0 @@ -#include "handlers.h" - -const char* VERSION = "0.0.1"; -const char* STATS_FORMAT = - "STAT connections %d\r\n" - "STAT requests %d\r\n" - "STAT hits %d\r\n" - "STAT misses %d\r\n" - "STAT hit_ratio %2.4f\r\n" - "STAT backlog %d\r\n" - "%s" - "END\r\n"; -const char* HELP_TEXT = - "COMMANDS:\r\n" - " STATS - show statistics\r\n" - " GET - get from the cache or empty when not present\r\n" - " VERSION - show the current application version" - " FLUSH_ALL - clear the cache\r\n" - " HELP - show this message\r\n" - " QUIT - disconnect from the server\r\n"; - -void handle_stats(KCLIST* tokens, FILE* client){ - char out[1024]; - int total = hits + misses; - float hit_ratio = 0; - if(hits){ - hit_ratio = (float)hits / (float)(hits + misses); - } - int size = queue_size(&requests); - char* status = kcdbstatus(db); - KCLIST* stats = kclistnew(); - tokenize(stats, status, "\n"); - char status_buf[1024]; - strcpy(status_buf, ""); - int stat_count = kclistcount(stats); - int i; - for(i = 0; i < stat_count; ++i){ - size_t part_size; - const char* part = kclistget(stats, i, &part_size); - KCLIST* parts = kclistnew(); - tokenize(parts, (char*)part, "\t"); - char buf[128]; - if(kclistcount(parts) == 2){ - sprintf(buf, "STAT %s %s\r\n", kclistget(parts, 0, &part_size), kclistget(parts, 1, &part_size)); - } - kclistdel(parts); - strcat(status_buf, buf); - } - kclistdel(stats); - sprintf(out, STATS_FORMAT, connections, total, hits, misses, hit_ratio, size, status_buf); - fputs(out, client); -} - -void handle_version(KCLIST* tokens, FILE* client){ - char out[1024]; - sprintf(out, "VERSION %s\r\n", VERSION); - fputs(out, client); -} - -void handle_delete(KCLIST* tokens, FILE* client){ - if(kclistcount(tokens)){ - char* key; - list_shift(tokens, &key); - if(key != NULL){ - if(kcdbremove(db, key, strlen(key))){ - fputs("DELETED\r\n", client); - return; - } - } - } - fputs("NOT_FOUND\r\n", client); -} - -void handle_flush(KCLIST* tokens, FILE* client){ - kcdbclear(db); - fputs("OK\r\n", client); -} - -void handle_get(KCLIST* tokens, FILE* client){ - if(kclistcount(tokens)){ - char* key; - list_shift(tokens, &key); - if(key == NULL){ - return; - } - char out[1024]; - char* value = ""; - char* result_buffer; - size_t result_size; - result_buffer = kcdbget(db, key, strlen(key), &result_size); - if(result_buffer){ - if(strcmp(result_buffer, "0") != 0){ - value = result_buffer + 11; - char expiration[16]; - strncpy(expiration, result_buffer, 10); - int exp = atoi(expiration); - int now = (int)time(NULL); - if(exp > 0 && exp < now){ - value = ""; - } - } else { - value = "0"; - } - kcfree(result_buffer); - } - - if(strcmp(value, "0") == 0){ - ++misses; - sprintf(out, "END\r\n"); - } else if(strlen(value)){ - ++hits; - sprintf(out, "VALUE %s 0 %d\r\n%s\r\nEND\r\n", key, (int)strlen(value), value); - } else{ - ++misses; - sprintf(out, "END\r\n"); - char value[16]; - sprintf(value, "%10d:0", 0); - kcdbset(db, key, strlen(key), value, strlen(value)); - queue_add(&requests, key); - } - fputs(out, client); - free(key); - } else{ - fputs("ERROR: GET \r\n", client); - } -} - -void handle_help(KCLIST* tokens, FILE* client){ - fputs(HELP_TEXT, client); -} - -int handle_command(char* buffer, FILE* client){ - int status = 0; - KCLIST* tokens = kclistnew(); - tokenize(tokens, buffer, " "); - char* command; - list_shift(tokens, &command); - if(command != NULL){ - if(strcmp(command, "get") == 0){ - handle_get(tokens, client); - } else if(strcmp(command, "stats") == 0){ - handle_stats(tokens, client); - } else if(strcmp(command, "delete") == 0){ - handle_delete(tokens, client); - } else if(strcmp(command, "flush_all") == 0){ - handle_flush(tokens, client); - } else if(strcmp(command, "version") == 0){ - handle_version(tokens, client); - } else if(strcmp(command, "help") == 0){ - handle_help(tokens, client); - } else if(strcmp(command, "quit") == 0){ - status = -1; - } else{ - char out[1024]; - sprintf(out, "ERROR: UNKNOWN COMMAND %s\r\n", command); - fputs(out, client); - } - free(command); - } - - kclistdel(tokens); - - return status; -} diff --git a/src/handlers.h b/src/handlers.h deleted file mode 100644 index 6f63fff..0000000 --- a/src/handlers.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef FERRITE_HANDLERS -#define FERRITE_HANDLERS -#include -#include -#include "common.h" -#include "queue.h" - -void handle_stats(KCLIST* tokens, FILE* client); -void handle_version(KCLIST* tokens, FILE* client); -void handle_flush(KCLIST* tokens, FILE* client); -void handle_delete(KCLIST* tokens, FILE* client); -void handle_get(KCLIST* tokens, FILE* client); -void handle_help(KCLIST* tokens, FILE* client); -int handle_command(char* buffer, FILE* client); -#endif diff --git a/src/main.c b/src/main.c deleted file mode 100644 index acf2f4b..0000000 --- a/src/main.c +++ /dev/null @@ -1,224 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include - -#include "common.h" -#include "handlers.h" -#include "proxy.h" -#include "queue.h" -#include "util.h" - - -sig_atomic_t misses = 0; -sig_atomic_t hits = 0; -sig_atomic_t connections = 0; -int record_expire_time = 3600; - -static struct option long_options[] = { - {"port", 1, 0, 0}, - {"url", 1, 0, 0}, - {"workers", 1, 0, 0}, - {"cache", 1, 0, 0}, - {"expire", 1, 0, 0}, - {"help", 0, 0, 0}, - {NULL, 0, NULL, 0} -}; - -void usage(){ - const char* usage_str = - "Usage: ferrite [-h|--help] [-p|--port ] [-w|--workers ] [-u|--url ]\r\n" - " [-c|--cache ] [-e|--expire ]\r\n" - "\t-p, --port\t- which port number to bind too [default: 7000]\r\n" - "\t-w, --workers\t- how many background workers to spawn [default: 10]\r\n" - "\t-u, --url\t- which url to proxy requests to [default: http://127.0.0.1:8000]\r\n" - "\t-c, --cache\t- kyoto cabinet cache to use [default: \"*\"]\r\n" - "\t-e, --expire\t- the expiration time in seconds from when a record is cached, 0 to disable [default:3600]\r\n" - "\t-h, --help\t- display this message\r\n"; - printf("%s", usage_str); -} - -void* worker(void* arg){ - FILE* client = (FILE*) arg; - - char buffer[128]; - ++connections; - int status; - while(fgets(buffer, sizeof(buffer), client)){ - int last = strlen(buffer) - 1; - for(; last > 0; --last){ - if(buffer[last] == '\r' || buffer[last] == '\n'){ - buffer[last] = 0; - } - } - if(strlen(buffer)){ - status = handle_command(buffer, client); - if(status == -1){ - break; - } - } - } - - --connections; - fclose(client); - return 0; -} - -void on_signal(){ - printf("Shutting down ferrite\r\n"); - if(server){ - printf("Closing socket\r\n"); - shutdown(server, 2); - close(server); - } - - if(&requests != NULL){ - printf("Deleting Request Queue\r\n"); - queue_del(&requests); - } - - if(db){ - printf("Closing Cache\r\n"); - kcdbclose(db); - } - exit(0); -} - -int main(int argc, char* argv[]){ - int i, addr_len, yes; - int port = 7000; - int pool_size = 10; - pthread_t pool[pool_size]; - char* cache_file = "*"; - char* base_url = "http://127.0.0.1:8000/"; - struct sockaddr_in addr, client_addr; - - signal(SIGINT, on_signal); - signal(SIGTERM, on_signal); - - - int c; - int option_index = 0; - while((c = getopt_long(argc, argv, "hp:u:w:c:e:", long_options, &option_index)) != -1){ - if(c == 0){ - switch(option_index){ - case 0: - c = 'p'; - break; - case 1: - c = 'u'; - break; - case 2: - c = 'w'; - break; - case 3: - c = 'c'; - break; - case 4: - c = 'e'; - break; - case 5: - c = 'h'; - break; - default: - c = '?'; - break; - } - } - switch(c){ - case 'p': - port = atoi(optarg); - if(port <= 0){ - fprintf(stderr, "Invalid Port Number: %s\r\n", optarg); - exit(1); - } - break; - case 'u': - base_url = optarg; - break; - case 'w': - pool_size = atoi(optarg); - if(pool_size <= 0){ - fprintf(stderr, "Invalid Worker Count: %s\r\n", optarg); - exit(1); - } - break; - case 'c': - cache_file = optarg; - break; - case 'e': - record_expire_time = atoi(optarg); - if(record_expire_time <= 0){ - fprintf(stderr, "Invalid Expiration Time: %s\r\n", optarg); - exit(1); - } - break; - case 'h': - usage(); - exit(0); - case '?': - default: - fprintf(stderr, "Unknown Option: %c\r\n", c); - usage(); - exit(-1); - break; - } - } - - queue_init(&requests); - - printf("Using Expire Time of %d sec\r\n", record_expire_time); - printf("Opening Cache File: \"%s\"\r\n", cache_file); - db = kcdbnew(); - if(!kcdbopen(db, cache_file, KCOWRITER | KCOCREATE)){ - fprintf(stderr, "Error opening cache: %s\n", kcecodename(kcdbecode(db))); - return -1; - } - - printf("Starting %d worker threads\r\n", pool_size); - for(i = 0; i < pool_size; ++i){ - pthread_create(&(pool[i]), NULL, call_proxy, (void*)base_url); - } - printf("Connected to %s\r\n", base_url); - - - server = socket(PF_INET, SOCK_STREAM, 0); - if(!server){ - fprintf(stderr, "Could not create socket\r\n"); - return -1; - } - - port = 7000; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = htonl(INADDR_ANY); - - yes = 1; - setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); - if(bind(server, (struct sockaddr*)&addr, sizeof(addr)) < 0){ - perror("Error binding socket"); - return -1; - } - - if(listen(server, 24) < 0){ - perror("Error binding socket"); - return -1; - } else{ - printf("Listening on 0.0.0.0:%d\r\n", port); - int client; - pthread_t child; - FILE *fp; - while(1){ - addr_len = sizeof(client_addr); - client = accept(server, 0, 0); - fp = fdopen(client, "r+"); - pthread_create(&child, 0, worker, fp); - pthread_detach(child); - } - } - - return 0; -} diff --git a/src/proxy. b/src/proxy. deleted file mode 100644 index e69de29..0000000 diff --git a/src/proxy.c b/src/proxy.c deleted file mode 100644 index e8362fe..0000000 --- a/src/proxy.c +++ /dev/null @@ -1,58 +0,0 @@ -#include "proxy.h" - -size_t curl_write(char* ptr, size_t size, size_t nmemb, CURL_RESULT* result){ - size_t realsize = size * nmemb; - - result->data= realloc(result->data, result->size + realsize + 1); - if(result->data == NULL){ - fprintf(stderr, "Out of memory receiving curl results\n"); - return 0; - } - memcpy(&(result->data[result->size]), ptr, realsize); - result->size += realsize; - result->data[result->size] = 0; - return realsize; -} - -void* call_proxy(void* arg){ - char* base_url = (char*)arg; - CURL* curl = curl_easy_init(); - if(!curl){ - fprintf(stderr, "Could not initialize curl\n"); - return NULL; - } - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write); - - while(1){ - char* next; - queue_get(&requests, &next); - int url_size = strlen(base_url) + strlen(next) + 1; - char* url = (char*)malloc(url_size * sizeof(char)); - sprintf(url, "%s%s", base_url, next); - - CURL_RESULT* result = malloc(sizeof(CURL_RESULT)); - result->data = malloc(sizeof(char)); - result->data[0] = 0; - result->size = 0; - CURLcode res; - curl_easy_setopt(curl, CURLOPT_URL, url); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, result); - res = curl_easy_perform(curl); - char value[1024]; - int exp = 0; - if(record_expire_time > 0){ - exp = (int)time(NULL) + record_expire_time; - } - sprintf(value, "%10d:%s", exp, result->data); - kcdbset(db, next, strlen(next), value, strlen(value)); - - if(url != NULL){ - free(url); - } - if(next != NULL){ - free(next); - } - free(result->data); - free(result); - } -} diff --git a/src/proxy.h b/src/proxy.h deleted file mode 100644 index 93e255b..0000000 --- a/src/proxy.h +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef FERRITE_PROXY -#define FERRITE_PROXY - -#include -#include -#include "common.h" -#include "queue.h" - -typedef struct { - char* data; - size_t size; -} CURL_RESULT; - -size_t curl_write(char* ptr, size_t size, size_t nmemb, CURL_RESULT* result); -void* call_proxy(void* arg); -#endif diff --git a/src/queue.c b/src/queue.c deleted file mode 100644 index c05ad74..0000000 --- a/src/queue.c +++ /dev/null @@ -1,40 +0,0 @@ -#include "queue.h" - -void queue_del(PTR_QUEUE q){ - kclistdel(*(&q->list)); - pthread_mutex_destroy(&q->mutex); - pthread_cond_destroy(&q->cond); -} - -void queue_init(PTR_QUEUE q){ - *(&q->list) = kclistnew(); - *(&q->size) = 0; - pthread_mutex_init(&q->mutex, NULL); - pthread_cond_init(&q->cond, NULL); -} - -void queue_add(PTR_QUEUE q, char* value){ - pthread_mutex_lock(&q->mutex); - kclistpush(*(&q->list), value, strlen(value) + 1); - *(&q->size) += 1; - pthread_mutex_unlock(&q->mutex); - pthread_cond_signal(&q->cond); -} - -void queue_get(PTR_QUEUE q, char** value){ - pthread_mutex_lock(&q->mutex); - while(*(&q->size) == 0){ - pthread_cond_wait(&q->cond, &q->mutex); - } - list_shift(*(&q->list), value); - *(&q->size) -= 1; - pthread_mutex_unlock(&q->mutex); -} - -int queue_size(PTR_QUEUE q){ - int size = 0; - pthread_mutex_lock(&q->mutex); - size = *(&q->size); - pthread_mutex_unlock(&q->mutex); - return size; -} diff --git a/src/queue.h b/src/queue.h deleted file mode 100644 index 52da6db..0000000 --- a/src/queue.h +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef FERRITE_QUEUE -#define FERRITE_QUEUE - -#include -#include -#include "util.h" - -typedef struct { - KCLIST* list; - pthread_mutex_t mutex; - pthread_cond_t cond; - int size; -} QUEUE; - -typedef QUEUE* PTR_QUEUE; - -void queue_del(PTR_QUEUE q); -void queue_init(PTR_QUEUE q); -void queue_add(PTR_QUEUE q, char* value); -void queue_get(PTR_QUEUE q, char** value); -int queue_size(PTR_QUEUE q); - -#endif diff --git a/src/util.c b/src/util.c deleted file mode 100644 index 7a1da9d..0000000 --- a/src/util.c +++ /dev/null @@ -1,42 +0,0 @@ -#include "util.h" - -void list_shift(KCLIST* list, char** next){ - size_t size; - const char* results = kclistget(list, 0, &size); - *next = malloc((size * sizeof(char)) + 1); - strcpy(*next, results); - (*next)[size] = 0; - kclistshift(list); -} - - -void lower(char* word){ - int length = strlen(word); - int i; - for(i = 0; i < length; ++i){ - word[i] = tolower(word[i]); - } -} - -void tokenize(KCLIST* tokens, char* base, char* delimiter){ - char* remainder; - char* token; - char* ptr = base; - while((token = strtok_r(ptr, delimiter, &remainder))){ - int last; - int len = strlen(token); - for(last = len - 1; last >= 0; --last){ - if(token[last] == '\n' || token[last] == '\r'){ - token[last] = 0; - break; - } - } - lower(token); - char new_token[1024]; - strcpy(new_token, token); - new_token[strlen(token)] = 0; - kclistpush(tokens, new_token, strlen(new_token) + 1); - ptr = remainder; - } - free(token); -} diff --git a/src/util.h b/src/util.h deleted file mode 100644 index a696350..0000000 --- a/src/util.h +++ /dev/null @@ -1,9 +0,0 @@ -#ifndef FERRITE_UTIL -#define FERRITE_UTIL - -#include - -void list_shift(KCLIST* list, char** next); -void lower(char* word); -void tokenize(KCLIST* tokens, char* base, char* delimiter); -#endif