jmhobbs

Delayed Queues for RQ

I really like RQ. It's got a sensible API and it's built for Python, something I can't say about Resque, pyres is fine, but it's not the same.

My one beef with RQ is that I can't delay a job for an arbitrary amount of time. You queue it up and it runs. To get around that, I built a simple delayed queue system for RQ, using the same redis backend.

My delayed queues leverage sorted sets to store and select jobs. We put the job in with it's earliest run timestamp as the score, then we have a cron job or daemon that pulls them out when they are ready and pushes them over to RQ. Simple enough!

Here's the really relevant code, everything else is trimming.

Delaying Jobs

    def delay(self, queue, job, seconds, *args, **kwargs):
        '''Delay a queue job by a number of seconds.'''
        self.redis.zadd('queue:delayed', pickle.dumps({'job': job, 'queue': queue, 'args': args, 'kwargs': kwargs, 'id': uuid.uuid1().hex}), self._now() + seconds)

Waking Jobs

    def enqueue_delayed_jobs(self, now=None):
        '''Enqueue and clear out ready delayed jobs.'''
        if not now:
            now = self._now()
 
        jobs = self.redis.zrangebyscore('queue:delayed', 0, now)
 
        for pickled_job in jobs:
            job = pickle.loads(pickled_job)
            Queue(job['queue'], connection=self.redis).enqueue(job['job'], *job['args'], **job['kwargs'])
            self.redis.zrem('queue:delayed', pickled_job)
 
        return len(jobs)

You can run this at a granularity as low as one second the way it is written, and I'm sure you could go tighter if you wanted. We run it at a minute granularity, since our jobs are not highly time sensitive.

And because redis is atomic, you can run enqueue daemons/cron jobs on multiple machines, and everything should work fine, which is great for availability.

Grab the code and examples here if you are interested, https://gist.github.com/jmhobbs/5358101.