qw (QueueWorker) - python library for processing a redis list as a work queue
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

1.9 KiB

qw

qw (or QueueWorker) is used to run worker processes which listen on a redis list for jobs to process.

Setup

pip

pip install qw

git

git clone git://github.com/brettlangdon/qw.git
cd ./qw
python setup.py install

Design

Manager

The manager is simply a process manager. It's job is to start/stop worker sub-processes.

Worker

The workers are processes which sit and listen for jobs on a few queues and then process those jobs.

Target

The worker/manager take a target which can be either a function or a string (importable function).

def target(job_id, job_data):
    pass

manager = Manager(target)
# OR
manager = Manager('__main__.target')

Queues

There are a few different queues that are used. The job queues are just redis lists, manager/worker lists are sets and jobs are hashes.

  • "all:managers" - a set of all managers
  • "all:jobs" - a queue that all workers can pull jobs from, the values are just the job ids
  • "job:<job_id>" - a hash of the job data
  • ":workers" - a set of all workers belonging to a given manager
  • ":jobs" - a queue of jobs for a specific manager, workers will try to pull from here before all:jobs, the values are just the job ids
  • ":jobs" - a queue of jobs for a specific worker, this is meant as a in progress queue for each worker, the workers will pull jobs into this queue from either <manager>:jobs or all:jobs, the values are just the job ids

Basic Usage

from qw.manager import Manager


def job_printer(job_id, job_data):
    print job_id
    print job_data


manager = Manager(job_printer)
manager.start()
manager.join()

API

Manager(object)

  • __init__(self, target, host="localhost", port=6379, db=0, num_workers=None, name=None)
  • start(self)
  • stop(self)
  • join(self)

Worker(multiprocess.Process)

  • __init__(self, client, target, manager_name=None, timeout=10)
  • run(self)
  • shutdown(self)