Browse Source

add target to manager/workers

pull/1/head
Brett Langdon 11 years ago
parent
commit
6c05f45043
3 changed files with 14 additions and 14 deletions
  1. +2
    -2
      bin/qw-manager
  2. +4
    -3
      qw/manager.py
  3. +8
    -9
      qw/worker.py

+ 2
- 2
bin/qw-manager View File

@ -12,7 +12,7 @@ from qw import __version__
__doc__ = """Usage:
qw-manager [--level=<log-level>] [--workers=<num-workers>] [--name=<name>] [--host=<host>] [--port=<port>] [--db=<db>]
qw-manager [--level=<log-level>] [--workers=<num-workers>] [--name=<name>] [--host=<host>] [--port=<port>] [--db=<db>] <target>
qw-manager (--help | --version)
Options:
@ -34,7 +34,7 @@ logging_setup.manager_logger.setLevel(log_level)
logging_setup.worker_logger.setLevel(log_level)
manager = Manager(
name=arguments["--name"], num_workers=arguments["--workers"],
arguments["<target>"], name=arguments["--name"], num_workers=arguments["--workers"],
host=arguments["--host"], port=arguments["--port"], db=arguments["--db"]
)


+ 4
- 3
qw/manager.py View File

@ -8,14 +8,15 @@ from qw.worker import Worker
class Manager(object):
__slots__ = ["workers", "client", "num_workers", "log", "name"]
__slots__ = ["workers", "client", "num_workers", "log", "name", "target"]
def __init__(self, host="localhost", port=6379, db=0, num_workers=None, name=None):
def __init__(self, target, host="localhost", port=6379, db=0, num_workers=None, name=None):
self.workers = []
self.num_workers = num_workers or multiprocessing.cpu_count()
self.client = Client(host=host, port=port, db=db)
self.log = logging.getLogger("qw.manager")
self.name = name or socket.gethostname()
self.target = target
def start(self):
self.log.info("starting", extra={"process_name": self.name})
@ -24,7 +25,7 @@ class Manager(object):
self.log.info("starting %s workers", self.num_workers, extra={"process_name": self.name})
for _ in xrange(self.num_workers):
worker = Worker(self.client, manager_name=self.name)
worker = Worker(self.client, self.target, manager_name=self.name)
worker.start()
self.workers.append(worker)
self.log.info("registering %s", self.name, extra={"process_name": self.name})


+ 8
- 9
qw/worker.py View File

@ -1,23 +1,25 @@
import logging
import multiprocessing
import time
import traceback
import socket
import os
from qw.utils import dynamic_import
class Worker(multiprocessing.Process):
__slots__ = [
"client", "exit", "log", "timeout", "manager_name"
"client", "exit", "log", "timeout", "manager_name", "target"
]
def __init__(self, client, manager_name=None, timeout=10):
def __init__(self, client, target, manager_name=None, timeout=10):
super(Worker, self).__init__()
self.client = client
self.manager_name = manager_name or socket.gethostname()
self.exit = multiprocessing.Event()
self.timeout = timeout
self.log = logging.getLogger("qw.worker")
self.target = target
@property
def name(self):
@ -53,18 +55,15 @@ class Worker(multiprocessing.Process):
"processing job id (%s) data (%r)" % (job_id, job_data), extra={"process_name": self.name}
)
if job_data:
self._process(job_id, job_data)
self.target(job_id, job_data)
self.log.debug("removing job id (%s)" % (job_id), extra={"process_name": self.name})
self.client.finish_job(job_id, self.name)
def _process(self, job_id, job_data):
print "%s - %s" % (job_id, job_data)
time.sleep(20)
print "done"
def run(self):
self.log.info("starting", extra={"process_name": self.name})
if isinstance(self.target, basestring):
self.target = dynamic_import(self.target)
self._register()
while not self.exit.is_set():
try:


Loading…
Cancel
Save