diff --git a/Makefile b/Makefile index 3aecfc6..7feff2e 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,17 @@ -all: - mkdir -p out - gcc -I/usr/local/include -I~/Downloads/libuv-0.10.12/include ./src/main.c -o ./out/fast-cache -L/usr/local/lib -luv -lkyotocabinet -lcurl +CFLAGS=$(shell curl-config --cflags) -Wall $(EXTRA_CFLAGS) +LFLAGS=$(shell curl-config --libs) -lpthread -lkyotocabinet +CC=gcc + +default: fast-cache + +fast-cache: + $(CC) $(CFLAGS) ./src/main.c -o $@ $(LFLAGS) debug: - mkdir -p out - gcc -I/usr/local/include -I~/Downloads/libuv-0.10.12/include ./src/main.c -o ./out/fast-cache -L/usr/local/lib -luv -lkyotocabinet -lcurl -g + make -f Makefile EXTRA_CFLAGS="-g" run: - ./out/fast-cache + ./fast-cache clean: - rm -rf out + rm ./fast-cache diff --git a/src/common.c b/src/common.c new file mode 100644 index 0000000..c7f60b4 --- /dev/null +++ b/src/common.c @@ -0,0 +1,21 @@ +#ifndef FAST_CACHE_COMMON +#define FAST_CACHE_COMMON +#include "queue.c" + +static sig_atomic_t misses = 0; +static sig_atomic_t hits = 0; +static sig_atomic_t connections = 0; + +static KCDB* db; +static int server; +static queue requests; + +static const char* VERSION = "0.0.1"; +const char* STATS_FORMAT = + "STAT connections %d\r\n" + "STAT hits %d\r\n" + "STAT misses %d\r\n" + "STAT hit_ratio %2.4f\r\n" + "%s" + "END\r\n"; +#endif diff --git a/src/handlers.c b/src/handlers.c new file mode 100644 index 0000000..143477f --- /dev/null +++ b/src/handlers.c @@ -0,0 +1,119 @@ +#ifndef FAST_CACHE_HANDLERS +#define FAST_CACHE_HANDLERS +#include + +#include "common.c" +#include "queue.c" + +void handle_stats(KCLIST* tokens, FILE* client){ + char out[1024]; + float hit_ratio = 0; + if(hits){ + hit_ratio = (float)hits / (float)(hits + misses); + } + char* status = kcdbstatus(db); + KCLIST* stats = kclistnew(); + tokenize(stats, status, "\n"); + char status_buf[1024]; + strcpy(status_buf, ""); + int stat_count = kclistcount(stats); + int i; + for(i = 0; i < stat_count; ++i){ + size_t part_size; + const char* part = kclistget(stats, i, &part_size); + KCLIST* parts = kclistnew(); + tokenize(parts, (char*)part, "\t"); + char buf[128]; + if(kclistcount(parts) == 2){ + sprintf(buf, "STAT %s %s\r\n", kclistget(parts, 0, &part_size), kclistget(parts, 1, &part_size)); + } + kclistdel(parts); + strcat(status_buf, buf); + } + sprintf(out, STATS_FORMAT, connections, hits, misses, hit_ratio, status_buf); + fputs(out, client); +} + +void handle_version(KCLIST* tokens, FILE* client){ + char out[1024]; + sprintf(out, "VERSION %s\r\n", VERSION); + fputs(out, client); +} + +void handle_flush(KCLIST* tokens, FILE* client){ + if(kcdbclear(db)){ + fputs("OK\r\n", client); + } +} + +void handle_delete(KCLIST* tokens, FILE* client){ + if(kclistcount(tokens)){ + char key[128]; + list_shift(tokens, key); + if(kcdbremove(db, key, strlen(key))){ + fputs("DELETED\r\n", client); + } + } +} + +void handle_get(KCLIST* tokens, FILE* client){ + if(kclistcount(tokens)){ + char key[128]; + list_shift(tokens, key); + char out[1024]; + char* result_buffer; + size_t result_size; + result_buffer = kcdbget(db, key, strlen(key), &result_size); + if(result_buffer){ + if(strcmp(result_buffer, "0") == 0){ + ++misses; + sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key); + } else{ + ++hits; + sprintf(out, "VALUE %s 0 %d\r\n%s\r\nEND\r\n", key, (int)strlen(result_buffer), result_buffer); + } + kcfree(result_buffer); + } else{ + ++misses; + sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key); + kcdbset(db, key, strlen(key), "0", 1); + queue_add(&requests, key); + } + fputs(out, client); + } else{ + fputs("INVALID GET COMMAND: GET \r\n", client); + return; + } +} + +int handle_command(char* buffer, FILE* client){ + int status = 0; + char command[1024]; + KCLIST* tokens = kclistnew(); + tokenize(tokens, buffer, " "); + list_shift(tokens, command); + if(command){ + if(strcmp(command, "get") == 0){ + handle_get(tokens, client); + } else if(strcmp(command, "stats") == 0){ + handle_stats(tokens, client); + } else if(strcmp(command, "flush_all") == 0){ + handle_flush(tokens, client); + } else if(strcmp(command, "delete") == 0){ + handle_delete(tokens, client); + } else if(strcmp(command, "version") == 0){ + handle_version(tokens, client); + } else if(strcmp(command, "quit") == 0){ + status = -1; + } else{ + char out[1024]; + sprintf(out, "UNKNOWN COMMAND: %s\r\n", command); + fputs(out, client); + } + } + + kclistdel(tokens); + + return status; +} +#endif diff --git a/src/main.c b/src/main.c index f29ed5c..dc0eea0 100644 --- a/src/main.c +++ b/src/main.c @@ -1,234 +1,55 @@ -#include #include +#include +#include #include -#include - -static uv_loop_t* loop = NULL; -static KCDB* db = NULL; -static sig_atomic_t cache_miss = 0; -static sig_atomic_t hits = 0; - -const char* VERSION = "0.0.1"; - -struct curl_result { - char* data; - size_t size; -}; - - -void lower(char* word){ - int length = strlen(word); - int i; - for(i = 0; i < length; ++i){ - word[i] = tolower(word[i]); - } -} - -static uv_buf_t alloc_buffer(uv_handle_t *handle, size_t suggested_size) { - uv_buf_t buf; - buf.base = malloc(suggested_size); - buf.len = suggested_size; - return buf; -} - -void handle_write(uv_write_t *req, int status) { - if(status == -1){ - fprintf(stderr, "Write error %s\n", uv_err_name(uv_last_error(loop))); - } - - char* base = (char*)req->data; - free(base); - free(req); -} - -void get_tokens(char** tokens, char* base){ - size_t size = 0; - char* remainder; - char* token; - char* ptr = base; - while(token = strtok_r(ptr, " ", &remainder)){ - int last; - for(last = 0; last < strlen(token); ++last){ - if(token[last] == '\n' || token[last] == '\r'){ - token[last] = 0; - break; +#include + +#include "common.c" +#include "handlers.c" +#include "proxy.c" +#include "queue.c" +#include "util.c" + + +void* worker(void* arg){ + FILE* fp = (FILE*) arg; + + char buffer[100]; + ++connections; + int status; + while(fgets(buffer, sizeof(buffer), fp)){ + int last = strlen(buffer) - 1; + for(; last > 0; --last){ + if(buffer[last] == '\r' || buffer[last] == '\n'){ + buffer[last] = 0; } } - lower(token); - tokens[size] = token; - ++size; - ptr = remainder; - } - free(token); -} - -size_t curl_write(char* ptr, size_t size, size_t nmemb, struct curl_result* result){ - size_t realsize = size * nmemb; - - result->data= realloc(result->data, result->size + realsize + 1); - if(result->data == NULL){ - fprintf(stderr, "Out of memory receiving curl results\n"); - return 0; - } - memcpy(&(result->data[result->size]), ptr, realsize); - result->size += realsize; - result->data[result->size] = 0; - return realsize; -} - -void after_call_proxy(uv_work_t* req, int status){ - free(req); -} - -void call_proxy(uv_work_t* req){ - char* key = (char*)req->data; - char* url = (char*)malloc(1024 * sizeof(char)); - sprintf(url, "http://127.0.0.1:8000/%s", key); - CURL* curl = curl_easy_init(); - if(!curl){ - fprintf(stderr, "Could not initialize curl\n"); - return; - } - - struct curl_result* result = malloc(sizeof(struct curl_result)); - result->data = malloc(sizeof(char)); - result->data[0] = 0; - result->size = 0; - CURLcode res; - curl_easy_setopt(curl, CURLOPT_URL, url); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, result); - res = curl_easy_perform(curl); - kcdbset(db, key, strlen(key), result->data, strlen(result->data)); - - curl_easy_cleanup(curl); - free(url); - free(result->data); - free(result); -} - -void handle_version(char* buffer){ - sprintf(buffer, "VERSION %s\r\n", VERSION); -} - -void handle_flush_all(char* buffer){ - if(kcdbclear(db)){ - sprintf(buffer, "OK\r\n"); - } else{ - sprintf(buffer, "FAILED\r\n"); - } -} - -void handle_stats(char* buffer){ - float ratio; - if(!hits && !cache_miss){ - ratio = 0; - } else{ - ratio = (float)hits / (hits + cache_miss); - } - int records = kcdbcount(db); - char* format = "STAT cache_miss %d\r\nSTAT hits %d\r\nSTAT hit_ratio %2.2f\r\nSTAT records %d\r\nEND\r\n"; - sprintf(buffer, format, cache_miss, hits, ratio, records); -} - -void handle_get(char* buffer, char** tokens){ - if(tokens[1]){ - char* result_buffer; - char key[1024]; - strcpy(key, tokens[1]); - size_t result_size, key_size; - key_size = strlen(key); - result_buffer = kcdbget(db, key, key_size, &result_size); - - int result = !!result_buffer; - if(result == 0){ - ++cache_miss; - uv_work_t* req = malloc(sizeof(uv_work_t)); - req->data = malloc(sizeof(key)); - strcpy(req->data, key); - uv_queue_work(loop, req, call_proxy, after_call_proxy); - result_buffer = ""; - kcdbset(db, key, strlen(key), "0", 1); - } else if(strcmp(result_buffer, "0") == 0){ - ++cache_miss; - result_buffer = ""; - result = 0; - } else{ - ++hits; - } - - sprintf(buffer, "VALUE %s 0 %lu\r\n%s\r\nEND\r\n", key, strlen(result_buffer), result_buffer); - if(result){ - kcfree(result_buffer); - } - } else{ - sprintf(buffer, "END\r\n"); - } -} - -void handle_read(uv_stream_t *client, ssize_t nread, uv_buf_t buf){ - if(nread == -1){ - if (uv_last_error(loop).code != UV_EOF){ - fprintf(stderr, "Read error %s\n", uv_err_name(uv_last_error(loop))); + if(strlen(buffer)){ + status = handle_command(buffer, fp); + if(status == -1){ + break; + } } - uv_close((uv_handle_t*) client, NULL); - return; - } else if(nread <= 0){ - return; } - uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t)); - - char* tokens[8]; - get_tokens(tokens, buf.base); - - char* buffer = (char*)malloc(1024 * sizeof(char)); - if(strcmp(tokens[0], "get") == 0){ - handle_get(buffer, tokens); - } else if(strcmp(tokens[0], "stats") == 0){ - handle_stats(buffer); - } else if(strcmp(tokens[0], "flush_all") == 0){ - handle_flush_all(buffer); - } else if(strcmp(tokens[0], "version") == 0){ - handle_version(buffer); - } else if(strcmp(tokens[0], "quit") == 0){ - uv_close((uv_handle_t*) client, NULL); - return; - } else{ - sprintf(buffer, "Unknown Command: %s\r\n", tokens[0]); - } - buf.base = malloc((strlen(buffer) * sizeof(char)) + 1); - strcpy(buf.base, buffer); - buf.len = strlen(buf.base); - req->data = malloc((strlen(buffer) * sizeof(char)) + 1); - strcpy(req->data, buffer); - uv_write(req, client, &buf, 1, handle_write); - free(buffer); - free(*tokens); - *tokens = NULL; + --connections; + fclose(fp); + return 0; } -void on_new_connection(uv_stream_t* server, int status){ - if(status == -1){ - return; +void on_signal(){ + if(server){ + printf("Closing socket\r\n"); + close(server); } - uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t)); - uv_tcp_init(loop, client); - if(uv_accept(server, (uv_stream_t*) client) == 0){ - uv_read_start((uv_stream_t*) client, alloc_buffer, handle_read); - } else { - uv_close((uv_handle_t*) client, NULL); + if(&requests != NULL){ + printf("Deleting Request Queue\r\n"); + queue_del(&requests); } -} -void on_signal(){ - if(loop){ - printf("Stopping event loop...\n"); - uv_stop(loop); - } if(db){ - printf("Closing Cache...\n"); + printf("Closing Cache\r\n"); kcdbclose(db); } exit(0); @@ -236,26 +57,57 @@ void on_signal(){ int main(){ signal(SIGINT, on_signal); + int pool_size = 5; + pthread_t pool[pool_size]; + + queue_init(&requests); - printf("Opening Cache...\n"); db = kcdbnew(); if(!kcdbopen(db, "*#bnum=1000000#capsiz=1g", KCOWRITER | KCOCREATE)){ fprintf(stderr, "Error opening cache: %s\n", kcecodename(kcdbecode(db))); return -1; } - loop = uv_default_loop(); - uv_tcp_t server; - uv_tcp_init(loop, &server); + int i; + printf("Starting %d worker threads\r\n", pool_size); + for(i = 0; i < pool_size; ++i){ + pthread_create(&(pool[i]), NULL, call_proxy, NULL); + } + + struct sockaddr_in addr, client_addr; + int addr_len; + + server = socket(PF_INET, SOCK_STREAM, 0); + if(!server){ + fprintf(stderr, "Could not create socket\r\n"); + return -1; + } + + addr.sin_family = AF_INET; + addr.sin_port = htons(7000); + addr.sin_addr.s_addr = htonl(INADDR_ANY); - struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000); - uv_tcp_bind(&server, bind_addr); - int r = uv_listen((uv_stream_t*) &server, 128, on_new_connection); - if(r){ - fprintf(stderr, "Listen error %s\n", uv_err_name(uv_last_error(loop))); - return 1; + if(bind(server, (struct sockaddr*)&addr, sizeof(addr)) < 0){ + fprintf(stderr, "Address already in use\r\n"); + return -1; + } + + if(listen(server, 24) < 0){ + fprintf(stderr, "Address already in use\r\n"); + return -1; + } else{ + printf("Listening on 0.0.0.0:7000\r\n"); + int client; + pthread_t child; + FILE *fp; + while(1){ + addr_len = sizeof(client_addr); + client = accept(server, 0, 0); + fp = fdopen(client, "r+"); + pthread_create(&child, 0, worker, fp); + pthread_detach(child); + } } - printf("Listening at 0.0.0.0:7000\n"); - return uv_run(loop, UV_RUN_DEFAULT); + return 0; } diff --git a/src/proxy.c b/src/proxy.c new file mode 100644 index 0000000..2a11c36 --- /dev/null +++ b/src/proxy.c @@ -0,0 +1,58 @@ +#ifndef FAST_CACHE_PROXY +#define FAST_CACHE_PROXY +#include + +#include "common.c" +#include "queue.c" + +struct curl_result { + char* data; + size_t size; +}; + +size_t curl_write(char* ptr, size_t size, size_t nmemb, struct curl_result* result){ + size_t realsize = size * nmemb; + + result->data= realloc(result->data, result->size + realsize + 1); + if(result->data == NULL){ + fprintf(stderr, "Out of memory receiving curl results\n"); + return 0; + } + memcpy(&(result->data[result->size]), ptr, realsize); + result->size += realsize; + result->data[result->size] = 0; + return realsize; +} + +void* call_proxy(void* arg){ + char next[1024]; + while(1){ + queue_get(&requests, next); + char* url = (char*)malloc(1024 * sizeof(char)); + sprintf(url, "http://127.0.0.1:8000/%s", next); + CURL* curl = curl_easy_init(); + if(!curl){ + fprintf(stderr, "Could not initialize curl\n"); + return; + } + + struct curl_result* result = malloc(sizeof(struct curl_result)); + result->data = malloc(sizeof(char)); + result->data[0] = 0; + result->size = 0; + CURLcode res; + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, result); + res = curl_easy_perform(curl); + kcdbset(db, next, strlen(next), result->data, strlen(result->data)); + + curl_easy_cleanup(curl); + free(url); + free(result->data); + free(result); + } +} + + +#endif diff --git a/src/queue.c b/src/queue.c new file mode 100644 index 0000000..b797675 --- /dev/null +++ b/src/queue.c @@ -0,0 +1,47 @@ +#ifndef FAST_CACHE_QUEUE +#define FAST_CACHE_QUEUE + +#include +#include + +#include "util.c" + +typedef struct { + KCLIST* list; + pthread_mutex_t mutex; + pthread_cond_t cond; + int size; +} queue; + +void queue_del(queue* q){ + kclistdel(*(&q->list)); + pthread_mutex_destroy(&q->mutex); + pthread_cond_destroy(&q->cond); +} + +void queue_init(queue* q){ + *(&q->list) = kclistnew(); + *(&q->size) = 0; + pthread_mutex_init(&q->mutex, NULL); + pthread_cond_init(&q->cond, NULL); +} + +void queue_add(queue* q, char* value){ + pthread_mutex_lock(&q->mutex); + kclistpush(*(&q->list), value, strlen(value) + 1); + *(&q->size) += 1; + pthread_mutex_unlock(&q->mutex); + pthread_cond_signal(&q->cond); +} + +void queue_get(queue* q, char* value){ + pthread_mutex_lock(&q->mutex); + while(*(&q->size) == 0){ + pthread_cond_wait(&q->cond, &q->mutex); + } + list_shift(*(&q->list), value); + *(&q->size) -= 1; + pthread_mutex_unlock(&q->mutex); +} + +#endif diff --git a/src/util.c b/src/util.c new file mode 100644 index 0000000..c3f86ba --- /dev/null +++ b/src/util.c @@ -0,0 +1,46 @@ +#ifndef FAST_CACHE_UTIL +#define FAST_CACHE_UTIL + +#include + +void list_shift(KCLIST* list, char* next){ + size_t size; + const char* results = kclistget(list, 0, &size); + strcpy(next, results); + next[size] = 0; + kclistshift(list); +} + + +void lower(char* word){ + int length = strlen(word); + int i; + for(i = 0; i < length; ++i){ + word[i] = tolower(word[i]); + } +} + +void tokenize(KCLIST* tokens, char* base, char* delimiter){ + char* remainder; + char* token; + char* ptr = base; + while(token = strtok_r(ptr, delimiter, &remainder)){ + int last; + int len = strlen(token); + for(last = len - 1; last >= 0; --last){ + if(token[last] == '\n' || token[last] == '\r'){ + token[last] = 0; + break; + } + } + lower(token); + char new_token[1024]; + strcpy(new_token, token); + new_token[strlen(token)] = 0; + kclistpush(tokens, new_token, strlen(new_token) + 1); + ptr = remainder; + } + free(token); +} + +#endif