Django&Celery是怎么玩的?

栏目: Python · 发布时间: 5年前

内容简介:celery是python的一套异步任务处理框架,可以让我们聚焦在编写业务逻辑,而不是底层异步实现。celery为django框架提供了专用的集成方法,实施过程只需要根据官方手册即可搞定。下面,我将总结我实施Django+Celery过程中的经验和思路,帮助大家理清脉络,快速上手。

celery是 python 的一套异步任务处理框架,可以让我们聚焦在编写业务逻辑,而不是底层异步实现。

celery为django框架提供了专用的集成方法,实施过程只需要根据官方手册即可搞定。

下面,我将总结我实施Django+Celery过程中的经验和思路,帮助大家理清脉络,快速上手。

安装Celery

依赖的版本是:celery==4.3.0。

pip install celery

后续所有配置流程,均基于官方手册的指导: https://docs.celeryproject.org/en/latest/django/

创建celery.py

在settings.py同级目录下创建celery.py,它是celery worker(异步处理进程)的入口:

import os
from celery import Celery
 
# 该环境变量配置为该Django项目的settings模块,这样celery会进一步加载app.settings模块来获取其中的配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'apps.settings')
 
# 创建一个celery对象,随意起一个名字
app = Celery('push2')
# 令该对象从django的settings.py加载CELERY开头的配置项
app.config_from_object('django.conf:settings', namespace='CELERY') 
# 扫描所有django app下面的tasks.py文件,注册里面所有的异步任务(就是一些函数)
app.autodiscover_tasks()  

后续我们会用celery worker程序拉起这个入口文件,该文件初始化好一个叫做app的celery对象即可。

celery worker是celery提供的一个python程序,它的作用是消费rabbitmq中的任务,并执行这些任务对应的函数。

创建tasks.py

我们可以在项目下的任意app内,创建一个tasks.py文件,上述的autodiscover_tasks()就是遍历django中注册的所有app,并在app目录下找到tasks.py模块,然后遍历模块中的所有异步任务函数。

我们的异步任务逻辑就写在tasks.py中,例如我在demo应用中创建的tasks.py实现了一个异步任务add:

# -*- coding: utf-8 -*-
 
# Create your tasks here
from celery import shared_task
import time
 
 
@shared_task
def add(x, y):
    time.sleep(60)  # 模拟执行60秒
    return x + y

该任务接受2个参数,模拟执行了60秒,最后返回一个结果,当然这个结果通常来说是没用的,因为这是一个异步的任务(Celery提供了result扩展,可以把结果放在 redis 之类的存储里,前台可以来获取,这个不常用,大家自己学习)。

异步任务需要用shared_task装饰器包装,这样celery在扫描函数的时候才知道这是一个异步任务,而不是一个普通函数。

生成异步任务

当我们在tasks.py中实现了异步任务的处理逻辑后,我们就需要想一下该如何投递一个任务呢?

因为我们的add方法经过了shared_task包装,因此呢它不再是一个普通函数,它还提供了很多额外的属性。

# CELERY示例
def celery(request):
    # 提交异步任务
    demo_tasks.add.delay(5, 6)
    return json_response()

这是一个views.py中的前台接口,它调用了tasks.py中的add方法的delay方法。

我们知道python中一切皆是对象,所以add function也可以拥有delay属性,通过调用delay属性就可以投递一个异步任务到rabbitmq中。

注意,这里的5,6是add任务的传参,它会一起被打包到消息中,一起放入rabbitmq中。

配置celery

OK,我们会投递任务,也写好了任务逻辑,同时worker的入口程序也写好了。

接下来的问题是:任务序列化后该投递到哪里。

这就需要配置一下celery了,我们在celery worker的入口文件中,令celery app对象加载了django的settings.py中CELERY开头的配置项,因此我们就去编辑django的settings.py:

# Celery配置
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_TASK_ROUTES = {  # 任务路由, 不同类型任务可以申请不同的队列进行隔离
    '*': {  # 当前全部路由到一个队列
        'queue': 'my-common-queue',
    }
}
CELERY_TASK_ACKS_LATE = True    # 在任务执行完成后ACK
CELERY_TASK_ACKS_ON_FAILURE_OR_TIMEOUT = False  # 任务失败不自动ACK
CELERY_BROKER_CONNECTION_MAX_RETRIES = 0 # worker无限重连rabbitmq
 
# Celery配置
CELERY_BROKER_URL = 'amqp://my_rabbitmq01:user@host/vhost'
CELERY_TASK_TIME_LIMIT = 300 # 任务最长运行时间
CELERY_TASK_SOFT_TIME_LIMIT = 290 # 在290秒的时候抛出异常
CELERY_WORKER_CONCURRENCY = 64 # worker可以同时执行的子进程数量
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # 每个worker最多取256 * 1个任务,分配给256个子进程每人1个
CELERY_WORKER_MAX_TASKS_PER_CHILD = 10 # 处理10个任务退出1次

我用到了上述的配置项,通常我们也不需要更复杂的配置了。

关键在于:

  • rabbitmq地址是什么?
  • worker要启动多少个子进程并发处理任务,等等…
  • 要求worker必须在任务成功后才向rabbitmq做ack
  • 生产者投递的任务要进入哪个rabbitmq队列(上述例子是所有类型的任务都进入my-common-queue队列,具体配置方法自行学习)

启动worker

任务投递到rabbitmq后,我们就差启动worker了。

我们需要进入到django的项目根目录(包含manage.py的目录),然后启动celery worker:

celery worker -A'apps' -l INFO -Q'my-common-queue'

这里apps是包含celery.py以及settings.py的包名,这样celery会在apps包下加载celery.py文件作为入口。

-Q指定worker消费rabbitmq的哪个队列来获取任务,可以传多个。

我们可以想象celery的实现原理:生产者在调用add.delay的时候,会将异步函数的完整包名以及传参序列化下来,投递到rabbitmq;worker将项目根目录加入到sys.path中,在worker端取出消息后,可以再次import引入异步函数的模块,找到函数,然后传参执行该函数。

celery worker的架构默认是父子进程模式,父进程与rabbitmq通讯以及保持心跳,子进程执行单个任务,同时可以有很多子进程并发执行任务,任务可以执行很长的时间,只要不超过我们在settings.py中定义的超时时间就不会被杀死。

其他

遇到的另外一个问题,就是celery worker命令放到systemd中的时候,发现无法取到任务,很是奇怪。

官方给的解决方法是利用celery multi命令来拉起celery worker,直接按照官方的方法去做就好了,我就不贴示例了: http://docs.celeryproject.org/en/latest/userguide/daemonizing.html

博主无私的分享着知识,你愿意送他一顿热腾腾的早餐吗?

Django&Celery是怎么玩的?

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Design systems

Design systems

Not all design systems are equally effective. Some can generate coherent user experiences, others produce confusing patchwork designs. Some inspire teams to contribute to them, others are neglected. S......一起来看看 《Design systems》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具