laravel异步任务调用

栏目: 编程语言 · PHP · 发布时间: 5年前

内容简介:队列允许你将一个耗时的任务进行延迟处理,让应用程序对页面的请求有更快的响应。版本:laravel版本 5.4,队列使用Redis

队列

队列允许你将一个耗时的任务进行延迟处理,让应用程序对页面的请求有更快的响应。

laravel异步队列调用

laravel 队列文档

版本:laravel版本 5.4,队列使用Redis

配置文件config/queue.php

队列执行过程

php artisan queue:work

vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php

protected function runWorker($connection, $queue)
{
    $this->worker->setCache($this->laravel['cache']->driver());

    return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
}

daemon的方式

vendor/laravel/framework/src/Illuminate/Queue/Worker.php

public function daemon($connectionName, $queue, WorkerOptions $options)
{
    $this->listenForSignals();

    $lastRestart = $this->getTimestampOfLastQueueRestart();

    while (true) {
        // Before reserving any jobs, we will make sure this queue is not paused and
        // if it is we will just pause this worker for a given amount of time and
        // make sure we do not need to kill this worker process off completely.
        if (! $this->daemonShouldRun($options)) {
            $this->pauseWorker($options, $lastRestart);

            continue;
        }

        // First, we will attempt to get the next job off of the queue. We will also
        // register the timeout handler and reset the alarm for this job so it is
        // not stuck in a frozen state forever. Then, we can fire off this job.
        $job = $this->getNextJob(
            $this->manager->connection($connectionName), $queue
        );

        $this->registerTimeoutHandler($job, $options);

        // If the daemon should run (not in maintenance mode, etc.), then we can run
        // fire off this job for processing. Otherwise, we will need to sleep the
        // worker so no more jobs are processed until they should be processed.
        if ($job) {
            $this->runJob($job, $connectionName, $options);
        } else {
            $this->sleep($options->sleep);
        }

        // Finally, we will check to see if we have exceeded our memory limits or if
        // the queue should restart based on other indications. If so, we'll stop
        // this worker and let whatever is "monitoring" it restart the process.
        $this->stopIfNecessary($options, $lastRestart);
    }
}

getNextJob,从 redis 取出队列任务,,获取任务后runJob,没有任务就sleep

protected function runJob($job, $connectionName, WorkerOptions $options)
{
    try {
        return $this->process($connectionName, $job, $options);
    } catch (Exception $e) {
        $this->exceptions->report($e);

        $this->stopWorkerIfLostConnection($e);
    } catch (Throwable $e) {
        $this->exceptions->report($e = new FatalThrowableError($e));

        $this->stopWorkerIfLostConnection($e);
    }
}
public function process($connectionName, $job, WorkerOptions $options)
{
    try {
        // First we will raise the before job event and determine if the job has already ran
        // over the its maximum attempt limit, which could primarily happen if the job is
        // continually timing out and not actually throwing any exceptions from itself.
        $this->raiseBeforeJobEvent($connectionName, $job);

        $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
            $connectionName, $job, (int) $options->maxTries
        );

        // Here we will fire off the job and let it process. We will catch any exceptions so
        // they can be reported to the developers logs, etc. Once the job is finished the
        // proper events will be fired to let any listeners know this job has finished.
        $job->fire();

        $this->raiseAfterJobEvent($connectionName, $job);
    } catch (Exception $e) {
        $this->handleJobException($connectionName, $job, $options, $e);
    } catch (Throwable $e) {
        $this->handleJobException(
            $connectionName, $job, $options, new FatalThrowableError($e)
        );
    }
}

vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php

public function fire()
{
    $payload = $this->payload();

    list($class, $method) = JobName::parse($payload['job']);

    with($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
}

获取任务的类和方法

vendor/laravel/framework/src/Illuminate/Queue/Jobs/JobName.php

protected function resolve($class)
{
    return $this->container->make($class);
}

vendor/laravel/framework/src/Illuminate/Container/Container.php

public function make($abstract)
{
    return $this->resolve($abstract);
}
protected function resolve($abstract, $parameters = [])
{
    $abstract = $this->getAlias($abstract);

    $needsContextualBuild = ! empty($parameters) || ! is_null(
        $this->getContextualConcrete($abstract)
    );

    // If an instance of the type is currently being managed as a singleton we'll
    // just return an existing instance instead of instantiating new instances
    // so the developer can keep using the same objects instance every time.
    if (isset($this->instances[$abstract]) && ! $needsContextualBuild) {
        return $this->instances[$abstract];
    }

    $this->with[] = $parameters;

    $concrete = $this->getConcrete($abstract);

    // We're ready to instantiate an instance of the concrete type registered for
    // the binding. This will instantiate the types, as well as resolve any of
    // its "nested" dependencies recursively until all have gotten resolved.
    if ($this->isBuildable($concrete, $abstract)) {
        $object = $this->build($concrete);
    } else {
        $object = $this->make($concrete);
    }

    // If we defined any extenders for this type, we'll need to spin through them
    // and apply them to the object being built. This allows for the extension
    // of services, such as changing configuration or decorating the object.
    foreach ($this->getExtenders($abstract) as $extender) {
        $object = $extender($object, $this);
    }

    // If the requested type is registered as a singleton we'll want to cache off
    // the instances in "memory" so we can return it later without creating an
    // entirely new instance of an object on each subsequent request for it.
    if ($this->isShared($abstract) && ! $needsContextualBuild) {
        $this->instances[$abstract] = $object;
    }

    $this->fireResolvingCallbacks($abstract, $object);

    // Before returning, we will also set the resolved flag to "true" and pop off
    // the parameter overrides for this build. After those two things are done
    // we will be ready to return back the fully constructed class instance.
    $this->resolved[$abstract] = true;

    array_pop($this->with);

    return $object;
}

跟踪worker队列执行

mac使用dtruss,linux使用strace,跟踪程序执行时进程系统调用(system call)和所接收的信号

sudo dtruss php artisan queue:work redis --queue=default --sleep=3 --timeout=180
strace php artisan queue:work redis --queue=default --sleep=3 --timeout=180
Processing: App\Jobs\CacheJob
../laravel/framework/src/Illuminate/Queue/Jobs/RedisJob.php
../laravel/framework/src/Illuminate/Queue/Jobs/Job.php
../laravel/framework/src/Illuminate/Contracts/Queue/Job.php
../laravel/framework/src/Illuminate/Queue/Events/JobProcessing.php
../laravel/framework/src/Illuminate/Queue/Jobs/JobName.php
../laravel/framework/src/Illuminate/Queue/CallQueuedHandler.php
../laravel/framework/src/Illuminate/Contracts/Bus/Dispatcher.php
../laravel/framework/src/Illuminate/Bus/Dispatcher.php
../laravel/framework/src/Illuminate/Contracts/Bus/QueueingDispatcher.php
../laravel/framework/src/Illuminate/Pipeline/Pipeline.php
../laravel/framework/src/Illuminate/Contracts/Pipeline/Pipeline.php
../../app/Jobs/CacheJob.php

执行一个队列任务后可以看到调用的文件,内容较多,只截取一部分

redis 查询

查询队列任务数

  • queue:default // 存储未处理任务
  • queue:default:delayed // 存储延迟任务
  • queue:default:reserved // 存储待处理任务
redis-cli -h 127.0.0.1 -p 6379 -a `awk '/^requirepass/ {print $2}' /etc/redis.conf` -n 1 llen queues:default

参考资料


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Analytics 2.0

Web Analytics 2.0

Avinash Kaushik / Sybex / 2009-10-26 / USD 39.99

The bestselling book Web Analytics: An Hour A Day was the first book in the analytics space to move beyond clickstream analysis. Web Analytics 2.0 will significantly evolve the approaches from the fir......一起来看看 《Web Analytics 2.0》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

URL 编码/解码
URL 编码/解码

URL 编码/解码

MD5 加密
MD5 加密

MD5 加密工具