diff --git a/qw/client.py b/qw/client.py new file mode 100644 index 0000000..18873af --- /dev/null +++ b/qw/client.py @@ -0,0 +1,88 @@ +import redis + +import uuid + + +class Client(redis.StrictRedis): + ALL_MANAGERS = "all:managers" + ALL_JOBS = "all:jobs" + MANAGER_WORKERS = "%s:workers" + MANAGER_JOBS = "%s:jobs" + WORKER_JOBS = "%s:jobs" + JOB_KEY = "job:%s" + + def __init__(self, host="localhost", port=6379, db=0): + super(Client, self).__init__(host=host, port=port, db=db) + + def register_manager(self, name): + self.sadd(self.ALL_MANAGERS, name) + + def deregister_manager(self, name): + self.srem(self.ALL_MANAGERS, name) + + def register_worker(self, manager, name): + self.sadd(self.MANAGER_WORKERS % (manager, ), name) + + def deregister_worker(self, manager, name): + self.srem(self.MANAGER_WORKERS % (manager, ), name) + + def queue_job(self, job_data, manager=None, worker=None): + job_id = uuid.uuid4() + self.hmset(self.JOB_KEY % (job_id, ), job_data) + if manager is not None: + self.lpush(self.MANAGER_JOBS % (manager, ), job_id) + elif worker is not None: + self.lpush(self.WORKER_JOBS % (worker, ), job_id) + else: + self.lpush(self.ALL_JOBS, job_id) + + return job_id + + def fetch_next_job(self, manager, worker, timeout=10): + # try to fetch in this order + # 1) any jobs already assigned but not finished by the worker + # 2) any jobs assigned specifically to the manager + # 3) try to grab a job from the pool of all jobs + job_id = ( + self.lpop(self.WORKER_JOBS % (worker, )) or + self.brpoplpush(self.MANAGER_JOBS % (manager, ), self.WORKER_JOBS % (worker, ), timeout=timeout) or + self.brpoplpush(self.ALL_JOBS, self.WORKER_JOBS % (worker, ), timeout=timeout) + ) + + job_data = None + if job_id is not None: + job_data = self.hgetall(self.JOB_KEY % (job_id, )) + + return (job_id, job_data) + + def finish_job(self, job_id, worker_name): + self.delete(self.JOB_KEY % (job_id, )) + self.lrem(self.WORKER_JOBS % (worker_name, ), 1, job_id) + + def get_all_managers(self): + return self.lrange(self.ALL_MANAGERS, 0, -1) + + def get_manager_workers(self, manager_name): + return self.lrange(self.MANAGER_WORKERS % (manager_name, ), 0, -1) + + def get_worker_pending_jobs(self, worker_name): + for job_id in self.lrange(self.WORKER_JOBS % (worker_name, ), 0, -1): + yield job_id + + def get_manager_queued_jobs(self, manager_name): + for job_id in self.lrange(self.MANAGER_JOBS % (manager_name, ), 0, -1): + yield job_id + + def get_all_queued_jobs(self): + for job_id in self.lrange(self.ALL_JOBS, 0, -1): + yield (None, job_id) + + for manager in self.get_all_managers(): + for job_id in self.get_manager_queued_jobs(manager): + yield (manager, job_id) + + def get_all_pending_jobs(self): + for manager in self.get_all_managers(): + for worker in self.get_manager_workers(manager): + for job_id in self.get_worker_pending_jobs(worker): + yield (worker, job_id) diff --git a/qw/manager.py b/qw/manager.py index 9e6564b..80742b2 100644 --- a/qw/manager.py +++ b/qw/manager.py @@ -2,20 +2,19 @@ import logging import multiprocessing import socket -import redis - from qw import queue_name +from qw.client import Client from qw.exception import AlreadyStartedException, NotStartedException from qw.worker import Worker class Manager(object): - __slots__ = ["workers", "connection", "num_workers", "log", "name"] + __slots__ = ["workers", "client", "num_workers", "log", "name"] def __init__(self, host="localhost", port=6379, db=0, num_workers=None, name=None): self.workers = [] self.num_workers = num_workers or multiprocessing.cpu_count() - self.connection = redis.StrictRedis(host=host, port=port, db=db) + self.client = Client(host=host, port=port, db=db) self.log = logging.getLogger("qw.manager") self.name = name or socket.gethostname() @@ -30,7 +29,7 @@ class Manager(object): worker.start() self.workers.append(worker) self.log.info("registering %s under %s", self.name, queue_name.ALL_MANAGERS, extra={"process_name": self.name}) - self.connection.sadd(queue_name.ALL_MANAGERS, self.name) + self.client.register_manager(self.name) def join(self): self.log.debug("joining workers", extra={"process_name": self.name}) @@ -50,7 +49,7 @@ class Manager(object): raise NotStartedException("Workers Do Not Exist") self.log.info("deregistering %s from %s", self.name, queue_name.ALL_MANAGERS, extra={"process_name": self.name}) - self.connection.srem(queue_name.ALL_MANAGERS, self.name) + self.client.deregister_manager(self.name) self.log.info("shutting down %s workers", len(self.workers), extra={"process_name": self.name}) # trigger them all to shutdown @@ -59,17 +58,3 @@ class Manager(object): self.join() self.log.info("stopped", extra={"process_name": self.name}) - - def get_worker_pending_jobs(self): - if not self.workers: - raise NotStartedException("Workers Do Not Exist") - - worker_names = self.connection.smembers("%s:workers" % (self.name, )) - - for worker_name in worker_names: - self.log.debug("fetching %s's pending jobs", worker_name, extra={"process_name": self.name}) - for job in self.connection.lrange("%s:jobs" % (worker_name, ), 0, -1): - yield (worker_name, job) - - def get_all_queued_jobs(self): - return self.connection.lrange(queue_name.ALL_JOBS, 0, -1) diff --git a/qw/queue_name.py b/qw/queue_name.py deleted file mode 100644 index b285aef..0000000 --- a/qw/queue_name.py +++ /dev/null @@ -1,3 +0,0 @@ -ALL_MANAGERS = "all:managers" -ALL_JOBS = "all:jobs" -JOB_PREFIX = "job:" diff --git a/qw/worker.py b/qw/worker.py index 27fe0c5..acf1e78 100644 --- a/qw/worker.py +++ b/qw/worker.py @@ -5,17 +5,15 @@ import traceback import socket import os -from qw import queue_name - class Worker(multiprocessing.Process): __slots__ = [ - "connection", "exit", "log", "timeout", "manager_name" + "client", "exit", "log", "timeout", "manager_name" ] - def __init__(self, redis_connection, manager_name=None, timeout=10): + def __init__(self, client, manager_name=None, timeout=10): super(Worker, self).__init__() - self.connection = redis_connection + self.client = client self.manager_name = manager_name or socket.gethostname() self.exit = multiprocessing.Event() self.timeout = timeout @@ -34,36 +32,23 @@ class Worker(multiprocessing.Process): "registering worker %s under '%s:workers'" % (self.name, self.manager_name), extra={"process_name": self.name} ) - # reigtser this process under the parent manager_name - self.connection.sadd("%s:workers" % (self.manager_name, ), self.name) + self.client.register_worker(self.manager_name, self.name) def _deregister(self): self.log.info( "deregistering worker %s from '%s:workers'" % (self.name, self.manager_name), extra={"process_name": self.name} ) - self.connection.srem("%s:workers" % (self.manager_name, ), self.name) + self.client.deregister_worker(self.manager_name, self.name) def _run(self): # try to fetch a previously unfinished job # otherwise try to fetch from the main job pool - self.log.debug( - "polling for jobs from '%s' and '%s'" % (self.job_queue, queue_name.ALL_JOBS), - extra={"process_name": self.name} - ) - job_id = ( - self.connection.lpop(self.job_queue) or - self.connection.brpoplpush(queue_name.ALL_JOBS, self.job_queue, timeout=self.timeout) - ) - - if not job_id: - self.log.debug( - "no jobs found from '%s' or '%s'" % (self.job_queue, queue_name.ALL_JOBS), - extra={"process_name": self.name} - ) + self.log.debug("polling for jobs", extra={"process_name": self.name}) + job_id, job_data = self.client.fetch_next_job(self.manager_name, self.name, timeout=self.timeout) + if not job_id or not job_data: return - job_data = self.connection.hgetall("job:%s" % (job_id, )) self.log.debug( "processing job id (%s) data (%r)" % (job_id, job_data), extra={"process_name": self.name} ) @@ -71,8 +56,7 @@ class Worker(multiprocessing.Process): self._process(job_id, job_data) self.log.debug("removing job id (%s)" % (job_id), extra={"process_name": self.name}) - self.connection.delete("job:%s" % (job_id, )) - self.connection.lrem(self.job_queue, 1, job_id) + self.client.finish_job(job_id, self.name) def _process(self, job_id, job_data): print "%s - %s" % (job_id, job_data)