qw (QueueWorker) - python library for processing a redis list as a work queue
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

60 lines
2.2 KiB

import logging
import multiprocessing
import socket
from qw.client import Client
from qw.exception import AlreadyStartedException, NotStartedException
from qw.worker import Worker
class Manager(object):
__slots__ = ["workers", "client", "num_workers", "log", "name", "target"]
def __init__(self, target, host="localhost", port=6379, db=0, num_workers=None, name=None):
self.workers = []
self.num_workers = num_workers or multiprocessing.cpu_count()
self.client = Client(host=host, port=port, db=db)
self.log = logging.getLogger("qw.manager")
self.name = name or socket.gethostname()
self.target = target
def start(self):
self.log.info("starting", extra={"process_name": self.name})
if self.workers:
raise AlreadyStartedException("Workers Already Started")
self.log.info("starting %s workers", self.num_workers, extra={"process_name": self.name})
for _ in xrange(self.num_workers):
worker = Worker(self.client, self.target, manager_name=self.name)
worker.start()
self.workers.append(worker)
self.log.info("registering %s", self.name, extra={"process_name": self.name})
self.client.register_manager(self.name)
def join(self):
self.log.debug("joining workers", extra={"process_name": self.name})
if not self.workers:
raise NotStartedException("Workers Do Not Exist")
# wait for them all to stop
for worker in self.workers:
worker.join()
# make sure to clear out workers list
self.workers = []
def stop(self):
self.log.info("stopping", extra={"process_name": self.name})
if not self.workers:
raise NotStartedException("Workers Do Not Exist")
self.log.info("deregistering %s", self.name, extra={"process_name": self.name})
self.client.deregister_manager(self.name)
self.log.info("shutting down %s workers", len(self.workers), extra={"process_name": self.name})
# trigger them all to shutdown
for worker in self.workers:
worker.shutdown()
self.join()
self.log.info("stopped", extra={"process_name": self.name})