Browse Source

add client over raw redis connection

pull/1/head
Brett Langdon 12 years ago
parent
commit
39e15aa0c6
4 changed files with 102 additions and 48 deletions
  1. +88
    -0
      qw/client.py
  2. +5
    -20
      qw/manager.py
  3. +0
    -3
      qw/queue_name.py
  4. +9
    -25
      qw/worker.py

+ 88
- 0
qw/client.py View File

@ -0,0 +1,88 @@
import redis
import uuid
class Client(redis.StrictRedis):
ALL_MANAGERS = "all:managers"
ALL_JOBS = "all:jobs"
MANAGER_WORKERS = "%s:workers"
MANAGER_JOBS = "%s:jobs"
WORKER_JOBS = "%s:jobs"
JOB_KEY = "job:%s"
def __init__(self, host="localhost", port=6379, db=0):
super(Client, self).__init__(host=host, port=port, db=db)
def register_manager(self, name):
self.sadd(self.ALL_MANAGERS, name)
def deregister_manager(self, name):
self.srem(self.ALL_MANAGERS, name)
def register_worker(self, manager, name):
self.sadd(self.MANAGER_WORKERS % (manager, ), name)
def deregister_worker(self, manager, name):
self.srem(self.MANAGER_WORKERS % (manager, ), name)
def queue_job(self, job_data, manager=None, worker=None):
job_id = uuid.uuid4()
self.hmset(self.JOB_KEY % (job_id, ), job_data)
if manager is not None:
self.lpush(self.MANAGER_JOBS % (manager, ), job_id)
elif worker is not None:
self.lpush(self.WORKER_JOBS % (worker, ), job_id)
else:
self.lpush(self.ALL_JOBS, job_id)
return job_id
def fetch_next_job(self, manager, worker, timeout=10):
# try to fetch in this order
# 1) any jobs already assigned but not finished by the worker
# 2) any jobs assigned specifically to the manager
# 3) try to grab a job from the pool of all jobs
job_id = (
self.lpop(self.WORKER_JOBS % (worker, )) or
self.brpoplpush(self.MANAGER_JOBS % (manager, ), self.WORKER_JOBS % (worker, ), timeout=timeout) or
self.brpoplpush(self.ALL_JOBS, self.WORKER_JOBS % (worker, ), timeout=timeout)
)
job_data = None
if job_id is not None:
job_data = self.hgetall(self.JOB_KEY % (job_id, ))
return (job_id, job_data)
def finish_job(self, job_id, worker_name):
self.delete(self.JOB_KEY % (job_id, ))
self.lrem(self.WORKER_JOBS % (worker_name, ), 1, job_id)
def get_all_managers(self):
return self.lrange(self.ALL_MANAGERS, 0, -1)
def get_manager_workers(self, manager_name):
return self.lrange(self.MANAGER_WORKERS % (manager_name, ), 0, -1)
def get_worker_pending_jobs(self, worker_name):
for job_id in self.lrange(self.WORKER_JOBS % (worker_name, ), 0, -1):
yield job_id
def get_manager_queued_jobs(self, manager_name):
for job_id in self.lrange(self.MANAGER_JOBS % (manager_name, ), 0, -1):
yield job_id
def get_all_queued_jobs(self):
for job_id in self.lrange(self.ALL_JOBS, 0, -1):
yield (None, job_id)
for manager in self.get_all_managers():
for job_id in self.get_manager_queued_jobs(manager):
yield (manager, job_id)
def get_all_pending_jobs(self):
for manager in self.get_all_managers():
for worker in self.get_manager_workers(manager):
for job_id in self.get_worker_pending_jobs(worker):
yield (worker, job_id)

+ 5
- 20
qw/manager.py View File

@ -2,20 +2,19 @@ import logging
import multiprocessing
import socket
import redis
from qw import queue_name
from qw.client import Client
from qw.exception import AlreadyStartedException, NotStartedException
from qw.worker import Worker
class Manager(object):
__slots__ = ["workers", "connection", "num_workers", "log", "name"]
__slots__ = ["workers", "client", "num_workers", "log", "name"]
def __init__(self, host="localhost", port=6379, db=0, num_workers=None, name=None):
self.workers = []
self.num_workers = num_workers or multiprocessing.cpu_count()
self.connection = redis.StrictRedis(host=host, port=port, db=db)
self.client = Client(host=host, port=port, db=db)
self.log = logging.getLogger("qw.manager")
self.name = name or socket.gethostname()
@ -30,7 +29,7 @@ class Manager(object):
worker.start()
self.workers.append(worker)
self.log.info("registering %s under %s", self.name, queue_name.ALL_MANAGERS, extra={"process_name": self.name})
self.connection.sadd(queue_name.ALL_MANAGERS, self.name)
self.client.register_manager(self.name)
def join(self):
self.log.debug("joining workers", extra={"process_name": self.name})
@ -50,7 +49,7 @@ class Manager(object):
raise NotStartedException("Workers Do Not Exist")
self.log.info("deregistering %s from %s", self.name, queue_name.ALL_MANAGERS, extra={"process_name": self.name})
self.connection.srem(queue_name.ALL_MANAGERS, self.name)
self.client.deregister_manager(self.name)
self.log.info("shutting down %s workers", len(self.workers), extra={"process_name": self.name})
# trigger them all to shutdown
@ -59,17 +58,3 @@ class Manager(object):
self.join()
self.log.info("stopped", extra={"process_name": self.name})
def get_worker_pending_jobs(self):
if not self.workers:
raise NotStartedException("Workers Do Not Exist")
worker_names = self.connection.smembers("%s:workers" % (self.name, ))
for worker_name in worker_names:
self.log.debug("fetching %s's pending jobs", worker_name, extra={"process_name": self.name})
for job in self.connection.lrange("%s:jobs" % (worker_name, ), 0, -1):
yield (worker_name, job)
def get_all_queued_jobs(self):
return self.connection.lrange(queue_name.ALL_JOBS, 0, -1)

+ 0
- 3
qw/queue_name.py View File

@ -1,3 +0,0 @@
ALL_MANAGERS = "all:managers"
ALL_JOBS = "all:jobs"
JOB_PREFIX = "job:"

+ 9
- 25
qw/worker.py View File

@ -5,17 +5,15 @@ import traceback
import socket
import os
from qw import queue_name
class Worker(multiprocessing.Process):
__slots__ = [
"connection", "exit", "log", "timeout", "manager_name"
"client", "exit", "log", "timeout", "manager_name"
]
def __init__(self, redis_connection, manager_name=None, timeout=10):
def __init__(self, client, manager_name=None, timeout=10):
super(Worker, self).__init__()
self.connection = redis_connection
self.client = client
self.manager_name = manager_name or socket.gethostname()
self.exit = multiprocessing.Event()
self.timeout = timeout
@ -34,36 +32,23 @@ class Worker(multiprocessing.Process):
"registering worker %s under '%s:workers'" % (self.name, self.manager_name),
extra={"process_name": self.name}
)
# reigtser this process under the parent manager_name
self.connection.sadd("%s:workers" % (self.manager_name, ), self.name)
self.client.register_worker(self.manager_name, self.name)
def _deregister(self):
self.log.info(
"deregistering worker %s from '%s:workers'" % (self.name, self.manager_name),
extra={"process_name": self.name}
)
self.connection.srem("%s:workers" % (self.manager_name, ), self.name)
self.client.deregister_worker(self.manager_name, self.name)
def _run(self):
# try to fetch a previously unfinished job
# otherwise try to fetch from the main job pool
self.log.debug(
"polling for jobs from '%s' and '%s'" % (self.job_queue, queue_name.ALL_JOBS),
extra={"process_name": self.name}
)
job_id = (
self.connection.lpop(self.job_queue) or
self.connection.brpoplpush(queue_name.ALL_JOBS, self.job_queue, timeout=self.timeout)
)
if not job_id:
self.log.debug(
"no jobs found from '%s' or '%s'" % (self.job_queue, queue_name.ALL_JOBS),
extra={"process_name": self.name}
)
self.log.debug("polling for jobs", extra={"process_name": self.name})
job_id, job_data = self.client.fetch_next_job(self.manager_name, self.name, timeout=self.timeout)
if not job_id or not job_data:
return
job_data = self.connection.hgetall("job:%s" % (job_id, ))
self.log.debug(
"processing job id (%s) data (%r)" % (job_id, job_data), extra={"process_name": self.name}
)
@ -71,8 +56,7 @@ class Worker(multiprocessing.Process):
self._process(job_id, job_data)
self.log.debug("removing job id (%s)" % (job_id), extra={"process_name": self.name})
self.connection.delete("job:%s" % (job_id, ))
self.connection.lrem(self.job_queue, 1, job_id)
self.client.finish_job(job_id, self.name)
def _process(self, job_id, job_data):
print "%s - %s" % (job_id, job_data)


Loading…
Cancel
Save