内容简介:队列允许你将一个耗时的任务进行延迟处理,让应用程序对页面的请求有更快的响应。版本:laravel版本 5.4,队列使用Redis
队列
队列允许你将一个耗时的任务进行延迟处理,让应用程序对页面的请求有更快的响应。
laravel异步队列调用
版本:laravel版本 5.4,队列使用Redis
配置文件config/queue.php
队列执行过程
php artisan queue:work
vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php
protected function runWorker($connection, $queue) { $this->worker->setCache($this->laravel['cache']->driver()); return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}( $connection, $queue, $this->gatherWorkerOptions() ); }
daemon的方式
vendor/laravel/framework/src/Illuminate/Queue/Worker.php
public function daemon($connectionName, $queue, WorkerOptions $options) { $this->listenForSignals(); $lastRestart = $this->getTimestampOfLastQueueRestart(); while (true) { // Before reserving any jobs, we will make sure this queue is not paused and // if it is we will just pause this worker for a given amount of time and // make sure we do not need to kill this worker process off completely. if (! $this->daemonShouldRun($options)) { $this->pauseWorker($options, $lastRestart); continue; } // First, we will attempt to get the next job off of the queue. We will also // register the timeout handler and reset the alarm for this job so it is // not stuck in a frozen state forever. Then, we can fire off this job. $job = $this->getNextJob( $this->manager->connection($connectionName), $queue ); $this->registerTimeoutHandler($job, $options); // If the daemon should run (not in maintenance mode, etc.), then we can run // fire off this job for processing. Otherwise, we will need to sleep the // worker so no more jobs are processed until they should be processed. if ($job) { $this->runJob($job, $connectionName, $options); } else { $this->sleep($options->sleep); } // Finally, we will check to see if we have exceeded our memory limits or if // the queue should restart based on other indications. If so, we'll stop // this worker and let whatever is "monitoring" it restart the process. $this->stopIfNecessary($options, $lastRestart); } }
getNextJob,从 redis 取出队列任务,,获取任务后runJob,没有任务就sleep
protected function runJob($job, $connectionName, WorkerOptions $options) { try { return $this->process($connectionName, $job, $options); } catch (Exception $e) { $this->exceptions->report($e); $this->stopWorkerIfLostConnection($e); } catch (Throwable $e) { $this->exceptions->report($e = new FatalThrowableError($e)); $this->stopWorkerIfLostConnection($e); } }
public function process($connectionName, $job, WorkerOptions $options) { try { // First we will raise the before job event and determine if the job has already ran // over the its maximum attempt limit, which could primarily happen if the job is // continually timing out and not actually throwing any exceptions from itself. $this->raiseBeforeJobEvent($connectionName, $job); $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( $connectionName, $job, (int) $options->maxTries ); // Here we will fire off the job and let it process. We will catch any exceptions so // they can be reported to the developers logs, etc. Once the job is finished the // proper events will be fired to let any listeners know this job has finished. $job->fire(); $this->raiseAfterJobEvent($connectionName, $job); } catch (Exception $e) { $this->handleJobException($connectionName, $job, $options, $e); } catch (Throwable $e) { $this->handleJobException( $connectionName, $job, $options, new FatalThrowableError($e) ); } }
vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php
public function fire() { $payload = $this->payload(); list($class, $method) = JobName::parse($payload['job']); with($this->instance = $this->resolve($class))->{$method}($this, $payload['data']); }
获取任务的类和方法
vendor/laravel/framework/src/Illuminate/Queue/Jobs/JobName.php
protected function resolve($class) { return $this->container->make($class); }
vendor/laravel/framework/src/Illuminate/Container/Container.php
public function make($abstract) { return $this->resolve($abstract); }
protected function resolve($abstract, $parameters = []) { $abstract = $this->getAlias($abstract); $needsContextualBuild = ! empty($parameters) || ! is_null( $this->getContextualConcrete($abstract) ); // If an instance of the type is currently being managed as a singleton we'll // just return an existing instance instead of instantiating new instances // so the developer can keep using the same objects instance every time. if (isset($this->instances[$abstract]) && ! $needsContextualBuild) { return $this->instances[$abstract]; } $this->with[] = $parameters; $concrete = $this->getConcrete($abstract); // We're ready to instantiate an instance of the concrete type registered for // the binding. This will instantiate the types, as well as resolve any of // its "nested" dependencies recursively until all have gotten resolved. if ($this->isBuildable($concrete, $abstract)) { $object = $this->build($concrete); } else { $object = $this->make($concrete); } // If we defined any extenders for this type, we'll need to spin through them // and apply them to the object being built. This allows for the extension // of services, such as changing configuration or decorating the object. foreach ($this->getExtenders($abstract) as $extender) { $object = $extender($object, $this); } // If the requested type is registered as a singleton we'll want to cache off // the instances in "memory" so we can return it later without creating an // entirely new instance of an object on each subsequent request for it. if ($this->isShared($abstract) && ! $needsContextualBuild) { $this->instances[$abstract] = $object; } $this->fireResolvingCallbacks($abstract, $object); // Before returning, we will also set the resolved flag to "true" and pop off // the parameter overrides for this build. After those two things are done // we will be ready to return back the fully constructed class instance. $this->resolved[$abstract] = true; array_pop($this->with); return $object; }
跟踪worker队列执行
mac使用dtruss,linux使用strace,跟踪程序执行时进程系统调用(system call)和所接收的信号
sudo dtruss php artisan queue:work redis --queue=default --sleep=3 --timeout=180 strace php artisan queue:work redis --queue=default --sleep=3 --timeout=180
Processing: App\Jobs\CacheJob ../laravel/framework/src/Illuminate/Queue/Jobs/RedisJob.php ../laravel/framework/src/Illuminate/Queue/Jobs/Job.php ../laravel/framework/src/Illuminate/Contracts/Queue/Job.php ../laravel/framework/src/Illuminate/Queue/Events/JobProcessing.php ../laravel/framework/src/Illuminate/Queue/Jobs/JobName.php ../laravel/framework/src/Illuminate/Queue/CallQueuedHandler.php ../laravel/framework/src/Illuminate/Contracts/Bus/Dispatcher.php ../laravel/framework/src/Illuminate/Bus/Dispatcher.php ../laravel/framework/src/Illuminate/Contracts/Bus/QueueingDispatcher.php ../laravel/framework/src/Illuminate/Pipeline/Pipeline.php ../laravel/framework/src/Illuminate/Contracts/Pipeline/Pipeline.php ../../app/Jobs/CacheJob.php
执行一个队列任务后可以看到调用的文件,内容较多,只截取一部分
redis 查询
查询队列任务数
- queue:default // 存储未处理任务
- queue:default:delayed // 存储延迟任务
- queue:default:reserved // 存储待处理任务
redis-cli -h 127.0.0.1 -p 6379 -a `awk '/^requirepass/ {print $2}' /etc/redis.conf` -n 1 llen queues:default
参考资料
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- SpringBoot | :异步开发之异步调用
- 改进异步封装:处理带返回值的异步调用
- Spring Boot 异步调用
- 异步调用,真的没有那么可怕!
- SpringBoot学习笔记(十七:异步调用)
- angular 用Observable实现异步调用
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。