Golang websocket结合一致性哈希算法构建高并发推送服务

栏目: Html5 · 发布时间: 6年前

内容简介:1 场景介绍web应用中,常有业务状态需要实时更新的场景。如一个较长的后台任务,从浏览器用户触发执行到执行完成可能需几十秒的时间,这时前端需隔几秒请求一次后台,查询任务执行进度。此种方式是长轮询的方式,是存在一定弊端的,增加了后台服务的负载,若并发操作量太大,后台压力会成倍激增。业界常采用http1.1的websocket扩展协议与浏览器建立长连接来实现实时业务状态更新。

2018年9月2日

1 场景介绍

web应用中,常有业务状态需要实时更新的场景。如一个较长的后台任务,从浏览器用户触发执行到执行完成可能需几十秒的时间,这时前端需隔几秒请求一次后台,查询任务执行进度。此种方式是长轮询的方式,是存在一定弊端的,增加了后台服务的负载,若并发操作量太大,后台压力会成倍激增。业界常采用http1.1的websocket扩展协议与浏览器建立长连接来实现实时业务状态更新。

2 实现方案

本文采用golang实现一个长连接服务,对外提供两个接口,一个是基于http的rest消息发送接口,一个是基于websocket的cient接入接口,如下图所示。

Golang websocket结合一致性哈希算法构建高并发推送服务

为使前端的接入更简单,从建立连接到用户关闭浏览器,中间前端无须发送消息来告知服务器client是否下线。我们将检测放在后台,后台采用定时心跳方式保持对client的监听,若心跳失败,则将该client剔除。如下图所示。

Golang websocket结合一致性哈希算法构建高并发推送服务

3 golang实现代码

comet服务内有两个模块,http server负责接收消息,comet server负责维护websocket client,每个client启用一个go routine对客户端保持心跳检测。

3.1 核心模块

package comet

import (
    "encoding/json"
    "log"
    "time"

    "golang.org/x/net/websocket"
)

type HttpServer struct {
    wsServer *WsServer
}

type WsServer struct {
    Clients map[string][]*Client
    AddCli  chan *Client
    DelCli  chan *Client
    Message chan *Message
}

type Client struct {
    UserId    string
    Timestamp int64
    conn      *websocket.Conn
    wsServer  *WsServer
}

type Message struct {
    UserId  string `json:"user_id"`
    Message string `json:"message"`
}

func NewWsServer() *WsServer {
    return &WsServer{
        make(map[string][]*Client),
        make(chan *Client),
        make(chan *Client),
        make(chan *Message, 1000),
    }
}

func NewHttpServer(wsServer *WsServer) *HttpServer {
    return &HttpServer{wsServer}
}

func (httpServer *HttpServer) SendMessage(userId, message string) {
    log.Printf("message reveived, user_id: %s, message: %s", userId, message)
    httpServer.wsServer.Message <- &Message{userId, message}
}

func (wsServer *WsServer) SendMessage(userId, message string) {
    clients := wsServer.Clients[userId]
    if len(clients) > 0 {
        for _, c := range clients {
            c.conn.Write([]byte(message))
        }
        log.Printf("message success sent to client, user_id: %s", userId)
    } else {
        log.Printf("client not found, user_id: %s", userId)
    }
}

func (wsServer *WsServer) addClient(c *Client) {
    clients := wsServer.Clients[c.UserId]
    wsServer.Clients[c.UserId] = append(clients, c)
    log.Printf("a client added, userId: %s, timestamp: %d", c.UserId, c.Timestamp)
}

func (wsServer *WsServer) delClient(c *Client) {
    clients := wsServer.Clients[c.UserId]
    if len(clients) > 0 {
        for i, client := range clients {
            if client.Timestamp == c.Timestamp {
                wsServer.Clients[c.UserId] = append(clients[:i], clients[i+1:]...)
                break
            }
        }
    }
    if 0 == len(clients) {
        delete(wsServer.Clients, c.UserId)
    }
    log.Printf("a client deleted, user_id: %s, timestamp: %d", c.UserId, c.Timestamp)
}

func (wsServer *WsServer) Start() {
    for {
        select {
        case msg := <-wsServer.Message:
            wsServer.SendMessage(msg.UserId, msg.Message)
        case c := <-wsServer.AddCli:
            wsServer.addClient(c)
        case c := <-wsServer.DelCli:
            wsServer.delClient(c)

        }
    }
}

func (c *Client) heartbeat() error {
    millis := time.Now().UnixNano() / 1000000
    heartbeat := struct {
        Heartbeat int64 `json:"heartbeat"`
    }{millis}
    bytes, _ := json.Marshal(heartbeat)
    _, err := c.conn.Write(bytes)
    return err
}

func (c *Client) Listen() {
    for {
        err := c.heartbeat()
        if nil != err {
            log.Printf("client heartbeat error, user_id: %v, timestamp: %d, err: %s", c.UserId, c.Timestamp, err)
            c.wsServer.DelCli <- c
            return
        }
        time.Sleep(time.Second * 5)
    }
}

4 一致性哈希包装

考虑到单服务的同时在线人数支持是有限的,所以在其上层用一致性哈希算法包装。这样同一user_id建立连接会打到同一台后台服务器,给此user_id发送消息也会打到同样的服务器。这样后台部署多个comet服务形成一个集群即可支撑高并发消息推送场景。如下图所示,最外层nginx挂接公网域名,对外提供基于wss的消息接收接口及基于http的消息发送接口。中间采用haproxy对user_id参数作一致性哈希转发,对同一user_id的操作会打到同一台comet server。底层扩展为多台comet server即可构建一个高并发的消息推送服务。

Golang websocket结合一致性哈希算法构建高并发推送服务

2018.09.02

大连

Golang , 算法

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Art of UNIX Programming

The Art of UNIX Programming

Eric S. Raymond / Addison-Wesley / 2003-10-3 / USD 54.99

Writing better software: 30 years of UNIX development wisdom In this book, five years in the making, the author encapsulates three decades of unwritten, hard-won software engineering wisdom. Raymond b......一起来看看 《The Art of UNIX Programming》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具