Python RQ(Redis Queue)添加gevent支持

栏目: 数据库 · 发布时间: 6年前

内容简介:python-rq简单好用,但缺点是,默认的实现是使用fork的模式,关于这点可以看python-rq源码分析 。所以我们要对他进行改造,每次执行任务,我们就使用一个coroutine。gevent的文档中这样写道:因此,我们在最上面就开始进行 monkey patch。此外,我把queue定义在了

python-rq简单好用,但缺点是,默认的实现是使用fork的模式,关于这点可以看python-rq源码分析 。

所以我们要对他进行改造,每次执行任务,我们就使用一个coroutine。gevent的文档中这样写道:

Patching should be done as early as possible in the lifecycle of the program.

因此,我们在最上面就开始进行 monkey patch。此外,我把queue定义在了 jobs/queue.py 里。直接上代码:

# worker.py
import gevent.monkey
gevent.monkey.patch_all()  # noqa

import logging
from rq.worker import (
    Worker,
    WorkerStatus,
)
import redis

from config import config
from jobs import (
    money_q,
    message_q,
)


class GeventWorker(Worker):
    def execute_job(self, job, queue):
        self.set_state(WorkerStatus.BUSY)
        self.log.debug("gonna spawn a greenlet to execute job %s from queue", job, queue)
        gevent.spawn(self.perform_job, job, queue).join()
        self.log.debug("job %s from queue %s executed", job, queue)
        self.set_state(WorkerStatus.IDLE)


def gevent_worker(queues):
    worker = GeventWorker(
        queues=queues,
        connection=redis.StrictRedis.from_url(config.WORKER_BROKER)
    )
    worker.work()


if __name__ == "__main__":
    gevent_worker([money_q, message_q])
# queue.py
from rq import Queue
import redis

from config import config

__conn = redis.StrictRedis.from_url(config.WORKER_BROKER)

money_q = Queue("money", connection=__conn)
message_q = Queue("message", connection=__conn)

解释一下实现原理:

首先阅读 rq 默认的worker实现,就会发现,所有的worker都有 execute_job 这个方法,因此我们继承 Worker 并且 重写这个方法,在我们的实现里,新起一个coroutine来执行相关代码。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

程序的法理

程序的法理

孙笑侠 / 商务印书馆 / 2005-11 / 21.00元

《程序的法理》基于法律形式化的理念而展开,着眼于程序的法理分析,力图从中国法治的本土特点出发,发掘程序法理论在中国语境下对应的实际问题,是一部学术价值较高的法学著作。一起来看看 《程序的法理》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

html转js在线工具
html转js在线工具

html转js在线工具