用consul做grpc的服务发现

栏目: 后端 · 发布时间: 5年前

内容简介:当server端是集群部署时,client调用server就需要用到服务发现与负载均衡。通常有两总方式:第一种方式常见的就是用nginx给http服务做负载均衡,client端不直接与server交互,而是把请求并给nginx,nginx再转给后端的服务。

用consul做grpc的服务发现与健康检查

consul

服务发现与负载均衡

当server端是集群部署时,client调用server就需要用到服务发现与负载均衡。通常有两总方式:

  • 一种方式是在client与server之间加代理,由代理来做负载均衡
  • 一种方式是将服务注册到一个数据中心,client通过数据中心查询到所有服务的节点信息,然后自己选择负载均衡的策略。

第一种方式常见的就是用nginx给http服务做负载均衡,client端不直接与server交互,而是把请求并给nginx,nginx再转给后端的服务。

这种方式的优点是:

  • client和server无需做改造,client看不到server的集群,就像单点一样调用就可以

这种方式有几个缺点:

  • 所有的请求都必须经过代理,代理侧容易出现性能瓶颈
  • 代理不能出故障,一旦代理挂了服务就没法访问了。

第二种方式可以参考dubbo的rpc方式,所有的服务都注册在zookeeper上,client端从zookeeper订阅server的列表,然后自己选择把请求发送到哪个server上。对于上面提到的两个缺点,这种方式都很好的避免了:

  • client与server端是直接交互的,server可以做任意的水平扩展,不会出现性能瓶颈
  • 注册中心(zookeeper)通过raft算法实现分布式高可用,不用担心注册中心挂了服务信息丢失的情况。

这种方式的缺点就是实现起来比较复杂。

用第一种方式做grpc的负载均衡时可以有以下的选择:

用第二种方式时,可以选择的数据中心中间件有:

他们都实现了raft算法,都可以用来做注册中心,本篇文章选择consul是因为consul的特点就是做服务发现,有现成的api可以用。

用consul给golang的grpc做服务注册与发现

grpc的resolver

grpc的Dial()和DialContent()方法中都可以添加Load-Balance的选项,Dial方法已经被废弃了,本篇文章介绍使用DialContext的方法。

grpc官方实现了[dns_resolver]()用来做dns的负载均衡。我们通过例子看看grpc client端的代码是怎么写的,然后再理解dns_resolver的源码,最后参照dns_resolver来写自己的consul_resovler。

dns的负载均衡的例子:

package main
import (
    "context"
    "log"
    "google.golang.org/grpc"
    "google.golang.org/grpc/balancer/roundrobin"
    pb "google.golang.org/grpc/examples/helloworld/helloworld"
    "google.golang.org/grpc/resolver"
)
const (
    address     = "dns:///dns-record-name:443"
    defaultName = "world"
)
func main() {
    // The secret sauce
    resolver.SetDefaultScheme("dns")
    // Set up a connection to the server.
    
    ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)

    conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()
    c := pb.NewGreeterClient(conn)
    // Contact the servers in round-robin manner.
    for i := 0; i < 3; i++ {
        ctx := context.Background()
        r, err := c.SayHello(ctx, &pb.HelloRequest{Name: defaultName})
        if err != nil {
            log.Fatalf("could not greet: %v", err)
        }
        log.Printf("Greeting: %s", r.Message)
    }
}

DialContext的定义如下:

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error)

下面这行代码指明了用dns_resolver,实际上也可以不写,grpc会根据DialContext的第二个参数target来判断选用哪个resolver,例子中传给DialContext的target是 dns:///dns-record-name:443,grpc会自动选择dns_resolver

resolver.SetDefaultScheme("dns")

下面的这个选项,指明了grpc用轮询做为负载均衡的策略

grpc.WithBalancerName(roundrobin.Name)

调用grpc.DialContext之后,grpc会找到对应的resovler,拿到服务的地址列表,然后在调用服务提供的接口时,根据指定的轮询策略选择一个服务。

gRPC Name Resolution 里面说了,可以实现自定义的resolver作为插件。

先看看resolver.go的源码,源码路径是$GOPATH/src/google.golang.org/grpc/resolver/resolver.go

m = make(map[string]Builder) //scheme到Builder的map

func Register(b Builder) { //用于resolver注册的接口,dns_resolver.go的init方中调用了这个方法,实际就是更新了map
    m[b.Scheme()] = b
}

type Resolver interface {
    ResolveNow(ResolveNowOption) //立即resolve,重新查询服务信息
    Close() //关闭这个Resolver
}

type Target struct {//uri解析之后的对象, uri的格式详见RFC3986
    Scheme    string
    Authority string
    Endpoint  string
}

type Address struct {//描述一个服务的地址信息
    Addr string //格式是 host:port
    Type AddressType
    ServerName string
    Metadata interface{}
}

type ClientConn interface {//定义了两个callback函数,用于通知服务信息的更新
    NewAddress(addresses []Address)
    NewServiceConfig(serviceConfig string)
}

type Builder interface { 
    Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error) //返回一个Resolver
    Scheme() string  //返回scheme如 "dns", "passthrough", "consul"
}

func Get(scheme string) Builder { //grpc.ClientConn会高用这个方法获取指定的Builder接口的实例
    if b, ok := m[scheme]; ok {
        return b
    }
    return nil
}

即使加了注释,估计也很难马上理解这个其中的具体含意,博主也是结合dns_resolver.go,反复读了好几遍才理解resolver.go。其大致的意思是,grpc.DialContext方法调用之后:

  • 解析target(例如dns:///dns-record-name:443)获取scheme
  • 调用resolver.Get方法根据scheme拿到对应的Builder
  • 调用Builder.Build方法

    • 解析target
    • 获取服务地址的信息
    • 调用ClientConn.NewAddress和NewServiceConfig这两个callback把服务信息传递给上层的调用方
    • 返回Resolver接口实例给上层
  • 上层可以通过Resolver.ResolveNow方法主动刷新服务信息

了解了resolver源码的意思之后,再看一下dns_resolver.go就比较清晰了

//注册一个Builder到resolver的map里面
//这个方法会被默认调用,了解 go 的init可以自行百度
func init() { 
    resolver.Register(NewBuilder())
}

func NewBuilder() resolver.Builder {//创建一个resolver.Builder的实例
    return &dnsBuilder{minFreq: defaultFreq}
}

func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
    //解析target拿到ip和端口
    host, port, err := parseTarget(target.Endpoint, defaultPort)
    if err != nil {
        return nil, err
    }

    // IP address.
    if net.ParseIP(host) != nil {
        host, _ = formatIP(host)
        addr := []resolver.Address{{Addr: host + ":" + port}}
        i := &ipResolver{
            cc: cc,
            ip: addr,
            rn: make(chan struct{}, 1),
            q:  make(chan struct{}),
        }
        cc.NewAddress(addr)
        go i.watcher()
        return i, nil
    }

    // DNS address (non-IP).
    ctx, cancel := context.WithCancel(context.Background())
    d := &dnsResolver{
        freq:                 b.minFreq,
        backoff:              backoff.Exponential{MaxDelay: b.minFreq},
        host:                 host,
        port:                 port,
        ctx:                  ctx,
        cancel:               cancel,
        cc:                   cc,
        t:                    time.NewTimer(0),
        rn:                   make(chan struct{}, 1),
        disableServiceConfig: opts.DisableServiceConfig,
    }

    if target.Authority == "" {
        d.resolver = defaultResolver
    } else {
        d.resolver, err = customAuthorityResolver(target.Authority)
        if err != nil {
            return nil, err
        }
    }

    d.wg.Add(1)
    go d.watcher()//起一个goroutine,因为watcher这个方法是个死循环,当定时器
    return d, nil
}

func (d *dnsResolver) watcher() {
    defer d.wg.Done()
    for {
        //这个select没有default,当没有case满足时会一直阻塞
        //结束阻塞的条件是定时器超时d.t.C,或者d.rn这个channel中有数据可读
        select { 
        case <-d.ctx.Done():
            return
        case <-d.t.C:
        case <-d.rn:
        }
        result, sc := d.lookup()
        // Next lookup should happen within an interval defined by d.freq. It may be
        // more often due to exponential retry on empty address list.
        if len(result) == 0 {
            d.retryCount++
            d.t.Reset(d.backoff.Backoff(d.retryCount))
        } else {
            d.retryCount = 0
            d.t.Reset(d.freq)
        }
        //resolver.ClientConn的两个callback的调用,实现服务信息传入上层
        d.cc.NewServiceConfig(sc)
        d.cc.NewAddress(result)
    }
}

//向channel中写入,用于结束watcher中那个select的阻塞状态,后面的代码就是重新查询服务信息的逻辑
func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOption) {
    select {
    case i.rn <- struct{}{}:
    default:
    }
}

实现consul_resovler

上面我们了解了grpc的resolver的机制,接下来实现consul_resolver, 我们先把代码的架子搭起来

init() //返回一个resolver.Builder的实例

//实现resolver.Builder的接口中的所有方法就是一个resolver.Builder
type consulBuidler strcut {
}


func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
    //TODO 解析target, 拿到consul的ip和端口
    
    //TODO 用consul的go api连接consul,查询服务结点信息,并且调用resolver.ClientConn的两个callback
}

func (cb *consulBuilder) Scheme() string {
    return "consul"
}

//ResolverNow方法什么也不做,因为和consul保持了发布订阅的关系
//不需要像dns_resolver那个定时的去刷新
func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) {
}

//暂时先什么也不做吧
func (cr *consulResolver) Close() {
}

现在来看,实现consul_resolver.go最大的问题就是怎么用consul提供的go api了,参考 这篇文章 就可以了,然后consul_resolver.go的代码就出来了

package consul

import (
    "errors"
    "fmt"
    "github.com/hashicorp/consul/api"
    "google.golang.org/grpc/resolver"
    "regexp"
    "sync"
)

const (
    defaultPort = "8500"
)

var (
    errMissingAddr = errors.New("consul resolver: missing address")

    errAddrMisMatch = errors.New("consul resolver: invalied uri")

    errEndsWithColon = errors.New("consul resolver: missing port after port-separator colon")

    regexConsul, _ = regexp.Compile("^([A-z0-9.]+)(:[0-9]{1,5})?/([A-z_]+)$")
)

func Init() {
    fmt.Printf("calling consul init\n")
    resolver.Register(NewBuilder())
}

type consulBuilder struct {
}

type consulResolver struct {
    address              string
    wg                   sync.WaitGroup
    cc                   resolver.ClientConn
    name                 string
    disableServiceConfig bool
    lastIndex            uint64
}

func NewBuilder() resolver.Builder {
    return &consulBuilder{}
}

func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {

    fmt.Printf("calling consul build\n")
    fmt.Printf("target: %v\n", target)
    host, port, name, err := parseTarget(fmt.Sprintf("%s/%s", target.Authority, target.Endpoint))
    if err != nil {
        return nil, err
    }

    cr := &consulResolver{
        address:              fmt.Sprintf("%s%s", host, port),
        name:                 name,
        cc:                   cc,
        disableServiceConfig: opts.DisableServiceConfig,
        lastIndex:            0,
    }

    cr.wg.Add(1)
    go cr.watcher()
    return cr, nil

}

func (cr *consulResolver) watcher() {
    fmt.Printf("calling consul watcher\n")
    config := api.DefaultConfig()
    config.Address = cr.address
    client, err := api.NewClient(config)
    if err != nil {
        fmt.Printf("error create consul client: %v\n", err)
        return
    }

    for {
        services, metainfo, err := client.Health().Service(cr.name, cr.name, true, &api.QueryOptions{WaitIndex: cr.lastIndex})
        if err != nil {
            fmt.Printf("error retrieving instances from Consul: %v", err)
        }

        cr.lastIndex = metainfo.LastIndex
        var newAddrs []resolver.Address
        for _, service := range services {
            addr := fmt.Sprintf("%v:%v", service.Service.Address, service.Service.Port)
            newAddrs = append(newAddrs, resolver.Address{Addr: addr})
        }
        fmt.Printf("adding service addrs\n")
        fmt.Printf("newAddrs: %v\n", newAddrs)
        cr.cc.NewAddress(newAddrs)
        cr.cc.NewServiceConfig(cr.name)
    }

}

func (cb *consulBuilder) Scheme() string {
    return "consul"
}

func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) {
}

func (cr *consulResolver) Close() {
}

func parseTarget(target string) (host, port, name string, err error) {

    fmt.Printf("target uri: %v\n", target)
    if target == "" {
        return "", "", "", errMissingAddr
    }

    if !regexConsul.MatchString(target) {
        return "", "", "", errAddrMisMatch
    }

    groups := regexConsul.FindStringSubmatch(target)
    host = groups[1]
    port = groups[2]
    name = groups[3]
    if port == "" {
        port = defaultPort
    }
    return host, port, name, nil
}

到此,grpc客户端服务发现就搞定了。

consul的服务注册

服务注册直接用consul的go api就可以了,也是参考前一篇文章,简单的封装一下,consul_register.go的代码如下:

package consul

import (
    "fmt"
    "github.com/hashicorp/consul/api"
    "time"
)

type ConsulService struct {
    IP   string
    Port int
    Tag  []string
    Name string
}

func RegitserService(ca string, cs *ConsulService) {

    //register consul
    consulConfig := api.DefaultConfig()
    consulConfig.Address = ca
    client, err := api.NewClient(consulConfig)
    if err != nil {
        fmt.Printf("NewClient error\n%v", err)
        return
    }
    agent := client.Agent()
    interval := time.Duration(10) * time.Second
    deregister := time.Duration(1) * time.Minute

    reg := &api.AgentServiceRegistration{
        ID:      fmt.Sprintf("%v-%v-%v", cs.Name, cs.IP, cs.Port), // 服务节点的名称
        Name:    cs.Name,                                          // 服务名称
        Tags:    cs.Tag,                                           // tag,可以为空
        Port:    cs.Port,                                          // 服务端口
        Address: cs.IP,                                            // 服务 IP
        Check: &api.AgentServiceCheck{ // 健康检查
            Interval:                       interval.String(),                                // 健康检查间隔
            GRPC:                           fmt.Sprintf("%v:%v/%v", cs.IP, cs.Port, cs.Name), // grpc 支持,执行健康检查的地址,service 会传到 Health.Check 函数中
            DeregisterCriticalServiceAfter: deregister.String(),                              // 注销时间,相当于过期时间
        },
    }

    fmt.Printf("registing to %v\n", ca)
    if err := agent.ServiceRegister(reg); err != nil {
        fmt.Printf("Service Register error\n%v", err)
        return
    }

}

改造一下grpc的helloworld

把grpc的helloworld的demo改一下,用consul来做服务注册和发现。

server端代码:

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/health/grpc_health_v1"
    "log"
    "net"
    "server/internal/consul"
    pb "server/proto/helloworld"
)

const (
    port = ":50051"
)

// server is used to implement helloworld.GreeterServer.
type server struct{}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    log.Printf("Received: %v", in.Name)
    return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

func RegisterToConsul() {
    consul.RegitserService("127.0.0.1:8500", &consul.ConsulService{
        Name: "helloworld",
        Tag:  []string{"helloworld"},
        IP:   "127.0.0.1",
        Port: 50051,
    })
}

//health
type HealthImpl struct{}

// Check 实现健康检查接口,这里直接返回健康状态,这里也可以有更复杂的健康检查策略,比如根据服务器负载来返回
func (h *HealthImpl) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
    fmt.Print("health checking\n")
    return &grpc_health_v1.HealthCheckResponse{
        Status: grpc_health_v1.HealthCheckResponse_SERVING,
    }, nil
}

func (h *HealthImpl) Watch(req *grpc_health_v1.HealthCheckRequest, w grpc_health_v1.Health_WatchServer) error {
    return nil
}

func main() {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterGreeterServer(s, &server{})
    grpc_health_v1.RegisterHealthServer(s, &HealthImpl{})
    RegisterToConsul()
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

client端代码:

package main

import (
    "client/internal/consul"
    pb "client/proto/helloworld"
    "context"
    "google.golang.org/grpc"
    "log"
    "os"
    "time"
)

const (
    target      = "consul://127.0.0.1:8500/helloworld"
    defaultName = "world"
)

func main() {
    consul.Init()
    // Set up a connection to the server.
    ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
    conn, err := grpc.DialContext(ctx, target, grpc.WithBlock(), grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()
    c := pb.NewGreeterClient(conn)

    // Contact the server and print out its response.
    name := defaultName
    if len(os.Args) > 1 {
        name = os.Args[1]
    }
    for {
        ctx, _ := context.WithTimeout(context.Background(), time.Second)
        r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
        if err != nil {
            log.Fatalf("could not greet: %v", err)
        }
        log.Printf("Greeting: %s", r.Message)
        time.Sleep(time.Second * 2)
    }
}

运行一把

启动consul

consul agent -dev

启动hello server

cd server
go run cmd/main.go

启动hello client

cd client
go run cmd/main.go

运行结果:

//client
2019/03/07 17:22:04 Greeting: Hello world
2019/03/07 17:22:06 Greeting: Hello world

//server
2019/03/07 17:22:04 Received: world
2019/03/07 17:22:06 Received: world

完整工程的git地址

工程使用方法:

cd server
go mod tidy
go run cmd/main.go

cd client
go mod tidy
go run cmd/main.go

请自行解决防火墙的问题

参考文章


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Computers and Intractability

Computers and Intractability

M R Garey、D S Johnson / W. H. Freeman / 1979-4-26 / GBP 53.99

This book's introduction features a humorous story of a man with a line of people behind him, who explains to his boss, "I can't find an efficient algorithm, but neither can all these famous people." ......一起来看看 《Computers and Intractability》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具