From bb0fb87840e00c57edca25e92c1de0b99fe39864 Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Tue, 14 Oct 2014 09:12:57 -0400 Subject: [PATCH] add article 'python redis queue workers' --- .../about/python-redis-queue-workers/index.md | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 contents/writing/about/python-redis-queue-workers/index.md diff --git a/contents/writing/about/python-redis-queue-workers/index.md b/contents/writing/about/python-redis-queue-workers/index.md new file mode 100644 index 0000000..0f79a49 --- /dev/null +++ b/contents/writing/about/python-redis-queue-workers/index.md @@ -0,0 +1,89 @@ +--- +title: Python Redis Queue Workers +author: Brett Langdon +date: 2014-10-14 +template: article.jade +--- + +Learn an easy, distributed approach to processing jobs +from a Redis queue in Python. + +--- + +Recently I started thinking about a new project. I want to write my own Continuous Integration (CI) +server. I know what you are thinking... "Why?!" and yes I agree, there are a bunch of good ones out +there now, I just want to do it. The first problem I came across was how to have distributed workers +to process the incoming builds for the CI server. I wanted something that was easy to start up on +multiple machines and that needed minimal configuration to get going. + +The design is relatively simple, there is a main queue which jobs can be pulled from and a second queue +that each worker process pulls jobs into to denote processing. The main queue is meant as a list of things that +have to be processed where the processing queues is a list of pending jobs which are being processed by the +workers. For this example we will be using [Redis lists](http://redis.io/commands#list) since they support +the short feature list we require. + +### worker.py +Lets start with the worker process, the job of the worker is to simply grab a job from the queue and process it. + +```python +import redis + + +def process(job_id, job_data): + print "Processing job id(%s) with data (%r)" % (job_id, job_data) + + +def main(client, processing_queue, all_queue): + while True: + # try to fetch a job id from ":jobs" and push it to ":jobs" + job_id = client.brpoplpush(all_queue, processing_queue) + if not job_id: + continue + # fetch the job data + job_data = client.hgetall("job:%s" % (job_id, )) + # process the job + process(job_id, job_data) + # cleanup the job information from redis + client.delete("job:%s" % (job_id, )) + client.lrem(process_queue, 1, job_id) + + +if __name__ == "__main__": + import socket + import os + + client = redis.StrictRedis() + try: + main(client, "processing:jobs", "all:jobs") + except KeyboardInterrupt: + pass +``` + +The above script does the following: +1. Try to fetch a job from the queue `all:jobs` pushing it to `processing:jobs` +2. Fetch the job data from a [hash](http://redis.io/commands#hash) key with the name `job:` +3. Process the job information +4. Remove the hash key `job:` +5. Remove the job id from the queue `processing:jobs` + +With this design we will always be able to determine how many jobs are currently queued for process +by looking at the list `all:jobs` and we will also know exactly how many jobs are being processed +by looking at the list `processing:jobs` which contains the list of job ids that all workers are +working on. + +Also we are not tied down to running just 1 worker on 1 machine. With this design we can run multiple +worker processes on as many nodes as we want. As long as they all have access to the same Redis server. +There are a few limitations which are all seeded in Redis' [limits on lists](http://redis.io/topics/data-types), +but this should be good enough to get started. + +There are a few other approaches that can be taken here as well. Instead of using a single processing queue +we could use a separate queue for each worker. Then we can look at which jobs are currently being processed +by each individual worker, this approach would also give us the opportunity to have the workers try to fetch +from the worker specific queue first before looking at `all:jobs` so we can either assign jobs to specific +workers or where the worker can recover from failed processing by starting with the last job it was working +on before failing. + +## qw +I have developed the library [qw](https://github.com/brettlangdon/qw) or (QueueWorker) to implement a similar +pattern to this, so if you are interested in playing around with this or to see a more developed implementation +please checkout the projects [github page](https://github.com/brettlangdon/qw) for more information.