commit b21ce924d814381c068506db1e81edce524c3694 Author: brettlangdon Date: Thu Oct 9 09:03:28 2014 -0400 initial commit diff --git a/bin/qw-manager b/bin/qw-manager new file mode 100755 index 0000000..4694387 --- /dev/null +++ b/bin/qw-manager @@ -0,0 +1,62 @@ +#!/usr/bin/env python + +import logging +import os +import signal + +import docopt + +from qw.manager import Manager +from qw import logging_setup +from qw import __version__ + + +__doc__ = """Usage: + qw-manager [--level=] [--workers=] [--name=] [--host=] [--port=] [--db=] + qw-manager (--help | --version) + +Options: + --help Show this help message + --version Show version information + -l --level= Set the log level (debug,info,warn,error) [default: info] + -w --workers= Set the number of workers to start, defaults to number of cpus + -n --name= Set the manager name, defaults to hostname + -h --host= Set the redis host to use [default: localhost] + -p --port= Set the redis port to use [default: 6379] + -d --db= Set the redis db number to use [default: 0] +""" + +arguments = docopt.docopt(__doc__, version="qw-manager %s" % (__version__, )) + +log_level = arguments["--level"].upper() + +logging_setup.manager_logger.setLevel(log_level) +logging_setup.worker_logger.setLevel(log_level) + +manager = Manager( + name=arguments["--name"], num_workers=arguments["--workers"], + host=arguments["--host"], port=arguments["--port"], db=arguments["--db"] +) + +manager.start() + +def stop_handler(signum, frame): + manager.stop() + +def info_handler(signum, frame): + print "All Queued Jobs:" + for job in manager.get_all_queued_jobs(): + print job + + print "Current Pending Jobs:" + for worker, job in manager.get_worker_pending_jobs(): + print "%s: %s" % (worker, job) + +signal.signal(signal.SIGTERM, stop_handler) +signal.signal(signal.SIGHUP, stop_handler) +signal.signal(signal.SIGINT, info_handler) + +try: + manager.join() +except KeyboardInterrupt: + manager.stop() diff --git a/qw/__init__.py b/qw/__init__.py new file mode 100644 index 0000000..3dc1f76 --- /dev/null +++ b/qw/__init__.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/qw/exception.py b/qw/exception.py new file mode 100644 index 0000000..4c6110d --- /dev/null +++ b/qw/exception.py @@ -0,0 +1,6 @@ +class AlreadyStartedException(Exception): + pass + + +class NotStartedException(Exception): + pass diff --git a/qw/logging_setup.py b/qw/logging_setup.py new file mode 100644 index 0000000..d0e2a97 --- /dev/null +++ b/qw/logging_setup.py @@ -0,0 +1,13 @@ +import logging + +FORMAT = "%(asctime)s - %(name)s - %(process_name)s - %(levelname)s - %(message)s" +formatter = logging.Formatter(FORMAT) + +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(formatter) + +manager_logger = logging.getLogger("qw.manager") +manager_logger.addHandler(stream_handler) + +worker_logger = logging.getLogger("qw.worker") +worker_logger.addHandler(stream_handler) diff --git a/qw/manager.py b/qw/manager.py new file mode 100644 index 0000000..9e6564b --- /dev/null +++ b/qw/manager.py @@ -0,0 +1,75 @@ +import logging +import multiprocessing +import socket + +import redis + +from qw import queue_name +from qw.exception import AlreadyStartedException, NotStartedException +from qw.worker import Worker + + +class Manager(object): + __slots__ = ["workers", "connection", "num_workers", "log", "name"] + + def __init__(self, host="localhost", port=6379, db=0, num_workers=None, name=None): + self.workers = [] + self.num_workers = num_workers or multiprocessing.cpu_count() + self.connection = redis.StrictRedis(host=host, port=port, db=db) + self.log = logging.getLogger("qw.manager") + self.name = name or socket.gethostname() + + def start(self): + self.log.info("starting", extra={"process_name": self.name}) + if self.workers: + raise AlreadyStartedException("Workers Already Started") + + self.log.info("starting %s workers", self.num_workers, extra={"process_name": self.name}) + for _ in xrange(self.num_workers): + worker = Worker(self.connection, manager_name=self.name) + worker.start() + self.workers.append(worker) + self.log.info("registering %s under %s", self.name, queue_name.ALL_MANAGERS, extra={"process_name": self.name}) + self.connection.sadd(queue_name.ALL_MANAGERS, self.name) + + def join(self): + self.log.debug("joining workers", extra={"process_name": self.name}) + if not self.workers: + raise NotStartedException("Workers Do Not Exist") + + # wait for them all to stop + for worker in self.workers: + worker.join() + + # make sure to clear out workers list + self.workers = [] + + def stop(self): + self.log.info("stopping", extra={"process_name": self.name}) + if not self.workers: + raise NotStartedException("Workers Do Not Exist") + + self.log.info("deregistering %s from %s", self.name, queue_name.ALL_MANAGERS, extra={"process_name": self.name}) + self.connection.srem(queue_name.ALL_MANAGERS, self.name) + + self.log.info("shutting down %s workers", len(self.workers), extra={"process_name": self.name}) + # trigger them all to shutdown + for worker in self.workers: + worker.shutdown() + + self.join() + self.log.info("stopped", extra={"process_name": self.name}) + + def get_worker_pending_jobs(self): + if not self.workers: + raise NotStartedException("Workers Do Not Exist") + + worker_names = self.connection.smembers("%s:workers" % (self.name, )) + + for worker_name in worker_names: + self.log.debug("fetching %s's pending jobs", worker_name, extra={"process_name": self.name}) + for job in self.connection.lrange("%s:jobs" % (worker_name, ), 0, -1): + yield (worker_name, job) + + def get_all_queued_jobs(self): + return self.connection.lrange(queue_name.ALL_JOBS, 0, -1) diff --git a/qw/queue_name.py b/qw/queue_name.py new file mode 100644 index 0000000..b285aef --- /dev/null +++ b/qw/queue_name.py @@ -0,0 +1,3 @@ +ALL_MANAGERS = "all:managers" +ALL_JOBS = "all:jobs" +JOB_PREFIX = "job:" diff --git a/qw/worker.py b/qw/worker.py new file mode 100644 index 0000000..27fe0c5 --- /dev/null +++ b/qw/worker.py @@ -0,0 +1,100 @@ +import logging +import multiprocessing +import time +import traceback +import socket +import os + +from qw import queue_name + + +class Worker(multiprocessing.Process): + __slots__ = [ + "connection", "exit", "log", "timeout", "manager_name" + ] + + def __init__(self, redis_connection, manager_name=None, timeout=10): + super(Worker, self).__init__() + self.connection = redis_connection + self.manager_name = manager_name or socket.gethostname() + self.exit = multiprocessing.Event() + self.timeout = timeout + self.log = logging.getLogger("qw.worker") + + @property + def name(self): + return "%s.%s" % (self.manager_name, os.getpid()) + + @property + def job_queue(self): + return "%s:jobs" % (self.name, ) + + def _register(self): + self.log.info( + "registering worker %s under '%s:workers'" % (self.name, self.manager_name), + extra={"process_name": self.name} + ) + # reigtser this process under the parent manager_name + self.connection.sadd("%s:workers" % (self.manager_name, ), self.name) + + def _deregister(self): + self.log.info( + "deregistering worker %s from '%s:workers'" % (self.name, self.manager_name), + extra={"process_name": self.name} + ) + self.connection.srem("%s:workers" % (self.manager_name, ), self.name) + + def _run(self): + # try to fetch a previously unfinished job + # otherwise try to fetch from the main job pool + self.log.debug( + "polling for jobs from '%s' and '%s'" % (self.job_queue, queue_name.ALL_JOBS), + extra={"process_name": self.name} + ) + job_id = ( + self.connection.lpop(self.job_queue) or + self.connection.brpoplpush(queue_name.ALL_JOBS, self.job_queue, timeout=self.timeout) + ) + + if not job_id: + self.log.debug( + "no jobs found from '%s' or '%s'" % (self.job_queue, queue_name.ALL_JOBS), + extra={"process_name": self.name} + ) + return + + job_data = self.connection.hgetall("job:%s" % (job_id, )) + self.log.debug( + "processing job id (%s) data (%r)" % (job_id, job_data), extra={"process_name": self.name} + ) + if job_data: + self._process(job_id, job_data) + + self.log.debug("removing job id (%s)" % (job_id), extra={"process_name": self.name}) + self.connection.delete("job:%s" % (job_id, )) + self.connection.lrem(self.job_queue, 1, job_id) + + def _process(self, job_id, job_data): + print "%s - %s" % (job_id, job_data) + time.sleep(20) + print "done" + + def run(self): + self.log.info("starting", extra={"process_name": self.name}) + self._register() + while not self.exit.is_set(): + try: + self._run() + except KeyboardInterrupt: + self.log.error("encountered a KeyboardInterrupt", extra={"process_name": self.name}) + break + except Exception, e: + self.log.error("encountered an error (%r)" % (e, ), extra={"process_name": self.name}) + traceback.print_exc() + + self._deregister() + self.log.info("stopping", extra={"process_name": self.name}) + + def shutdown(self): + self.log.info("shutdown signal received", extra={"process_name": self.name}) + self.exit.set() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..826502e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +redis==2.10.3 +docopt==0.6.2 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..9f0d566 --- /dev/null +++ b/setup.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python + +from setuptools import setup, findpackages + +from qw import __version__ + +setup( + name="qw", + version=__version__, + description="Python Distributed Redis Queue Workers", + author="Brett Langdon", + author_email="brett@blangdon.com", + url="https://github.com/brettlangdon/qw", + packages=findpackages(), + license="MIT", + classifiers=[ + "Intended Audience :: Developers", + "Programming Language :: Python", + "Programming Language :: Python :: 2.6", + "Programming Language :: Python :: 2.7", + "License :: OSI Approved :: MIT License", + "Topic :: Utilities", + ] +)