Laravel 中使用 swoole 项目实战开发案例二 (后端主动分场景给界面推送消息)

栏目: PHP · 发布时间: 5年前

内容简介:最近使用 swoole 做了项目,里面设计推送信息给界面前端,和无登陆用户的状态监控,以下是本人从中获取的一点心得,有改进的地方请留言评论。我们假设有一个需求,我在后端点击按钮1,首页弹出“后端触发了按钮1”。后端点了按钮2,列表页弹出“后端触发了按钮2”。做到根据不同场景推送到不同页面。客户端浏览器打开或者刷新界面,在swoole服务会生成一个进程句柄

最近使用 swoole 做了项目,里面设计推送信息给界面前端,和无登陆用户的状态监控,以下是本人从中获取的一点心得,有改进的地方请留言评论。

需求分析

我们假设有一个需求,我在后端点击按钮1,首页弹出“后端触发了按钮1”。后端点了按钮2,列表页弹出“后端触发了按钮2”。做到根据不同场景推送到不同页面。

代码思路

  • Swoole fd

客户端浏览器打开或者刷新界面,在swoole服务会生成一个进程句柄 fd ,每次浏览器页面有打开链接websocket的js代码,便会生成,每次刷新的时候,会关闭之前打开的 fd ,重新生成一个新的,关闭界面的时候会生成一个新的。swoole的 fd 生成规则是从1开始递增。

  • Redis Hash存储 fd

我们建立一个key为 swoole:fds redis哈希类型数据, fd 为hash的字段,每个字段的值我们存储前端websocket请求的url参数信息(根据业务复杂度自己灵活变通,我在项目中会在url带上sessionId)。每次链接打开swoole服务的时候我们存储其信息,每次关闭页面时候我们清除其字段。在 redis 存储如下

Laravel 中使用 swoole 项目实战开发案例二 (后端主动分场景给界面推送消息)

  • 触发分场景推送

    在界面上当进行了触发操作的时候,通过后台curl请求swoole http服务,swoole http服务根据你向我传递的参数分发给对应的逻辑处理。如curl请求 127.0.0.1:9502page=back&func=pushHomeLogic&token=123456 我们可以根据传入的 func 参数,在后台分发给对应逻辑处理。如分发给 pushHomeLogic 方法。在其里面实现自己的逻辑。为防止过多的 if else 以及 foreach 操作 ,我们采用的是闭包, call_user_func 等方法实现如下

    public function onRequest($request,$response)
       {
           if ($this->checkAccess("", $request)) {
               $param = $request->get;
               // 分发处理请求逻辑
               if (isset($param['func'])) {
                   if (method_exists($this,$param['func'])) {
                       call_user_func([$this,$param['func']],$request);
                   }
               }
           }
       }// 往首页推送逻辑处理
       public function pushHomeLogic($request)
       {
           $callback = function (array $aContent,int $fd,SwooleDemo $oSwoole)use($request) {
               if ($aContent && $aContent['page'] == "home") {
                   $aRes['message'] = "后端按了按钮1";
                   $aRes['code'] = "200";
                   $oSwoole::$server->push($fd,xss_json($aRes));
               }
           };
           $this->eachFdLogic($callback);
       }

完整代码

swool脚本代码逻辑

<?php

namespace App\Console\Commands;

use Closure;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;

class SwooleDemo extends Command
{
    // 命令名称
    protected $signature = 'swoole:demo';
    // 命令说明
    protected $description = '这是关于swoole websocket的一个测试demo';
    // swoole websocket服务
    private static $server = null;

    public function __construct()
    {
        parent::__construct();
    }

    // 入口
    public function handle()
    {
        $this->redis = Redis::connection('websocket');
        $server = self::getWebSocketServer();
        $server->on('open',[$this,'onOpen']);
        $server->on('message', [$this, 'onMessage']);
        $server->on('close', [$this, 'onClose']);
        $server->on('request', [$this, 'onRequest']);
        $this->line("swoole服务启动成功 ...");
        $server->start();
    }

    // 获取服务
    public static function getWebSocketServer()
    {
        if (!(self::$server instanceof \swoole_websocket_server)) {
            self::setWebSocketServer();
        }
        return self::$server;
    }
    // 服务处始设置
    protected static function setWebSocketServer():void
    {
        self::$server  = new \swoole_websocket_server("0.0.0.0", 9502);
        self::$server->set([
            'worker_num' => 1,
            'heartbeat_check_interval' => 60,    // 60秒检测一次
            'heartbeat_idle_time' => 121,        // 121秒没活动的
        ]);
    }

    // 打开swoole websocket服务回调代码
    public function onOpen($server, $request)
    {
        if ($this->checkAccess($server, $request)) {
            self::$server->push($request->fd,xss_json(["code"=>200,"message"=>"打开swoole服务成功"]));
        }
    }
    // 给swoole websocket 发送消息回调代码
    public function onMessage($server, $frame)
    {

    }
    // http请求swoole websocket 回调代码
    public function onRequest($request,$response)
    {
        if ($this->checkAccess("", $request)) {
            $param = $request->get;
            // 分发处理请求逻辑
            if (isset($param['func'])) {
                if (method_exists($this,$param['func'])) {
                    call_user_func([$this,$param['func']],$request);
                }
            }
        }
    }

    // websocket 关闭回调代码
    public function onClose($serv,$fd)
    {
        $this->redis->hdel('swoole:fds', $fd);
        $this->line("客户端 {$fd} 关闭");
    }


    // 校验客户端连接的合法性,无效的连接不允许连接
    public function checkAccess($server, $request):bool
    {
        $bRes = true;
        if (!isset($request->get) || !isset($request->get['token'])) {
            self::$server->close($request->fd);
            $this->line("接口验证字段不全");
            $bRes = false;
        } else if ($request->get['token'] != 123456) {
            $this->line("接口验证错误");
            $bRes = false;
        }
        $this->storeUrlParamToRedis($request);
        return $bRes;
    }

    // 将每个界面打开websocket的url 存储起来
    public function storeUrlParamToRedis($request):void
    {
        // 存储请求url带的信息
        $sContent = json_encode(
            [
                'page' => $request->get['page'],
                'fd' => $request->fd,
            ], true);
        $this->redis->hset("swoole:fds", $request->fd, $sContent);
    }

    /**
     * @param $request
     * @see 循环逻辑处理
     */
    public function eachFdLogic(Closure $callback = null)
    {
        foreach (self::$server->connections as $fd) {
            if (self::$server->isEstablished($fd)) {
                $aContent = json_decode($this->redis->hget("swoole:fds",$fd),true);
                $callback($aContent,$fd,$this);
            } else {
                $this->redis->hdel("swoole:fds",$fd);
            }
        }
    }
    // 往首页推送逻辑处理
    public function pushHomeLogic($request)
    {
        $callback = function (array $aContent,int $fd,SwooleDemo $oSwoole)use($request) {
            if ($aContent && $aContent['page'] == "home") {
                $aRes['message'] = "后端按了按钮1";
                $aRes['code'] = "200";
                $oSwoole::$server->push($fd,xss_json($aRes));
            }
        };
        $this->eachFdLogic($callback);
    }
    // 往列表页推送逻辑处理
    public function pushListLogic($request)
    {
        $callback = function (array $aContent,int $fd,SwooleDemo $oSwoole)use($request) {
            if ($aContent && $aContent['page'] == "list") {
                $aRes['message'] = "后端按了按钮2";
                $aRes['code'] = "200";
                $oSwoole::$server->push($fd,xss_json($aRes));
            }
        };
        $this->eachFdLogic($callback);
    }

    // 启动websocket服务
    public function start()
    {
        self::$server->start();
    }
}

控制器代码

<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use Illuminate\Support\Facades\Redis;
class TestController extends Controller
{
    // 首页
    public function home()
    {
        return view("home");
    }
    // 列表
    public function list()
    {
        return view("list");
    }
    // 后端控制
    public function back()
    {
        if (request()->method() == 'POST') {
           $this->curl_get($this->getUrl());
           return json_encode(['code'=>200,"message"=>"成功"]);
        } else {
            return view("back");
        }

    }
    // 获取要请求swoole websocet服务地址
    public function getUrl():string
    {
        // 域名 端口 请求swoole服务的方法
        $sBase = request()->server('HTTP_HOST');
        $iPort = 9502;
        $sFunc = request()->post('func');
        $sPage = "back";
        return $sBase.":".$iPort."?func=".$sFunc."&token=123456&page=".$sPage;
    }
    // curl 推送
    public function curl_get(string $url):string
    {
        $ch_curl = curl_init();
        curl_setopt ($ch_curl, CURLOPT_TIMEOUT_MS, 3000);
        curl_setopt($ch_curl, CURLOPT_SSL_VERIFYPEER, 0);
        curl_setopt ($ch_curl, CURLOPT_HEADER,false);
        curl_setopt($ch_curl, CURLOPT_HTTPGET, 1);
        curl_setopt($ch_curl, CURLOPT_RETURNTRANSFER,true);
        curl_setopt ($ch_curl, CURLOPT_URL,$url);
        $str  = curl_exec($ch_curl);
        curl_close($ch_curl);
        return $str;
    }
}

页面js代码

  • 后端控制页

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>后端界面</title>
    <meta name=viewport content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no">
</head>
<body>
<button class="push" data-func="pushHomeLogic">按钮1</button>
<button class="push" data-func="pushListLogic">按钮2</button>
</body>
<script src="{{ asset("/vendor/tw/global/jQuery/jquery-2.2.3.min.js")}} "></script>
<script>
    $(function () {
        $(".push").on('click',function(){
            var func = $(this).attr('data-func').trim();
            ajaxGet(func)
        })
        function ajaxGet(func) {
            url = "{{route('back')}}";
            token = "{{csrf_token()}}";
            $.ajax({
                url: url,
                type: 'post',
                dataType: "json",
                data:{func:func,_token:token},
                error: function (data) {
                    alert("服务器繁忙, 请联系管理员!");
                    return;
                },
                success: function (result) {

                },
            })
        }

    })
</script>
</html>
  • 首页

<!DOCTYPE html>

<html lang="en">

<head>

<meta charset="UTF-8">
<title>swoole首页</title>
<meta name=viewport content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no">

</head>

<body>

<h1>这是首页</h1>

</body>

<script>

var ws;//websocket实例
var lockReconnect = false;//避免重复连接
var wsUrl = 'ws://{{$_SERVER["HTTP_HOST"]}}:9502?page=home&token=123456';

function initEventHandle() {
    ws.onclose = function () {
        reconnect(wsUrl);
    };
    ws.onerror = function () {
        reconnect(wsUrl);
    };
    ws.onopen = function () {
        //心跳检测重置
        heartCheck.reset().start();
    };
    ws.onmessage = function (event) {
        //如果获取到消息,心跳检测重置
        //拿到任何消息都说明当前连接是正常的
        var data = JSON.parse(event.data);
        if (data.code == 200) {
            console.log(data.message)
        }
        heartCheck.reset().start();
    }
}
createWebSocket(wsUrl);
/**
 * 创建链接
 * @param url
 */
function createWebSocket(url) {
    try {
        ws = new WebSocket(url);
        initEventHandle();
    } catch (e) {
        reconnect(url);
    }
}
function reconnect(url) {
    if(lockReconnect) return;
    lockReconnect = true;
    //没连接上会一直重连,设置延迟避免请求过多
    setTimeout(function () {
        createWebSocket(url);
        lockReconnect = false;
    }, 2000);
}
//心跳检测
var heartCheck = {
    timeout: 60000,//60秒
    timeoutObj: null,
    serverTimeoutObj: null,
    reset: function(){
        clearTimeout(this.timeoutObj);
        clearTimeout(this.serverTimeoutObj);
        return this;
    },
    start: function(){
        var self = this;
        this.timeoutObj = setTimeout(function(){
            //这里发送一个心跳,后端收到后,返回一个心跳消息,
            //onmessage拿到返回的心跳就说明连接正常
            ws.send("heartbeat");
            self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了
                ws.close();//如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
            }, self.timeout);
        }, this.timeout);
    },
    header:function(url) {
        window.location.href=url
    }

}

</script>

</html>

* ####    列表页面

<!DOCTYPE html>

<html lang="en">

<head>

<meta charset="UTF-8">
<title>swoole列表页</title>
<meta name=viewport content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no">

</head>

<body>

<h1>swoole列表页</h1>

</body>

<script>

var ws;//websocket实例
var lockReconnect = false;//避免重复连接
var wsUrl = 'ws://{{$_SERVER["HTTP_HOST"]}}:9502?page=list&token=123456';

function initEventHandle() {
    ws.onclose = function () {
        reconnect(wsUrl);
    };
    ws.onerror = function () {
        reconnect(wsUrl);
    };
    ws.onopen = function () {
        //心跳检测重置
        heartCheck.reset().start();
    };
    ws.onmessage = function (event) {
        //如果获取到消息,心跳检测重置
        //拿到任何消息都说明当前连接是正常的
        var data = JSON.parse(event.data);
        if (data.code == 200) {
            console.log(data.message)
        }
        heartCheck.reset().start();
    }
}
createWebSocket(wsUrl);
/**
 * 创建链接
 * @param url
 */
function createWebSocket(url) {
    try {
        ws = new WebSocket(url);
        initEventHandle();
    } catch (e) {
        reconnect(url);
    }
}
function reconnect(url) {
    if(lockReconnect) return;
    lockReconnect = true;
    //没连接上会一直重连,设置延迟避免请求过多
    setTimeout(function () {
        createWebSocket(url);
        lockReconnect = false;
    }, 2000);
}
//心跳检测
var heartCheck = {
    timeout: 60000,//60秒
    timeoutObj: null,
    serverTimeoutObj: null,
    reset: function(){
        clearTimeout(this.timeoutObj);
        clearTimeout(this.serverTimeoutObj);
        return this;
    },
    start: function(){
        var self = this;
        this.timeoutObj = setTimeout(function(){
            //这里发送一个心跳,后端收到后,返回一个心跳消息,
            //onmessage拿到返回的心跳就说明连接正常
            ws.send("heartbeat");
            self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了
                ws.close();//如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
            }, self.timeout);
        }, this.timeout);
    },
    header:function(url) {
        window.location.href=url
    }

}

</script>

</html>

### 界面效果
#### 后台控制点击按钮1
![Laravel 中使用 swoole 项目实战开发案例一 (建立 swoole 和前端通信)](https://cdn.learnku.com/uploads/images/201907/01/43464/E3wTNweNZ7.png!large)
![Laravel 中使用 swoole 项目实战开发案例一 (建立 swoole 和前端通信)](https://cdn.learnku.com/uploads/images/201907/01/43464/WxCJP5EpfF.png!large)
#### 后端界面点击按钮2
![Laravel 中使用 swoole 项目实战开发案例一 (建立 swoole 和前端通信)](https://cdn.learnku.com/uploads/images/201907/01/43464/rmKYU8Z3Wj.png!large)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

时间的朋友2018

时间的朋友2018

罗振宇 / 中信出版集团 / 2019-1

2018年,有点不一样。 从年头到现在,各种信息扑面而来。不管你怎么研判这些信息的深意,有一点是有共识的:2018,我们站在了一个时代的门槛上,陌生,崭新。就像一个少年长大了,有些艰困必须承当,有些道路只能独行。 用经济学家的话说,2018年,我们面对的是一次巨大的“不确定性”。 所谓“不确定性”,就是无法用过去的经验判断未来事情发生的概率。所以,此时轻言乐观、悲观,都没有什么意......一起来看看 《时间的朋友2018》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具