Browse Source

initial commit

pull/1/head
Brett Langdon 11 years ago
commit
b21ce924d8
9 changed files with 286 additions and 0 deletions
  1. +62
    -0
      bin/qw-manager
  2. +1
    -0
      qw/__init__.py
  3. +6
    -0
      qw/exception.py
  4. +13
    -0
      qw/logging_setup.py
  5. +75
    -0
      qw/manager.py
  6. +3
    -0
      qw/queue_name.py
  7. +100
    -0
      qw/worker.py
  8. +2
    -0
      requirements.txt
  9. +24
    -0
      setup.py

+ 62
- 0
bin/qw-manager View File

@ -0,0 +1,62 @@
#!/usr/bin/env python
import logging
import os
import signal
import docopt
from qw.manager import Manager
from qw import logging_setup
from qw import __version__
__doc__ = """Usage:
qw-manager [--level=<log-level>] [--workers=<num-workers>] [--name=<name>] [--host=<host>] [--port=<port>] [--db=<db>]
qw-manager (--help | --version)
Options:
--help Show this help message
--version Show version information
-l --level=<log-level> Set the log level (debug,info,warn,error) [default: info]
-w --workers=<num-workers> Set the number of workers to start, defaults to number of cpus
-n --name=<name> Set the manager name, defaults to hostname
-h --host=<host> Set the redis host to use [default: localhost]
-p --port=<port> Set the redis port to use [default: 6379]
-d --db=<db> Set the redis db number to use [default: 0]
"""
arguments = docopt.docopt(__doc__, version="qw-manager %s" % (__version__, ))
log_level = arguments["--level"].upper()
logging_setup.manager_logger.setLevel(log_level)
logging_setup.worker_logger.setLevel(log_level)
manager = Manager(
name=arguments["--name"], num_workers=arguments["--workers"],
host=arguments["--host"], port=arguments["--port"], db=arguments["--db"]
)
manager.start()
def stop_handler(signum, frame):
manager.stop()
def info_handler(signum, frame):
print "All Queued Jobs:"
for job in manager.get_all_queued_jobs():
print job
print "Current Pending Jobs:"
for worker, job in manager.get_worker_pending_jobs():
print "%s: %s" % (worker, job)
signal.signal(signal.SIGTERM, stop_handler)
signal.signal(signal.SIGHUP, stop_handler)
signal.signal(signal.SIGINT, info_handler)
try:
manager.join()
except KeyboardInterrupt:
manager.stop()

+ 1
- 0
qw/__init__.py View File

@ -0,0 +1 @@
__version__ = "0.1.0"

+ 6
- 0
qw/exception.py View File

@ -0,0 +1,6 @@
class AlreadyStartedException(Exception):
pass
class NotStartedException(Exception):
pass

+ 13
- 0
qw/logging_setup.py View File

@ -0,0 +1,13 @@
import logging
FORMAT = "%(asctime)s - %(name)s - %(process_name)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(FORMAT)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
manager_logger = logging.getLogger("qw.manager")
manager_logger.addHandler(stream_handler)
worker_logger = logging.getLogger("qw.worker")
worker_logger.addHandler(stream_handler)

+ 75
- 0
qw/manager.py View File

@ -0,0 +1,75 @@
import logging
import multiprocessing
import socket
import redis
from qw import queue_name
from qw.exception import AlreadyStartedException, NotStartedException
from qw.worker import Worker
class Manager(object):
__slots__ = ["workers", "connection", "num_workers", "log", "name"]
def __init__(self, host="localhost", port=6379, db=0, num_workers=None, name=None):
self.workers = []
self.num_workers = num_workers or multiprocessing.cpu_count()
self.connection = redis.StrictRedis(host=host, port=port, db=db)
self.log = logging.getLogger("qw.manager")
self.name = name or socket.gethostname()
def start(self):
self.log.info("starting", extra={"process_name": self.name})
if self.workers:
raise AlreadyStartedException("Workers Already Started")
self.log.info("starting %s workers", self.num_workers, extra={"process_name": self.name})
for _ in xrange(self.num_workers):
worker = Worker(self.connection, manager_name=self.name)
worker.start()
self.workers.append(worker)
self.log.info("registering %s under %s", self.name, queue_name.ALL_MANAGERS, extra={"process_name": self.name})
self.connection.sadd(queue_name.ALL_MANAGERS, self.name)
def join(self):
self.log.debug("joining workers", extra={"process_name": self.name})
if not self.workers:
raise NotStartedException("Workers Do Not Exist")
# wait for them all to stop
for worker in self.workers:
worker.join()
# make sure to clear out workers list
self.workers = []
def stop(self):
self.log.info("stopping", extra={"process_name": self.name})
if not self.workers:
raise NotStartedException("Workers Do Not Exist")
self.log.info("deregistering %s from %s", self.name, queue_name.ALL_MANAGERS, extra={"process_name": self.name})
self.connection.srem(queue_name.ALL_MANAGERS, self.name)
self.log.info("shutting down %s workers", len(self.workers), extra={"process_name": self.name})
# trigger them all to shutdown
for worker in self.workers:
worker.shutdown()
self.join()
self.log.info("stopped", extra={"process_name": self.name})
def get_worker_pending_jobs(self):
if not self.workers:
raise NotStartedException("Workers Do Not Exist")
worker_names = self.connection.smembers("%s:workers" % (self.name, ))
for worker_name in worker_names:
self.log.debug("fetching %s's pending jobs", worker_name, extra={"process_name": self.name})
for job in self.connection.lrange("%s:jobs" % (worker_name, ), 0, -1):
yield (worker_name, job)
def get_all_queued_jobs(self):
return self.connection.lrange(queue_name.ALL_JOBS, 0, -1)

+ 3
- 0
qw/queue_name.py View File

@ -0,0 +1,3 @@
ALL_MANAGERS = "all:managers"
ALL_JOBS = "all:jobs"
JOB_PREFIX = "job:"

+ 100
- 0
qw/worker.py View File

@ -0,0 +1,100 @@
import logging
import multiprocessing
import time
import traceback
import socket
import os
from qw import queue_name
class Worker(multiprocessing.Process):
__slots__ = [
"connection", "exit", "log", "timeout", "manager_name"
]
def __init__(self, redis_connection, manager_name=None, timeout=10):
super(Worker, self).__init__()
self.connection = redis_connection
self.manager_name = manager_name or socket.gethostname()
self.exit = multiprocessing.Event()
self.timeout = timeout
self.log = logging.getLogger("qw.worker")
@property
def name(self):
return "%s.%s" % (self.manager_name, os.getpid())
@property
def job_queue(self):
return "%s:jobs" % (self.name, )
def _register(self):
self.log.info(
"registering worker %s under '%s:workers'" % (self.name, self.manager_name),
extra={"process_name": self.name}
)
# reigtser this process under the parent manager_name
self.connection.sadd("%s:workers" % (self.manager_name, ), self.name)
def _deregister(self):
self.log.info(
"deregistering worker %s from '%s:workers'" % (self.name, self.manager_name),
extra={"process_name": self.name}
)
self.connection.srem("%s:workers" % (self.manager_name, ), self.name)
def _run(self):
# try to fetch a previously unfinished job
# otherwise try to fetch from the main job pool
self.log.debug(
"polling for jobs from '%s' and '%s'" % (self.job_queue, queue_name.ALL_JOBS),
extra={"process_name": self.name}
)
job_id = (
self.connection.lpop(self.job_queue) or
self.connection.brpoplpush(queue_name.ALL_JOBS, self.job_queue, timeout=self.timeout)
)
if not job_id:
self.log.debug(
"no jobs found from '%s' or '%s'" % (self.job_queue, queue_name.ALL_JOBS),
extra={"process_name": self.name}
)
return
job_data = self.connection.hgetall("job:%s" % (job_id, ))
self.log.debug(
"processing job id (%s) data (%r)" % (job_id, job_data), extra={"process_name": self.name}
)
if job_data:
self._process(job_id, job_data)
self.log.debug("removing job id (%s)" % (job_id), extra={"process_name": self.name})
self.connection.delete("job:%s" % (job_id, ))
self.connection.lrem(self.job_queue, 1, job_id)
def _process(self, job_id, job_data):
print "%s - %s" % (job_id, job_data)
time.sleep(20)
print "done"
def run(self):
self.log.info("starting", extra={"process_name": self.name})
self._register()
while not self.exit.is_set():
try:
self._run()
except KeyboardInterrupt:
self.log.error("encountered a KeyboardInterrupt", extra={"process_name": self.name})
break
except Exception, e:
self.log.error("encountered an error (%r)" % (e, ), extra={"process_name": self.name})
traceback.print_exc()
self._deregister()
self.log.info("stopping", extra={"process_name": self.name})
def shutdown(self):
self.log.info("shutdown signal received", extra={"process_name": self.name})
self.exit.set()

+ 2
- 0
requirements.txt View File

@ -0,0 +1,2 @@
redis==2.10.3
docopt==0.6.2

+ 24
- 0
setup.py View File

@ -0,0 +1,24 @@
#!/usr/bin/env python
from setuptools import setup, findpackages
from qw import __version__
setup(
name="qw",
version=__version__,
description="Python Distributed Redis Queue Workers",
author="Brett Langdon",
author_email="brett@blangdon.com",
url="https://github.com/brettlangdon/qw",
packages=findpackages(),
license="MIT",
classifiers=[
"Intended Audience :: Developers",
"Programming Language :: Python",
"Programming Language :: Python :: 2.6",
"Programming Language :: Python :: 2.7",
"License :: OSI Approved :: MIT License",
"Topic :: Utilities",
]
)

Loading…
Cancel
Save