gRPC 基于zookeeper实现负载均衡

栏目: 编程工具 · 发布时间: 6年前

内容简介:gRPC是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。gRPC通过其插件机制,可以很灵活的实现负载均衡、调用链、健康检查、权限认证等模块,本文主要介绍如何通过gRPC定义的接口实现负载均衡功能。RPC服务的除了解决跨语言调用的问题、模块解耦,重要的一点是通过模块的微服务化,可以水平扩展RPC服务的节点,应用层通过异步调用多个服务,降低应用层的延时。

gRPC是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。

gRPC通过其插件机制,可以很灵活的实现负载均衡、调用链、健康检查、权限认证等模块,本文主要介绍如何通过gRPC定义的接口实现负载均衡功能。

负载均衡方案

RPC服务的除了解决跨语言调用的问题、模块解耦,重要的一点是通过模块的微服务化,可以水平扩展RPC服务的节点,应用层通过异步调用多个服务,降低应用层的延时。

由于RPC服务之间是无状态的,可以水平增加机器,扩展其服务能力。但是,如何利用多个节点呢?

通常做法,可以在RPC前面加上loadbalance(LB),LB 后面挂上对应的服务节点。应用层直接访问LB,RPC节点隐藏在LB后面,对应用层不可见。 这种好处比较明显, 只要把LB的地址以及端口提供给应用层即可, 应用层不用关心LB算法,大大降低了应用调用的复杂性。这种方式在web等短连接应用是比较好的解决方案,因为在重新进行LB连接的时候,可以重新选择后端的服务。

但是,对于长链接的服务来说,这就有很大的问题。 比如,一个长链接应用A 连接到LB上, LB 随机转发到后端的RPC服务B上,由于A是长链接,后面的所有请求都会转到B上,那么就不能起到负载均衡的作用了。你可能想到,最好跟RPC服务保持长链接,不用每次调用都进行连接,释放连接。 gRPC没有提供这种负载均衡的组件, 但是暴露了负载均衡的接口,只要extends NameResolverProvider 类,实现接口方法,就能很方便的实现负载均衡模块。

关于gRPC 负载均衡的基础介绍,请参考 juejin.im/post/5cd6e6…

下面介绍如何通过zookeeper实现负载均衡的NameResolver

ZkNameResolverProvider实现

public class ZkNameResolverProvider extends NameResolverProvider {
    @Override
    protected boolean isAvailable() {
        return true;
    }

    @Override
    protected int priority() {
        return 5;
    }

    @Nullable
    @Override
    public NameResolver newNameResolver(URI targetUri, Attributes params) {
        return new ZkNameResolver(targetUri);
    }

    @Override
    public String getDefaultScheme() {
        return "zk";
    }
}
复制代码

ZkNameResolver实现

public class ZkNameResolver extends NameResolver implements Watcher {
    private URI zkUri;
    private ZooKeeper zoo;
    private Listener listener;
    private final int ZK_CONN_TIMEOUT = 3000;
    private final String ZK_PATH = "/grpc_server_list";

    ZkNameResolver(URI zkUri) {
        this.zkUri = zkUri;
    }

    @Override
    public String getServiceAuthority() {
        return zkUri.getAuthority();
    }

    @Override
    public void start(Listener listener) {
        this.listener = listener;
        final CountDownLatch latch = new CountDownLatch(1);
        String zkAddr = zkUri.getHost() + ":" + zkUri.getPort();
        System.out.printf("connect to zookeeper server %s", zkAddr);
        try {
            this.zoo = new ZooKeeper(zkAddr, ZK_CONN_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
        } catch (IOException e) {
            System.out.printf(e);
            System.out.printf("connect to zookeeper failed, JVM exited [%s]", e.getMessage());
            System.exit(1);
        }
        try {
            latch.await();
            System.out.printf("connect to zookeeper succeed");
        } catch (InterruptedException e) {
            System.out.printf(e);
            System.out.printf("CountDownLatch interrupted, JVM exited [%s]", e.getMessage());
            System.exit(1);
        }
        try {
            Stat stat = zoo.exists(ZK_PATH, true);
            if (stat == null) {
                System.out.printf("%s not exists", ZK_PATH);
            } else {
                System.out.printf("%s exists", ZK_PATH);
            }
        } catch (KeeperException | InterruptedException e) {
            System.out.printf(e);
        }

        try {
            List<String> children = zoo.getChildren(ZK_PATH, this);
            addServersToListener(children);
        } catch (KeeperException | InterruptedException e) {
            System.out.printf(e);
            System.out.printf("get children of %s failed [%s], JVM exited", ZK_PATH, e.getMessage());
            System.exit(1);
        }
    }
    // 把zookeeper ZK_PATH的子节点作为rpc的节点地址,注册到gRPC负载均衡服务中
    private void addServersToListener(List<String> servers) {
        System.out.printf("rpc servers:%s", servers);
        ArrayList<EquivalentAddressGroup> addressGroups = new ArrayList<EquivalentAddressGroup>();
        for (String server : servers) {
            List<SocketAddress> socketAddresses = new ArrayList<SocketAddress>();
            String[] address = server.split(":");
            socketAddresses.add(new InetSocketAddress(address[0], Integer.parseInt(address[1])));
            addressGroups.add(new EquivalentAddressGroup(socketAddresses));
        }
        if (addressGroups.size() > 0) {
            listener.onAddresses(addressGroups, Attributes.EMPTY);
        } else {
            System.out.printf("No servers find, keep looking");
        }
    }

    @Override
    public void shutdown() {
        try {
            zoo.close();
        } catch (InterruptedException e) {
            System.out.printf(e);
        }
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.None) {
            System.out.printf("Zookeeper connection expired");
        } else {
            try {
                List<String> children = zoo.getChildren(ZK_PATH, false);
                addServersToListener(children);
                zoo.getChildren(ZK_PATH, true);
            } catch (Exception e) {
                System.out.printf(e);
            }
        }
    }
}
复制代码
/grpc_server_list/rpc_host:50010
listener.onAddresses

channel 创建

this.channel = ManagedChannelBuilder
// 配置zk地址
.forTarget("zk://zkhost:2181")
// 配置NameResolverProvider实现类
.nameResolverFactory(new ZkNameResolverProvider())
.enableRetry()
.maxRetryAttempts(5)
.keepAliveTime(5, TimeUnit.MINUTES)
.keepAliveWithoutCalls(true)
.keepAliveTimeout(10, TimeUnit.MINUTES)
.idleTimeout(24, TimeUnit.HOURS)
// 轮询策略
.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
.usePlaintext()
.build();
复制代码
  • forTarget("zk://zkhost:2181") 配置zookeeper链接地址
  • nameResolverFactory(new ZkNameResolverProvider()) 配置 NameResolverProvider 实现类,让gRPC通过 ZkNameResolverProvider 查找可用的服务节点地址
  • 调用rpc服务,会根据配置的 loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()) 策略进行轮询调用对应的后端服务

总结

gRPC 提供了非常灵活的的负载均衡接口,通过实现接口, 可以很方便的实现负载均衡。 通过自定义的负载均衡机制,可以保证调用方与每个rpc保持长链接,大大提高了rpc的网络开销,同时轮询到每个rpc服务上,扩展了rpc的响应能力。 通过zookeeper可以watch机制,监听特定path( /grpc_server_list )子节点的增加或者删除,动态实现服务的注册于下线,大大提高了后端服务水平扩展的便捷性。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Understanding Machine Learning

Understanding Machine Learning

Shai Shalev-Shwartz、Shai Ben-David / Cambridge University Press / 2014 / USD 48.51

Machine learning is one of the fastest growing areas of computer science, with far-reaching applications. The aim of this textbook is to introduce machine learning, and the algorithmic paradigms it of......一起来看看 《Understanding Machine Learning》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具