【Laravel-海贼王系列】第十一章,Job&队列消费端实现

栏目: PHP · 发布时间: 5年前

内容简介:我们先从构造函数和片段一如果设置了参数
namespace Illuminate\Queue\Console;

use Illuminate\Queue\Worker;
use Illuminate\Support\Carbon;
use Illuminate\Console\Command;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Queue\WorkerOptions;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class WorkCommand extends Command
{
  
    ...
    
    /**
     * @var \Illuminate\Queue\Worker
     */
    protected $worker;

    public function __construct(Worker $worker)
    {
        parent::__construct();
        $this->worker = $worker;

    }
    
    public function handle()
    {

        if ($this->downForMaintenance() && $this->option('once')) {
            return $this->worker->sleep($this->option('sleep'));
        }

        $this->listenForEvents();

        $connection = $this->argument('connection')
                        ?: $this->laravel['config']['queue.default'];

        $queue = $this->getQueue($connection);

        $this->runWorker(
            $connection, $queue
        );
    }

   ...
}

复制代码

我们先从构造函数和 handle() 方法开始分析,这是入口。

片段一

如果设置了参数 --force 来启动则跳过维护模式的判断,否则会进行判断是否处于维护模式。

if ($this->downForMaintenance() && $this->option('once')) {
        return $this->worker->sleep($this->option('sleep'));
    }
复制代码

片段二

绑定事件任务处理过程和结果到事件,通过控制台实时输出结果,便于开发了解处理进度。

$this->listenForEvents();
 
 protected function listenForEvents()
    {
        $this->laravel['events']->listen(JobProcessing::class, function ($event) {
            $this->writeOutput($event->job, 'starting');
        });

        $this->laravel['events']->listen(JobProcessed::class, function ($event) {
            $this->writeOutput($event->job, 'success');
        });

        $this->laravel['events']->listen(JobFailed::class, function ($event) {
            $this->writeOutput($event->job, 'failed');

            $this->logFailedJob($event);
        });
    }
复制代码

片段三

通过配置文件中配置的驱动获取对应驱动的队列名,如果没有则返回 default

$connection = $this->argument('connection') ?: $this->laravel['config']['queue.default'];
 
 $queue = $this->getQueue($connection);
 
 protected function getQueue($connection)
    {
        return $this->option('queue') ?: $this->laravel['config']->get(
            "queue.connections.{$connection}.queue", 'default'
        );
    }
    
复制代码

片段四 传入连接驱动和队列名称到 runWorker 方法运行任务。

$this->runWorker(
            $connection, $queue
        );
复制代码

这里是启动的重点,我们传入的 $connection = 'redis' $queue = 'default' ,继续分析

protected function runWorker($connection, $queue)
{
    // 这里的 $this->laravel['cache'] 是 Illuminate\Cache\CacheManager 类的实例。
    //(是在 app.providers.Illuminate\Cache\CacheServiceProvider::class 注册的)
    // $this->laravel['cache']->driver() 返回 Illuminate\Cache\Repository 类的实例。

    // 框架通过 CacheManager 对很多存储管理进行了统一。
    // 可以通过修改 app.config.cache.default 和 `app.config.cache.stores 中的值来修改存储驱动。    
    
    $this->worker->setCache($this->laravel['cache']->driver()); // 将获取的驱动赋值给 workder 的 cache成员
    
    // 当 worker 对象拥有了cache对象之后便拥有了操作对应数据的能力 !

    return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
}
复制代码

继续运行

return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
复制代码

这里传入的参数分别是,可以看出都是对队列消费的一些基本设置。

【Laravel-海贼王系列】第十一章,Job&队列消费端实现

当运行模式非 --once 的情况下就会以 daemon 的方式运行。我们看 \Illuminate\Queue\Worker 对象的 daemon 方法即可

守护进程模式

public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        if ($this->supportsAsyncSignals()) {
            $this->listenForSignals();
        }

        $lastRestart = $this->getTimestampOfLastQueueRestart();

        while (true) {
        
            if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
                $this->pauseWorker($options, $lastRestart);

                continue;
            }

            $job = $this->getNextJob(
                $this->manager->connection($connectionName), $queue
            );

            if ($this->supportsAsyncSignals()) {
                $this->registerTimeoutHandler($job, $options);
            }

            if ($job) {
                $this->runJob($job, $connectionName, $options);
            } else {
                $this->sleep($options->sleep);
            }

            $this->stopIfNecessary($options, $lastRestart, $job);
        }
    }
复制代码

进程参数设定

先设置进程的一些管理参数

if ($this->supportsAsyncSignals()) { // extension_loaded('pcntl'); 是否支持 'pcntl' 拓展,支持多进程的拓展。
        $this->listenForSignals();
}

protected function listenForSignals()
    {
        pcntl_async_signals(true); // PHP7.1信号新特性 -- 开启异步信号处理

        // 安装信号处理器,后面可以传入相应的信号来终止或其他操作
        
        pcntl_signal(SIGTERM, function () {
            $this->shouldQuit = true;  // SIGTERM    终止进程      软件终止信号
        });

        pcntl_signal(SIGUSR2, function () {
            $this->paused = true; // SIGUSR2  终止进程 用户定义信号2
        });

        pcntl_signal(SIGCONT, function () {
            $this->paused = false; // SIGCONT 忽略信号  继续执行一个停止的进程
        });
    }
复制代码

关于 pcntl 的用法可以参考PCNTL

信号可以参考对照表

接着看,从 cache 中获取上一次重启的时间戳

$lastRestart = $this->getTimestampOfLastQueueRestart();
复制代码

循环任务执行

判断是否终止运行

if (! $this->daemonShouldRun($options, $connectionName,$queue)) {
    
    // $opions 就是 调用artisan 传入的参数
    // $connectionName 我用了 redis 驱动,所有就是 'redis'
    // $queue 这里没有传入队列则是 'default'
    
    $this->pauseWorker($options, $lastRestart);
    continue;
}
复制代码

下面代码一共三个判断:

1.是否是关站模式并且非强制运行。

2.是否有外部传入的暂停信号

3.是否有绑定 Looping 事件执行并返回结果

如果符合条件则暂停或者发送终止信号。

主要功能是为了控制是否继续执行任务。

protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
    {
        return ! (($this->manager->isDownForMaintenance() && ! $options->force) ||
            $this->paused ||
            $this->events->until(new Events\Looping($connectionName, $queue)) === false);
    }
复制代码

获取待运行的 Job

// $this->manager->connection($connectionName) 是 Illuminate\Queue\RedisQueue 对象
// $queue : 'default'

$job = $this->getNextJob(
    $this->manager->connection($connectionName), $queue
);
复制代码

继续看 getNextJob

protected function getNextJob($connection, $queue)
    {
        try {
            foreach (explode(',', $queue) as $queue) {
                if (! is_null($job = $connection->pop($queue))) {
                    return $job;
                }
            }
        } catch (Exception $e) {
        // 异常处理主要是报告异常
        // 设置 '$this->shouldQuit = true;' 后续就会终止
            $this->exceptions->report($e);

            $this->stopWorkerIfLostConnection($e);

            $this->sleep(1);
        } catch (Throwable $e) {
            $this->exceptions->report($e = new FatalThrowableError($e));

            $this->stopWorkerIfLostConnection($e);

            $this->sleep(1);
        }
    }
复制代码

上面分析过了 $connectionRedisQueue 对象,所有展开 RedisQueuepop 方法,获取要执行的任务对象。

public function pop($queue = null)
    {
        $this->migrate($prefixed = $this->getQueue($queue));

        if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
            return;
        }
        [$job, $reserved] = $nextJob;
        if ($reserved) {
            return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
        }
    }
复制代码

迁移延迟队列

pop 的过程中首先迁移延迟队列的相关数据

protected function migrate($queue)
    {
        // 这里是不是很熟悉了,上一章存储端分析的时候延迟
        // 队列就是用的这个key来存的
        
        // 将延迟的队列迁移到主队列
        $this->migrateExpiredJobs($queue.':delayed', $queue);
        
        // 将过期队列迁移到主队列
        if (! is_null($this->retryAfter)) {
            $this->migrateExpiredJobs($queue.':reserved', $queue);
        }
    }
复制代码

继续看如何迁移到主队列的

public function migrateExpiredJobs($from, $to)
    {
        return $this->getConnection()->eval(
            LuaScripts::migrateExpiredJobs(), 
            2,
            $from,
            $to,
            $this->currentTime()
        );
    }
    
public static function migrateExpiredJobs()
    {
        return <<<'LUA'
        if(next(val) ~= nil) then
            redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
            
            for i = 1, #val, 100 do
                redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
            end
        end
        
        return val
        LUA;
    }    
复制代码

最终通过 eval 命令使用 Lua 解释器执行脚本。 请看Redis Eval

真香,这仅仅是把延迟任务切回主队列,继续!

检索数据

从队列检索下一个 Job

if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
            return; // 没有数据就返回
        }
复制代码

展开检索代码

protected function retrieveNextJob($queue)
    {
        if (! is_null($this->blockFor)) { // 默认值是 null
            return $this->blockingPop($queue);
        }

        // 这段是直接通过  lua  从 redis lpop出对象,
        // 在lua中完成封装,执行逻辑和 blockingPop 相似
        return $this->getConnection()->eval(
            LuaScripts::pop(), 2, $queue, $queue.':reserved',
            $this->availableAt($this->retryAfter)
        );
    }
复制代码

我们主要看 blockingPop 的代码

protected function blockingPop($queue)
    {
        // 以阻塞的方式弹出队列的第一个元素
        $rawBody = $this->getConnection()->blpop($queue, $this->blockFor);
        
        // 解析获取的数据,同时再封装一个重试对象并写入有序集合。
        if (! empty($rawBody)) {
            $payload = json_decode($rawBody[1], true);

            $payload['attempts']++;

            $reserved = json_encode($payload);

            $this->getConnection()->zadd($queue.':reserved', [
                $reserved => $this->availableAt($this->retryAfter),
            ]);

            return [$rawBody[1], $reserved];
        }

        return [null, null];
    }
复制代码

检索完成之后回到 pop 中继续执行

public function pop($queue = null)
    {
        $this->migrate($prefixed = $this->getQueue($queue));

        if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
            return;
        }
        
        // 到这里了!
        [$job, $reserved] = $nextJob;
        if ($reserved) {
            return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
        }
    }
复制代码

我们来看看 $nextJob 是什么

【Laravel-海贼王系列】第十一章,Job&队列消费端实现

最后调用

return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
复制代码

看看 Illuminate\Queue\Jobs\RedisJob 的构造函数

public function __construct(Container $container, RedisQueue $redis, $job, $reserved, $connectionName, $queue)
    {
        $this->job = $job;
        $this->redis = $redis;
        $this->queue = $queue;
        $this->reserved = $reserved;
        $this->container = $container;
        $this->connectionName = $connectionName;

        $this->decoded = $this->payload();
    }
复制代码

这应该是最后一层封装,最后要返回给最外层的任务对象。

运行 Job

回到 Worker 对象中

...
 
 $job = $this->getNextJob(
                $this->manager->connection($connectionName), $queue
            );
 
 // 刚刚我们从 redis 中拿到了封装好的 $job 对象,继续执行
 
 // $job 就是 Illuminate\Queue\Jobs\RedisJob 对象
 
 // 是否支持 pcntl 拓展,异步模式传递信号
 if ($this->supportsAsyncSignals()) {
    // 设置超时信号处理
    $this->registerTimeoutHandler($job, $options);
 }
复制代码

继续注册超时信号控制

protected function registerTimeoutHandler($job, WorkerOptions $options)
    {
        pcntl_signal(SIGALRM, function () {
            $this->kill(1);
        });

        pcntl_alarm(
            max($this->timeoutForJob($job, $options), 0)
        );
    }
复制代码

总算要到运行 Job 的部分了

if ($job) {
        $this->runJob($job, $connectionName, $options);
    } else {
        // 不存在 $job 则睡眠,最低睡眠1秒
        $this->sleep($options->sleep);
    }
复制代码

解析 runJob

到这一步我们已经拿到了所有的对象,接下来就是把 对象用起来!

protected function runJob($job, $connectionName, WorkerOptions $options)
    {
        try {
            return $this->process($connectionName, $job, $options);
        } catch (Exception $e) {
            
            // 异常处理和上部分的一样,
            // 设定停止信号,在循环的结尾会检测信号
            // 因此我们不需要分析这段
            
            $this->exceptions->report($e);

            $this->stopWorkerIfLostConnection($e);
        } catch (Throwable $e) {
            $this->exceptions->report($e = new FatalThrowableError($e));

            $this->stopWorkerIfLostConnection($e);
        }
    }
复制代码

展开

$this->process($connectionName, $job, $options);
复制代码

继续展开

public function process($connectionName, $job, WorkerOptions $options)
    {
        try {
           
            // 触发任务执行前的绑定事件,从队列删除任务
            $this->raiseBeforeJobEvent($connectionName, $job);
          
            // 标记超过最大重试次数的任务
            $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
                $connectionName, $job, (int) $options->maxTries
            );

           
            $job->fire();

            // 触发任务执行后的绑定事件
            $this->raiseAfterJobEvent($connectionName, $job);
        } catch (Exception $e) {
            $this->handleJobException($connectionName, $job, $options, $e);
        } catch (Throwable $e) {
            $this->handleJobException(
                $connectionName, $job, $options, new FatalThrowableError($e)
            );
        }
    }
复制代码

$job->fire()

$job => Illuminate\Queue\Jobs\RedisJob 继承了 Illuminate\Queue\Jobs\Job 所以调用了抽象父类的 fire() 方法

public function fire()
    {
        $payload = $this->payload();

        [$class, $method] = JobName::parse($payload['job']);

        ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
    }
复制代码

我们看看 $payload 的结构实际就是 json_decode($job, true)

【Laravel-海贼王系列】第十一章,Job&队列消费端实现

转换后的 [$class, $method] 分别是 Illuminate\Queue\CallQueuedHandlercall

最后就是从容器中解析出 Illuminate\Queue\CallQueuedHandler 对象并且调用 call 方法,展开方法

public function call(Job $job, array $data)
    {
        try {
            $command = $this->setJobInstanceIfNecessary(
                $job, unserialize($data['command'])
            );
        } catch (ModelNotFoundException $e) {
            return $this->handleModelNotFound($job, $e);
        }

        $this->dispatcher->dispatchNow(
            $command, $this->resolveHandler($job, $command)
        ); 

        if (! $job->hasFailed() && ! $job->isReleased()) {
            $this->ensureNextJobInChainIsDispatched($command);
        }

        if (! $job->isDeletedOrReleased()) {
            $job->delete();
        }
    }
复制代码

先看看 $command 获取的是什么

protected function setJobInstanceIfNecessary(Job $job, $instance)
    {
        if (in_array(InteractsWithQueue::class, class_uses_recursive($instance))) {
            $instance->setJob($job);
        }

        return $instance;
    }
复制代码

打印 class_uses_recursive($instance)

【Laravel-海贼王系列】第十一章,Job&队列消费端实现

接着就调用了 $instance->setJob($job) ;

这里的 $instance 就是对应我们自己编写的任务对象。

执行完之后最终 $command 返回的就是自己编写的类

【Laravel-海贼王系列】第十一章,Job&队列消费端实现

RedisJob$command 传给 dispatchNow 方法 $this->dispatcherIlluminate\Bus\Dispatcher 对象

$this->dispatcher->dispatchNow(
            $command, $this->resolveHandler($job, $command)
        );
复制代码

最后的真像

public function dispatchNow($command, $handler = null)
    {
        if ($handler || $handler = $this->getCommandHandler($command)) {
            $callback = function ($command) use ($handler) {
                return $handler->handle($command); // 划重点,要考!
            };
        } else {
            $callback = function ($command) {
                return $this->container->call([$command, 'handle']);
            };
        }

        return $this->pipeline->send($command)->through($this->pipes)->then($callback);
    }
复制代码

其实费了那么大的力气,最后就是调用 $command->handle 回头看看 job 的定义

【Laravel-海贼王系列】第十一章,Job&队列消费端实现

就像烟火过后一样,消失于无形。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

创造突破性产品

创造突破性产品

Jonathan Cagan、Craig M.Vogel / 机械工业出版社 / 2004-1 / 35.00元

在《创造突破性产品:从产品策略到项目定案的创新》中作者总结多年的研究成果,指明了与产品创新相关的一系列因素,并提供了一套全新的开发突破性产品的理论与方法,该书旨在帮助企业,技术和设计人员: 获得对用户的需求和市场新的趋势的准确洞察力; 认识可以创造新市场的产品机会缺口; 指导产品模糊前期的构造; 正确地运用定性和定量的研究方法; ......一起来看看 《创造突破性产品》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

在线进制转换器
在线进制转换器

各进制数互转换器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换