grpc 中的 stream(服务中的长连接,订阅,上传大数据等)

栏目: IT技术 · 发布时间: 4年前

内容简介:前言:之前我们讲了 grpc 怎么简单的使用 ,这次讲讲 grpc 中的 stream,srteam 顾名思义 就是 一种 流,可以源源不断的 推送 数据,很适合 传输一些大数据,或者 服务端 和 客户端 长时间 数据交互,比如 客户端 可以向 服务端 订阅 一个数据,服务端 就 可以利用 stream ,源源不断地 推送数据。stream的种类:

前言:

之前我们讲了 grpc 怎么简单的使用 ,这次讲讲 grpc 中的 stream,srteam 顾名思义 就是 一种 流,可以源源不断的 推送 数据,很适合 传输一些大数据,或者 服务端 和 客户端 长时间 数据交互,比如 客户端 可以向 服务端 订阅 一个数据,服务端 就 可以利用 stream ,源源不断地 推送数据。

stream的种类:

客户端推送 服务端 rpc GetStream (StreamReqData) returns (stream StreamResData){}

服务端推送 客户端 rpc PutStream (stream StreamReqData) returns (StreamResData){}

客户端与 服务端 互相 推送 rpc AllStream (stream StreamReqData) returns (stream StreamResData){}

其实这个流 已经 基本退化成 tcp了,grpc 底层为我们 分包了,所以真的很方便。

protobuf的定义:

syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc

//声明 包名

package pro;

//声明grpc服务

service Greeter {

/*

以下 分别是 服务端 推送流, 客户端 推送流 ,双向流。

*/

rpc GetStream (StreamReqData) returns (stream StreamResData){}

rpc PutStream (stream StreamReqData) returns (StreamResData){}

rpc AllStream (stream StreamReqData) returns (stream StreamResData){}

}

//stream请求结构

message StreamReqData {

string data = 1;

}

//stream返回结构

message StreamResData {

string data = 1;

}

我们在 protobuf 里面 定义 要提供的服务,如果 你想把哪个数据 源源不断的 推送 就在前面加个stream 就好了,定义好记得编译。

服务端的实现:

package main

import (

"context"

"fmt"

"google.golang.org/grpc"

"grpc/pro"

"log"

"net"

"sync"

"time"

)

const PORT = ":50051"

type server struct {

}

//服务端 单向流

func (s *server)GetStream(req

pro.StreamReqData, res pro.Greeter_GetStreamServer) error{

i:= 0

for{

i++

res.Send(&pro.StreamResData{Data:fmt.Sprintf("%v",time.Now().Unix())})

time.Second)

if i >10 {

break

}

}

return nil

}

//客户端 单向流

func (this *server) PutStream(cliStr pro.Greeter_PutStreamServer) error {

for {
    if tem, err := cliStr.Recv(); err == nil {
        log.Println(tem)
    } else {
        log.Println("break, err :", err)
        break
    }
}

return nil

}

//客户端服务端 双向流

func(this *server) AllStream(allStr pro.Greeter_AllStreamServer) error {

wg := sync.WaitGroup{}
wg.Add(2)
go func() {
    for {
        data, _ := allStr.Recv()
        log.Println(data)
    }
    wg.Done()
}()

go func() {
    for {
        allStr.Send(&pro.StreamResData{Data:"ssss"})
        time.Sleep(time.Second)
    }
    wg.Done()
}()

wg.Wait()
return nil

}

func main(){

//监听端口

lis,err := net.Listen("tcp",PORT)

if err != nil{

return

}

//创建一个grpc 服务器

s := grpc.NewServer()

//注册事件

pro.RegisterGreeterServer(s,&server{})

//处理链接

s.Serve(lis)

}

知识点:

每个函数都对应着 完成了 protobuf 里面的 定义。

每个函数 形参都有对应的 推送 或者 接收 对象,我们只要 不断循环 Recv(),或者 Send() 就能接收或者推送了!

当return出函数,就说明此次 推送 或者 接收 结束了,client 会 对应的 收到消息!

客户端调用:

package main

import (

"google.golang.org/grpc"

"grpc/pro"
"log"
"context"
"time"
_ "google.golang.org/grpc/balancer/grpclb"

)

const (

ADDRESS = "localhost:50051"

)

func main(){

//通过grpc 库 建立一个连接

conn ,err := grpc.Dial(ADDRESS,grpc.WithInsecure())

if err != nil{

return

}

defer conn.Close()

//通过刚刚的连接 生成一个client对象。

c := pro.NewGreeterClient(conn)

//调用服务端推送流

reqstreamData := &pro.StreamReqData{Data:"aaa"}

res,_ := c.GetStream(context.Background(),reqstreamData)

for {

aa,err := res.Recv()

if err != nil {

log.Println(err)

break

}

log.Println(aa)

}

//客户端 推送 流

putRes, _ := c.PutStream(context.Background())

i := 1

for {

i++

putRes.Send(&pro.StreamReqData{Data:"ss"})

time.Sleep(time.Second)

if i > 10 {

break

}

}

//服务端 客户端 双向流

allStr,_ := c.AllStream(context.Background())

go func() {

for {

data,_ := allStr.Recv()

log.Println(data)

}

}()

go func() {
    for {
        allStr.Send(&pro.StreamReqData{Data:"ssss"})
        time.Sleep(time.Second)
    }
}()

select {
}

}

client 调用 流的函数, 就会 返回一个 流对象,只要 不断地 对它进行读取或者写入,对应方就能收到。

总结:

grpc 的 stream 和 go 的协程 配合 简直完美。通过流 我们 可以更加 灵活的 实现自己的业务。如 订阅,大数据传输等。

欢迎关注我们的微信公众号,每天学习Go知识

grpc 中的 stream(服务中的长连接,订阅,上传大数据等)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

领域特定语言

领域特定语言

Martin Fowler / ThoughtWorks中国 / 机械工业出版社华章公司 / 2013-3 / 89.00元

本书是DSL领域的丰碑之作,由世界级软件开发大师和软件开发“教父”Martin Fowler历时多年写作而成,ThoughtWorks中国翻译。全面详尽地讲解了各种DSL及其构造方式,揭示了与编程语言无关的通用原则和模式,阐释了如何通过DSL有效提高开发人员的生产力以及增进与领域专家的有效沟通,能为开发人员选择和使用DSL提供有效的决策依据和指导方法。 全书共57章,分为六个部分:第一部分介......一起来看看 《领域特定语言》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具