内容简介:Here are some issues I’ve seen crop up several times in Django projects using Celery. They probably apply to other web frameworks and other task queues, I simply haven’t used others so much.If you duplicate data from your database in your task arguments, i
Here are some issues I’ve seen crop up several times in Django projects using Celery. They probably apply to other web frameworks and other task queues, I simply haven’t used others so much.
1. Enqueueing Data Rather Than References
If you duplicate data from your database in your task arguments, it can go stale in the queue before the task executes. I can’t describe this more completely than Celery’s documentation on task state . See the description and example there.
This is not so easy to do accidentally on Celery since version 4, which changed the default serializer from Pickle to JSON . (If you’re not sure what serializer you’re using, check your settings.)
But, it is still possible to enqueue data rather than references. For example, imagine you enqueue an email with a 1 minute delay, using the user’s email address in the task arguments, rather than ID. If the user changes their email address before the ask runs, the email gets sent to the wrong address.
2. Enqueueing Tasks Within Database Transactions
Although you might think Celery is for executing tasks “later/eventually,” it can execute them quickly too!
If you enqueue a task from within a database transaction, it might execute before the database commits the transaction.
This can mean that the data your task needs to access, such as a new model instance, isn’t visible, and it raises a DoesNotExist
exception.
The solution is to enqueue your tasks when after transaction commits, using Django’s transaction.on_commit
.
This is well described in both the Django documentation
and the Celery documentation
.
If you’re not using transactions in your views, you can also end up with tasks executing before all the data they expect is there.
I recommend using transactions in your views - the easiest way is to use
Django’s ATOMIC_REQUESTS
setting
.
3. Not Using Database Transactions in Tasks
Whilst Django makes it easy to use database transactions in your views
with ATOMIC_REQUESTS
, you’re a bit on your own for other code paths.
This includes Celery tasks.
If you don’t wrap your tasks
with transaction.atomic()
, or use it inside your task body, you may have data integrity problems.
It’s worth auditing your tasks to find where you should use transaction.atomic()
.
You could even add a project-specific wrapper for Celery’s @shared_task
that adds @atomic
to your tasks.
4. Default “Unfair” Task Distribution
By default, the Celery worker will send batches of tasks to its worker processes where they are re-queued in-memory. This is intended to increase throughput as worker processes don’t need to wait for tasks from your broker. But it does mean a long running task in a process holds up faster tasks queued behind it, even when other worker processes are free to run them.
Taylor Hughes’ Celery tips post has a great diagram demonstrating this visually in tip #2.
In my experience, this default behaviour has never been desirable. It’s very common for projects to have tasks with vastly different running times, leading to this blocking.
You can disable it by running celery worker
with -O fair
.
The Celery documentation on “Prefork pool prefetch settings”
has a better explanation.
5. Using a Long countdown
or an eta
in the Far Future
Celery provides the
eta
and countdown
arguments to task enqueues.
These let you schedule tasks for later execution.
Unfortunately the way these work is not built into brokers. These delayed tasks will end up at the head of your queue, in front of later non-delayed tasks. The Celery worker process fetches the delayed tasks and “puts them aside” in memory, then fetches the non-delayed tasks.
With many such tasks, the Celery worker process will use a lot of memory to hold these tasks. Restarting the worker process will also need to re-fetch all the delayed tasks at the head of the queue. I’ve seen a case where there were enough delayed tasks that restarting took several minutes to start doing actual work!
If you need to delay tasks for more than a few minutes, you should avoid eta
and countdown
.
It’s better to add a field to your model instances with a time to be acted on, and use a periodic task to enqueue the actual execution.
6. ACKS Behaviour
Celery’s default behaviour to acknowledge tasks immediately, removing them from your broker’s queue. If they are interrupted, for example by a random server crash, Celery won’t retry the task.
This is good in the case that your task is not idempotent (repeatable without problems). But it’s not good for dealing with random errors, such as your database connection dropping randomly. In this case, work goes missing, since Celery removed it from the queue before attempting it.
The opposite behaviour, “acks late,” acknowledges tasks only after successful completion. This is the encouraged behaviour of other many queuing systems, such as SQS.
Celery covers this in its documentation in the FAQ “Should I use retry or acks_late?” . It’s a nuanced issue, but I do think the default “acks early” behaviour tends to be counter-intuitive.
I recommend setting acks_late = True
as the default in your Celery configuration and thinking through which mode is appropriate for each task.
You can reconfigure it on a per task function by
passing acks_late
to the @shared_task
decorator.
6. Not Retrying Missed Work
Tasks might crash for any number of reasons, many of which are out of your control. For example, if your database server crashes, Celery might fail to execute tasks, and raise a “connection failed” error.
The easiest way to fix this is with a second periodic “sweeper task” that scans and repeats/requeues missed work. The recent AWS article Avoiding insurmountable queue backlogs revealed they call such tasks “anti-entropy sweepers.” (That article is a fantastic read, filled with advice on working with queues at scale.)
7. Changing Task Signatures in Backwards-Incompatible Ways
Imagine we had a task function like this:
@shared_task def frobnicate(widget_id: int): ...
And we changed it to add a new required argument frazzle
:
@shared_task def frobnicate(widget_id: int, frazzle: bool): ...
When we deploy this change, any enqueued tasks from the first version of the code will fail with:
TypeError: frobnicate() missing 1 required positional argument: 'frazzle'
You should treat task function signatures with the same consideration as database migrations.
If you’re adding a new argument, give it a default first - similar to adding database columns as nullable. After that version of the code is live, you can remove the default.
Similarly if you’re removing an argument, give it a default first.
And if you’re removing a task, remove the call sites first, then the task itself.
I’m not aware of any tooling to help enforce this, though in my previous position at YPlan we had some homegrown Django system checks . These made sure we thought through every change we made to tasks and their arguments.
Fin
I hope this helps you enjoy Celery, or whatever task queue you use, more,
—Adam
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。