队列系统
简介
{tip} Laravel 现在为你的 Redis 队列 提供了 Horizon,一个漂亮的仪表盘和配置系统。查看完整的 Horizon documentation 文档 了解更多信息。
Laravel 队列为不同的后台队列服务提供统一的 API,例如 Beanstalk,Amazon SQS,Redis,甚至其他基于关系型数据库的队列。队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和相应的时间。
队列配置文件存放在 config/queue.php
文件中。每一种队列驱动的配置都可以在该文件中找到,包括数据库, Beanstalkd, Amazon SQS, Redis,以及同步(本地使用)驱动。其中还包含了一个 null
队列驱动用于那些放弃队列的任务。
连接 Vs. 队列
在开始使用 Laravel 队列前,弄明白 「连接」 和 「队列」 的区别是很重要的。在你的 config/queue.php
配置文件里,有一个 connections
配置选项。这个选项给 Amazon SQS,Beanstalk,或者 Redis 这样的后端服务定义了一个特有的连接。不管是哪一种,一个给定的连接可能会有多个 「队列」,而 「队列」 可以被认为是不同的栈或者大量的队列任务。
要注意的是,queue
配置文件中每个连接的配置示例中都包含一个 queue
属性。这是默认队列任务被发给指定连接的时候会被分发到这个队列中。换句话说,如果你分发任务的时候没有显式定义队列,那么它就会被放到连接配置中 queue
属性所定义的队列中:
// 这个任务将被分发到默认队列...
Job::dispatch();
// 这个任务将被发送到「emails」队列...
Job::dispatch()->onQueue('emails');
有些应用可能不需要把任务发到不同的队列,而只发到一个简单的队列中就行了。但是把任务推到不同的队列仍然是非常有用的,因为 Laravel 队列处理器允许你定义队列的优先级,所以你能给不同的队列划分不同的优先级或者区分不同任务的不同处理方式了。比如说,如果你把任务推到 high
队列中,你就能让队列处理器优先处理这些任务了:
php artisan queue:work --queue=high,default
驱动的必要设置
Database
为了使用 database
队列驱动,你需要一张数据表来存储任务。运行 queue:table
Artisan 命令来创建这张表的迁移文件。当迁移文件创建好后,你就可以使用 migrate
命令来进行迁移:
php artisan queue:table
php artisan migrate
Redis
为了使用 redis
队列驱动,你需要在 config/database.php
配置文件中配置 Redis 的数据库连接。
Redis 集群
如果你的 Redis 队列驱动使用了 Redis 集群,你的队列名必须包含一个 key hash tag 。这是为了确保所有的 Redis 键对于一个队列都被放在同一哈希中。
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
阻塞
当使用 Redis 队列时,你可以用 block_for
配置项来具体说明驱动应该在将任务重新放入 Redis 数据库以及处理器轮询之前阻塞多久。
基于你的队列加载来调整这个值比把新任务放入 Redis 数据库轮询要更有效率的多。例如,你可以将这个值设置为 5
来表明这个驱动应该在等待任务可用时阻塞5秒。
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'retry_after' => 90,
'block_for' => 5,
],
{note} 阻塞是一个实验性功能。如果在任务被取回的同时,Redis 服务或队列处理器崩溃的话,有很小的可能导致一个队列任务丢失。
其它队列驱动的依赖扩展包
在使用列表里的队列服务前,必须安装以下依赖扩展包:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~3.0
- Redis:
predis/predis ~1.0
创建任务
生成任务类
在你的应用程序中,队列的任务类都默认放在 app/Jobs
目录下。如果这个目录不存在,那当你运行 make:job
Artisan 命令时目录就会被自动创建。你可以用以下的 Artisan 命令来生成一个新的队列任务:
php artisan make:job ProcessPodcast
生成的类实现了 Illuminate\Contracts\Queue\ShouldQueue
接口,这意味着这个任务将会被推送到队列中,而不是同步执行。
任务类结构
任务类的结构很简单,一般来说只会包含一个让队列用来调用此任务的 handle
方法。我们来看一个示例的任务类。这个示例里,假设我们管理着一个播客发布服务,在发布之前需要处理上传播客文件:
<?php
namespace App\Jobs;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建一个新的任务实例。
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 运行任务。
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// Process uploaded podcast...
}
}
注意,在这个例子中,我们在任务类的构造器中直接传递了一个 Eloquent 模型 。因为我们在任务类里引用了 SerializesModels
这个 trait,使得 Eloquent 模型在处理任务时可以被优雅地序列化和反序列化。如果你的队列任务类在构造器中接收了一个 Eloquent 模型,那么只有可识别出该模型的属性会被序列化到队列里。当任务被实际运行时,队列系统便会自动从数据库中重新取回完整的模型。这整个过程对你的应用程序来说是完全透明的,这样可以避免在序列化完整的 Eloquent 模式实例时所带来的一些问题。
在队列处理任务时,会调用 handle
方法,而这里我们也可以通过 handle
方法的参数类型提示,让 Laravel 的 服务容器 自动注入依赖对象。
{note} 像图片内容这种二进制数据,在放入队列任务之前必须使用
base64_encode
方法转换一下。否则,当这项任务放置到队列中时,可能无法正确序列化为 JSON。
分发任务
一旦你写完了你的任务类你就可以使用它自带的 dispatch
方法分发它。传递给 dispatch
方法的参数将会被传递给任务的构造函数:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 存储一个新的播客节目。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast);
}
}
延迟分发
如果你想延迟你的队列任务的执行,你可以在分发任务的时候使用 delay
方法。例如,让我们详细说明一个十分钟之后才会执行的任务:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 存储一个新的播客节目。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
}
}
{note} Amazon SQS 队列服务最大延迟 15 分钟的时间。
工作链
工作链允许你具体定义一个按序列执行的队列任务的列表。一旦序列中的任务失败了,剩余的工作将不会执行。要运行一个工作链,你可以对可分发的任务使用 withChain
方法:
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch();
工作链连接&队列
如果你想定义用于工作链的默认连接和队列,你可以使用 allOnConnection
和 allOnQueue
方法。 这些方法指定了所需队列的连接和队列——除非队列任务被明确指定给了不同的连接/队列:
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');
自定义连接&队列
分发任务到指定队列
通过将任务分发到不同队列,你可以将你的队列任务「分类」,甚至指定给不同队列分配的任务数量。记住,这不是推送任务到你定义的队列配置文件的不同的连接里,而是一个单一的连接。要指定队列,在分发任务时使用 onQueue
方法:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 存储一个新的播客节目。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
}
}
分发任务到指定连接
如果你在多队列连接中工作,你可以指定将任务分发到哪个连接。要指定连接,在分发任务时使用 onConnection
方法:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 存储一个新播客节目。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
}
}
当然,你可以链式调用 onConnection
和 onQueue
方法来指定连接和队列。
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
指定最大任务尝试次数/超时值
最大尝试次数
在一个任务重指定最大尝试次数可以通过 Artisan 命令的 --tries
选项 指定:
php artisan queue:work --tries=3
你可能想通过任务类自身对最大任务尝试次数进行一个更颗粒化的处理。如果最大尝试次数是在任务类中定义的,它将优先于命令行中的值提供。
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 任务可以尝试的最大次数。
*
* @var int
*/
public $tries = 5;
}
基于时间的尝试
作为另外一个选择来定义任务在失败前会尝试多少次,你可以定义一个任务超时时间。这样的话,在给定的时间范围内,任务可以无限次尝试。要定义一个任务的超时时间,在你的任务类中新增一个 retryUntil
方法:
/**
* 定义任务超时时间
*
* @return \DateTime
*/
public function retryUntil()
{
return now()->addSeconds(5);
}
{tip} 你也可以在你的队列事件监听器中使用
retryUntil
方法。
超时
{note}
timeout
特性对于 PHP 7.1+ 和pcntl
PHP 扩展进行了优化.
同样的,任务执行最大秒数的数值可以通过 Artisan 命令行的 --timeout
选项指定。
php artisan queue:work --timeout=30
然而,你可能也想在任务类自身定义一个超时时间。如果在任务类中指定,优先级将会高于命令行:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 超时时间。
*
* @var int
*/
public $timeout = 120;
}
频率限制
{note} 这个特性要求你的应用可以使用 Redis 服务器.
如果你的应用使用了 Redis ,你可以通过时间或并发限制你的队列任务。当你的队列任务通过同样有速率限制的 API 使用时,这个特性将很有帮助。例如,使用 throttle
方法,你可以限制一个给定类型的任务每 60 秒只执行 10 次。如果没有获得锁,一般情况下你应该将任务放回队列以使其可以被稍后重试。
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// 任务逻辑...
}, function () {
// 无法获得锁...
return $this->release(10);
});
{tip}在上述的例子里,
key
可以是任何你想要限制频率的任务类型的唯一识别字符串。例如,你也许想使构件基于任务类名的 key 或它操作的一系列 Eloquent 模型的 ID。
或者,你想具体说明一个任务可以同时执行的最大数量。在如下情况时这会很有用处:当一个队列中的任务正在修改资源时,一次只能被一个任务修改。例如,使用 funnel
方法,你可以限制一个给定类型的任务一次只能执行一个处理器。
Redis::funnel('key')->limit(1)->then(function () {
// 任务逻辑...
}, function () {
// 无法获得锁...
return $this->release(10);
});
{tip} 当使用频率限制时,任务执行成功的尝试的次数可能会难以确定。所以,将频率限制与 时间限制 组合是很有作用的。
错误处理
如果在任务执行的时候出现异常,任务会被自动释放到队列中以再次尝试。任务将会一直被释放直到达到应用允许的最大重试次数。最大重试的数值由 queue:work
Artisan 命令的 --tries
选项定义,或者在任务类中定义。更多执行队列处理器的信息可以 在以下找到 。
运行队列处理器
Laravel 包含了一个队列处理器以将推送到队列中的任务执行。你可以使用 queue:work
Artisan 命令运行处理器。 注意一旦 queue:work
命令开始执行,它会一直运行直到它被手动停止或终端被关闭。
php artisan queue:work
{tip} 要使
queue:work
进程一直在后台运行,你应该使用进程管理器比如 Supervisor 来确保队列处理器不会停止运行
记住,队列处理器是一个常驻的进程并且在内存中保存着已经启动的应用状态。因此,它们并不会在启动后注意到你代码的更改。所以,在你的重新部署过程中,请记得 重启你的队列处理器.
执行单一任务
--once
选项用于使队列处理器只处理队列中的单一任务。
php artisan queue:work --once
指定连接&队列
你也可以具体说明队列处理器应该使用哪个队列连接。 传递给 work
的连接名应该与你的 config/queue.php
配置文件中定义的连接之一相符。
php artisan queue:work redis
你甚至可以自定义你的队列处理器使其只执行连接中指定的队列。例如,如果你的所有邮件都由 redis
连接的 emails
队列处理,你可以使用如下的命令启动一个仅执行此队列的处理器:
php artisan queue:work redis --queue=emails
资源注意事项
后台驻留的队列处理器不会在执行完每个任务后「重启」框架。因此,你应该在每个任务完成后释放任何占用过大的资源。例如,如果你正在用 GD 库执行图像处理,你应该在完成后使用 imagedestroy
释放内存。
队列优先级
有时你可能想确定队列执行的优先顺序。例如在 config/queue.php
中你可以将 redis
连接的 queue
队列的优先级从 default
设置为 low
。然而, 偶尔你也想像如下方式将一个任务推送到 high
队列:
dispatch((new Job)->onQueue('high'));
要运行一个处理器来确认 low
队列中的任务在全部的 high
队列任务完成后才继续执行,你可以传递一个逗号分隔的队列名列表作为 work
命令的参数。
php artisan queue:work --queue=high,low
队列处理器&部署
因为队列处理器是常驻进程,他们在重启前不会应用你代码的更改。因此,部署使用队列处理器的应用最简单的方法是在部署进程中重启队列处理器。你可以平滑地重启所有队列处理器通过使用 queue:restart
方法:
php artisan queue:restart
这个命令将会引导所有的队列处理器在完成当前任务后平滑「中止」,这样不会有丢失的任务。由于在执行 queue:restart
后队列处理器将会中止,所以你应该运行一个进程管理器例如 Supervisor 来自动重启队列处理器。
{tip} 队列使用 缓存 存储重启信号,所以你应该确定在使用这个功能之前配置好缓存驱动。
任务过期&超时
任务过期
在你的 config/queue.php
配置文件中,每个队列连接都定义了一个 retry_after
选项。这个选项指定了队列连接在重试一个任务前应该等它执行多久。例如,如果 retry_after
的值设置为 90
,那么任务在执行了 90 秒后将会被放回队列而不是删除它。一般情况下,你应该将 retry_after
的值设置为你认为你的任务可能会执行需要最长时间的值。
{note} 只有在 Amazon SQS 中不存在
retry_after
这个值。 SQS将会以 AWS 控制台配置的 默认可见超时值 作为重试任务的依据。
处理器超时
queue:work
Artisan 命令包含一个 --timeout
选项。 --timeout
选项指定了 Laravel 的队列主进程在中止一个执行任务的子进程之前需要等到多久。有时一个子进程可能会因为各种原因「冻结」,比如一个外部的 HTTP 请求失去响应。 --timeout
选项会移除那些超过指定时间被冻结的进程。
php artisan queue:work --timeout=60
retry_after
配置项和 --timeout
命令行配置并不同,但将它们同时使用可以确保任务不会丢失并且任务只会成功执行一次。
{note}
--timeout
的值应该比你在retry_after
中配置的值至少短几秒。这会确保处理器永远会在一个任务被重试之前中止。如果你的--timeout
值比retry_after
的值长的话,你的任务可能会被执行两次。
队列进程睡眠时间
当任务在队列中可用时,处理器将会一直无间隔地处理任务。 然而, sleep
选项定义了如果没有新任务的时候处理器将会「睡眠」多长时间。在处理器睡眠时,它不会处理任何新任务 —— 任务将会在队列处理器再次启动后执行。
php artisan queue:work --sleep=3
Supervisor 配置
安装 Supervisor
Supervisor 是一个 Linux 下的进程管理器,它会在 queue:work
进程关闭后自动重启。要在 Ubuntu 下安装 Supervisor ,你可以使用以下命令:
sudo apt-get install supervisor
{tip} 如果自己配置 Supervisor 很难的话,你可以考虑使用 Laravel Forge ,它将会为你的 Laravel 项目自动安装配置 Supervisor
配置 Supervisor
一般 Supervisor 的配置文件存储在 /etc/supervisor/conf.d
目录。 在这个目录下,你可以创建任意数量的配置文件控制 Supervisor 对于进进程的管理。例如,让我们创建一个 laravel-worker.conf
文件开始管理 queue:work
进程:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
在这个示例中, numprocs
会告诉 Supervisor 运行 8 个 queue:work
进程并且管理它们,当它们关闭时会将其自动重启。当然,你应该将 command
选项中的 queue:work sqs
部分修改为你的队列连接。
启动 Supervisor
当配置文件创建好后,你可以用下面的命令更新 Supervisor 配置文件并且启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
要了解更多关于 Supervisor 的信息,查询 Supervisor 文档 。
处理失败的任务
有时你的队列任务会失败。别担心,凡事无完美! Laravel 包含了一个便捷的方式指定任务会被最大尝试的次数。在一个任务达到了它最大尝试次数后,它会被放入 failed_jobs
表。要创建 failed_jobs
表你可以使用 queue:failed-table
命令:
php artisan queue:failed-table
php artisan migrate
然后,运行你的 队列处理器 ,你应该在 queue:work
命令上使用 --tries
选项。如果你没有指定 --tries
的值,任务将会被无限次尝试。
php artisan queue:work redis --tries=3
清除失败的任务
你可以在你的任务类中定义一个 failed
方法,它可以允许你在一个任务失败后清除它。这是一个提醒你的用户或撤回任何任务所做出的修改的绝佳时机。导致任务失败的 Exception
会被传入到 failed
方法:
<?php
namespace App\Jobs;
use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建一个新的任务实例。
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 执行任务。
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 执行上传播客...
}
/**
* 执行失败的任务。
*
* @param Exception $exception
* @return void
*/
public function failed(Exception $exception)
{
// 给用户发送失败的通知等等...
}
}
任务失败事件
如果你想注册一个在任务失败时调用的事件,你可以使用 Queue::failing
方法。这是通过 email 或 Stride 通知你的团队的绝佳时机。例如,我们可以从 Laravel 中的 AppServiceProvider
注册一个此事件的回调。
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;
class AppServiceProvider extends ServiceProvider
{
/**
* 启动任意服务。
*
* @return void
*/
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
/**
* 注册服务提供者。
*
* @return void
*/
public function register()
{
//
}
}
重试失败的任务
想要查看所有被放入 failed_jobs
数据表中的任务,你可以使用 queue:failed
Artisan 命令:
php artisan queue:failed
queue:failed
命令会列出任务 ID ,队列,以及失败的事件。任务 ID 可能会被用于重试失败的任务。例如,要重试一个任务 ID 为 5
的任务,使用如下命令:
php artisan queue:retry 5
要重试所有失败的任务,执行 queue:retry
命令,将 all
作为 ID 传入:
php artisan queue:retry all
如果你想删除一个失败的任务,使用 queue:forget
命令:
php artisan queue:forget 5
要清空所有失败的任务,使用 queue:flush
命令:
php artisan queue:flush
任务事件
通过在队列 facade 中使用 before
和 after
方法,你可以指定一个队列任务被执行前后的回调。这些回调是添加额外的日志或增加统计的绝好时机。通常,你应该在 服务提供者 中调用这些方法。例如,我们可以使用 Laravel 的 AppServiceProvider
:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* 引导启动任意应用服务
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
/**
* 注册服务提供者
*
* @return void
*/
public function register()
{
//
}
}
在队列 facade 使用 looping
方法可以在处理器尝试获取任务之前执行回调。例如,你也许想用一个闭包来回滚之前失败的任务尚未关闭的事务。
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});
点击查看所有 Laravel 中文文档 文章: https://www.codercto.com/courses/l/3.html