【Laravel-海贼王系列】第十一章,Job&队列消费端实现

栏目: PHP · 发布时间: 6年前

内容简介:我们先从构造函数和片段一如果设置了参数
namespace Illuminate\Queue\Console;

use Illuminate\Queue\Worker;
use Illuminate\Support\Carbon;
use Illuminate\Console\Command;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Queue\WorkerOptions;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class WorkCommand extends Command
{
  
    ...
    
    /**
     * @var \Illuminate\Queue\Worker
     */
    protected $worker;

    public function __construct(Worker $worker)
    {
        parent::__construct();
        $this->worker = $worker;

    }
    
    public function handle()
    {

        if ($this->downForMaintenance() && $this->option('once')) {
            return $this->worker->sleep($this->option('sleep'));
        }

        $this->listenForEvents();

        $connection = $this->argument('connection')
                        ?: $this->laravel['config']['queue.default'];

        $queue = $this->getQueue($connection);

        $this->runWorker(
            $connection, $queue
        );
    }

   ...
}

复制代码

我们先从构造函数和 handle() 方法开始分析,这是入口。

片段一

如果设置了参数 --force 来启动则跳过维护模式的判断,否则会进行判断是否处于维护模式。

if ($this->downForMaintenance() && $this->option('once')) {
        return $this->worker->sleep($this->option('sleep'));
    }
复制代码

片段二

绑定事件任务处理过程和结果到事件,通过控制台实时输出结果,便于开发了解处理进度。

$this->listenForEvents();
 
 protected function listenForEvents()
    {
        $this->laravel['events']->listen(JobProcessing::class, function ($event) {
            $this->writeOutput($event->job, 'starting');
        });

        $this->laravel['events']->listen(JobProcessed::class, function ($event) {
            $this->writeOutput($event->job, 'success');
        });

        $this->laravel['events']->listen(JobFailed::class, function ($event) {
            $this->writeOutput($event->job, 'failed');

            $this->logFailedJob($event);
        });
    }
复制代码

片段三

通过配置文件中配置的驱动获取对应驱动的队列名,如果没有则返回 default

$connection = $this->argument('connection') ?: $this->laravel['config']['queue.default'];
 
 $queue = $this->getQueue($connection);
 
 protected function getQueue($connection)
    {
        return $this->option('queue') ?: $this->laravel['config']->get(
            "queue.connections.{$connection}.queue", 'default'
        );
    }
    
复制代码

片段四 传入连接驱动和队列名称到 runWorker 方法运行任务。

$this->runWorker(
            $connection, $queue
        );
复制代码

这里是启动的重点,我们传入的 $connection = 'redis' $queue = 'default' ,继续分析

protected function runWorker($connection, $queue)
{
    // 这里的 $this->laravel['cache'] 是 Illuminate\Cache\CacheManager 类的实例。
    //(是在 app.providers.Illuminate\Cache\CacheServiceProvider::class 注册的)
    // $this->laravel['cache']->driver() 返回 Illuminate\Cache\Repository 类的实例。

    // 框架通过 CacheManager 对很多存储管理进行了统一。
    // 可以通过修改 app.config.cache.default 和 `app.config.cache.stores 中的值来修改存储驱动。    
    
    $this->worker->setCache($this->laravel['cache']->driver()); // 将获取的驱动赋值给 workder 的 cache成员
    
    // 当 worker 对象拥有了cache对象之后便拥有了操作对应数据的能力 !

    return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
}
复制代码

继续运行

return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
复制代码

这里传入的参数分别是,可以看出都是对队列消费的一些基本设置。

【Laravel-海贼王系列】第十一章,Job&队列消费端实现

当运行模式非 --once 的情况下就会以 daemon 的方式运行。我们看 \Illuminate\Queue\Worker 对象的 daemon 方法即可

守护进程模式

public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        if ($this->supportsAsyncSignals()) {
            $this->listenForSignals();
        }

        $lastRestart = $this->getTimestampOfLastQueueRestart();

        while (true) {
        
            if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
                $this->pauseWorker($options, $lastRestart);

                continue;
            }

            $job = $this->getNextJob(
                $this->manager->connection($connectionName), $queue
            );

            if ($this->supportsAsyncSignals()) {
                $this->registerTimeoutHandler($job, $options);
            }

            if ($job) {
                $this->runJob($job, $connectionName, $options);
            } else {
                $this->sleep($options->sleep);
            }

            $this->stopIfNecessary($options, $lastRestart, $job);
        }
    }
复制代码

进程参数设定

先设置进程的一些管理参数

if ($this->supportsAsyncSignals()) { // extension_loaded('pcntl'); 是否支持 'pcntl' 拓展,支持多进程的拓展。
        $this->listenForSignals();
}

protected function listenForSignals()
    {
        pcntl_async_signals(true); // PHP7.1信号新特性 -- 开启异步信号处理

        // 安装信号处理器,后面可以传入相应的信号来终止或其他操作
        
        pcntl_signal(SIGTERM, function () {
            $this->shouldQuit = true;  // SIGTERM    终止进程      软件终止信号
        });

        pcntl_signal(SIGUSR2, function () {
            $this->paused = true; // SIGUSR2  终止进程 用户定义信号2
        });

        pcntl_signal(SIGCONT, function () {
            $this->paused = false; // SIGCONT 忽略信号  继续执行一个停止的进程
        });
    }
复制代码

关于 pcntl 的用法可以参考PCNTL

信号可以参考对照表

接着看,从 cache 中获取上一次重启的时间戳

$lastRestart = $this->getTimestampOfLastQueueRestart();
复制代码

循环任务执行

判断是否终止运行

if (! $this->daemonShouldRun($options, $connectionName,$queue)) {
    
    // $opions 就是 调用artisan 传入的参数
    // $connectionName 我用了 redis 驱动,所有就是 'redis'
    // $queue 这里没有传入队列则是 'default'
    
    $this->pauseWorker($options, $lastRestart);
    continue;
}
复制代码

下面代码一共三个判断:

1.是否是关站模式并且非强制运行。

2.是否有外部传入的暂停信号

3.是否有绑定 Looping 事件执行并返回结果

如果符合条件则暂停或者发送终止信号。

主要功能是为了控制是否继续执行任务。

protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
    {
        return ! (($this->manager->isDownForMaintenance() && ! $options->force) ||
            $this->paused ||
            $this->events->until(new Events\Looping($connectionName, $queue)) === false);
    }
复制代码

获取待运行的 Job

// $this->manager->connection($connectionName) 是 Illuminate\Queue\RedisQueue 对象
// $queue : 'default'

$job = $this->getNextJob(
    $this->manager->connection($connectionName), $queue
);
复制代码

继续看 getNextJob

protected function getNextJob($connection, $queue)
    {
        try {
            foreach (explode(',', $queue) as $queue) {
                if (! is_null($job = $connection->pop($queue))) {
                    return $job;
                }
            }
        } catch (Exception $e) {
        // 异常处理主要是报告异常
        // 设置 '$this->shouldQuit = true;' 后续就会终止
            $this->exceptions->report($e);

            $this->stopWorkerIfLostConnection($e);

            $this->sleep(1);
        } catch (Throwable $e) {
            $this->exceptions->report($e = new FatalThrowableError($e));

            $this->stopWorkerIfLostConnection($e);

            $this->sleep(1);
        }
    }
复制代码

上面分析过了 $connectionRedisQueue 对象,所有展开 RedisQueuepop 方法,获取要执行的任务对象。

public function pop($queue = null)
    {
        $this->migrate($prefixed = $this->getQueue($queue));

        if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
            return;
        }
        [$job, $reserved] = $nextJob;
        if ($reserved) {
            return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
        }
    }
复制代码

迁移延迟队列

pop 的过程中首先迁移延迟队列的相关数据

protected function migrate($queue)
    {
        // 这里是不是很熟悉了,上一章存储端分析的时候延迟
        // 队列就是用的这个key来存的
        
        // 将延迟的队列迁移到主队列
        $this->migrateExpiredJobs($queue.':delayed', $queue);
        
        // 将过期队列迁移到主队列
        if (! is_null($this->retryAfter)) {
            $this->migrateExpiredJobs($queue.':reserved', $queue);
        }
    }
复制代码

继续看如何迁移到主队列的

public function migrateExpiredJobs($from, $to)
    {
        return $this->getConnection()->eval(
            LuaScripts::migrateExpiredJobs(), 
            2,
            $from,
            $to,
            $this->currentTime()
        );
    }
    
public static function migrateExpiredJobs()
    {
        return <<<'LUA'
        if(next(val) ~= nil) then
            redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
            
            for i = 1, #val, 100 do
                redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
            end
        end
        
        return val
        LUA;
    }    
复制代码

最终通过 eval 命令使用 Lua 解释器执行脚本。 请看Redis Eval

真香,这仅仅是把延迟任务切回主队列,继续!

检索数据

从队列检索下一个 Job

if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
            return; // 没有数据就返回
        }
复制代码

展开检索代码

protected function retrieveNextJob($queue)
    {
        if (! is_null($this->blockFor)) { // 默认值是 null
            return $this->blockingPop($queue);
        }

        // 这段是直接通过  lua  从 redis lpop出对象,
        // 在lua中完成封装,执行逻辑和 blockingPop 相似
        return $this->getConnection()->eval(
            LuaScripts::pop(), 2, $queue, $queue.':reserved',
            $this->availableAt($this->retryAfter)
        );
    }
复制代码

我们主要看 blockingPop 的代码

protected function blockingPop($queue)
    {
        // 以阻塞的方式弹出队列的第一个元素
        $rawBody = $this->getConnection()->blpop($queue, $this->blockFor);
        
        // 解析获取的数据,同时再封装一个重试对象并写入有序集合。
        if (! empty($rawBody)) {
            $payload = json_decode($rawBody[1], true);

            $payload['attempts']++;

            $reserved = json_encode($payload);

            $this->getConnection()->zadd($queue.':reserved', [
                $reserved => $this->availableAt($this->retryAfter),
            ]);

            return [$rawBody[1], $reserved];
        }

        return [null, null];
    }
复制代码

检索完成之后回到 pop 中继续执行

public function pop($queue = null)
    {
        $this->migrate($prefixed = $this->getQueue($queue));

        if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
            return;
        }
        
        // 到这里了!
        [$job, $reserved] = $nextJob;
        if ($reserved) {
            return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
        }
    }
复制代码

我们来看看 $nextJob 是什么

【Laravel-海贼王系列】第十一章,Job&队列消费端实现

最后调用

return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
复制代码

看看 Illuminate\Queue\Jobs\RedisJob 的构造函数

public function __construct(Container $container, RedisQueue $redis, $job, $reserved, $connectionName, $queue)
    {
        $this->job = $job;
        $this->redis = $redis;
        $this->queue = $queue;
        $this->reserved = $reserved;
        $this->container = $container;
        $this->connectionName = $connectionName;

        $this->decoded = $this->payload();
    }
复制代码

这应该是最后一层封装,最后要返回给最外层的任务对象。

运行 Job

回到 Worker 对象中

...
 
 $job = $this->getNextJob(
                $this->manager->connection($connectionName), $queue
            );
 
 // 刚刚我们从 redis 中拿到了封装好的 $job 对象,继续执行
 
 // $job 就是 Illuminate\Queue\Jobs\RedisJob 对象
 
 // 是否支持 pcntl 拓展,异步模式传递信号
 if ($this->supportsAsyncSignals()) {
    // 设置超时信号处理
    $this->registerTimeoutHandler($job, $options);
 }
复制代码

继续注册超时信号控制

protected function registerTimeoutHandler($job, WorkerOptions $options)
    {
        pcntl_signal(SIGALRM, function () {
            $this->kill(1);
        });

        pcntl_alarm(
            max($this->timeoutForJob($job, $options), 0)
        );
    }
复制代码

总算要到运行 Job 的部分了

if ($job) {
        $this->runJob($job, $connectionName, $options);
    } else {
        // 不存在 $job 则睡眠,最低睡眠1秒
        $this->sleep($options->sleep);
    }
复制代码

解析 runJob

到这一步我们已经拿到了所有的对象,接下来就是把 对象用起来!

protected function runJob($job, $connectionName, WorkerOptions $options)
    {
        try {
            return $this->process($connectionName, $job, $options);
        } catch (Exception $e) {
            
            // 异常处理和上部分的一样,
            // 设定停止信号,在循环的结尾会检测信号
            // 因此我们不需要分析这段
            
            $this->exceptions->report($e);

            $this->stopWorkerIfLostConnection($e);
        } catch (Throwable $e) {
            $this->exceptions->report($e = new FatalThrowableError($e));

            $this->stopWorkerIfLostConnection($e);
        }
    }
复制代码

展开

$this->process($connectionName, $job, $options);
复制代码

继续展开

public function process($connectionName, $job, WorkerOptions $options)
    {
        try {
           
            // 触发任务执行前的绑定事件,从队列删除任务
            $this->raiseBeforeJobEvent($connectionName, $job);
          
            // 标记超过最大重试次数的任务
            $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
                $connectionName, $job, (int) $options->maxTries
            );

           
            $job->fire();

            // 触发任务执行后的绑定事件
            $this->raiseAfterJobEvent($connectionName, $job);
        } catch (Exception $e) {
            $this->handleJobException($connectionName, $job, $options, $e);
        } catch (Throwable $e) {
            $this->handleJobException(
                $connectionName, $job, $options, new FatalThrowableError($e)
            );
        }
    }
复制代码

$job->fire()

$job => Illuminate\Queue\Jobs\RedisJob 继承了 Illuminate\Queue\Jobs\Job 所以调用了抽象父类的 fire() 方法

public function fire()
    {
        $payload = $this->payload();

        [$class, $method] = JobName::parse($payload['job']);

        ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
    }
复制代码

我们看看 $payload 的结构实际就是 json_decode($job, true)

【Laravel-海贼王系列】第十一章,Job&队列消费端实现

转换后的 [$class, $method] 分别是 Illuminate\Queue\CallQueuedHandlercall

最后就是从容器中解析出 Illuminate\Queue\CallQueuedHandler 对象并且调用 call 方法,展开方法

public function call(Job $job, array $data)
    {
        try {
            $command = $this->setJobInstanceIfNecessary(
                $job, unserialize($data['command'])
            );
        } catch (ModelNotFoundException $e) {
            return $this->handleModelNotFound($job, $e);
        }

        $this->dispatcher->dispatchNow(
            $command, $this->resolveHandler($job, $command)
        ); 

        if (! $job->hasFailed() && ! $job->isReleased()) {
            $this->ensureNextJobInChainIsDispatched($command);
        }

        if (! $job->isDeletedOrReleased()) {
            $job->delete();
        }
    }
复制代码

先看看 $command 获取的是什么

protected function setJobInstanceIfNecessary(Job $job, $instance)
    {
        if (in_array(InteractsWithQueue::class, class_uses_recursive($instance))) {
            $instance->setJob($job);
        }

        return $instance;
    }
复制代码

打印 class_uses_recursive($instance)

【Laravel-海贼王系列】第十一章,Job&队列消费端实现

接着就调用了 $instance->setJob($job) ;

这里的 $instance 就是对应我们自己编写的任务对象。

执行完之后最终 $command 返回的就是自己编写的类

【Laravel-海贼王系列】第十一章,Job&队列消费端实现

RedisJob$command 传给 dispatchNow 方法 $this->dispatcherIlluminate\Bus\Dispatcher 对象

$this->dispatcher->dispatchNow(
            $command, $this->resolveHandler($job, $command)
        );
复制代码

最后的真像

public function dispatchNow($command, $handler = null)
    {
        if ($handler || $handler = $this->getCommandHandler($command)) {
            $callback = function ($command) use ($handler) {
                return $handler->handle($command); // 划重点,要考!
            };
        } else {
            $callback = function ($command) {
                return $this->container->call([$command, 'handle']);
            };
        }

        return $this->pipeline->send($command)->through($this->pipes)->then($callback);
    }
复制代码

其实费了那么大的力气,最后就是调用 $command->handle 回头看看 job 的定义

【Laravel-海贼王系列】第十一章,Job&队列消费端实现

就像烟火过后一样,消失于无形。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

New Dark Age

New Dark Age

James Bridle / Verso Books / 2018-7-17 / GBP 16.99

As the world around us increases in technological complexity, our understanding of it diminishes. Underlying this trend is a single idea: the belief that our existence is understandable through comput......一起来看看 《New Dark Age》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具