diff --git a/bin/qw-manager b/bin/qw-manager index f9cdf84..d4128ab 100755 --- a/bin/qw-manager +++ b/bin/qw-manager @@ -12,7 +12,7 @@ from qw import __version__ __doc__ = """Usage: - qw-manager [--level=] [--workers=] [--name=] [--host=] [--port=] [--db=] + qw-manager [--level=] [--workers=] [--name=] [--host=] [--port=] [--db=] qw-manager (--help | --version) Options: @@ -34,7 +34,7 @@ logging_setup.manager_logger.setLevel(log_level) logging_setup.worker_logger.setLevel(log_level) manager = Manager( - name=arguments["--name"], num_workers=arguments["--workers"], + arguments[""], name=arguments["--name"], num_workers=arguments["--workers"], host=arguments["--host"], port=arguments["--port"], db=arguments["--db"] ) diff --git a/qw/manager.py b/qw/manager.py index 37b8e60..b5c254a 100644 --- a/qw/manager.py +++ b/qw/manager.py @@ -8,14 +8,15 @@ from qw.worker import Worker class Manager(object): - __slots__ = ["workers", "client", "num_workers", "log", "name"] + __slots__ = ["workers", "client", "num_workers", "log", "name", "target"] - def __init__(self, host="localhost", port=6379, db=0, num_workers=None, name=None): + def __init__(self, target, host="localhost", port=6379, db=0, num_workers=None, name=None): self.workers = [] self.num_workers = num_workers or multiprocessing.cpu_count() self.client = Client(host=host, port=port, db=db) self.log = logging.getLogger("qw.manager") self.name = name or socket.gethostname() + self.target = target def start(self): self.log.info("starting", extra={"process_name": self.name}) @@ -24,7 +25,7 @@ class Manager(object): self.log.info("starting %s workers", self.num_workers, extra={"process_name": self.name}) for _ in xrange(self.num_workers): - worker = Worker(self.client, manager_name=self.name) + worker = Worker(self.client, self.target, manager_name=self.name) worker.start() self.workers.append(worker) self.log.info("registering %s", self.name, extra={"process_name": self.name}) diff --git a/qw/worker.py b/qw/worker.py index acf1e78..1ee08b6 100644 --- a/qw/worker.py +++ b/qw/worker.py @@ -1,23 +1,25 @@ import logging import multiprocessing -import time import traceback import socket import os +from qw.utils import dynamic_import + class Worker(multiprocessing.Process): __slots__ = [ - "client", "exit", "log", "timeout", "manager_name" + "client", "exit", "log", "timeout", "manager_name", "target" ] - def __init__(self, client, manager_name=None, timeout=10): + def __init__(self, client, target, manager_name=None, timeout=10): super(Worker, self).__init__() self.client = client self.manager_name = manager_name or socket.gethostname() self.exit = multiprocessing.Event() self.timeout = timeout self.log = logging.getLogger("qw.worker") + self.target = target @property def name(self): @@ -53,18 +55,15 @@ class Worker(multiprocessing.Process): "processing job id (%s) data (%r)" % (job_id, job_data), extra={"process_name": self.name} ) if job_data: - self._process(job_id, job_data) + self.target(job_id, job_data) self.log.debug("removing job id (%s)" % (job_id), extra={"process_name": self.name}) self.client.finish_job(job_id, self.name) - def _process(self, job_id, job_data): - print "%s - %s" % (job_id, job_data) - time.sleep(20) - print "done" - def run(self): self.log.info("starting", extra={"process_name": self.name}) + if isinstance(self.target, basestring): + self.target = dynamic_import(self.target) self._register() while not self.exit.is_set(): try: