内容简介:说白了,它是一个分布式队列的管理工具,我们可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列。执行完毕后结果存储在redis中,查看redis中的数据,发现存在一个string类型的键值对 celery-task-meta-064e4262-e1ba-4e87-b4a1-52dd1418188f:data 该键值对的失效时间为24小时链式任务就是异步或者定时执行的任务由多个子任务执行完成
-
1.1 Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具(它本身不是一个任务队列, 它是任务队列管理的工具, 它提供的接口可以帮助我们实现分布式任务队列)。
-
1.2 Celery专注于实时任务处理,支持任务调度(跟rabbitMQ可实现多种exchange。)
说白了,它是一个分布式队列的管理工具,我们可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列。
-
1.3 Celery 架构
-
消息中间件(message broker)(邮箱, 邮局): 本身不提供消息服务,可以和第三方消息中间件集成,常用的有 redis mongodb rabbitMQ
-
任务执行单元(worker)(寄件人): 是Celery提供的任务执行单元, worker并发的运行在分布式的系统节点中
-
任务执行结果存储(task result store)(收件人):用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括Redis,MongoDB,Django ORM,AMQP等
-
2. 任务队列和消息队列
-
任务队列是一种在线或机器分发任务的机制
-
消息队列输入是工作的一个单元, 可以认为是一个任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。
-
图解
2.简单示例
2.1 创建一个celery实例 创建tasks.py文件
import time from celery import Celery app = Celery('tasks', broker='redis:////127.0.0.1:6379/6', backend='redis:////127.0.0.1:6379/7') @app.task def add(x, y): time.sleep(10) return x + y 复制代码
ps: tasks为任务名称 设置reids为中间件
2.2 创建一个index.py文件调用并且检测任务、查看任务执行状态
#!/usr/bin/env python # -*- coding:utf-8 -*- from tasks import add, app from celery.result import AsyncResult import time # 立即告知celery去执行add任务,并传入两个参数 result = add.delay(4, 4) print(result.id) async = AsyncResult(id=result.id, app=app) time.sleep(3) if async.successful(): result = async.get() print(result, "执行成功") # result.forget() # 将结果删除 elif async.failed(): print('执行失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行') 复制代码
- ps 如果使用 redis 作为任务队列中间人,在redis中存在两个键 celery 和 _kombu.binding.celery , _kombu.binding.celery 表示有一名为 celery 的任务队列(Celery 默认),而 celery为默认队列中的任务列表,使用list类型,可以看看添加进去的任务数据。
2.3 执行命令详解
- celery -A app.celery_tasks.celery worker -Q queue --loglevel=info
-
A参数指定celery对象的位置,该app.celery_tasks.celery指的 是app包下面的celery_tasks.py模块的celery实例,注意一定是初始化后的实例,
-
Q参数指的是该worker接收指定的队列的任务,这是为了当多个队列有不同的任务时可以独立;如果不设会接收所有的队列的任务;
-
l参数指定worker的日志级别;
-
执行完毕后结果存储在redis中,查看redis中的数据,发现存在一个string类型的键值对 celery-task-meta-064e4262-e1ba-4e87-b4a1-52dd1418188f:data 该键值对的失效时间为24小时
2.4 消息主体分析
- body : 是序列化后使用base64编码的信息,包括具体的任务参数,其中包括了需要执行的方法、参数和一些任务基本信息
- content-encoding : 序列化数据编码方式
- content-type : 任务数据的序列化方式,默认使用python内置的序列化模块pickle(ps: pickle模块支持的类型 所有python支持的原生类型:布尔值,整数,浮点数,复数,字符串,字节,None。 由任何原生类型组成的列表,元组,字典和集合。 函数,类,类的实例, 常用的方法:dumps,dump,loads,load)
{ "body": "gAJ9cQAoWAQAAAB0YXNrcQFYCQAAAHRhc2tzLmFkZHECWAIAAABpZHEDWCQAAABjNDMwMzZkMi03Yzc3LTQ0MDUtOTYwNC1iZDc3ZTcyNzNlN2FxBFgEAAAAYXJnc3EFSwRLBIZxBlgGAAAAa3dhcmdzcQd9cQhYBwAAAHJldHJpZXNxCUsAWAMAAABldGFxCk5YBwAAAGV4cGlyZXNxC05YAwAAAHV0Y3EMiFgJAAAAY2FsbGJhY2tzcQ1OWAgAAABlcnJiYWNrc3EOTlgJAAAAdGltZWxpbWl0cQ9OToZxEFgHAAAAdGFza3NldHERTlgFAAAAY2hvcmRxEk51Lg==", "content-encoding": "binary", "content-type": "application/x-python-serialize", "headers": {}, "properties": { "reply_to": "caa78c3a-618a-31f0-84a9-b79db708af02", "correlation_id": "c43036d2-7c77-4405-9604-bd77e7273e7a", "delivery_mode": 2, "delivery_info": { "priority": 0, "exchange": "celery", "routing_key": "celery" }, "body_encoding": "base64", "delivery_tag": "e7e288b5-ecbb-4ec6-912c-f42eb92dbd72" } } 复制代码
2.5Celery配置
CELERY_DEFAULT_QUEUE:默认队列 BROKER_URL : 代理人的网址 CELERY_RESULT_BACKEND:结果存储地址 CELERY_TASK_SERIALIZER:任务序列化方式 CELERY_RESULT_SERIALIZER:任务执行结果序列化方式 CELERY_TASK_RESULT_EXPIRES:任务过期时间 CELERY_ACCEPT_CONTENT:指定任务接受的内容序列化类型(序列化),一个列表; 复制代码
2.6获取执行任务执行结果的方法
r = func.delay(...) r.ready() # 查看任务状态,返回布尔值, 任务执行完成, 返回 True, 否则返回 False. r.wait() # 等待任务完成, 返回任务执行结果,很少使用; r.get(timeout=1) # 获取任务执行结果,可以设置等待时间 r.result # 任务执行结果. r.state # PENDING, START, SUCCESS,任务当前的状态 r.status # PENDING, START, SUCCESS,任务当前的状态 r.successful # 任务成功返回true r.traceback # 如果任务抛出了一个异常,你也可以获取原始的回溯信息 复制代码
2.7celery的装饰方法celery.task
- task()把任务(函数)装饰成异步
@celery.task() def func(): # do something pass 复制代码
- 可以重新定义任务的基类
class MyTask(celery.Task): # 任务失败时执行 def on_failure(self, exc, task_id, args, kwargs, einfo): print('{0!r} failed: {1!r}'.format(task_id, exc)) # 任务成功时执行 def on_success(self, retval, task_id, args, kwargs): pass # 任务重试时执行 def on_retry(self, exc, task_id, args, kwargs, einfo): pass 复制代码
参数
- task_id : 任务id
- einfo:执行失败时任务详情
- exc: 失败时的错误类型
- retval: 任务成功时返回的执行结果
2.8 一份完整的配置文件
# 注意,celery4版本后,CELERY_BROKER_URL改为BROKER_URL BROKER_URL = 'amqp://username:passwd@host:port/虚拟主机名' # 指定结果的接受地址 CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db' # 指定任务序列化方式 CELERY_TASK_SERIALIZER = 'msgpack' # 指定结果序列化方式 CELERY_RESULT_SERIALIZER = 'msgpack' # 任务过期时间,celery任务执行结果的超时时间 CELERY_TASK_RESULT_EXPIRES = 60 * 20 # 指定任务接受的序列化类型. CELERY_ACCEPT_CONTENT = ["msgpack"] # 任务发送完成是否需要确认,这一项对性能有一点影响 CELERY_ACKS_LATE = True # 压缩方案选择,可以是zlib, bzip2,默认是发送没有压缩的数据 CELERY_MESSAGE_COMPRESSION = 'zlib' # 规定完成任务的时间 CELERYD_TASK_TIME_LIMIT = 5 # 在5s内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程 # celery worker的并发数,默认是服务器的内核数目,也是命令行-c参数指定的数目 CELERYD_CONCURRENCY = 4 # celery worker 每次去rabbitmq预取任务的数量 CELERYD_PREFETCH_MULTIPLIER = 4 # 每个worker执行了多少任务就会死掉,默认是无限的 CELERYD_MAX_TASKS_PER_CHILD = 40 # 设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中 CELERY_DEFAULT_QUEUE = "default" # 设置详细的队列 CELERY_QUEUES = { "default": { # 这是上面指定的默认队列 "exchange": "default", "exchange_type": "direct", "routing_key": "default" }, "topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列 "routing_key": "topic.#", "exchange": "topic_exchange", "exchange_type": "topic", }, "task_eeg": { # 设置扇形交换机 "exchange": "tasks", "exchange_type": "fanout", "binding_key": "tasks", }, } 复制代码
2.8 Celery定时任务
- 指定定时任务并加入配置 重新启动worker
# config.py from datetime import timedelta from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'ptask': { 'task': 'tasks.period_task', 'schedule': timedelta(seconds=5), }, } # 添加定时任务 @app.task(bind=True) def period_task(self): print 'period task done: {0}'.format(self.request.id) 复制代码
PS:时间如果涉及到datatime最好设置为UTC时间
- 启动定时任务进程
celery -A task beat 复制代码
2.9 链式任务
链式任务就是异步或者定时执行的任务由多个子任务执行完成
def update_page_info(url): # fetch_page -> parse_page -> store_page chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url) chain() @app.task() def fetch_page(url): return myhttplib.get(url) @app.task() def parse_page(page): return myparser.parse_document(page) @app.task(ignore_result=True) def store_page_info(info, url): PageInfo.objects.create(url=url, info=info) fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)]) 复制代码
以上所述就是小编给大家介绍的《关于celery的介绍》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- ASP.NET Core模块化前后端分离快速开发框架介绍之3、数据访问模块介绍
- 简编漫画介绍WebAssembly
- CGroup 介绍
- CGroup 介绍
- vue初步介绍
- Microbit MicroPython 介绍
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。