Mix PHP V2 实例:AliCloud 短信协程池异步发送守护程序

栏目: PHP · 发布时间: 5年前

内容简介:前些时间我们发布了本范例依然使用消息队列的方式接收短信发送任务,消息中间件使用:

前些时间我们发布了 Mix PHP V2 实例:协程池异步邮件发送守护程序 范例,这一次我们提供一个使用大厂 SDK 通过 Swoole Hook 协程化来并行执行短信发生任务,本文是一个代码简单、IO性能极其强大的范例。

请先升级到 mix-framework >= v2.0.5

本范例依然使用消息队列的方式接收短信发送任务,消息中间件使用:

  • redis

生产者

通常框架中使用 Redis 会安装一个类库来使用,本例使用原生代码,便于理解。

// 连接
$redis = new \Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
    throw new \Exception('Redis connect failed.');
}
$redis->auth('');
$redis->select(0);
// 投递任务
for($i = 0; $i < 3; $i++){
    $data = [
        'phone'         => '***',
        'templateCode'  => 'SMS_***',
        'templateParam' => ['code' => 123456],
    ];
    $redis->lpush('queue:sms', serialize($data));
}

消费者

使用的是 ali 云的短信服务, 查看官方 PHP SDK 文档 ,使用的库为:

composer require alibabacloud/client

通过查看该库的 composer 依赖文件 ,我们得知该库基于 guzzlehttp 开发,因为 Mix PHP 提供了无需修改代码就可 Hook Guzzle 库可在协程中使用的工具 Mix PHP V2 生态:让 Guzzle 支持 Swoole 的 Hook 协程 ,所以能基本确定该库可在 Swoole 协程中使用。

首先我们安装 https://github.com/mix-php/guzzle-hookalibabacloud/client 可在协程中使用:

composer require mix/guzzle-hook

然后在项目的 composer.json 文件中增加 extra 配置项,如下:

"extra": {
    "include_files": [
      "vendor/mix/guzzle-hook/src/functions_include.php"
    ]
}

更新自动加载:

composer dump-autoload

下面我们采用 Mix PHP V2 的守护程序、协程池来完成一个超高性能的短信发送程序。

首先我们在配置 applications/console/config/main.php 中注册一个命令:

// 命令
'commands'         => [

        'smser' => [
            'Smser',
            'description' => "SMS send daemon demo.",
            'options'     => [
                [['d', 'daemon'], 'description' => 'Run in the background'],
            ],
        ],

],

注册的命令中指定的 Smser 命令类,接下来我们编写一个 SmserCommand 类:

applications/console/src/Commands/SmserCommand.php
<?php

namespace Console\Commands;

use Console\Libraries\SmserWorker;
use Mix\Concurrent\CoroutinePool\Dispatcher;
use Mix\Console\CommandLine\Flag;
use Mix\Core\Coroutine;
use Mix\Core\Coroutine\Channel;
use Mix\Core\Event;
use Mix\Helper\ProcessHelper;
use AlibabaCloud\Client\AlibabaCloud;

/**
 * Class SmserCommand
 * @package Daemon\Commands
 * @author liu,jian <coder.keda@gmail.com>
 */
class SmserCommand
{

    const ACCESS_KEY = '***';
    const ACCESS_SECRET = '***';

    /**
     * 退出
     * @var bool
     */
    public $quit = false;

    /**
     * 主函数
     */
    public function main()
    {
        // 守护处理
        $daemon = Flag::bool(['d', 'daemon'], false);
        if ($daemon) {
            ProcessHelper::daemon();
        }
        // 捕获信号
        ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], function ($signal) {
            $this->quit = true;
            ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], null);
        });
        // 设置ali云全局参数
        AlibabaCloud::accessKeyClient(static::ACCESS_KEY, static::ACCESS_SECRET)->regionId('cn-hangzhou')->asDefaultClient();
        // 手动关闭Swoole文件Hook,因为ali云依赖的uuid库有文件hook协程兼容问题,Swoole 4.4已经适配该问题
        Coroutine::enableHook(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_FILE);
        // 协程池执行任务
        xgo(function () {
            $maxWorkers = 20;
            $maxQueue   = 20;
            $jobQueue   = new Channel($maxQueue);
            $dispatch   = new Dispatcher([
                'jobQueue'   => $jobQueue,
                'maxWorkers' => $maxWorkers,
            ]);
            $dispatch->start(SmserWorker::class);
            // 投放任务
            $redis = app()->redisPool->getConnection();
            while (true) {
                if ($this->quit) {
                    $dispatch->stop();
                    return;
                }
                try {
                    $data = $redis->brPop(['queue:sms'], 3);
                } catch (\Throwable $e) {
                    $dispatch->stop();
                    return;
                }
                if (!$data) {
                    continue;
                }
                $data = array_pop($data); // brPop命令最后一个键才是值
                $jobQueue->push($data);
            }
        });
        // 等待事件
        Event::wait();
    }

}

$data = $redis->brPop(['queue:sms'], 3); 外部的异常捕获可得知,当 Redis 连接出错时,比如 Redis 重启、连接异常时协程池会安全退出,也就是说当进程异常退出后用户需使用 supervisorpm2工具 重启守护进程。

上面是一个 Mix PHP 协程池的使用代码,基本可以直接复制使用,框架默认包含了协程池的 Demo,本次实例只是修改了协程池的 Worker,本命令主要是完成从 Redis 队列中获取消息然后 push 到 jobQueue 中,jobQueue 中的数据会被 20 个 Worker 实例中某一个抢占后并行执行,本例的发送代码逻辑就在 SmserWorker 类中:

applications/console/src/Libraries/SmserWorker.php
<?php

namespace Console\Libraries;

use Mix\Concurrent\CoroutinePool\AbstractWorker;
use Mix\Concurrent\CoroutinePool\WorkerInterface;

/**
 * Class SmserWorker
 * @package Daemon\Libraries
 * @author liu,jian <coder.keda@gmail.com>
 */
class SmserWorker extends AbstractWorker implements WorkerInterface
{

    /**
     * 邮件发送器
     * @var Smser
     */
    public $smser;

    /**
     * 初始化事件
     */
    public function onInitialize()
    {
        parent::onInitialize(); // TODO: Change the autogenerated stub
        // 实例化一些需重用的对象
        $this->smser = new Smser();
    }

    /**
     * 处理
     * @param $data
     */
    public function handle($data)
    {
        // TODO: Implement handle() method.
        $data = unserialize($data);
        if (empty($data)) {
            return;
        }
        try {
            $result = $this->smser->send($data['phone'], $data['templateCode'], $data['templateParam']);
            app()->log->info("SMS sent successfully:phone {phone} templateCode {templateCode} result {result}", array_merge($data, ['result' => json_encode($result, JSON_UNESCAPED_UNICODE)]));
        } catch (\Throwable $e) {
            app()->log->error("SMS failed to send:phone {phone} templateCode {templateCode} error {error}", array_merge($data, ['error' => $e->getMessage()]));
        }
    }

}

由以上代码可见,Worker 在初始化时,新增了一个 Smser 类的属性,当 jobQueue 消息投递过来时消息会传递到 handle 方法,在该方法中使用 Mailer 类的实例完成邮件发送任务,所以我们要编写了一个 Smser 发送程序:

applications/console/src/Libraries/Smser.php
<?php

namespace Console\Libraries;

use AlibabaCloud\Client\AlibabaCloud;
use AlibabaCloud\Client\Exception\ClientException;
use AlibabaCloud\Client\Exception\ServerException;
use Mix\Core\Coroutine;

/**
 * Class Smser
 * @package Console\Libraries
 * @author liu,jian <coder.keda@gmail.com>
 */
class Smser
{

    /**
     * 配置信息
     */
    const SIGN_NAME = '***';

    /**
     * Smser constructor.
     */
    public function __construct()
    {
        // 开启协程钩子
        Coroutine::enableHook();
    }

    /**
     * 发送
     * @param $phone
     * @param $templateCode
     * @param $templateParam
     * @return array
     * @throws ClientException
     * @throws ServerException
     */
    public function send($phone, $templateCode, $templateParam)
    {
        $result = AlibabaCloud::rpc()
            ->product('Dysmsapi')
            // ->scheme('https') // https | http
            ->version('2017-05-25')
            ->action('SendSms')
            ->method('POST')
            ->options([
                'query' => [
                    'PhoneNumbers'  => $phone,
                    'SignName'      => static::SIGN_NAME,
                    'TemplateCode'  => $templateCode,
                    'TemplateParam' => json_encode($templateParam),
                ],
            ])
            ->request();
        return $result->toArray();
    }

}

以上就完成了全部的代码逻辑,现在我们开始测试,先启动消费者守护程序:

[root@localhost bin]# ./mix-console smser

将上文的生产者脚本命名为 push.php 然后在 CLI 中执行 (开一个新终端):

[root@localhost bin]# php /tmp/push.php

消费者守护程序结果:

[root@localhost bin]# ./mix-console smser
[info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"OK","RequestId":"4071D031-6D9E-4F70-9269-6C1979080858","BizId":"939807358670612546^0","Code":"OK"}
[info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"触发分钟级流控Permits:1","RequestId":"490B73D7-317E-4362-B2DD-5E2153A7B891","Code":"isv.BUSINESS_LIMIT_CONTROL"}
[info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"触发分钟级流控Permits:1","RequestId":"1FD22EDB-BAA4-4416-8FF9-242EDCF34359","Code":"isv.BUSINESS_LIMIT_CONTROL"}

命令行终端打印了发送成功的日志,发送完成。


以上所述就是小编给大家介绍的《Mix PHP V2 实例:AliCloud 短信协程池异步发送守护程序》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

复盘+:把经验转化为能力(第2版)

复盘+:把经验转化为能力(第2版)

邱昭良 / 机械工业出版社 / 39.00

随着环境日趋多变、不确定、复杂、模糊,无论是个人还是组织,都需要更快更有效地进行创新应变、提升能力。复盘作为一种从经验中学习的结构化方法,满足了快速学习的需求,也是有效进行知识萃取与共享的机制。在第1版基础上,《复盘+:把经验转化为能力》(第2版)做了六方面修订: ·提炼复盘的关键词,让大家更精准地理解复盘的精髓; ·基于实际操作经验,梳理、明确了复盘的"底层逻辑"; ·明确了复......一起来看看 《复盘+:把经验转化为能力(第2版)》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

SHA 加密
SHA 加密

SHA 加密工具