diff --git a/Makefile b/Makefile index 3aecfc6..d91eb64 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ all: mkdir -p out - gcc -I/usr/local/include -I~/Downloads/libuv-0.10.12/include ./src/main.c -o ./out/fast-cache -L/usr/local/lib -luv -lkyotocabinet -lcurl + gcc -I/usr/local/include ./src/main.c -o ./out/fast-cache -L/usr/local/lib -luv -lkyotocabinet -lcurl debug: mkdir -p out - gcc -I/usr/local/include -I~/Downloads/libuv-0.10.12/include ./src/main.c -o ./out/fast-cache -L/usr/local/lib -luv -lkyotocabinet -lcurl -g + gcc -I/usr/local/include ./src/main.c -o ./out/fast-cache -L/usr/local/lib -lkyotocabinet -lcurl -g run: ./out/fast-cache diff --git a/src/main.c b/src/main.c index f29ed5c..dffb65b 100644 --- a/src/main.c +++ b/src/main.c @@ -1,62 +1,104 @@ #include #include +#include +#include #include -#include +#include -static uv_loop_t* loop = NULL; -static KCDB* db = NULL; -static sig_atomic_t cache_miss = 0; + +static const char* VERSION = "0.0.1"; +static sig_atomic_t misses = 0; static sig_atomic_t hits = 0; -const char* VERSION = "0.0.1"; +static KCDB* db; +static int sd; + +const char* STATS_FORMAT = + "STAT hits %d\r\n" + "STAT misses %d\r\n" + "STAT hit_ratio %2.4f\r\n" + "%s" + "END\r\n"; + struct curl_result { char* data; size_t size; }; +typedef struct { + KCLIST* list; + pthread_mutex_t mutex; + pthread_cond_t cond; + int size; +} ProxyQueue; +static ProxyQueue requests; -void lower(char* word){ - int length = strlen(word); - int i; - for(i = 0; i < length; ++i){ - word[i] = tolower(word[i]); - } +void list_shift(KCLIST* list, char* next){ + size_t size; + const char* results = kclistget(list, 0, &size); + strcpy(next, results); + next[size] = 0; + kclistshift(list); } -static uv_buf_t alloc_buffer(uv_handle_t *handle, size_t suggested_size) { - uv_buf_t buf; - buf.base = malloc(suggested_size); - buf.len = suggested_size; - return buf; +void queue_del(ProxyQueue* q){ + kclistdel(*(&q->list)); + pthread_mutex_destroy(&q->mutex); + pthread_cond_destroy(&q->cond); } -void handle_write(uv_write_t *req, int status) { - if(status == -1){ - fprintf(stderr, "Write error %s\n", uv_err_name(uv_last_error(loop))); +void queue_init(ProxyQueue* q){ + *(&q->list) = kclistnew(); + *(&q->size) = 0; + pthread_mutex_init(&q->mutex, NULL); + pthread_cond_init(&q->cond, NULL); +} + +void queue_add(ProxyQueue* q, char* value){ + pthread_mutex_lock(&q->mutex); + kclistpush(*(&q->list), value, strlen(value) + 1); + *(&q->size) += 1; + pthread_mutex_unlock(&q->mutex); + pthread_cond_signal(&q->cond); +} + +void queue_get(ProxyQueue* q, char* value){ + pthread_mutex_lock(&q->mutex); + while(*(&q->size) == 0){ + pthread_cond_wait(&q->cond, &q->mutex); } + list_shift(*(&q->list), value); + *(&q->size) -= 1; + pthread_mutex_unlock(&q->mutex); +} - char* base = (char*)req->data; - free(base); - free(req); +void lower(char* word){ + int length = strlen(word); + int i; + for(i = 0; i < length; ++i){ + word[i] = tolower(word[i]); + } } -void get_tokens(char** tokens, char* base){ - size_t size = 0; +void tokenize(KCLIST* tokens, char* base, char* delimiter){ char* remainder; char* token; char* ptr = base; - while(token = strtok_r(ptr, " ", &remainder)){ + while(token = strtok_r(ptr, delimiter, &remainder)){ int last; - for(last = 0; last < strlen(token); ++last){ + int len = strlen(token); + for(last = len - 1; last >= 0; --last){ if(token[last] == '\n' || token[last] == '\r'){ token[last] = 0; break; } } lower(token); - tokens[size] = token; - ++size; + char new_token[1024]; + strcpy(new_token, token); + new_token[strlen(token)] = 0; + kclistpush(tokens, new_token, strlen(new_token) + 1); ptr = remainder; } free(token); @@ -76,159 +118,186 @@ size_t curl_write(char* ptr, size_t size, size_t nmemb, struct curl_result* resu return realsize; } -void after_call_proxy(uv_work_t* req, int status){ - free(req); -} +void* call_proxy(void* arg){ + char next[1024]; + while(1){ + queue_get(&requests, next); + char* url = (char*)malloc(1024 * sizeof(char)); + sprintf(url, "http://127.0.0.1:8000/%s", next); + CURL* curl = curl_easy_init(); + if(!curl){ + fprintf(stderr, "Could not initialize curl\n"); + return; + } -void call_proxy(uv_work_t* req){ - char* key = (char*)req->data; - char* url = (char*)malloc(1024 * sizeof(char)); - sprintf(url, "http://127.0.0.1:8000/%s", key); - CURL* curl = curl_easy_init(); - if(!curl){ - fprintf(stderr, "Could not initialize curl\n"); - return; + struct curl_result* result = malloc(sizeof(struct curl_result)); + result->data = malloc(sizeof(char)); + result->data[0] = 0; + result->size = 0; + CURLcode res; + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, result); + res = curl_easy_perform(curl); + kcdbset(db, next, strlen(next), result->data, strlen(result->data)); + + curl_easy_cleanup(curl); + free(url); + free(result->data); + free(result); } +} - struct curl_result* result = malloc(sizeof(struct curl_result)); - result->data = malloc(sizeof(char)); - result->data[0] = 0; - result->size = 0; - CURLcode res; - curl_easy_setopt(curl, CURLOPT_URL, url); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, result); - res = curl_easy_perform(curl); - kcdbset(db, key, strlen(key), result->data, strlen(result->data)); - - curl_easy_cleanup(curl); - free(url); - free(result->data); - free(result); +void handle_stats(KCLIST* tokens, FILE* client){ + char out[1024]; + float hit_ratio = 0; + if(hits){ + hit_ratio = (float)hits / (float)(hits + misses); + } + char* status = kcdbstatus(db); + KCLIST* stats = kclistnew(); + tokenize(stats, status, "\n"); + char status_buf[1024]; + strcpy(status_buf, ""); + int stat_count = kclistcount(stats); + int i; + for(i = 0; i < stat_count; ++i){ + size_t part_size; + const char* part = kclistget(stats, i, &part_size); + KCLIST* parts = kclistnew(); + tokenize(parts, (char*)part, "\t"); + char buf[128]; + if(kclistcount(parts) == 2){ + sprintf(buf, "STAT %s %s\r\n", kclistget(parts, 0, &part_size), kclistget(parts, 1, &part_size)); + } + kclistdel(parts); + strcat(status_buf, buf); + } + sprintf(out, STATS_FORMAT, hits, misses, hit_ratio, status_buf); + fputs(out, client); } -void handle_version(char* buffer){ - sprintf(buffer, "VERSION %s\r\n", VERSION); +void handle_version(KCLIST* tokens, FILE* client){ + char out[1024]; + sprintf(out, "VERSION %s\r\n", VERSION); + fputs(out, client); } -void handle_flush_all(char* buffer){ +void handle_flush(KCLIST* tokens, FILE* client){ if(kcdbclear(db)){ - sprintf(buffer, "OK\r\n"); - } else{ - sprintf(buffer, "FAILED\r\n"); + fputs("OK\r\n", client); } } -void handle_stats(char* buffer){ - float ratio; - if(!hits && !cache_miss){ - ratio = 0; - } else{ - ratio = (float)hits / (hits + cache_miss); +void handle_delete(KCLIST* tokens, FILE* client){ + if(kclistcount(tokens)){ + char key[128]; + list_shift(tokens, key); + if(kcdbremove(db, key, strlen(key))){ + fputs("DELETED\r\n", client); + } } - int records = kcdbcount(db); - char* format = "STAT cache_miss %d\r\nSTAT hits %d\r\nSTAT hit_ratio %2.2f\r\nSTAT records %d\r\nEND\r\n"; - sprintf(buffer, format, cache_miss, hits, ratio, records); } -void handle_get(char* buffer, char** tokens){ - if(tokens[1]){ +void handle_get(KCLIST* tokens, FILE* client){ + if(kclistcount(tokens)){ + char key[128]; + list_shift(tokens, key); + char out[1024]; char* result_buffer; - char key[1024]; - strcpy(key, tokens[1]); - size_t result_size, key_size; - key_size = strlen(key); - result_buffer = kcdbget(db, key, key_size, &result_size); - - int result = !!result_buffer; - if(result == 0){ - ++cache_miss; - uv_work_t* req = malloc(sizeof(uv_work_t)); - req->data = malloc(sizeof(key)); - strcpy(req->data, key); - uv_queue_work(loop, req, call_proxy, after_call_proxy); - result_buffer = ""; - kcdbset(db, key, strlen(key), "0", 1); - } else if(strcmp(result_buffer, "0") == 0){ - ++cache_miss; - result_buffer = ""; - result = 0; - } else{ - ++hits; - } - - sprintf(buffer, "VALUE %s 0 %lu\r\n%s\r\nEND\r\n", key, strlen(result_buffer), result_buffer); - if(result){ + size_t result_size; + result_buffer = kcdbget(db, key, strlen(key), &result_size); + if(result_buffer){ + if(strcmp(result_buffer, "0") == 0){ + ++misses; + sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key); + } else{ + ++hits; + sprintf(out, "VALUE %s 0 %d\r\n%s\r\nEND\r\n", key, (int)strlen(result_buffer), result_buffer); + } kcfree(result_buffer); + } else{ + ++misses; + sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key); + kcdbset(db, key, strlen(key), "0", 1); + queue_add(&requests, key); } + fputs(out, client); } else{ - sprintf(buffer, "END\r\n"); + fputs("INVALID GET COMMAND: GET \r\n", client); + return; } } -void handle_read(uv_stream_t *client, ssize_t nread, uv_buf_t buf){ - if(nread == -1){ - if (uv_last_error(loop).code != UV_EOF){ - fprintf(stderr, "Read error %s\n", uv_err_name(uv_last_error(loop))); +int handle_command(char* buffer, FILE* client){ + int status = 0; + char command[1024]; + KCLIST* tokens = kclistnew(); + tokenize(tokens, buffer, " "); + list_shift(tokens, command); + if(command){ + if(strcmp(command, "get") == 0){ + handle_get(tokens, client); + } else if(strcmp(command, "stats") == 0){ + handle_stats(tokens, client); + } else if(strcmp(command, "flush_all") == 0){ + handle_flush(tokens, client); + } else if(strcmp(command, "delete") == 0){ + handle_delete(tokens, client); + } else if(strcmp(command, "version") == 0){ + handle_version(tokens, client); + } else if(strcmp(command, "quit") == 0){ + status = -1; + } else{ + char out[1024]; + sprintf(out, "UNKNOWN COMMAND: %s\r\n", command); + fputs(out, client); } - uv_close((uv_handle_t*) client, NULL); - return; - } else if(nread <= 0){ - return; } - uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t)); - - char* tokens[8]; - get_tokens(tokens, buf.base); - - char* buffer = (char*)malloc(1024 * sizeof(char)); - if(strcmp(tokens[0], "get") == 0){ - handle_get(buffer, tokens); - } else if(strcmp(tokens[0], "stats") == 0){ - handle_stats(buffer); - } else if(strcmp(tokens[0], "flush_all") == 0){ - handle_flush_all(buffer); - } else if(strcmp(tokens[0], "version") == 0){ - handle_version(buffer); - } else if(strcmp(tokens[0], "quit") == 0){ - uv_close((uv_handle_t*) client, NULL); - return; - } else{ - sprintf(buffer, "Unknown Command: %s\r\n", tokens[0]); - } - buf.base = malloc((strlen(buffer) * sizeof(char)) + 1); - strcpy(buf.base, buffer); - buf.len = strlen(buf.base); - req->data = malloc((strlen(buffer) * sizeof(char)) + 1); - strcpy(req->data, buffer); - uv_write(req, client, &buf, 1, handle_write); - free(buffer); - free(*tokens); - *tokens = NULL; + kclistdel(tokens); + + return status; } -void on_new_connection(uv_stream_t* server, int status){ - if(status == -1){ - return; - } +void* worker(void* arg){ + FILE* fp = (FILE*) arg; - uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t)); - uv_tcp_init(loop, client); - if(uv_accept(server, (uv_stream_t*) client) == 0){ - uv_read_start((uv_stream_t*) client, alloc_buffer, handle_read); - } else { - uv_close((uv_handle_t*) client, NULL); + char buffer[100]; + + int status; + while(fgets(buffer, sizeof(buffer), fp)){ + int last = strlen(buffer) - 1; + for(; last > 0; --last){ + if(buffer[last] == '\r' || buffer[last] == '\n'){ + buffer[last] = 0; + } + } + if(strlen(buffer)){ + status = handle_command(buffer, fp); + if(status == -1){ + break; + } + } } + + fclose(fp); + return 0; } void on_signal(){ - if(loop){ - printf("Stopping event loop...\n"); - uv_stop(loop); + if(sd){ + printf("Closing socket\r\n"); + close(sd); + } + + if(&requests != NULL){ + printf("Deleting Request Queue\r\n"); + queue_del(&requests); } + if(db){ - printf("Closing Cache...\n"); + printf("Closing Cache\r\n"); kcdbclose(db); } exit(0); @@ -236,26 +305,57 @@ void on_signal(){ int main(){ signal(SIGINT, on_signal); + int pool_size = 5; + pthread_t pool[pool_size]; + + queue_init(&requests); - printf("Opening Cache...\n"); db = kcdbnew(); if(!kcdbopen(db, "*#bnum=1000000#capsiz=1g", KCOWRITER | KCOCREATE)){ fprintf(stderr, "Error opening cache: %s\n", kcecodename(kcdbecode(db))); return -1; } - loop = uv_default_loop(); - uv_tcp_t server; - uv_tcp_init(loop, &server); + int i; + printf("Starting %d worker threads\r\n", pool_size); + for(i = 0; i < pool_size; ++i){ + pthread_create(&(pool[i]), NULL, call_proxy, NULL); + } + + struct sockaddr_in addr, client_addr; + int addr_len; + + sd = socket(PF_INET, SOCK_STREAM, 0); + if(!sd){ + fprintf(stderr, "Could not create socket\r\n"); + return -1; + } + + addr.sin_family = AF_INET; + addr.sin_port = htons(7000); + addr.sin_addr.s_addr = htonl(INADDR_ANY); - struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000); - uv_tcp_bind(&server, bind_addr); - int r = uv_listen((uv_stream_t*) &server, 128, on_new_connection); - if(r){ - fprintf(stderr, "Listen error %s\n", uv_err_name(uv_last_error(loop))); - return 1; + if(bind(sd, (struct sockaddr*)&addr, sizeof(addr)) < 0){ + fprintf(stderr, "Address already in use\r\n"); + return -1; + } + + if(listen(sd, 24) < 0){ + fprintf(stderr, "Address already in use\r\n"); + return -1; + } else{ + printf("Listening on 0.0.0.0:7000\r\n"); + int client; + pthread_t child; + FILE *fp; + while(1){ + addr_len = sizeof(client_addr); + client = accept(sd, 0, 0); + fp = fdopen(client, "r+"); + pthread_create(&child, 0, worker, fp); + pthread_detach(child); + } } - printf("Listening at 0.0.0.0:7000\n"); - return uv_run(loop, UV_RUN_DEFAULT); + return 0; } diff --git a/src/new.c b/src/new.c deleted file mode 100644 index f54e23a..0000000 --- a/src/new.c +++ /dev/null @@ -1,273 +0,0 @@ -#include -#include -#include -#include -#include -#include - - -static const char* VERSION = "0.0.1"; -static sig_atomic_t misses = 0; -static sig_atomic_t hits = 0; - -static KCDB* db; -static KCLIST* queue; -static int sd; - -const char* STATS_FORMAT = - "STAT hits %d\r\n" - "STAT misses %d\r\n" - "STAT hit_ratio %d\r\n" - "STAT records %d\r\n" - "END\r\n"; - -typedef struct { - KCLIST* list; - pthread_mutex_t mutex; - pthread_cond_t cond; - int size; -} ProxyQueue; -static ProxyQueue requests; - -void list_shift(KCLIST* list, char* next){ - if(kclistcount(list)){ - size_t size; - strcpy(next, kclistget(list, 0, &size)); - kclistshift(list); - } -} - -void queue_init(ProxyQueue q){ - q.list = kclistnew(); - q.size = 0; - pthread_mutex_init(&q.mutex, NULL); - pthread_cond_init(&q.cond, NULL); -} - -void queue_add(ProxyQueue* q, char* value){ - pthread_mutex_lock(&q->mutex); - //kclistpush(*(&q->list), value, strlen(value)); - *(&q->size) += 1; - pthread_mutex_unlock(&q->mutex); - pthread_cond_signal(&q->cond); -} - -void queue_get(ProxyQueue* q, char* value){ - pthread_mutex_lock(&q->mutex); - while(*(&q->size) == 0){ - pthread_cond_wait(&q->cond, &q->mutex); - } - //list_shift(*(&q->list), value); - *(&q->size) -= 1; - pthread_mutex_unlock(&q->mutex); -} - -void lower(char* word){ - int length = strlen(word); - int i; - for(i = 0; i < length; ++i){ - word[i] = tolower(word[i]); - } -} - -void tokenize(KCLIST* tokens, char* base){ - char* remainder; - char* token; - char* ptr = base; - while(token = strtok_r(ptr, " ", &remainder)){ - int last; - int len = strlen(token); - for(last = len - 1; last >= 0; --last){ - if(token[last] == '\n' || token[last] == '\r'){ - token[last] = 0; - break; - } - } - lower(token); - kclistpush(tokens, (const char*)token, strlen(token)); - ptr = remainder; - } - free(token); -} - -void* call_proxy(void* arg){ - char next[1024]; - printf("Waiting\r\n"); - queue_get(&requests, next); - printf("Calling Proxy For %s\r\n", next); -} - -void handle_stats(KCLIST* tokens, FILE* client){ - char out[1024]; - float hit_ratio = (float)hits / (float)(hits + misses); - int records = kcdbcount(db); - sprintf(out, STATS_FORMAT, hits, misses, hit_ratio, records); - fputs(out, client); -} - -void handle_version(KCLIST* tokens, FILE* client){ - char out[1024]; - sprintf(out, "VERSION %s\r\n", VERSION); - fputs(out, client); -} - -void handle_get(KCLIST* tokens, FILE* client){ - char key[128]; - list_shift(tokens, key); - if(key){ - char out[1024]; - char* result_buffer; - size_t result_size; - result_buffer = kcdbget(db, key, strlen(key), &result_size); - if(result_buffer){ - if(strcmp(result_buffer, "0") == 0){ - ++misses; - sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key); - kcfree(result_buffer); - } else{ - ++hits; - sprintf(out, "VALUE %s 0 %d\r\n%s\r\nEND\r\n", key, (int)strlen(result_buffer), result_buffer); - } - kcfree(result_buffer); - } else{ - ++misses; - sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key); - kcdbset(db, key, strlen(key), "0", 1); - queue_add(&requests, key); - } - fputs(out, client); - } else{ - fputs("INVALID GET COMMAND: GET \r\n", client); - return; - } -} - -int handle_command(KCLIST* tokens, FILE* client){ - int status = 0; - char command[128]; - list_shift(tokens, command); - if(command){ - if(strcmp(command, "get") == 0){ - handle_get(tokens, client); - } else if(strcmp(command, "stats") == 0){ - handle_stats(tokens, client); - } else if(strcmp(command, "version") == 0){ - handle_version(tokens, client); - } else if(strcmp(command, "quit") == 0){ - status = -1; - } else{ - printf("Unknown Command: %s\r\n", command); - char out[1024]; - sprintf(out, "UNKNOWN COMMAND: %s\r\n", command); - fputs(out, client); - } - } else{ - printf("No Command\r\n"); - } - - return status; -} - -void* worker(void* arg){ - FILE* fp = (FILE*) arg; - - char buffer[100]; - - KCLIST* tokens = kclistnew(); - int status; - while(fgets(buffer, sizeof(buffer), fp)){ - int last = strlen(buffer) - 1; - for(; last > 0; --last){ - if(buffer[last] == '\r' || buffer[last] == '\n'){ - buffer[last] = 0; - } - } - if(strlen(buffer)){ - tokenize(tokens, buffer); - status = handle_command(tokens, fp); - if(status == -1){ - break; - } - kclistclear(tokens); - } - } - - kclistdel(tokens); - fclose(fp); - return 0; -} - -void on_signal(){ - if(sd){ - printf("Closing socket\r\n"); - close(sd); - } - - if(queue){ - kclistdel(queue); - } - - if(db){ - printf("Closing Cache\r\n"); - kcdbclose(db); - } - exit(0); -} - -int main(){ - signal(SIGINT, on_signal); - int pool_size = 1; - pthread_t pool[pool_size]; - - queue_init(requests); - - queue = kclistnew(); - db = kcdbnew(); - if(!kcdbopen(db, "*#bnum=1000000#capsiz=1g", KCOWRITER | KCOCREATE)){ - fprintf(stderr, "Error opening cache: %s\n", kcecodename(kcdbecode(db))); - return -1; - } - - int i; - printf("Starting %d worker threads\r\n", pool_size); - for(i = 0; i < pool_size; ++i){ - pthread_create(&(pool[i]), NULL, call_proxy, NULL); - } - - struct sockaddr_in addr, client_addr; - int addr_len; - - sd = socket(PF_INET, SOCK_STREAM, 0); - if(!sd){ - fprintf(stderr, "Could not create socket\r\n"); - return -1; - } - - addr.sin_family = AF_INET; - addr.sin_port = htons(7000); - addr.sin_addr.s_addr = htonl(INADDR_ANY); - - if(bind(sd, (struct sockaddr*)&addr, sizeof(addr)) < 0){ - fprintf(stderr, "Address already in use\r\n"); - return -1; - } - - if(listen(sd, 24) < 0){ - fprintf(stderr, "Address already in use\r\n"); - return -1; - } else{ - printf("Listening on 0.0.0.0:7000\r\n"); - int client; - pthread_t child; - FILE *fp; - while(1){ - addr_len = sizeof(client_addr); - client = accept(sd, 0, 0); - fp = fdopen(client, "r+"); - pthread_create(&child, 0, worker, fp); - pthread_detach(child); - } - } - - return 0; -}