|
|
|
@ -2,7 +2,6 @@ import logging |
|
|
|
import multiprocessing |
|
|
|
import socket |
|
|
|
|
|
|
|
from qw import queue_name |
|
|
|
from qw.client import Client |
|
|
|
from qw.exception import AlreadyStartedException, NotStartedException |
|
|
|
from qw.worker import Worker |
|
|
|
@ -25,10 +24,10 @@ class Manager(object): |
|
|
|
|
|
|
|
self.log.info("starting %s workers", self.num_workers, extra={"process_name": self.name}) |
|
|
|
for _ in xrange(self.num_workers): |
|
|
|
worker = Worker(self.connection, manager_name=self.name) |
|
|
|
worker = Worker(self.client, manager_name=self.name) |
|
|
|
worker.start() |
|
|
|
self.workers.append(worker) |
|
|
|
self.log.info("registering %s under %s", self.name, queue_name.ALL_MANAGERS, extra={"process_name": self.name}) |
|
|
|
self.log.info("registering %s", self.name, extra={"process_name": self.name}) |
|
|
|
self.client.register_manager(self.name) |
|
|
|
|
|
|
|
def join(self): |
|
|
|
@ -48,7 +47,7 @@ class Manager(object): |
|
|
|
if not self.workers: |
|
|
|
raise NotStartedException("Workers Do Not Exist") |
|
|
|
|
|
|
|
self.log.info("deregistering %s from %s", self.name, queue_name.ALL_MANAGERS, extra={"process_name": self.name}) |
|
|
|
self.log.info("deregistering %s", self.name, extra={"process_name": self.name}) |
|
|
|
self.client.deregister_manager(self.name) |
|
|
|
|
|
|
|
self.log.info("shutting down %s workers", len(self.workers), extra={"process_name": self.name}) |
|
|
|
|