内容简介:celery是python的一套异步任务处理框架,可以让我们聚焦在编写业务逻辑,而不是底层异步实现。celery为django框架提供了专用的集成方法,实施过程只需要根据官方手册即可搞定。下面,我将总结我实施Django+Celery过程中的经验和思路,帮助大家理清脉络,快速上手。
celery是 python 的一套异步任务处理框架,可以让我们聚焦在编写业务逻辑,而不是底层异步实现。
celery为django框架提供了专用的集成方法,实施过程只需要根据官方手册即可搞定。
下面,我将总结我实施Django+Celery过程中的经验和思路,帮助大家理清脉络,快速上手。
安装Celery
依赖的版本是:celery==4.3.0。
pip install celery
后续所有配置流程,均基于官方手册的指导: https://docs.celeryproject.org/en/latest/django/ 。
创建celery.py
在settings.py同级目录下创建celery.py,它是celery worker(异步处理进程)的入口:
import os from celery import Celery # 该环境变量配置为该Django项目的settings模块,这样celery会进一步加载app.settings模块来获取其中的配置 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'apps.settings') # 创建一个celery对象,随意起一个名字 app = Celery('push2') # 令该对象从django的settings.py加载CELERY开头的配置项 app.config_from_object('django.conf:settings', namespace='CELERY') # 扫描所有django app下面的tasks.py文件,注册里面所有的异步任务(就是一些函数) app.autodiscover_tasks()
后续我们会用celery worker程序拉起这个入口文件,该文件初始化好一个叫做app的celery对象即可。
celery worker是celery提供的一个python程序,它的作用是消费rabbitmq中的任务,并执行这些任务对应的函数。
创建tasks.py
我们可以在项目下的任意app内,创建一个tasks.py文件,上述的autodiscover_tasks()就是遍历django中注册的所有app,并在app目录下找到tasks.py模块,然后遍历模块中的所有异步任务函数。
我们的异步任务逻辑就写在tasks.py中,例如我在demo应用中创建的tasks.py实现了一个异步任务add:
# -*- coding: utf-8 -*- # Create your tasks here from celery import shared_task import time @shared_task def add(x, y): time.sleep(60) # 模拟执行60秒 return x + y
该任务接受2个参数,模拟执行了60秒,最后返回一个结果,当然这个结果通常来说是没用的,因为这是一个异步的任务(Celery提供了result扩展,可以把结果放在 redis 之类的存储里,前台可以来获取,这个不常用,大家自己学习)。
异步任务需要用shared_task装饰器包装,这样celery在扫描函数的时候才知道这是一个异步任务,而不是一个普通函数。
生成异步任务
当我们在tasks.py中实现了异步任务的处理逻辑后,我们就需要想一下该如何投递一个任务呢?
因为我们的add方法经过了shared_task包装,因此呢它不再是一个普通函数,它还提供了很多额外的属性。
# CELERY示例 def celery(request): # 提交异步任务 demo_tasks.add.delay(5, 6) return json_response()
这是一个views.py中的前台接口,它调用了tasks.py中的add方法的delay方法。
我们知道python中一切皆是对象,所以add function也可以拥有delay属性,通过调用delay属性就可以投递一个异步任务到rabbitmq中。
注意,这里的5,6是add任务的传参,它会一起被打包到消息中,一起放入rabbitmq中。
配置celery
OK,我们会投递任务,也写好了任务逻辑,同时worker的入口程序也写好了。
接下来的问题是:任务序列化后该投递到哪里。
这就需要配置一下celery了,我们在celery worker的入口文件中,令celery app对象加载了django的settings.py中CELERY开头的配置项,因此我们就去编辑django的settings.py:
# Celery配置 CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_TASK_ROUTES = { # 任务路由, 不同类型任务可以申请不同的队列进行隔离 '*': { # 当前全部路由到一个队列 'queue': 'my-common-queue', } } CELERY_TASK_ACKS_LATE = True # 在任务执行完成后ACK CELERY_TASK_ACKS_ON_FAILURE_OR_TIMEOUT = False # 任务失败不自动ACK CELERY_BROKER_CONNECTION_MAX_RETRIES = 0 # worker无限重连rabbitmq # Celery配置 CELERY_BROKER_URL = 'amqp://my_rabbitmq01:user@host/vhost' CELERY_TASK_TIME_LIMIT = 300 # 任务最长运行时间 CELERY_TASK_SOFT_TIME_LIMIT = 290 # 在290秒的时候抛出异常 CELERY_WORKER_CONCURRENCY = 64 # worker可以同时执行的子进程数量 CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # 每个worker最多取256 * 1个任务,分配给256个子进程每人1个 CELERY_WORKER_MAX_TASKS_PER_CHILD = 10 # 处理10个任务退出1次
我用到了上述的配置项,通常我们也不需要更复杂的配置了。
关键在于:
- rabbitmq地址是什么?
- worker要启动多少个子进程并发处理任务,等等…
- 要求worker必须在任务成功后才向rabbitmq做ack
- 生产者投递的任务要进入哪个rabbitmq队列(上述例子是所有类型的任务都进入my-common-queue队列,具体配置方法自行学习)
启动worker
任务投递到rabbitmq后,我们就差启动worker了。
我们需要进入到django的项目根目录(包含manage.py的目录),然后启动celery worker:
celery worker -A'apps' -l INFO -Q'my-common-queue'
这里apps是包含celery.py以及settings.py的包名,这样celery会在apps包下加载celery.py文件作为入口。
-Q指定worker消费rabbitmq的哪个队列来获取任务,可以传多个。
我们可以想象celery的实现原理:生产者在调用add.delay的时候,会将异步函数的完整包名以及传参序列化下来,投递到rabbitmq;worker将项目根目录加入到sys.path中,在worker端取出消息后,可以再次import引入异步函数的模块,找到函数,然后传参执行该函数。
celery worker的架构默认是父子进程模式,父进程与rabbitmq通讯以及保持心跳,子进程执行单个任务,同时可以有很多子进程并发执行任务,任务可以执行很长的时间,只要不超过我们在settings.py中定义的超时时间就不会被杀死。
其他
遇到的另外一个问题,就是celery worker命令放到systemd中的时候,发现无法取到任务,很是奇怪。
官方给的解决方法是利用celery multi命令来拉起celery worker,直接按照官方的方法去做就好了,我就不贴示例了: http://docs.celeryproject.org/en/latest/userguide/daemonizing.html
博主无私的分享着知识,你愿意送他一顿热腾腾的早餐吗?
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
CSS 压缩/解压工具
在线压缩/解压 CSS 代码
XML、JSON 在线转换
在线XML、JSON转换工具