From 399e69ead4902461b4dfb6a6377fc295bc3fca0c Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Wed, 11 Sep 2013 15:18:47 -0400 Subject: [PATCH] make things plural and refactor pool into its own class --- docs/client.rst | 5 - docs/clients.rst | 5 + docs/index.rst | 3 +- docs/pools.rst | 5 + riakcached/{client.py => clients.py} | 197 +++++++++--------- riakcached/pools.py | 68 ++++++ .../{test_client.py => test_riakclient.py} | 2 +- 7 files changed, 177 insertions(+), 108 deletions(-) delete mode 100644 docs/client.rst create mode 100644 docs/clients.rst create mode 100644 docs/pools.rst rename riakcached/{client.py => clients.py} (52%) create mode 100644 riakcached/pools.py rename riakcached/tests/{test_client.py => test_riakclient.py} (99%) diff --git a/docs/client.rst b/docs/client.rst deleted file mode 100644 index f3c8d4d..0000000 --- a/docs/client.rst +++ /dev/null @@ -1,5 +0,0 @@ -riakcached.client -================= - -.. automodule:: riakcached.client - :members: diff --git a/docs/clients.rst b/docs/clients.rst new file mode 100644 index 0000000..d225bb8 --- /dev/null +++ b/docs/clients.rst @@ -0,0 +1,5 @@ +riakcached.clients +================= + +.. automodule:: riakcached.clients + :members: diff --git a/docs/index.rst b/docs/index.rst index 0059e0f..06a1445 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -6,8 +6,9 @@ Contents: .. toctree:: :maxdepth: 2 - client + clients exceptions + pools A Memcached like interface to the Riak HTTP API. diff --git a/docs/pools.rst b/docs/pools.rst new file mode 100644 index 0000000..1f19602 --- /dev/null +++ b/docs/pools.rst @@ -0,0 +1,5 @@ +riakcached.pools +================ + +.. automodule:: riakcached.pools + :members: diff --git a/riakcached/client.py b/riakcached/clients.py similarity index 52% rename from riakcached/client.py rename to riakcached/clients.py index b6f6f0d..9c50b55 100644 --- a/riakcached/client.py +++ b/riakcached/clients.py @@ -1,12 +1,11 @@ -__all__ = ["RiakClient"] +__all__ = ["RiakClient", "ThreadedRiakClient"] import json import Queue import threading -import urllib3 - from riakcached import exceptions +from riakcached.pools import Urllib3Pool class RiakClient(object): @@ -15,18 +14,21 @@ class RiakClient(object): __slots__ = [ "_serializers", "_deserializers", - "_pool", - "_timeout", + "base_url", "bucket", - "url", + "pool", ] - def __init__(self, bucket, url="http://127.0.0.1:8098", timeout=2, auto_connect=True): + def __init__(self, bucket, pool=None): """ """ + if pool is None: + self.pool = Urllib3Pool() + else: + self.pool = pool + self.bucket = bucket - self.url = url.strip("/") - self._timeout = timeout + self.base_url = pool.url self._serializers = { "application/json": json.dumps, } @@ -34,9 +36,6 @@ class RiakClient(object): "application/json": json.loads, } - if auto_connect: - self._connect() - def add_serializer(self, content_type, serializer): """ """ @@ -61,176 +60,142 @@ class RiakClient(object): deserializer = self._deserializers.get(content_type, str) return deserializer(data) - def close(self): - """ - """ - if self._pool: - self._pool.close() - def get(self, key, counter=False): """ """ - url = "%s/buckets/%s/keys/%s" % (self.url, self.bucket, key) + url = "%s/buckets/%s/keys/%s" % (self.base_url, self.bucket, key) if counter: - url = "%s/buckets/%s/counters/%s" % (self.url, self.bucket, key) - response = self._request(method="GET", url=url) - if response.status == 400: - raise exceptions.RiakcachedBadRequest(response.data) - elif response.status == 503: - raise exceptions.RiakcachedServiceUnavailable(response.data) - - if response.status not in (200, 300, 304): + url = "%s/buckets/%s/counters/%s" % (self.base_url, self.bucket, key) + status, data, headers = self.pool.request(method="GET", url=url) + if status == 400: + raise exceptions.RiakcachedBadRequest(data) + elif status == 503: + raise exceptions.RiakcachedServiceUnavailable(data) + + if status not in (200, 300, 304): return None - return self.deserialize(response.data, response.getheader("content-type")) + return self.deserialize(data, headers.get("content-type", "text/plain")) def get_many(self, keys): """ """ - def worker(key, results): - results.put((key, self.get(key))) - - args = [[key] for key in keys] - results = self._many(worker, args) - results = dict((key, value) for key, value in results.iteritems() if value is not None) - return results or None + results = dict((key, self.get(key)) for key in keys) + return dict((key, value) for key, value in results.iteritems() if value is not None) def set(self, key, value, content_type="text/plain"): """ """ value = self.serialize(value, content_type) - response = self._request( + status, data, _ = self.pool.request( method="POST", - url="%s/buckets/%s/keys/%s" % (self.url, self.bucket, key), + url="%s/buckets/%s/keys/%s" % (self.base_url, self.bucket, key), body=value, headers={ "Content-Type": content_type, }, ) - if response.status == 400: - raise exceptions.RiakcachedBadRequest(response.data) - elif response.status == 412: - raise exceptions.RiakcachedPreconditionFailed(response.data) - return response.status in (200, 201, 204, 300) + if status == 400: + raise exceptions.RiakcachedBadRequest(data) + elif status == 412: + raise exceptions.RiakcachedPreconditionFailed(data) + return status in (200, 201, 204, 300) def set_many(self, values): """ """ - def worker(key, value, results): - results.put((key, self.set(key, value))) - - args = [list(data) for data in values.items()] - return self._many(worker, args) + return dict((key, self.set(key, value)) for key, value in values.iteritems()) def delete(self, key): """ """ - response = self._request( + status, data, _ = self.pool.request( method="DELETE", - url="%s/buckets/%s/keys/%s" % (self.url, self.bucket, key), + url="%s/buckets/%s/keys/%s" % (self.base_url, self.bucket, key), ) - if response.status == 400: - raise exceptions.RiakcachedBadRequest(response.data) - return response.status in (204, 404) + if status == 400: + raise exceptions.RiakcachedBadRequest(data) + return status in (204, 404) def delete_many(self, keys): """ """ - def worker(key, results): - results.put((key, self.delete(key))) - - args = [[key] for key in keys] - - return self._many(worker, args) + return dict((key, self.delete(key)) for key in keys) def stats(self): """ """ - response = self._request( + status, data, _ = self.pool.request( method="GET", - url="%s/stats" % self.url, + url="%s/stats" % self.base_url, ) - if response.status == 200: - return self.deserialize(response.data, "application/json") + if status == 200: + return self.deserialize(data, "application/json") return None def props(self): """ """ - response = self._request( + status, data, _ = self.pool.request( method="GET", - url="%s/buckets/%s/props" % (self.url, self.bucket), + url="%s/buckets/%s/props" % (self.base_url, self.bucket), ) - if response.status == 200: - return json.loads(response.data) + if status == 200: + return json.loads(data) return None def set_props(self, props): """ """ - response = self._request( + status, _, _ = self.pool.request( method="PUT", - url="%s/buckets/%s/props" % (self.url, self.bucket), + url="%s/buckets/%s/props" % (self.base_url, self.bucket), body=self.serialize(props, "application/json"), headers={ "Content-Type": "application/json", } ) - return response.status == 200 + return status == 200 def keys(self): """ """ - response = self._request( + status, data, _ = self.pool.request( method="GET", - url="%s/buckets/%s/keys?keys=true" % (self.url, self.bucket), + url="%s/buckets/%s/keys?keys=true" % (self.base_url, self.bucket), ) - if response.status == 200: - return self.deserialize(response.data, "application/json") + if status == 200: + return self.deserialize(data, "application/json") return None def ping(self): """ """ - response = self._request( + status, _, _ = self.pool.request( method="GET", - url="%s/ping" % self.url, + url="%s/ping" % self.base_url, ) - return response.status == 200 + return status == 200 def incr(self, key, value=1): """ """ - response = self._request( + status, data, _ = self.pool.request( method="POST", - url="%s/buckets/%s/counters/%s" % (self.url, self.bucket, key), + url="%s/buckets/%s/counters/%s" % (self.base_url, self.bucket, key), body=str(value), ) - if response.status == 409: - raise exceptions.RiakcachedConflict(response.data) - elif response.status == 400: - raise exceptions.RiakcachedBadRequest(response.data) - return response.status in (200, 201, 204, 300) - - def _connect(self): - self._pool = urllib3.connection_from_url(self.url) - - def _request(self, method, url, body=None, headers=None): - try: - return self._pool.urlopen( - method=method, - url=url, - body=body, - headers=headers, - timeout=self._timeout, - redirect=False, - ) - except urllib3.exceptions.TimeoutError, e: - raise exceptions.RiakcachedTimeout(e.message) - except urllib3.exceptions.HTTPError, e: - raise exceptions.RiakcachedConnectionError(e.message) + if status == 409: + raise exceptions.RiakcachedConflict(data) + elif status == 400: + raise exceptions.RiakcachedBadRequest(data) + return status in (200, 201, 204, 300) + +class ThreadedRiakClient(RiakClient): + """ + """ def _many(self, target, args_list): workers = [] worker_results = Queue.Queue() @@ -249,3 +214,33 @@ class RiakClient(object): key, value = worker_results.get() results[key] = value return results + + def delete_many(self, keys): + """ + """ + def worker(key, results): + results.put((key, self.delete(key))) + + args = [[key] for key in keys] + + return self._many(worker, args) + + def set_many(self, values): + """ + """ + def worker(key, value, results): + results.put((key, self.set(key, value))) + + args = [list(data) for data in values.items()] + return self._many(worker, args) + + def get_many(self, keys): + """ + """ + def worker(key, results): + results.put((key, self.get(key))) + + args = [[key] for key in keys] + results = self._many(worker, args) + results = dict((key, value) for key, value in results.iteritems() if value is not None) + return results or None diff --git a/riakcached/pools.py b/riakcached/pools.py new file mode 100644 index 0000000..83827af --- /dev/null +++ b/riakcached/pools.py @@ -0,0 +1,68 @@ +import urllib3 + +from riakcached import exceptions + + +class Pool(object): + """ + """ + __slots__ = ["timeout", "url"] + + def __init__(self, base_url="http://127.0.0.1:8098", timeout=2, auto_connect=True): + """ + """ + self.url = base_url + self.timeout = timeout + if auto_connect: + self.connect() + + def connect(self): + """ + """ + raise NotImplementedError("You must not use %s directly" % self.__class__.__name__) + + def close(self): + """ + """ + raise NotImplementedError("You must not use %s directly" % self.__class__.__name__) + + def request(self, method, url, body=None, headers=None): + """ + """ + raise NotImplementedError("You must not use %s directly" % self.__class__.__name__) + + +class Urllib3Pool(Pool): + """ + """ + __slots__ = ["pool"] + + def connect(self): + """ + """ + self.pool = urllib3.connection_from_url(self.url) + + def close(self): + """ + """ + if self.pool: + self.pool.close() + + def request(self, method, url, body=None, headers=None): + """ + """ + try: + response = self.pool.urlopen( + method=method, + url=url, + body=body, + headers=headers, + timeout=self.timeout, + redirect=False, + ) + return response.status, response.data, response.getheaders() + except urllib3.exceptions.TimeoutError, e: + raise exceptions.RiakcachedTimeout(e.message) + except urllib3.exceptions.HTTPError, e: + raise exceptions.RiakcachedConnectionError(e.message) + return None, None, None diff --git a/riakcached/tests/test_client.py b/riakcached/tests/test_riakclient.py similarity index 99% rename from riakcached/tests/test_client.py rename to riakcached/tests/test_riakclient.py index 8f86cfc..9bb4f40 100644 --- a/riakcached/tests/test_client.py +++ b/riakcached/tests/test_riakclient.py @@ -5,7 +5,7 @@ import urllib3.exceptions from riakcached import exceptions -from riakcached.client import RiakClient +from riakcached.clients import RiakClient from riakcached.tests.utils import InlineClass