Browse Source

finish prototype of pthread version of fast-cache

master
Brett Langdon 13 years ago
parent
commit
e9c137ba7e
3 changed files with 262 additions and 435 deletions
  1. +2
    -2
      Makefile
  2. +260
    -160
      src/main.c
  3. +0
    -273
      src/new.c

+ 2
- 2
Makefile View File

@ -1,10 +1,10 @@
all:
mkdir -p out
gcc -I/usr/local/include -I~/Downloads/libuv-0.10.12/include ./src/main.c -o ./out/fast-cache -L/usr/local/lib -luv -lkyotocabinet -lcurl
gcc -I/usr/local/include ./src/main.c -o ./out/fast-cache -L/usr/local/lib -luv -lkyotocabinet -lcurl
debug:
mkdir -p out
gcc -I/usr/local/include -I~/Downloads/libuv-0.10.12/include ./src/main.c -o ./out/fast-cache -L/usr/local/lib -luv -lkyotocabinet -lcurl -g
gcc -I/usr/local/include ./src/main.c -o ./out/fast-cache -L/usr/local/lib -lkyotocabinet -lcurl -g
run:
./out/fast-cache


+ 260
- 160
src/main.c View File

@ -1,62 +1,104 @@
#include <curl/curl.h>
#include <kclangc.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <uv.h>
#include <sys/socket.h>
static uv_loop_t* loop = NULL;
static KCDB* db = NULL;
static sig_atomic_t cache_miss = 0;
static const char* VERSION = "0.0.1";
static sig_atomic_t misses = 0;
static sig_atomic_t hits = 0;
const char* VERSION = "0.0.1";
static KCDB* db;
static int sd;
const char* STATS_FORMAT =
"STAT hits %d\r\n"
"STAT misses %d\r\n"
"STAT hit_ratio %2.4f\r\n"
"%s"
"END\r\n";
struct curl_result {
char* data;
size_t size;
};
typedef struct {
KCLIST* list;
pthread_mutex_t mutex;
pthread_cond_t cond;
int size;
} ProxyQueue;
static ProxyQueue requests;
void lower(char* word){
int length = strlen(word);
int i;
for(i = 0; i < length; ++i){
word[i] = tolower(word[i]);
}
void list_shift(KCLIST* list, char* next){
size_t size;
const char* results = kclistget(list, 0, &size);
strcpy(next, results);
next[size] = 0;
kclistshift(list);
}
static uv_buf_t alloc_buffer(uv_handle_t *handle, size_t suggested_size) {
uv_buf_t buf;
buf.base = malloc(suggested_size);
buf.len = suggested_size;
return buf;
void queue_del(ProxyQueue* q){
kclistdel(*(&q->list));
pthread_mutex_destroy(&q->mutex);
pthread_cond_destroy(&q->cond);
}
void handle_write(uv_write_t *req, int status) {
if(status == -1){
fprintf(stderr, "Write error %s\n", uv_err_name(uv_last_error(loop)));
void queue_init(ProxyQueue* q){
*(&q->list) = kclistnew();
*(&q->size) = 0;
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->cond, NULL);
}
void queue_add(ProxyQueue* q, char* value){
pthread_mutex_lock(&q->mutex);
kclistpush(*(&q->list), value, strlen(value) + 1);
*(&q->size) += 1;
pthread_mutex_unlock(&q->mutex);
pthread_cond_signal(&q->cond);
}
void queue_get(ProxyQueue* q, char* value){
pthread_mutex_lock(&q->mutex);
while(*(&q->size) == 0){
pthread_cond_wait(&q->cond, &q->mutex);
}
list_shift(*(&q->list), value);
*(&q->size) -= 1;
pthread_mutex_unlock(&q->mutex);
}
char* base = (char*)req->data;
free(base);
free(req);
void lower(char* word){
int length = strlen(word);
int i;
for(i = 0; i < length; ++i){
word[i] = tolower(word[i]);
}
}
void get_tokens(char** tokens, char* base){
size_t size = 0;
void tokenize(KCLIST* tokens, char* base, char* delimiter){
char* remainder;
char* token;
char* ptr = base;
while(token = strtok_r(ptr, " ", &remainder)){
while(token = strtok_r(ptr, delimiter, &remainder)){
int last;
for(last = 0; last < strlen(token); ++last){
int len = strlen(token);
for(last = len - 1; last >= 0; --last){
if(token[last] == '\n' || token[last] == '\r'){
token[last] = 0;
break;
}
}
lower(token);
tokens[size] = token;
++size;
char new_token[1024];
strcpy(new_token, token);
new_token[strlen(token)] = 0;
kclistpush(tokens, new_token, strlen(new_token) + 1);
ptr = remainder;
}
free(token);
@ -76,159 +118,186 @@ size_t curl_write(char* ptr, size_t size, size_t nmemb, struct curl_result* resu
return realsize;
}
void after_call_proxy(uv_work_t* req, int status){
free(req);
}
void* call_proxy(void* arg){
char next[1024];
while(1){
queue_get(&requests, next);
char* url = (char*)malloc(1024 * sizeof(char));
sprintf(url, "http://127.0.0.1:8000/%s", next);
CURL* curl = curl_easy_init();
if(!curl){
fprintf(stderr, "Could not initialize curl\n");
return;
}
void call_proxy(uv_work_t* req){
char* key = (char*)req->data;
char* url = (char*)malloc(1024 * sizeof(char));
sprintf(url, "http://127.0.0.1:8000/%s", key);
CURL* curl = curl_easy_init();
if(!curl){
fprintf(stderr, "Could not initialize curl\n");
return;
struct curl_result* result = malloc(sizeof(struct curl_result));
result->data = malloc(sizeof(char));
result->data[0] = 0;
result->size = 0;
CURLcode res;
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, result);
res = curl_easy_perform(curl);
kcdbset(db, next, strlen(next), result->data, strlen(result->data));
curl_easy_cleanup(curl);
free(url);
free(result->data);
free(result);
}
}
struct curl_result* result = malloc(sizeof(struct curl_result));
result->data = malloc(sizeof(char));
result->data[0] = 0;
result->size = 0;
CURLcode res;
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, result);
res = curl_easy_perform(curl);
kcdbset(db, key, strlen(key), result->data, strlen(result->data));
curl_easy_cleanup(curl);
free(url);
free(result->data);
free(result);
void handle_stats(KCLIST* tokens, FILE* client){
char out[1024];
float hit_ratio = 0;
if(hits){
hit_ratio = (float)hits / (float)(hits + misses);
}
char* status = kcdbstatus(db);
KCLIST* stats = kclistnew();
tokenize(stats, status, "\n");
char status_buf[1024];
strcpy(status_buf, "");
int stat_count = kclistcount(stats);
int i;
for(i = 0; i < stat_count; ++i){
size_t part_size;
const char* part = kclistget(stats, i, &part_size);
KCLIST* parts = kclistnew();
tokenize(parts, (char*)part, "\t");
char buf[128];
if(kclistcount(parts) == 2){
sprintf(buf, "STAT %s %s\r\n", kclistget(parts, 0, &part_size), kclistget(parts, 1, &part_size));
}
kclistdel(parts);
strcat(status_buf, buf);
}
sprintf(out, STATS_FORMAT, hits, misses, hit_ratio, status_buf);
fputs(out, client);
}
void handle_version(char* buffer){
sprintf(buffer, "VERSION %s\r\n", VERSION);
void handle_version(KCLIST* tokens, FILE* client){
char out[1024];
sprintf(out, "VERSION %s\r\n", VERSION);
fputs(out, client);
}
void handle_flush_all(char* buffer){
void handle_flush(KCLIST* tokens, FILE* client){
if(kcdbclear(db)){
sprintf(buffer, "OK\r\n");
} else{
sprintf(buffer, "FAILED\r\n");
fputs("OK\r\n", client);
}
}
void handle_stats(char* buffer){
float ratio;
if(!hits && !cache_miss){
ratio = 0;
} else{
ratio = (float)hits / (hits + cache_miss);
void handle_delete(KCLIST* tokens, FILE* client){
if(kclistcount(tokens)){
char key[128];
list_shift(tokens, key);
if(kcdbremove(db, key, strlen(key))){
fputs("DELETED\r\n", client);
}
}
int records = kcdbcount(db);
char* format = "STAT cache_miss %d\r\nSTAT hits %d\r\nSTAT hit_ratio %2.2f\r\nSTAT records %d\r\nEND\r\n";
sprintf(buffer, format, cache_miss, hits, ratio, records);
}
void handle_get(char* buffer, char** tokens){
if(tokens[1]){
void handle_get(KCLIST* tokens, FILE* client){
if(kclistcount(tokens)){
char key[128];
list_shift(tokens, key);
char out[1024];
char* result_buffer;
char key[1024];
strcpy(key, tokens[1]);
size_t result_size, key_size;
key_size = strlen(key);
result_buffer = kcdbget(db, key, key_size, &result_size);
int result = !!result_buffer;
if(result == 0){
++cache_miss;
uv_work_t* req = malloc(sizeof(uv_work_t));
req->data = malloc(sizeof(key));
strcpy(req->data, key);
uv_queue_work(loop, req, call_proxy, after_call_proxy);
result_buffer = "";
kcdbset(db, key, strlen(key), "0", 1);
} else if(strcmp(result_buffer, "0") == 0){
++cache_miss;
result_buffer = "";
result = 0;
} else{
++hits;
}
sprintf(buffer, "VALUE %s 0 %lu\r\n%s\r\nEND\r\n", key, strlen(result_buffer), result_buffer);
if(result){
size_t result_size;
result_buffer = kcdbget(db, key, strlen(key), &result_size);
if(result_buffer){
if(strcmp(result_buffer, "0") == 0){
++misses;
sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key);
} else{
++hits;
sprintf(out, "VALUE %s 0 %d\r\n%s\r\nEND\r\n", key, (int)strlen(result_buffer), result_buffer);
}
kcfree(result_buffer);
} else{
++misses;
sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key);
kcdbset(db, key, strlen(key), "0", 1);
queue_add(&requests, key);
}
fputs(out, client);
} else{
sprintf(buffer, "END\r\n");
fputs("INVALID GET COMMAND: GET <KEY>\r\n", client);
return;
}
}
void handle_read(uv_stream_t *client, ssize_t nread, uv_buf_t buf){
if(nread == -1){
if (uv_last_error(loop).code != UV_EOF){
fprintf(stderr, "Read error %s\n", uv_err_name(uv_last_error(loop)));
int handle_command(char* buffer, FILE* client){
int status = 0;
char command[1024];
KCLIST* tokens = kclistnew();
tokenize(tokens, buffer, " ");
list_shift(tokens, command);
if(command){
if(strcmp(command, "get") == 0){
handle_get(tokens, client);
} else if(strcmp(command, "stats") == 0){
handle_stats(tokens, client);
} else if(strcmp(command, "flush_all") == 0){
handle_flush(tokens, client);
} else if(strcmp(command, "delete") == 0){
handle_delete(tokens, client);
} else if(strcmp(command, "version") == 0){
handle_version(tokens, client);
} else if(strcmp(command, "quit") == 0){
status = -1;
} else{
char out[1024];
sprintf(out, "UNKNOWN COMMAND: %s\r\n", command);
fputs(out, client);
}
uv_close((uv_handle_t*) client, NULL);
return;
} else if(nread <= 0){
return;
}
uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t));
char* tokens[8];
get_tokens(tokens, buf.base);
char* buffer = (char*)malloc(1024 * sizeof(char));
if(strcmp(tokens[0], "get") == 0){
handle_get(buffer, tokens);
} else if(strcmp(tokens[0], "stats") == 0){
handle_stats(buffer);
} else if(strcmp(tokens[0], "flush_all") == 0){
handle_flush_all(buffer);
} else if(strcmp(tokens[0], "version") == 0){
handle_version(buffer);
} else if(strcmp(tokens[0], "quit") == 0){
uv_close((uv_handle_t*) client, NULL);
return;
} else{
sprintf(buffer, "Unknown Command: %s\r\n", tokens[0]);
}
buf.base = malloc((strlen(buffer) * sizeof(char)) + 1);
strcpy(buf.base, buffer);
buf.len = strlen(buf.base);
req->data = malloc((strlen(buffer) * sizeof(char)) + 1);
strcpy(req->data, buffer);
uv_write(req, client, &buf, 1, handle_write);
free(buffer);
free(*tokens);
*tokens = NULL;
kclistdel(tokens);
return status;
}
void on_new_connection(uv_stream_t* server, int status){
if(status == -1){
return;
}
void* worker(void* arg){
FILE* fp = (FILE*) arg;
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
if(uv_accept(server, (uv_stream_t*) client) == 0){
uv_read_start((uv_stream_t*) client, alloc_buffer, handle_read);
} else {
uv_close((uv_handle_t*) client, NULL);
char buffer[100];
int status;
while(fgets(buffer, sizeof(buffer), fp)){
int last = strlen(buffer) - 1;
for(; last > 0; --last){
if(buffer[last] == '\r' || buffer[last] == '\n'){
buffer[last] = 0;
}
}
if(strlen(buffer)){
status = handle_command(buffer, fp);
if(status == -1){
break;
}
}
}
fclose(fp);
return 0;
}
void on_signal(){
if(loop){
printf("Stopping event loop...\n");
uv_stop(loop);
if(sd){
printf("Closing socket\r\n");
close(sd);
}
if(&requests != NULL){
printf("Deleting Request Queue\r\n");
queue_del(&requests);
}
if(db){
printf("Closing Cache...\n");
printf("Closing Cache\r\n");
kcdbclose(db);
}
exit(0);
@ -236,26 +305,57 @@ void on_signal(){
int main(){
signal(SIGINT, on_signal);
int pool_size = 5;
pthread_t pool[pool_size];
queue_init(&requests);
printf("Opening Cache...\n");
db = kcdbnew();
if(!kcdbopen(db, "*#bnum=1000000#capsiz=1g", KCOWRITER | KCOCREATE)){
fprintf(stderr, "Error opening cache: %s\n", kcecodename(kcdbecode(db)));
return -1;
}
loop = uv_default_loop();
uv_tcp_t server;
uv_tcp_init(loop, &server);
int i;
printf("Starting %d worker threads\r\n", pool_size);
for(i = 0; i < pool_size; ++i){
pthread_create(&(pool[i]), NULL, call_proxy, NULL);
}
struct sockaddr_in addr, client_addr;
int addr_len;
sd = socket(PF_INET, SOCK_STREAM, 0);
if(!sd){
fprintf(stderr, "Could not create socket\r\n");
return -1;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(7000);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000);
uv_tcp_bind(&server, bind_addr);
int r = uv_listen((uv_stream_t*) &server, 128, on_new_connection);
if(r){
fprintf(stderr, "Listen error %s\n", uv_err_name(uv_last_error(loop)));
return 1;
if(bind(sd, (struct sockaddr*)&addr, sizeof(addr)) < 0){
fprintf(stderr, "Address already in use\r\n");
return -1;
}
if(listen(sd, 24) < 0){
fprintf(stderr, "Address already in use\r\n");
return -1;
} else{
printf("Listening on 0.0.0.0:7000\r\n");
int client;
pthread_t child;
FILE *fp;
while(1){
addr_len = sizeof(client_addr);
client = accept(sd, 0, 0);
fp = fdopen(client, "r+");
pthread_create(&child, 0, worker, fp);
pthread_detach(child);
}
}
printf("Listening at 0.0.0.0:7000\n");
return uv_run(loop, UV_RUN_DEFAULT);
return 0;
}

+ 0
- 273
src/new.c View File

@ -1,273 +0,0 @@
#include <curl/curl.h>
#include <kclangc.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <sys/socket.h>
static const char* VERSION = "0.0.1";
static sig_atomic_t misses = 0;
static sig_atomic_t hits = 0;
static KCDB* db;
static KCLIST* queue;
static int sd;
const char* STATS_FORMAT =
"STAT hits %d\r\n"
"STAT misses %d\r\n"
"STAT hit_ratio %d\r\n"
"STAT records %d\r\n"
"END\r\n";
typedef struct {
KCLIST* list;
pthread_mutex_t mutex;
pthread_cond_t cond;
int size;
} ProxyQueue;
static ProxyQueue requests;
void list_shift(KCLIST* list, char* next){
if(kclistcount(list)){
size_t size;
strcpy(next, kclistget(list, 0, &size));
kclistshift(list);
}
}
void queue_init(ProxyQueue q){
q.list = kclistnew();
q.size = 0;
pthread_mutex_init(&q.mutex, NULL);
pthread_cond_init(&q.cond, NULL);
}
void queue_add(ProxyQueue* q, char* value){
pthread_mutex_lock(&q->mutex);
//kclistpush(*(&q->list), value, strlen(value));
*(&q->size) += 1;
pthread_mutex_unlock(&q->mutex);
pthread_cond_signal(&q->cond);
}
void queue_get(ProxyQueue* q, char* value){
pthread_mutex_lock(&q->mutex);
while(*(&q->size) == 0){
pthread_cond_wait(&q->cond, &q->mutex);
}
//list_shift(*(&q->list), value);
*(&q->size) -= 1;
pthread_mutex_unlock(&q->mutex);
}
void lower(char* word){
int length = strlen(word);
int i;
for(i = 0; i < length; ++i){
word[i] = tolower(word[i]);
}
}
void tokenize(KCLIST* tokens, char* base){
char* remainder;
char* token;
char* ptr = base;
while(token = strtok_r(ptr, " ", &remainder)){
int last;
int len = strlen(token);
for(last = len - 1; last >= 0; --last){
if(token[last] == '\n' || token[last] == '\r'){
token[last] = 0;
break;
}
}
lower(token);
kclistpush(tokens, (const char*)token, strlen(token));
ptr = remainder;
}
free(token);
}
void* call_proxy(void* arg){
char next[1024];
printf("Waiting\r\n");
queue_get(&requests, next);
printf("Calling Proxy For %s\r\n", next);
}
void handle_stats(KCLIST* tokens, FILE* client){
char out[1024];
float hit_ratio = (float)hits / (float)(hits + misses);
int records = kcdbcount(db);
sprintf(out, STATS_FORMAT, hits, misses, hit_ratio, records);
fputs(out, client);
}
void handle_version(KCLIST* tokens, FILE* client){
char out[1024];
sprintf(out, "VERSION %s\r\n", VERSION);
fputs(out, client);
}
void handle_get(KCLIST* tokens, FILE* client){
char key[128];
list_shift(tokens, key);
if(key){
char out[1024];
char* result_buffer;
size_t result_size;
result_buffer = kcdbget(db, key, strlen(key), &result_size);
if(result_buffer){
if(strcmp(result_buffer, "0") == 0){
++misses;
sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key);
kcfree(result_buffer);
} else{
++hits;
sprintf(out, "VALUE %s 0 %d\r\n%s\r\nEND\r\n", key, (int)strlen(result_buffer), result_buffer);
}
kcfree(result_buffer);
} else{
++misses;
sprintf(out, "VALUE %s 0 2\r\n{}\r\nEND\r\n", key);
kcdbset(db, key, strlen(key), "0", 1);
queue_add(&requests, key);
}
fputs(out, client);
} else{
fputs("INVALID GET COMMAND: GET <KEY>\r\n", client);
return;
}
}
int handle_command(KCLIST* tokens, FILE* client){
int status = 0;
char command[128];
list_shift(tokens, command);
if(command){
if(strcmp(command, "get") == 0){
handle_get(tokens, client);
} else if(strcmp(command, "stats") == 0){
handle_stats(tokens, client);
} else if(strcmp(command, "version") == 0){
handle_version(tokens, client);
} else if(strcmp(command, "quit") == 0){
status = -1;
} else{
printf("Unknown Command: %s\r\n", command);
char out[1024];
sprintf(out, "UNKNOWN COMMAND: %s\r\n", command);
fputs(out, client);
}
} else{
printf("No Command\r\n");
}
return status;
}
void* worker(void* arg){
FILE* fp = (FILE*) arg;
char buffer[100];
KCLIST* tokens = kclistnew();
int status;
while(fgets(buffer, sizeof(buffer), fp)){
int last = strlen(buffer) - 1;
for(; last > 0; --last){
if(buffer[last] == '\r' || buffer[last] == '\n'){
buffer[last] = 0;
}
}
if(strlen(buffer)){
tokenize(tokens, buffer);
status = handle_command(tokens, fp);
if(status == -1){
break;
}
kclistclear(tokens);
}
}
kclistdel(tokens);
fclose(fp);
return 0;
}
void on_signal(){
if(sd){
printf("Closing socket\r\n");
close(sd);
}
if(queue){
kclistdel(queue);
}
if(db){
printf("Closing Cache\r\n");
kcdbclose(db);
}
exit(0);
}
int main(){
signal(SIGINT, on_signal);
int pool_size = 1;
pthread_t pool[pool_size];
queue_init(requests);
queue = kclistnew();
db = kcdbnew();
if(!kcdbopen(db, "*#bnum=1000000#capsiz=1g", KCOWRITER | KCOCREATE)){
fprintf(stderr, "Error opening cache: %s\n", kcecodename(kcdbecode(db)));
return -1;
}
int i;
printf("Starting %d worker threads\r\n", pool_size);
for(i = 0; i < pool_size; ++i){
pthread_create(&(pool[i]), NULL, call_proxy, NULL);
}
struct sockaddr_in addr, client_addr;
int addr_len;
sd = socket(PF_INET, SOCK_STREAM, 0);
if(!sd){
fprintf(stderr, "Could not create socket\r\n");
return -1;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(7000);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(sd, (struct sockaddr*)&addr, sizeof(addr)) < 0){
fprintf(stderr, "Address already in use\r\n");
return -1;
}
if(listen(sd, 24) < 0){
fprintf(stderr, "Address already in use\r\n");
return -1;
} else{
printf("Listening on 0.0.0.0:7000\r\n");
int client;
pthread_t child;
FILE *fp;
while(1){
addr_len = sizeof(client_addr);
client = accept(sd, 0, 0);
fp = fdopen(client, "r+");
pthread_create(&child, 0, worker, fp);
pthread_detach(child);
}
}
return 0;
}

Loading…
Cancel
Save