内容简介:新建的这里就是执行一个这里会返回一个
<?php namespace App\Jobs; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; class TestJob implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; public function __construct() { echo '开始构造Job'; } public function handle() { echo '开始处理Job'; } } 复制代码
新建的 TestJob
类,这个类实现了序列化模型,队列功能等等都是通过 trait
类来补充的。 这些特性我们通过使用来分解。
运行一个任务
dispatch(new TestJob()); 复制代码
这里就是执行一个 TestJob
的任务,接下去看看 dispatch()
这个方法
function dispatch($job) { if ($job instanceof Closure) { $job = new CallQueuedClosure(new SerializableClosure($job)); } return new PendingDispatch($job); } 复制代码
这里会返回一个 Illuminate\Foundation\Bus\PendingDispatch
对象
TestJob
这个对象里面通过 use Queueable
引入的几个成员属性。 目前为止我们看到只不过是实例化了一个对象,同时将 TestJob
传给 PendingDispatch
我们来解读 PendingDispatch
这个类
<?php namespace Illuminate\Foundation\Bus; use Illuminate\Contracts\Bus\Dispatcher; class PendingDispatch { protected $job; public function __construct($job) { $this->job = $job; // 接收传入的 job 对象 } public function onConnection($connection) { $this->job->onConnection($connection); // 设置任务指定连接 return $this; } public function onQueue($queue) { $this->job->onQueue($queue); // 设置任务队列名 return $this; } public function allOnConnection($connection) { $this->job->allOnConnection($connection); // 设置工作链所有需要的连接 return $this; } public function allOnQueue($queue) { $this->job->allOnQueue($queue); // 设置工作链的队列 return $this; } public function delay($delay) { $this->job->delay($delay); // 设置延迟时间 return $this; } public function chain($chain) { $this->job->chain($chain); // 设置工作链任务 return $this; } public function __destruct() { app(Dispatcher::class)->dispatch($this->job); // 通过析构函数来转发job } } 复制代码
分解完这个类,其实大部分都是设置参数的过程,也是通过这些参数来控制任务的执行状态,比如延迟,工作链模式运行等等。
重点在析构函数,当运行完 return new PendingDispatch($job);
之后对象如果没有被任何变量接收,那么对象的内存空间会被回收,从而触发析构函数执行,也是触发 job
继续执行的方式!
public function __destruct() { app(Dispatcher::class)->dispatch($this->job); // 通过析构函数来转发job } 复制代码
获取任务对应的解析器
app(Dispatcher::class)
传入的参数是 Illuminate\Bus\Dispatcher
, 这个契约对应的绑定类是通过配置文件 app.providers.Illuminate\Bus\BusServiceProvider::class
来加载的 关于 provider
的启动在第九章中有讲,我们直接看启动方法
public function register() { $this->app->singleton(Dispatcher::class, function ($app) { return new Dispatcher($app, function ($connection = null) use ($app) { return $app[QueueFactoryContract::class]->connection($connection); }); }); $this->app->alias( Dispatcher::class, DispatcherContract::class ); $this->app->alias( Dispatcher::class, QueueingDispatcherContract::class ); } 复制代码
app(Dispatcher::class)
的实质就是这个闭包的返回
function ($app) { return new Dispatcher($app, function ($connection = null) use ($app) { return $app[QueueFactoryContract::class]->connection($connection); }); } 复制代码
看看 Dispatcher
构造函数
public function __construct(Container $container, Closure $queueResolver = null) { $this->container = $container; $this->queueResolver = $queueResolver; $this->pipeline = new Pipeline($container); } 复制代码
接受两个参数,第一个是容器,第二个就是闭包所以 $this->queueResolver
就是
function ($connection = null) use ($app) { return $app[QueueFactoryContract::class]->connection($connection); } 复制代码
我管这个 $this->queueResolver
叫解析器,作用是接收一个 $connection
然后从容器中解析出队列的驱动并进行连接。
QueueFactoryContract::class
是通过 provider
加载的 位于 app.providers.Illuminate\Queue\QueueServiceProvider::class,
返回的对象是 Illuminate\Queue\QueueManager
由于 'default' => env('QUEUE_CONNECTION', 'sync'),
中配置的 redis
所以最后返回的对象是 Illuminate\Queue\RedisQueue
分发任务到队列
public function dispatch($command) { // $this->queueResolver 这个队列解析器是在构造的时候注入的 if ($this->queueResolver && $this->commandShouldBeQueued($command)) { return $this->dispatchToQueue($command); } return $this->dispatchNow($command); } 复制代码
上面的方法明确了任务是该通过队列还是同步执行。
这里我们看,传入的 $command
就是开始的 TestJob
对象。 还记得 Laravel
文档说的如果要通过队列实现需要实现一个指定的接口吗 implements ShouldQueue
,这段代码就是解释了原因。
protected function commandShouldBeQueued($command) { return $command instanceof ShouldQueue; } 复制代码
继续下去,通过上面的判断之后我们进入 dispatchToQueue($command)
这里
public function dispatchToQueue($command) { $connection = $command->connection ?? null; $queue = call_user_func($this->queueResolver, $connection); if (! $queue instanceof Queue) { throw new RuntimeException('Queue resolver did not return a Queue implementation.'); } if (method_exists($command, 'queue')) { return $command->queue($queue, $command); } return $this->pushCommandToQueue($queue, $command); } 复制代码
上面解析过了 $queue
就是 Illuminate\Queue\RedisQueue
这个对象
if (method_exists($command, 'queue')) { return $command->queue($queue, $command); } // 返回 false 复制代码
所有最后执行了 return $this->pushCommandToQueue($queue, $command);
protected function pushCommandToQueue($queue, $command) { if (isset($command->queue, $command->delay)) { return $queue->laterOn($command->queue, $command->delay, $command); } // 如果存在指定的队列和延迟,则推入指定队列+延迟 if (isset($command->queue)) { return $queue->pushOn($command->queue, $command); } // 如果存在指定的队列则push到指定的队列 if (isset($command->delay)) { return $queue->later($command->delay, $command); } // 只存在延迟设置,推入延迟 return $queue->push($command); // 默认 } 复制代码
构造数据
上面已经到了最终的调用,那么接下来的事情就是构造一个什么样格式的数据存入 redis
追踪 $queue->push($command)
这里的 $job 就是最开始传入的 TestJob 对象! public function push($job, $data = '', $queue = null) { return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue); } 复制代码
构造 payload
protected function createPayload($job, $queue, $data = '') { $payload = json_encode($this->createPayloadArray($job, $queue, $data)); if (JSON_ERROR_NONE !== json_last_error()) { throw new InvalidPayloadException( 'Unable to JSON encode payload. Error code: '.json_last_error() ); } return $payload; } 复制代码
// 这里的 createPayloadArray() 先调用 Illuminate\Queue\RedisQueue
对象的
protected function createPayloadArray($job, $queue, $data = '') { return array_merge(parent::createPayloadArray($job, $queue, $data), [ 'id' => $this->getRandomId(), 'attempts' => 0, ]); } 复制代码
// 追踪父类 Illuminate\Queue\Queue
方法
protected function createPayloadArray($job, $queue, $data = '') { return is_object($job) ? $this->createObjectPayload($job, $queue) : $this->createStringPayload($job, $queue, $data); } // $job 是对象的时候格式化方式 protected function createObjectPayload($job, $queue) { $payload = $this->withCreatePayloadHooks($queue, [ 'displayName' => $this->getDisplayName($job), 'job' => 'Illuminate\Queue\CallQueuedHandler@call', 'maxTries' => $job->tries ?? null, // 这是任务设置的重试次数 'timeout' => $job->timeout ?? null, // 这是超时时间 'timeoutAt' => $this->getJobExpiration($job), // 获取处理过期时间 'data' => [ 'commandName' => $job, 'command' => $job, ], ]); return array_merge($payload, [ 'data' => [ 'commandName' => get_class($job), 'command' => serialize(clone $job), // 序列化,这里的序列化会调用 // SerializesModels 特质类的__sleep()方法 // 在开头的时候所有的 Job 类都有use ], ]); } // $job 是字符串的时候格式化方式 protected function createStringPayload($job, $queue, $data) { return $this->withCreatePayloadHooks($queue, [ 'displayName' => is_string($job) ? explode('@', $job)[0] : null, 'job' => $job, 'maxTries' => null, 'timeout' => null, 'data' => $data, ]); } 复制代码
将获取的最后 json
字符串 rpush
到 redis
中。
public function pushRaw($payload, $queue = null, array $options = []) { $this->getConnection()->rpush($this->getQueue($queue), $payload); return json_decode($payload, true)['id'] ?? null; } 复制代码至于延迟任务
return $queue->later($command->delay, $command);
, 逻辑基本上一样,只不过最后存入的队列是名不一样
小结
到这里位置关于任务和队列的应用写入端口已经完成,最终是把指定的格式存入配置的存储驱动中的过程。
以上所述就是小编给大家介绍的《【Laravel-海贼王系列】第十章,Job&队列存储端实现》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 【Laravel-海贼王系列】第十一章,Job&队列消费端实现
- 【Laravel-海贼王系列】第八章, Provider 功能解析
- 【Laravel-海贼王系列】第十四章,Session 解析
- 【Laravel-海贼王系列】第十二章,Facade 模式解析
- 【Laravel-海贼王系列】第十三章,路由&控制器解析
- 【Laravel-海贼王系列】第十七章,Laravel 那些骚操作
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Shallows
Nicholas Carr / W. W. Norton & Company / 2010-6-15 / USD 26.95
"Is Google making us stupid?" When Nicholas Carr posed that question, in a celebrated Atlantic Monthly cover story, he tapped into a well of anxiety about how the Internet is changing us. He also crys......一起来看看 《The Shallows》 这本书的介绍吧!