Browse Source

ferrite is now written in go and not c

master
Brett Langdon 13 years ago
parent
commit
80f350998a
17 changed files with 281 additions and 720 deletions
  1. +0
    -32
      Makefile
  2. +0
    -85
      README.md
  3. +157
    -0
      commands/commands.go
  4. +13
    -0
      common/common.go
  5. +82
    -0
      main.go
  6. +29
    -0
      proxy/proxy.go
  7. +0
    -12
      src/common.h
  8. +0
    -164
      src/handlers.c
  9. +0
    -15
      src/handlers.h
  10. +0
    -224
      src/main.c
  11. +0
    -0
      src/proxy.
  12. +0
    -58
      src/proxy.c
  13. +0
    -16
      src/proxy.h
  14. +0
    -40
      src/queue.c
  15. +0
    -23
      src/queue.h
  16. +0
    -42
      src/util.c
  17. +0
    -9
      src/util.h

+ 0
- 32
Makefile View File

@ -1,32 +0,0 @@
INSTALL=/usr/bin/install
CFLAGS=$(shell curl-config --cflags) -Wall $(EXTRA_CFLAGS)
LFLAGS=$(shell curl-config --libs) -lpthread -lkyotocabinet
CC=gcc
PREFIX=/usr/local
BINDIR = $(PREFIX)/bin
default: ferrite
src/%.o: src/%.c src/%.h
$(CC) -c $(CFLAGS) -o $@ $<
ferrite: ./src/util.o ./src/queue.o ./src/proxy.o ./src/handlers.o ./src/main.o
$(CC) $(CFLAGS) -o $@ $^ $(LFLAGS)
debug:
make -f Makefile EXTRA_CFLAGS="-g -O0"
run:
./ferrite
.PHONY: clean debug release
release: clean
make -f Makefile EXTRA_CFLAGS="-O3"
install:
$(INSTALL) ./ferrite $(BINDIR)
clean:
rm -f ./src/*.o
rm -f ./ferrite

+ 0
- 85
README.md View File

@ -3,91 +3,6 @@ ferrite
A very fast kyoto cabinet powered memcached interface API proxy caching server.
This is not your normal caching proxy server. For starters it doesn't speak HTTP (other than to the backend),
you interact with it via a subset of the memcached commands. The main purpose is you give it a url like:
`http://api.my-domain.com/api/` and then you make memcache get calls like `get/users` or `user/12` or any other
GET only api end point. The result content does not matter at all (which means you could use this to cache HTTP get calls).
The other _very_ important thing about `ferrite` is that when it gets a cache miss it does now wait around for a response
from the proxy server, it will send back an empty string to the client making the request and then queue up the request to
the proxy server. This means that although you will get back an empty string when you get a cache miss your response times
from this proxy server will be consistent (which can be very important for high performance applications).
## Name
The name `ferrite` comes from: [Ferrite Magnet](http://en.wikipedia.org/wiki/Ferrite_\(magnet\))
## Install
### Requirements
* [Kyoto Cabinet](http://fallabs.com/kyotocabinet/pkg/) - built and tested with version `1.2.76`
* [libcurl](http://curl.haxx.se/libcurl/) - built and tested with version `7.21.4`
```bash
git clone git://github.com/brettlangdon/ferrite.git
cd ./ferrite
make release
make install
```
## Using
### Usage
```bash
$ ferrite --help
Usage: ferrite [-h|--help] [-p|--port <NUM>] [-w|--workers <NUM>] [-u|--url <STRING>] [-c|--cache <STRING>] [-e|--expire <NUM>]
-p, --port - which port number to bind too [default: 7000]
-w, --workers - how many background workers to spawn [default: 10]
-u, --url - which url to proxy requests to [default: http://127.0.0.1:8000]
-c, --cache - kyoto cabinet cache to use [default: "*"]
-e, --exp - the expiration time in seconds from when a record is cached, 0 to disable [default:3600]
-h, --help - display this message
```
### Memcache Client
Just use your favorite memcache client
```python
import pymemcache.client
mc = pymemcache.client.Client([("127.0.0.1", 7000)])
users = mc.get("/all/users")
```
### Telnet
```bash
telnet 127.0.0.1 7000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
stats
STAT connections 1
STAT requests 0
STAT hits 0
STAT misses 0
STAT hit_ratio 0.0000
STAT backlog 0
STAT bnum 1048583
STAT capcnt -1
STAT capsiz -1
STAT chksum 255
STAT count 0
STAT fmtver 5
STAT librev 13
STAT libver 16
STAT opts 0
STAT path *
STAT realtype 32
STAT recovered 0
STAT reorganized 0
STAT size 8390432
STAT type 32
END
get all/users
VALUE all/users 0 0
END
get all/users
VALUE all/users 0 2
{}
END
```
## License
```
The MIT License (MIT)


+ 157
- 0
commands/commands.go View File

@ -0,0 +1,157 @@
package commands
import (
"bitbucket.org/ww/cabinet"
"bytes"
"fmt"
"github.com/brettlangdon/ferrite/common"
"github.com/brettlangdon/ferrite/proxy"
"net/url"
"os"
"strconv"
"strings"
"sync/atomic"
"time"
)
func ParseCommand(request string) (string, []string) {
request = strings.ToLower(request)
tokens := strings.Split(strings.Trim(request, " \r\n"), " ")
var command string
if len(tokens) > 0 {
command = tokens[0]
}
return command, tokens
}
func HandleCommand(command string, tokens []string, db *cabinet.KCDB) string {
var result string
switch command {
case "get":
result = HandleGet(tokens[1:], db)
break
case "stats":
result = HandleStats(db)
break
case "flush_all":
result = HandleFlushAll(db)
break
case "delete":
result = HandleDelete(tokens[1:], db)
break
case "version":
result = HandleVersion()
break
default:
result = "ERROR\r\n"
break
}
return result
}
func HandleGet(tokens []string, db *cabinet.KCDB) string {
var out string
if len(tokens) >= 1 {
key := tokens[0]
if strings.HasPrefix(key, "http%3a%2f%2f") || strings.HasPrefix(key, "https%3a%2f%2f") {
key, _ = url.QueryUnescape(key)
}
result, err := db.Get([]byte(key))
if err != nil {
atomic.AddInt32(&common.MISSES, 1)
err := db.Set([]byte(key), []byte("0"))
if err == nil {
go proxy.CallProxy(key, db)
} else {
fmt.Fprintf(os.Stderr, "Error Setting Key \"%s\" Value \"0\": $s\r\n", key, err.Error())
}
out = "END\r\n"
} else if bytes.Equal(result, []byte("0")) {
atomic.AddInt32(&common.MISSES, 1)
out = "END\r\n"
} else {
parts := strings.SplitN(string(result), ":", 2)
var expire int64 = 0
if len(parts) == 2 {
expire, err = strconv.ParseInt(parts[0], 10, 32)
if err != nil {
expire = 0
fmt.Fprintf(os.Stderr, "Error Parsing Expiration \"%s\": %s\r\n", parts[0], err.Error())
}
result = []byte(parts[1])
}
now := int64(time.Now().Unix())
if expire > 0 && expire < now {
err := db.Set([]byte(key), []byte("0"))
if err == nil {
go proxy.CallProxy(key, db)
} else {
fmt.Fprintf(os.Stderr, "Error Setting Key \"%s\" Value \"0\": $s\r\n", key, err.Error())
}
atomic.AddInt32(&common.MISSES, 1)
out = "END\r\n"
} else {
atomic.AddInt32(&common.HITS, 1)
out = fmt.Sprintf("VALUE %s 0 %d\r\n%s\r\nEND\r\n", key, len(result), result)
}
}
} else {
out = "ERROR\r\n"
}
return out
}
func HandleStats(db *cabinet.KCDB) string {
hits := atomic.LoadInt32(&common.HITS)
misses := atomic.LoadInt32(&common.MISSES)
connections := atomic.LoadInt32(&common.CONNECTIONS)
var hit_ratio float32 = 0
if hits > 0 {
hit_ratio = float32(hits) / float32(hits+misses)
}
db_stats := ""
status, err := db.Status()
if err == nil {
lines := strings.Split(status, "\n")
for _, line := range lines {
parts := strings.SplitN(strings.Trim(line, "\r\n"), "\t", 2)
if len(parts) == 2 {
db_stats += fmt.Sprintf("STAT %s %s\r\n", parts[0], parts[1])
}
}
} else {
fmt.Fprintf(os.Stderr, "Error Retrieving DB Status: %s\r\n", err.Error())
}
return fmt.Sprintf("STAT hits %d\r\n"+
"STAT misses %d\r\n"+
"STAT hit_ratio %1.4f\r\n"+
"STAT connections %d\r\n"+
"%s"+
"END\r\n",
hits, misses, hit_ratio, connections, db_stats,
)
}
func HandleFlushAll(db *cabinet.KCDB) string {
db.Clear()
return "OK\r\n"
}
func HandleDelete(tokens []string, db *cabinet.KCDB) string {
if len(tokens) <= 0 {
return "ERROR\r\n"
}
key := tokens[0]
err := db.Remove([]byte(key))
if err != nil {
return "NOT_FOUND\r\n"
}
return "DELETED\r\n"
}
func HandleVersion() string {
return fmt.Sprintf("VERSION %s\r\n", common.VERSION)
}

+ 13
- 0
common/common.go View File

@ -0,0 +1,13 @@
package common
import (
"flag"
)
var HITS int32 = 0
var MISSES int32 = 0
var CONNECTIONS int32 = 0
var TTL *int = flag.Int("ttl", 3600, "the TTL in seconds for a newly cached object")
var BIND *string = flag.String("bind", "0.0.0.0:7000", "the [<address>]:<port> to bind to")
var CACHE *string = flag.String("cache", "*", "the kyoto cabinet cache path")
var VERSION string = "0.0.1"

+ 82
- 0
main.go View File

@ -0,0 +1,82 @@
package main
import (
"bitbucket.org/ww/cabinet"
"flag"
"fmt"
"github.com/brettlangdon/ferrite/commands"
"github.com/brettlangdon/ferrite/common"
"net"
"os"
"os/signal"
"sync/atomic"
)
func handler(client net.Conn, db *cabinet.KCDB) {
atomic.AddInt32(&common.CONNECTIONS, 1)
defer func() {
client.Close()
atomic.AddInt32(&common.CONNECTIONS, -1)
}()
buffer := make([]byte, 1024)
for {
n, err := client.Read(buffer)
if err != nil {
break
}
if n <= 0 {
continue
}
command, tokens := commands.ParseCommand(string(buffer[:n]))
if len(command) == 0 {
continue
}
if command == "quit" || command == "exit" {
break
}
response := commands.HandleCommand(command, tokens, db)
_, err = client.Write([]byte(response))
}
}
func main() {
flag.Parse()
fmt.Printf("Using TTL: %d (sec)\r\n", *common.TTL)
fmt.Printf("Opening Cache: %s\r\n", *common.CACHE)
db := cabinet.New()
err := db.Open(*common.CACHE, cabinet.KCOWRITER|cabinet.KCOCREATE)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not open cache \"%s\": %s\r\n", *common.CACHE, err.Error())
os.Exit(-1)
}
listener, err := net.Listen("tcp", *common.BIND)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not bind to \"%s\": %s\r\n", *common.BIND, err.Error())
os.Exit(-1)
}
fmt.Printf("Listening at %s\t\n", *common.BIND)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for _ = range c {
fmt.Println("")
fmt.Printf("Shutting down ferrite\r\n")
db.Close()
db.Del()
os.Exit(0)
}
}()
for {
client, err := listener.Accept()
if err == nil {
go handler(client, db)
} else {
fmt.Fprintf(os.Stderr, "Error Accepting Client Connection: %s\r\n", err.Error())
}
}
}

+ 29
- 0
proxy/proxy.go View File

@ -0,0 +1,29 @@
package proxy
import (
"bitbucket.org/ww/cabinet"
"fmt"
"github.com/brettlangdon/ferrite/common"
"io/ioutil"
"net/http"
"os"
"time"
)
func CallProxy(url string, db *cabinet.KCDB) {
response, err := http.Get(url)
if err == nil {
defer response.Body.Close()
result, err := ioutil.ReadAll(response.Body)
if err == nil {
now := int32(time.Now().Unix())
ttl := int32(*common.TTL)
result = []byte(fmt.Sprintf("%d:%s", now+ttl, result))
db.Set([]byte(url), result)
} else {
fmt.Fprintf(os.Stderr, "Error Reading Response for \"%s\": %s\r\n", url, err.Error())
}
} else {
fmt.Fprintf(os.Stderr, "Error Retrieving Proxy Results for \"%s\": %s\r\n", url, err.Error())
}
}

+ 0
- 12
src/common.h View File

@ -1,12 +0,0 @@
#ifndef FERRITE_COMMON
#define FERRITE_COMMON
#include "queue.h"
extern sig_atomic_t misses;
extern sig_atomic_t hits;
extern sig_atomic_t connections;
extern int record_expire_time;
KCDB* db;
int server;
QUEUE requests;
#endif

+ 0
- 164
src/handlers.c View File

@ -1,164 +0,0 @@
#include "handlers.h"
const char* VERSION = "0.0.1";
const char* STATS_FORMAT =
"STAT connections %d\r\n"
"STAT requests %d\r\n"
"STAT hits %d\r\n"
"STAT misses %d\r\n"
"STAT hit_ratio %2.4f\r\n"
"STAT backlog %d\r\n"
"%s"
"END\r\n";
const char* HELP_TEXT =
"COMMANDS:\r\n"
" STATS - show statistics\r\n"
" GET <KEY> - get <KEY> from the cache or empty when not present\r\n"
" VERSION - show the current application version"
" FLUSH_ALL - clear the cache\r\n"
" HELP - show this message\r\n"
" QUIT - disconnect from the server\r\n";
void handle_stats(KCLIST* tokens, FILE* client){
char out[1024];
int total = hits + misses;
float hit_ratio = 0;
if(hits){
hit_ratio = (float)hits / (float)(hits + misses);
}
int size = queue_size(&requests);
char* status = kcdbstatus(db);
KCLIST* stats = kclistnew();
tokenize(stats, status, "\n");
char status_buf[1024];
strcpy(status_buf, "");
int stat_count = kclistcount(stats);
int i;
for(i = 0; i < stat_count; ++i){
size_t part_size;
const char* part = kclistget(stats, i, &part_size);
KCLIST* parts = kclistnew();
tokenize(parts, (char*)part, "\t");
char buf[128];
if(kclistcount(parts) == 2){
sprintf(buf, "STAT %s %s\r\n", kclistget(parts, 0, &part_size), kclistget(parts, 1, &part_size));
}
kclistdel(parts);
strcat(status_buf, buf);
}
kclistdel(stats);
sprintf(out, STATS_FORMAT, connections, total, hits, misses, hit_ratio, size, status_buf);
fputs(out, client);
}
void handle_version(KCLIST* tokens, FILE* client){
char out[1024];
sprintf(out, "VERSION %s\r\n", VERSION);
fputs(out, client);
}
void handle_delete(KCLIST* tokens, FILE* client){
if(kclistcount(tokens)){
char* key;
list_shift(tokens, &key);
if(key != NULL){
if(kcdbremove(db, key, strlen(key))){
fputs("DELETED\r\n", client);
return;
}
}
}
fputs("NOT_FOUND\r\n", client);
}
void handle_flush(KCLIST* tokens, FILE* client){
kcdbclear(db);
fputs("OK\r\n", client);
}
void handle_get(KCLIST* tokens, FILE* client){
if(kclistcount(tokens)){
char* key;
list_shift(tokens, &key);
if(key == NULL){
return;
}
char out[1024];
char* value = "";
char* result_buffer;
size_t result_size;
result_buffer = kcdbget(db, key, strlen(key), &result_size);
if(result_buffer){
if(strcmp(result_buffer, "0") != 0){
value = result_buffer + 11;
char expiration[16];
strncpy(expiration, result_buffer, 10);
int exp = atoi(expiration);
int now = (int)time(NULL);
if(exp > 0 && exp < now){
value = "";
}
} else {
value = "0";
}
kcfree(result_buffer);
}
if(strcmp(value, "0") == 0){
++misses;
sprintf(out, "END\r\n");
} else if(strlen(value)){
++hits;
sprintf(out, "VALUE %s 0 %d\r\n%s\r\nEND\r\n", key, (int)strlen(value), value);
} else{
++misses;
sprintf(out, "END\r\n");
char value[16];
sprintf(value, "%10d:0", 0);
kcdbset(db, key, strlen(key), value, strlen(value));
queue_add(&requests, key);
}
fputs(out, client);
free(key);
} else{
fputs("ERROR: GET <KEY>\r\n", client);
}
}
void handle_help(KCLIST* tokens, FILE* client){
fputs(HELP_TEXT, client);
}
int handle_command(char* buffer, FILE* client){
int status = 0;
KCLIST* tokens = kclistnew();
tokenize(tokens, buffer, " ");
char* command;
list_shift(tokens, &command);
if(command != NULL){
if(strcmp(command, "get") == 0){
handle_get(tokens, client);
} else if(strcmp(command, "stats") == 0){
handle_stats(tokens, client);
} else if(strcmp(command, "delete") == 0){
handle_delete(tokens, client);
} else if(strcmp(command, "flush_all") == 0){
handle_flush(tokens, client);
} else if(strcmp(command, "version") == 0){
handle_version(tokens, client);
} else if(strcmp(command, "help") == 0){
handle_help(tokens, client);
} else if(strcmp(command, "quit") == 0){
status = -1;
} else{
char out[1024];
sprintf(out, "ERROR: UNKNOWN COMMAND %s\r\n", command);
fputs(out, client);
}
free(command);
}
kclistdel(tokens);
return status;
}

+ 0
- 15
src/handlers.h View File

@ -1,15 +0,0 @@
#ifndef FERRITE_HANDLERS
#define FERRITE_HANDLERS
#include <kclangc.h>
#include <time.h>
#include "common.h"
#include "queue.h"
void handle_stats(KCLIST* tokens, FILE* client);
void handle_version(KCLIST* tokens, FILE* client);
void handle_flush(KCLIST* tokens, FILE* client);
void handle_delete(KCLIST* tokens, FILE* client);
void handle_get(KCLIST* tokens, FILE* client);
void handle_help(KCLIST* tokens, FILE* client);
int handle_command(char* buffer, FILE* client);
#endif

+ 0
- 224
src/main.c View File

@ -1,224 +0,0 @@
#include <kclangc.h>
#include <getopt.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <sys/socket.h>
#include <unistd.h>
#include "common.h"
#include "handlers.h"
#include "proxy.h"
#include "queue.h"
#include "util.h"
sig_atomic_t misses = 0;
sig_atomic_t hits = 0;
sig_atomic_t connections = 0;
int record_expire_time = 3600;
static struct option long_options[] = {
{"port", 1, 0, 0},
{"url", 1, 0, 0},
{"workers", 1, 0, 0},
{"cache", 1, 0, 0},
{"expire", 1, 0, 0},
{"help", 0, 0, 0},
{NULL, 0, NULL, 0}
};
void usage(){
const char* usage_str =
"Usage: ferrite [-h|--help] [-p|--port <NUM>] [-w|--workers <NUM>] [-u|--url <STRING>]\r\n"
" [-c|--cache <STRING>] [-e|--expire <NUM>]\r\n"
"\t-p, --port\t- which port number to bind too [default: 7000]\r\n"
"\t-w, --workers\t- how many background workers to spawn [default: 10]\r\n"
"\t-u, --url\t- which url to proxy requests to [default: http://127.0.0.1:8000]\r\n"
"\t-c, --cache\t- kyoto cabinet cache to use [default: \"*\"]\r\n"
"\t-e, --expire\t- the expiration time in seconds from when a record is cached, 0 to disable [default:3600]\r\n"
"\t-h, --help\t- display this message\r\n";
printf("%s", usage_str);
}
void* worker(void* arg){
FILE* client = (FILE*) arg;
char buffer[128];
++connections;
int status;
while(fgets(buffer, sizeof(buffer), client)){
int last = strlen(buffer) - 1;
for(; last > 0; --last){
if(buffer[last] == '\r' || buffer[last] == '\n'){
buffer[last] = 0;
}
}
if(strlen(buffer)){
status = handle_command(buffer, client);
if(status == -1){
break;
}
}
}
--connections;
fclose(client);
return 0;
}
void on_signal(){
printf("Shutting down ferrite\r\n");
if(server){
printf("Closing socket\r\n");
shutdown(server, 2);
close(server);
}
if(&requests != NULL){
printf("Deleting Request Queue\r\n");
queue_del(&requests);
}
if(db){
printf("Closing Cache\r\n");
kcdbclose(db);
}
exit(0);
}
int main(int argc, char* argv[]){
int i, addr_len, yes;
int port = 7000;
int pool_size = 10;
pthread_t pool[pool_size];
char* cache_file = "*";
char* base_url = "http://127.0.0.1:8000/";
struct sockaddr_in addr, client_addr;
signal(SIGINT, on_signal);
signal(SIGTERM, on_signal);
int c;
int option_index = 0;
while((c = getopt_long(argc, argv, "hp:u:w:c:e:", long_options, &option_index)) != -1){
if(c == 0){
switch(option_index){
case 0:
c = 'p';
break;
case 1:
c = 'u';
break;
case 2:
c = 'w';
break;
case 3:
c = 'c';
break;
case 4:
c = 'e';
break;
case 5:
c = 'h';
break;
default:
c = '?';
break;
}
}
switch(c){
case 'p':
port = atoi(optarg);
if(port <= 0){
fprintf(stderr, "Invalid Port Number: %s\r\n", optarg);
exit(1);
}
break;
case 'u':
base_url = optarg;
break;
case 'w':
pool_size = atoi(optarg);
if(pool_size <= 0){
fprintf(stderr, "Invalid Worker Count: %s\r\n", optarg);
exit(1);
}
break;
case 'c':
cache_file = optarg;
break;
case 'e':
record_expire_time = atoi(optarg);
if(record_expire_time <= 0){
fprintf(stderr, "Invalid Expiration Time: %s\r\n", optarg);
exit(1);
}
break;
case 'h':
usage();
exit(0);
case '?':
default:
fprintf(stderr, "Unknown Option: %c\r\n", c);
usage();
exit(-1);
break;
}
}
queue_init(&requests);
printf("Using Expire Time of %d sec\r\n", record_expire_time);
printf("Opening Cache File: \"%s\"\r\n", cache_file);
db = kcdbnew();
if(!kcdbopen(db, cache_file, KCOWRITER | KCOCREATE)){
fprintf(stderr, "Error opening cache: %s\n", kcecodename(kcdbecode(db)));
return -1;
}
printf("Starting %d worker threads\r\n", pool_size);
for(i = 0; i < pool_size; ++i){
pthread_create(&(pool[i]), NULL, call_proxy, (void*)base_url);
}
printf("Connected to %s\r\n", base_url);
server = socket(PF_INET, SOCK_STREAM, 0);
if(!server){
fprintf(stderr, "Could not create socket\r\n");
return -1;
}
port = 7000;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
yes = 1;
setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
if(bind(server, (struct sockaddr*)&addr, sizeof(addr)) < 0){
perror("Error binding socket");
return -1;
}
if(listen(server, 24) < 0){
perror("Error binding socket");
return -1;
} else{
printf("Listening on 0.0.0.0:%d\r\n", port);
int client;
pthread_t child;
FILE *fp;
while(1){
addr_len = sizeof(client_addr);
client = accept(server, 0, 0);
fp = fdopen(client, "r+");
pthread_create(&child, 0, worker, fp);
pthread_detach(child);
}
}
return 0;
}

+ 0
- 0
src/proxy. View File


+ 0
- 58
src/proxy.c View File

@ -1,58 +0,0 @@
#include "proxy.h"
size_t curl_write(char* ptr, size_t size, size_t nmemb, CURL_RESULT* result){
size_t realsize = size * nmemb;
result->data= realloc(result->data, result->size + realsize + 1);
if(result->data == NULL){
fprintf(stderr, "Out of memory receiving curl results\n");
return 0;
}
memcpy(&(result->data[result->size]), ptr, realsize);
result->size += realsize;
result->data[result->size] = 0;
return realsize;
}
void* call_proxy(void* arg){
char* base_url = (char*)arg;
CURL* curl = curl_easy_init();
if(!curl){
fprintf(stderr, "Could not initialize curl\n");
return NULL;
}
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write);
while(1){
char* next;
queue_get(&requests, &next);
int url_size = strlen(base_url) + strlen(next) + 1;
char* url = (char*)malloc(url_size * sizeof(char));
sprintf(url, "%s%s", base_url, next);
CURL_RESULT* result = malloc(sizeof(CURL_RESULT));
result->data = malloc(sizeof(char));
result->data[0] = 0;
result->size = 0;
CURLcode res;
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, result);
res = curl_easy_perform(curl);
char value[1024];
int exp = 0;
if(record_expire_time > 0){
exp = (int)time(NULL) + record_expire_time;
}
sprintf(value, "%10d:%s", exp, result->data);
kcdbset(db, next, strlen(next), value, strlen(value));
if(url != NULL){
free(url);
}
if(next != NULL){
free(next);
}
free(result->data);
free(result);
}
}

+ 0
- 16
src/proxy.h View File

@ -1,16 +0,0 @@
#ifndef FERRITE_PROXY
#define FERRITE_PROXY
#include <curl/curl.h>
#include <time.h>
#include "common.h"
#include "queue.h"
typedef struct {
char* data;
size_t size;
} CURL_RESULT;
size_t curl_write(char* ptr, size_t size, size_t nmemb, CURL_RESULT* result);
void* call_proxy(void* arg);
#endif

+ 0
- 40
src/queue.c View File

@ -1,40 +0,0 @@
#include "queue.h"
void queue_del(PTR_QUEUE q){
kclistdel(*(&q->list));
pthread_mutex_destroy(&q->mutex);
pthread_cond_destroy(&q->cond);
}
void queue_init(PTR_QUEUE q){
*(&q->list) = kclistnew();
*(&q->size) = 0;
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->cond, NULL);
}
void queue_add(PTR_QUEUE q, char* value){
pthread_mutex_lock(&q->mutex);
kclistpush(*(&q->list), value, strlen(value) + 1);
*(&q->size) += 1;
pthread_mutex_unlock(&q->mutex);
pthread_cond_signal(&q->cond);
}
void queue_get(PTR_QUEUE q, char** value){
pthread_mutex_lock(&q->mutex);
while(*(&q->size) == 0){
pthread_cond_wait(&q->cond, &q->mutex);
}
list_shift(*(&q->list), value);
*(&q->size) -= 1;
pthread_mutex_unlock(&q->mutex);
}
int queue_size(PTR_QUEUE q){
int size = 0;
pthread_mutex_lock(&q->mutex);
size = *(&q->size);
pthread_mutex_unlock(&q->mutex);
return size;
}

+ 0
- 23
src/queue.h View File

@ -1,23 +0,0 @@
#ifndef FERRITE_QUEUE
#define FERRITE_QUEUE
#include <kclangc.h>
#include <pthread.h>
#include "util.h"
typedef struct {
KCLIST* list;
pthread_mutex_t mutex;
pthread_cond_t cond;
int size;
} QUEUE;
typedef QUEUE* PTR_QUEUE;
void queue_del(PTR_QUEUE q);
void queue_init(PTR_QUEUE q);
void queue_add(PTR_QUEUE q, char* value);
void queue_get(PTR_QUEUE q, char** value);
int queue_size(PTR_QUEUE q);
#endif

+ 0
- 42
src/util.c View File

@ -1,42 +0,0 @@
#include "util.h"
void list_shift(KCLIST* list, char** next){
size_t size;
const char* results = kclistget(list, 0, &size);
*next = malloc((size * sizeof(char)) + 1);
strcpy(*next, results);
(*next)[size] = 0;
kclistshift(list);
}
void lower(char* word){
int length = strlen(word);
int i;
for(i = 0; i < length; ++i){
word[i] = tolower(word[i]);
}
}
void tokenize(KCLIST* tokens, char* base, char* delimiter){
char* remainder;
char* token;
char* ptr = base;
while((token = strtok_r(ptr, delimiter, &remainder))){
int last;
int len = strlen(token);
for(last = len - 1; last >= 0; --last){
if(token[last] == '\n' || token[last] == '\r'){
token[last] = 0;
break;
}
}
lower(token);
char new_token[1024];
strcpy(new_token, token);
new_token[strlen(token)] = 0;
kclistpush(tokens, new_token, strlen(new_token) + 1);
ptr = remainder;
}
free(token);
}

+ 0
- 9
src/util.h View File

@ -1,9 +0,0 @@
#ifndef FERRITE_UTIL
#define FERRITE_UTIL
#include <kclangc.h>
void list_shift(KCLIST* list, char** next);
void lower(char* word);
void tokenize(KCLIST* tokens, char* base, char* delimiter);
#endif

Loading…
Cancel
Save