Golang-长连接-状态推送

栏目: Go · 发布时间: 6年前

内容简介:前言:扫码登录功能自微信提出后,越来越多的被应用于各个web与app。这两天公司要做一个扫码登录功能,在leader的技术支持帮助下(基本都靠leader排坑),终于将服务搭建起来,并且支持上万并发。决定做扫码登录功能之后,在网上查看了很多的相关资料。对于扫码登录的实现方式有很多,淘宝用的是轮询,微信用长连接,QQ用轮询……。方式虽多,但目前看来大体分为两种,1:轮询,2:长连接。(两种方式各有利弊吧,我研究不深,优缺点就不赘述了)在和leader讨论之后选择了用长连接的方式。所以对长连接的实现方式调研了很

状态推送

前言:扫码登录功能自微信提出后,越来越多的被应用于各个web与app。这两天公司要做一个扫码登录功能,在leader的技术支持帮助下(基本都靠leader排坑),终于将服务搭建起来,并且支持上万并发。

长连接选择

决定做扫码登录功能之后,在网上查看了很多的相关资料。对于扫码登录的实现方式有很多,淘宝用的是轮询,微信用长连接,QQ用轮询……。方式虽多,但目前看来大体分为两种,1:轮询,2:长连接。(两种方式各有利弊吧,我研究不深,优缺点就不赘述了)

在和leader讨论之后选择了用长连接的方式。所以对长连接的实现方式调研了很多:

1.微信长连接:通过动态加载script的方式实现。

Golang-长连接-状态推送

这种方式好在没有跨域问题。

2.websocket长连接:在PC端与服务端搭起一条长连接后,服务端主动不断地向PC端推送状态。这应该是最完美的做法了。

3.我使用的长连接:PC端向服务端发送请求,服务端并不立即响应,而是hold住,等到用户扫码之后再响应这个请求,响应后连接断开。

Golang-长连接-状态推送

为什么不采用websocket呢?因为当时比较急、而对于websocket的使用比较陌生,所以没有使用。不过我现在这种做法在资源使用上比websocket低很多。

接口设计

(本来想把leader画的一副架构图放上来,但涉及到公司,不敢)

自己画的一副流程图

Golang-长连接-状态推送

稍微解释一下:

第一条连接:打开PC界面的时候向服务端发送请求并建立长连接(1)。当APP成功扫码后(2),响应这次请求(3)。

第二条连接类似。

分析得出我们的服务只需要两个接口即可

1.与PC建立长连接的接口

2.接收APP端数据并将数据发送给前端的接口

再细想可将这两个接口抽象为:

1.PC获取状态接口:get

2.APP设置状态接口:set

具体实现

GO 写的(不多哔哔)

长连接的根本原理:连接请求后,服务端利用channel阻塞住。等到channel中有value后,将value响应

Router

func Router(){
    http.HandleFunc("/status/get", Get)
    http.HandleFunc("/status/set", Set)
}

GET

每一条连接需要有一个KEY作标识,不然APP设置的状态不知道该发给那台PC。每一条连接即一个channel

var Status map[string](chan string) = make(map[string](chan string))

func Get(w http.ResponseWriter, r *http.Request){
    ...        //接收key的操作
    key = ...  //PC在请求接口时带着的key
    Status[key] = make(chan string)    //不需要缓冲区
    value := <-Status[key]
    ResponseJson(w, 0, "success", value)    //自己封的响应JSON方法
}

SET

APP扫码后可以得到二维码中的KEY,同时将想给PC发送的VALUE一起发送给服务端

func Set(w http.ResponseWriter, r *http.Request){
    ...        
    key = ...
    value = ...    //向PC传递的值
    Status[key] <- value
}

这就是实现的最基本原理。

接下来我们一点点实现其他的功能。

1.超时

从网上找了很多资料,大部分都说这种方式

srv := &http.Server{  
    ReadTimeout: 5 * time.Second,
    WriteTimeout: 10 * time.Second,
}
log.Println(srv.ListenAndServe())

这种方式确实是设置读超时与写超时。但(亲测)这种超时方式并不友善,假如现在 WriteTimeout 是10s,PC端请求过来之后,长连接建立。PC处于 pending 状态,并且服务端被 channel 阻塞住。10s之后,由于超时连接失效(并没有断,我也不了解其中原理)。PC并不知道连接断了,依然处于 pending 状态,服务端的这个 goroutine 依然被阻塞在这里。这个时候我调用set接口,第一次调用没用反应,但第二次调用PC端就能成功接收value。

Golang-长连接-状态推送

从图可以看出,我设置的 WriteTimeout 为10s,但这条长连接即使15s依然能收到成功响应。(ps:我调用了两次set接口,第一次没有反应)

研究后决定不使用这种方式设置超时,采用接口内部定时的方式实现超时返回

select {
    case <-`Timer`:
        utils.ResponseJson(w, -1, "timeout", nil)
    case value := <-statusChan:
        utils.ResponseJson(w, 0, "success", value)
    }

Timer 即为定时器。刚开始Timer是这样定义的

Timer := time.After(60 * time.Second)

60s后 Timer 会自动返回一个值,这时上面的通道就开了,响应timeout

但这样做有一个弊端,这个定时器一旦创建就必须等待60s,并且我没想到办法提前将定时器关了。如果这个长连接刚建立后5s就被响应,那么这个定时器就要多存在55s。这样对资源是一种浪费,并不合理。

这里选用了 context 作为定时器

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(Timeout)*time.Second)
defer cancel()
select {
    case <-ctx.Done():
        utils.ResponseJson(w, -1, "timeout", nil)
    case result := <-Status[key]:
        utils.ResponseJson(w, 0, "success", result)
}
ctx在初始化的时候就设置了超时时间 time.Duration(Timeout)*time.Second

超时之后 ctx.Done() 返回完成,起到定时作用。如果没有cancel()则会有一样的问题。原因如下

Golang-长连接-状态推送

具体参考如下: https://blog.csdn.net/liangzh...

context对比time包。提供了手动关闭定时器的方法 cancel()

只要get请求结束,都会去关闭定时器,这样可以避免资源浪费(一定程度避免内存泄漏)。

即使golang官方文档中,也推荐 defer cancel() 这样写

Golang-长连接-状态推送

官方文档也写到:即使ctx会在到期时关闭,但在任何场景手动调用cancel都是很好的做法。

这样超时功能就实现了

2.多机支持

服务如果只部署在一台机器上,万一机器跪了,那就全跪了。

所以我们的服务必须同时部署在多个机器上工作。即使其中一台挂了,也不影响服务使用。

这个图不会画,只能用leader的图了

Golang-长连接-状态推送

在项目初期讨论的时候leader给出了两种方案。1.如图使用 redis 做多机调度。2.使用zookeeper将消息发送给多机

因为现在是用redis做的,只讲述下redis的实现。(但依赖redis并不是很好,多机的负载均衡还要依赖其他工具。zookeeper能够解决这个问题,之后会将redis换成zookeeper)

首先我们要明确多机的难点在哪?

我们有两个接口,get、set。get是给前端建立长连接用的。set是后端设置状态用的。

假设有两台机器A、B。若前端的请求发送到A机器上,即A机器与前端连接,此时后端调用set接口,如果调用的是A机器的set接口,那是最好,长连接就能成功响应。但如果调用了B机器的set接口,B机器上又没有这条连接,那么这条连接就无法响应。

所以难点在于如何将同一个key的get、set分配到一台机器。

做法有很多:

有人给我提过一个意见:在做负载均衡的时候,就将连接分配到指定机器。刚开始我觉的很有道理,但细细想,如果这样做,在以后如果要加机器或减机器的时候会很麻烦。对横向的增减机器不友善。

最后我还是采用了leader给出的方案:用redis绑定key与机器的关系

即前端请求到一台机器上,以key做键,以机器IP做值放在redis里面。后端请求set接口时先用key去redis里面拿到机器IP,再将value发送到这台机器上。

此时就多了一个接口,用于机器内部相互调用

ChanSet

func Router(){
    http.HandleFunc("/status/get", Get)
    http.HandleFunc("/status/set", Set)
    http.HandleFunc("/channel/set", ChanSet)
}

func ChanSet(w http.ResponseWriter, r *http.Request){
    ...
    key = ...
    value = ...
    Status[key] <- value
}

GET

func Get(w http.ResponseWriter, r *http.Request){
    ...        
    IP = getLocalIp()       //得到本机IP
    RedisSet(key, IP)       //以key做键,IP做值放入redis
    Status[key] <- value
    ...
}

SET

func Set(w http.ResponseWriter, r *http.Request){
    ...
    IP = RedisGet(key)    //用key去取对应机器的IP
    Post(IP, key, value) //将key与value都发送给这台机器
}

这里相当于用redis sentinel做多台机器的通信。哨兵会帮我们将数据同步到所有机器上

这样即可实现多机支持

3.跨域

刚部署到线上的时候,第一次尝试就跪了。查看错误...(Access-Control-Allow-Origin)...

因为前端是通过AJAX请求的长连接服务,所以存在跨域问题。

在服务端设置允许跨域

func Get(w http.ResponseWriter, r *http.Request){
    ...
    w.Header().Set("Access-Control-Allow-Origin", "*")
    w.Header().Add("Access-Control-Allow-Headers", "Content-Type")
    ...
}

若是像微信的做法,动态的加载script方式,则没有跨域问题。

服务端直接允许跨域,可能会有安全问题,但我不是很了解,这里为了使用,就允许跨域了。

4.Map并发读写问题

跨域问题解决之后,线上可以正常使用了。紧接着请测试同学压测了一下。

预期单机并发10000以上,测试同学直接压了10000,服务挂了。

可能预期有点高,5000吧,于是压了5000,服务挂了。

1000呢,服务挂了。

100,服务挂了。

……

这下豁然开朗,不可能是机器问题,绝对是有BUG

看了下报错

Golang-长连接-状态推送

去看了下官方文档

Golang-长连接-状态推送

Map是不能并发的写操作,但可以并发的读。

原来对Map操作是这样写的

func Get(w http.ResponseWriter, r *http.Request){
    ...
    `Status[key] = make(chan string)`
    ...
    select {
    case <-ctx.Done():
        utils.ResponseJson(w, -1, "timeout", nil)
    case `result := <-Status[key]`:
        utils.ResponseJson(w, 0, "success", result)
    }
    ...
}

func ChanSet(w http.ResponseWriter, r *http.Request){
    ...
    `Status[key] <- value`
    ...
}

Status[key] = make(chan string) 在Status(map)里面初始化一个通道,是map的写操作

result := <-Status[key] 从Status[key]通道中读取一个值,由于是通道,这个值取出来后,通道内就没有了,所以这一步也是对map的写操作

Status[key] <- value 向Status[key]内放入一个值,map的写操作

由于这三处操作的是一个map,所以要加同一把锁

var Mutex sync.Mutex
func Get(w http.ResponseWriter, r *http.Request){
    ...
    //这里是同组大佬教我的写法,通道之间的拷贝传递的是指针,即statusChan与Status[key]指向的是同一个通道
    statusChan := make(chan string)
    Mutex.Lock()
    Status[key] = statusChan
    Mutex.Unlock()
    
    //在连接结束后将这些资源都释放
    defer func(){
        Mutex.Lock()
        delete(Status, key)
        Mutex.Unlock()
        close(statusChan)
        RedisDel(key)
    }()
    
    select {
        case <-ctx.Done():
            utils.ResponseJson(w, -1, "timeout", nil)
        case result := <-statusChan:
            utils.ResponseJson(w, 0, "success", result)
    }
    ...
}

func ChanSet(w http.ResponseWriter, r *http.Request){
    ...
    Mutex.Lock()
    Status[key] <- value
    Mutex.Unlock()
    ...
}

到现在,服务就可以正常使用了,并且支持上万并发。

5.Redis过期时间

服务正常使用之后,leader review代码,提出redis的数据为什么不设置过期时间,反而要自己手动删除。我一想,对啊。

于是设置了过期时间并且将 RedisDel(key) 删了。

设置完之后不出意外的服务跪了。

究其原因

我用一个key=1请求get,会在redis内存储一条数据记录(1 => Ip).如果我set了这条连接,按之前的逻辑会将redis里的这条数据删掉,而现在是等待它过期。若是在过期时间内,再次以这个key=1,调用set接口。set接口依然会从redis中拿到IP,Post数据到ChanSet接口。而ChanSet中 Status[key] <- value 由于 Status[key] 是关闭的,会阻塞在这里,阻塞不要紧,但之前这里加了锁,导致整个程序都阻塞在这里。

这里和leader讨论过,仍使用redis过期时间但需要修复这个Bug

func ChanSet(w http.ResponseWriter, r *http.Request){
    Mutex.Lock()
    ch := Status[key]
    Mutex.Unlock()

    if ch != nil {
        ch <- value
    }
}

不过这样有一个问题,就是同一个key,在过期时间内是无法多次使用的。不过这与业务要求并不冲突。

6.Linux文件最大句柄数

在给测试同学测试之前,自己也压测了一下。不过刚上来就疯狂报错,“%¥#@¥……%……%%..too many fail open...”

搜索结果是 linux 默认最大句柄数1024.

开了下自己的机器 ulimit -a 果然1024。修改(修改方法不多BB)


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Game Engine Architecture, Second Edition

Game Engine Architecture, Second Edition

Jason Gregory / A K Peters/CRC Press / 2014-8-15 / USD 69.95

A 2010 CHOICE outstanding academic title, this updated book covers the theory and practice of game engine software development. It explains practical concepts and techniques used by real game studios,......一起来看看 《Game Engine Architecture, Second Edition》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

html转js在线工具
html转js在线工具

html转js在线工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具