From 113d1995277e53b32c0c4c6fa9c1d8fb8cc29fd0 Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Sat, 20 Jul 2013 18:41:33 -0400 Subject: [PATCH] some major refactoring --- src/common.c | 21 ++++ src/handlers.c | 119 +++++++++++++++++++++ src/main.c | 275 +++---------------------------------------------- src/proxy.c | 58 +++++++++++ src/queue.c | 47 +++++++++ src/util.c | 46 +++++++++ 6 files changed, 303 insertions(+), 263 deletions(-) create mode 100644 src/common.c create mode 100644 src/handlers.c create mode 100644 src/proxy.c create mode 100644 src/queue.c create mode 100644 src/util.c diff --git a/src/common.c b/src/common.c new file mode 100644 index 0000000..c7f60b4 --- /dev/null +++ b/src/common.c @@ -0,0 +1,21 @@ +#ifndef FAST_CACHE_COMMON +#define FAST_CACHE_COMMON +#include "queue.c" + +static sig_atomic_t misses = 0; +static sig_atomic_t hits = 0; +static sig_atomic_t connections = 0; + +static KCDB* db; +static int server; +static queue requests; + +static const char* VERSION = "0.0.1"; +const char* STATS_FORMAT = + "STAT connections %d\r\n" + "STAT hits %d\r\n" + "STAT misses %d\r\n" + "STAT hit_ratio %2.4f\r\n" + "%s" + "END\r\n"; +#endif diff --git a/src/handlers.c b/src/handlers.c new file mode 100644 index 0000000..143477f --- /dev/null +++ b/src/handlers.c @@ -0,0 +1,119 @@ +#ifndef FAST_CACHE_HANDLERS +#define FAST_CACHE_HANDLERS +#include + +#include "common.c" +#include "queue.c" + +void handle_stats(KCLIST* tokens, FILE* client){ + char out[1024]; + float hit_ratio = 0; + if(hits){ + hit_ratio = (float)hits / (float)(hits + misses); + } + char* status = kcdbstatus(db); + KCLIST* stats = kclistnew(); + tokenize(stats, status, "\n"); + char status_buf[1024]; + strcpy(status_buf, ""); + int stat_count = kclistcount(stats); + int i; + for(i = 0; i < stat_count; ++i){ + size_t part_size; + const char* part = kclistget(stats, i, &part_size); + KCLIST* parts = kclistnew(); + tokenize(parts, (char*)part, "\t"); + char buf[128]; + if(kclistcount(parts) == 2){ + sprintf(buf, "STAT %s %s\r\n", kclistget(parts, 0, &part_size), kclistget(parts, 1, &part_size)); + } + kclistdel(parts); + strcat(status_buf, buf); + } + sprintf(out, STATS_FORMAT, connections, hits, misses, hit_ratio, status_buf); + fputs(out, client); +} + +void handle_version(KCLIST* tokens, FILE* client){ + char out[1024]; + sprintf(out, "VERSION %s\r\n", VERSION); + fputs(out, client); +} + +void handle_flush(KCLIST* tokens, FILE* client){ + if(kcdbclear(db)){ + fputs("OK\r\n", client); + } +} + +void handle_delete(KCLIST* tokens, FILE* client){ + if(kclistcount(tokens)){ + char key[128]; + list_shift(tokens, key); + if(kcdbremove(db, key, strlen(key))){ + fputs("DELETED\r\n", client); + } + } +} + +void handle_get(KCLIST* tokens, FILE* client){ + if(kclistcount(tokens)){ + char key[128]; + list_shift(tokens, key); + char out[1024]; + char* result_buffer; + size_t result_size; + result_buffer = kcdbget(db, key, strlen(key), &result_size); + if(result_buffer){ + if(strcmp(result_buffer, "0") == 0){ + ++misses; + sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key); + } else{ + ++hits; + sprintf(out, "VALUE %s 0 %d\r\n%s\r\nEND\r\n", key, (int)strlen(result_buffer), result_buffer); + } + kcfree(result_buffer); + } else{ + ++misses; + sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key); + kcdbset(db, key, strlen(key), "0", 1); + queue_add(&requests, key); + } + fputs(out, client); + } else{ + fputs("INVALID GET COMMAND: GET \r\n", client); + return; + } +} + +int handle_command(char* buffer, FILE* client){ + int status = 0; + char command[1024]; + KCLIST* tokens = kclistnew(); + tokenize(tokens, buffer, " "); + list_shift(tokens, command); + if(command){ + if(strcmp(command, "get") == 0){ + handle_get(tokens, client); + } else if(strcmp(command, "stats") == 0){ + handle_stats(tokens, client); + } else if(strcmp(command, "flush_all") == 0){ + handle_flush(tokens, client); + } else if(strcmp(command, "delete") == 0){ + handle_delete(tokens, client); + } else if(strcmp(command, "version") == 0){ + handle_version(tokens, client); + } else if(strcmp(command, "quit") == 0){ + status = -1; + } else{ + char out[1024]; + sprintf(out, "UNKNOWN COMMAND: %s\r\n", command); + fputs(out, client); + } + } + + kclistdel(tokens); + + return status; +} +#endif diff --git a/src/main.c b/src/main.c index 86b4ed3..dc0eea0 100644 --- a/src/main.c +++ b/src/main.c @@ -1,266 +1,15 @@ -#include #include #include #include #include #include +#include "common.c" +#include "handlers.c" +#include "proxy.c" +#include "queue.c" +#include "util.c" -static const char* VERSION = "0.0.1"; -static sig_atomic_t misses = 0; -static sig_atomic_t hits = 0; -static sig_atomic_t connections = 0; - -static KCDB* db; -static int sd; - -const char* STATS_FORMAT = - "STAT connections %d\r\n" - "STAT hits %d\r\n" - "STAT misses %d\r\n" - "STAT hit_ratio %2.4f\r\n" - "%s" - "END\r\n"; - - -struct curl_result { - char* data; - size_t size; -}; - -typedef struct { - KCLIST* list; - pthread_mutex_t mutex; - pthread_cond_t cond; - int size; -} ProxyQueue; -static ProxyQueue requests; - -void list_shift(KCLIST* list, char* next){ - size_t size; - const char* results = kclistget(list, 0, &size); - strcpy(next, results); - next[size] = 0; - kclistshift(list); -} - -void queue_del(ProxyQueue* q){ - kclistdel(*(&q->list)); - pthread_mutex_destroy(&q->mutex); - pthread_cond_destroy(&q->cond); -} - -void queue_init(ProxyQueue* q){ - *(&q->list) = kclistnew(); - *(&q->size) = 0; - pthread_mutex_init(&q->mutex, NULL); - pthread_cond_init(&q->cond, NULL); -} - -void queue_add(ProxyQueue* q, char* value){ - pthread_mutex_lock(&q->mutex); - kclistpush(*(&q->list), value, strlen(value) + 1); - *(&q->size) += 1; - pthread_mutex_unlock(&q->mutex); - pthread_cond_signal(&q->cond); -} - -void queue_get(ProxyQueue* q, char* value){ - pthread_mutex_lock(&q->mutex); - while(*(&q->size) == 0){ - pthread_cond_wait(&q->cond, &q->mutex); - } - list_shift(*(&q->list), value); - *(&q->size) -= 1; - pthread_mutex_unlock(&q->mutex); -} - -void lower(char* word){ - int length = strlen(word); - int i; - for(i = 0; i < length; ++i){ - word[i] = tolower(word[i]); - } -} - -void tokenize(KCLIST* tokens, char* base, char* delimiter){ - char* remainder; - char* token; - char* ptr = base; - while(token = strtok_r(ptr, delimiter, &remainder)){ - int last; - int len = strlen(token); - for(last = len - 1; last >= 0; --last){ - if(token[last] == '\n' || token[last] == '\r'){ - token[last] = 0; - break; - } - } - lower(token); - char new_token[1024]; - strcpy(new_token, token); - new_token[strlen(token)] = 0; - kclistpush(tokens, new_token, strlen(new_token) + 1); - ptr = remainder; - } - free(token); -} - -size_t curl_write(char* ptr, size_t size, size_t nmemb, struct curl_result* result){ - size_t realsize = size * nmemb; - - result->data= realloc(result->data, result->size + realsize + 1); - if(result->data == NULL){ - fprintf(stderr, "Out of memory receiving curl results\n"); - return 0; - } - memcpy(&(result->data[result->size]), ptr, realsize); - result->size += realsize; - result->data[result->size] = 0; - return realsize; -} - -void* call_proxy(void* arg){ - char next[1024]; - while(1){ - queue_get(&requests, next); - char* url = (char*)malloc(1024 * sizeof(char)); - sprintf(url, "http://127.0.0.1:8000/%s", next); - CURL* curl = curl_easy_init(); - if(!curl){ - fprintf(stderr, "Could not initialize curl\n"); - return; - } - - struct curl_result* result = malloc(sizeof(struct curl_result)); - result->data = malloc(sizeof(char)); - result->data[0] = 0; - result->size = 0; - CURLcode res; - curl_easy_setopt(curl, CURLOPT_URL, url); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, result); - res = curl_easy_perform(curl); - kcdbset(db, next, strlen(next), result->data, strlen(result->data)); - - curl_easy_cleanup(curl); - free(url); - free(result->data); - free(result); - } -} - -void handle_stats(KCLIST* tokens, FILE* client){ - char out[1024]; - float hit_ratio = 0; - if(hits){ - hit_ratio = (float)hits / (float)(hits + misses); - } - char* status = kcdbstatus(db); - KCLIST* stats = kclistnew(); - tokenize(stats, status, "\n"); - char status_buf[1024]; - strcpy(status_buf, ""); - int stat_count = kclistcount(stats); - int i; - for(i = 0; i < stat_count; ++i){ - size_t part_size; - const char* part = kclistget(stats, i, &part_size); - KCLIST* parts = kclistnew(); - tokenize(parts, (char*)part, "\t"); - char buf[128]; - if(kclistcount(parts) == 2){ - sprintf(buf, "STAT %s %s\r\n", kclistget(parts, 0, &part_size), kclistget(parts, 1, &part_size)); - } - kclistdel(parts); - strcat(status_buf, buf); - } - sprintf(out, STATS_FORMAT, connections, hits, misses, hit_ratio, status_buf); - fputs(out, client); -} - -void handle_version(KCLIST* tokens, FILE* client){ - char out[1024]; - sprintf(out, "VERSION %s\r\n", VERSION); - fputs(out, client); -} - -void handle_flush(KCLIST* tokens, FILE* client){ - if(kcdbclear(db)){ - fputs("OK\r\n", client); - } -} - -void handle_delete(KCLIST* tokens, FILE* client){ - if(kclistcount(tokens)){ - char key[128]; - list_shift(tokens, key); - if(kcdbremove(db, key, strlen(key))){ - fputs("DELETED\r\n", client); - } - } -} - -void handle_get(KCLIST* tokens, FILE* client){ - if(kclistcount(tokens)){ - char key[128]; - list_shift(tokens, key); - char out[1024]; - char* result_buffer; - size_t result_size; - result_buffer = kcdbget(db, key, strlen(key), &result_size); - if(result_buffer){ - if(strcmp(result_buffer, "0") == 0){ - ++misses; - sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key); - } else{ - ++hits; - sprintf(out, "VALUE %s 0 %d\r\n%s\r\nEND\r\n", key, (int)strlen(result_buffer), result_buffer); - } - kcfree(result_buffer); - } else{ - ++misses; - sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key); - kcdbset(db, key, strlen(key), "0", 1); - queue_add(&requests, key); - } - fputs(out, client); - } else{ - fputs("INVALID GET COMMAND: GET \r\n", client); - return; - } -} - -int handle_command(char* buffer, FILE* client){ - int status = 0; - char command[1024]; - KCLIST* tokens = kclistnew(); - tokenize(tokens, buffer, " "); - list_shift(tokens, command); - if(command){ - if(strcmp(command, "get") == 0){ - handle_get(tokens, client); - } else if(strcmp(command, "stats") == 0){ - handle_stats(tokens, client); - } else if(strcmp(command, "flush_all") == 0){ - handle_flush(tokens, client); - } else if(strcmp(command, "delete") == 0){ - handle_delete(tokens, client); - } else if(strcmp(command, "version") == 0){ - handle_version(tokens, client); - } else if(strcmp(command, "quit") == 0){ - status = -1; - } else{ - char out[1024]; - sprintf(out, "UNKNOWN COMMAND: %s\r\n", command); - fputs(out, client); - } - } - - kclistdel(tokens); - - return status; -} void* worker(void* arg){ FILE* fp = (FILE*) arg; @@ -289,9 +38,9 @@ void* worker(void* arg){ } void on_signal(){ - if(sd){ + if(server){ printf("Closing socket\r\n"); - close(sd); + close(server); } if(&requests != NULL){ @@ -328,8 +77,8 @@ int main(){ struct sockaddr_in addr, client_addr; int addr_len; - sd = socket(PF_INET, SOCK_STREAM, 0); - if(!sd){ + server = socket(PF_INET, SOCK_STREAM, 0); + if(!server){ fprintf(stderr, "Could not create socket\r\n"); return -1; } @@ -338,12 +87,12 @@ int main(){ addr.sin_port = htons(7000); addr.sin_addr.s_addr = htonl(INADDR_ANY); - if(bind(sd, (struct sockaddr*)&addr, sizeof(addr)) < 0){ + if(bind(server, (struct sockaddr*)&addr, sizeof(addr)) < 0){ fprintf(stderr, "Address already in use\r\n"); return -1; } - if(listen(sd, 24) < 0){ + if(listen(server, 24) < 0){ fprintf(stderr, "Address already in use\r\n"); return -1; } else{ @@ -353,7 +102,7 @@ int main(){ FILE *fp; while(1){ addr_len = sizeof(client_addr); - client = accept(sd, 0, 0); + client = accept(server, 0, 0); fp = fdopen(client, "r+"); pthread_create(&child, 0, worker, fp); pthread_detach(child); diff --git a/src/proxy.c b/src/proxy.c new file mode 100644 index 0000000..2a11c36 --- /dev/null +++ b/src/proxy.c @@ -0,0 +1,58 @@ +#ifndef FAST_CACHE_PROXY +#define FAST_CACHE_PROXY +#include + +#include "common.c" +#include "queue.c" + +struct curl_result { + char* data; + size_t size; +}; + +size_t curl_write(char* ptr, size_t size, size_t nmemb, struct curl_result* result){ + size_t realsize = size * nmemb; + + result->data= realloc(result->data, result->size + realsize + 1); + if(result->data == NULL){ + fprintf(stderr, "Out of memory receiving curl results\n"); + return 0; + } + memcpy(&(result->data[result->size]), ptr, realsize); + result->size += realsize; + result->data[result->size] = 0; + return realsize; +} + +void* call_proxy(void* arg){ + char next[1024]; + while(1){ + queue_get(&requests, next); + char* url = (char*)malloc(1024 * sizeof(char)); + sprintf(url, "http://127.0.0.1:8000/%s", next); + CURL* curl = curl_easy_init(); + if(!curl){ + fprintf(stderr, "Could not initialize curl\n"); + return; + } + + struct curl_result* result = malloc(sizeof(struct curl_result)); + result->data = malloc(sizeof(char)); + result->data[0] = 0; + result->size = 0; + CURLcode res; + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, result); + res = curl_easy_perform(curl); + kcdbset(db, next, strlen(next), result->data, strlen(result->data)); + + curl_easy_cleanup(curl); + free(url); + free(result->data); + free(result); + } +} + + +#endif diff --git a/src/queue.c b/src/queue.c new file mode 100644 index 0000000..b797675 --- /dev/null +++ b/src/queue.c @@ -0,0 +1,47 @@ +#ifndef FAST_CACHE_QUEUE +#define FAST_CACHE_QUEUE + +#include +#include + +#include "util.c" + +typedef struct { + KCLIST* list; + pthread_mutex_t mutex; + pthread_cond_t cond; + int size; +} queue; + +void queue_del(queue* q){ + kclistdel(*(&q->list)); + pthread_mutex_destroy(&q->mutex); + pthread_cond_destroy(&q->cond); +} + +void queue_init(queue* q){ + *(&q->list) = kclistnew(); + *(&q->size) = 0; + pthread_mutex_init(&q->mutex, NULL); + pthread_cond_init(&q->cond, NULL); +} + +void queue_add(queue* q, char* value){ + pthread_mutex_lock(&q->mutex); + kclistpush(*(&q->list), value, strlen(value) + 1); + *(&q->size) += 1; + pthread_mutex_unlock(&q->mutex); + pthread_cond_signal(&q->cond); +} + +void queue_get(queue* q, char* value){ + pthread_mutex_lock(&q->mutex); + while(*(&q->size) == 0){ + pthread_cond_wait(&q->cond, &q->mutex); + } + list_shift(*(&q->list), value); + *(&q->size) -= 1; + pthread_mutex_unlock(&q->mutex); +} + +#endif diff --git a/src/util.c b/src/util.c new file mode 100644 index 0000000..c3f86ba --- /dev/null +++ b/src/util.c @@ -0,0 +1,46 @@ +#ifndef FAST_CACHE_UTIL +#define FAST_CACHE_UTIL + +#include + +void list_shift(KCLIST* list, char* next){ + size_t size; + const char* results = kclistget(list, 0, &size); + strcpy(next, results); + next[size] = 0; + kclistshift(list); +} + + +void lower(char* word){ + int length = strlen(word); + int i; + for(i = 0; i < length; ++i){ + word[i] = tolower(word[i]); + } +} + +void tokenize(KCLIST* tokens, char* base, char* delimiter){ + char* remainder; + char* token; + char* ptr = base; + while(token = strtok_r(ptr, delimiter, &remainder)){ + int last; + int len = strlen(token); + for(last = len - 1; last >= 0; --last){ + if(token[last] == '\n' || token[last] == '\r'){ + token[last] = 0; + break; + } + } + lower(token); + char new_token[1024]; + strcpy(new_token, token); + new_token[strlen(token)] = 0; + kclistpush(tokens, new_token, strlen(new_token) + 1); + ptr = remainder; + } + free(token); +} + +#endif