gRPC 基于zookeeper实现负载均衡

栏目: 编程工具 · 发布时间: 5年前

内容简介:gRPC是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。gRPC通过其插件机制,可以很灵活的实现负载均衡、调用链、健康检查、权限认证等模块,本文主要介绍如何通过gRPC定义的接口实现负载均衡功能。RPC服务的除了解决跨语言调用的问题、模块解耦,重要的一点是通过模块的微服务化,可以水平扩展RPC服务的节点,应用层通过异步调用多个服务,降低应用层的延时。

gRPC是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。

gRPC通过其插件机制,可以很灵活的实现负载均衡、调用链、健康检查、权限认证等模块,本文主要介绍如何通过gRPC定义的接口实现负载均衡功能。

负载均衡方案

RPC服务的除了解决跨语言调用的问题、模块解耦,重要的一点是通过模块的微服务化,可以水平扩展RPC服务的节点,应用层通过异步调用多个服务,降低应用层的延时。

由于RPC服务之间是无状态的,可以水平增加机器,扩展其服务能力。但是,如何利用多个节点呢?

通常做法,可以在RPC前面加上loadbalance(LB),LB 后面挂上对应的服务节点。应用层直接访问LB,RPC节点隐藏在LB后面,对应用层不可见。 这种好处比较明显, 只要把LB的地址以及端口提供给应用层即可, 应用层不用关心LB算法,大大降低了应用调用的复杂性。这种方式在web等短连接应用是比较好的解决方案,因为在重新进行LB连接的时候,可以重新选择后端的服务。

但是,对于长链接的服务来说,这就有很大的问题。 比如,一个长链接应用A 连接到LB上, LB 随机转发到后端的RPC服务B上,由于A是长链接,后面的所有请求都会转到B上,那么就不能起到负载均衡的作用了。你可能想到,最好跟RPC服务保持长链接,不用每次调用都进行连接,释放连接。 gRPC没有提供这种负载均衡的组件, 但是暴露了负载均衡的接口,只要extends NameResolverProvider 类,实现接口方法,就能很方便的实现负载均衡模块。

关于gRPC 负载均衡的基础介绍,请参考 juejin.im/post/5cd6e6…

下面介绍如何通过zookeeper实现负载均衡的NameResolver

ZkNameResolverProvider实现

public class ZkNameResolverProvider extends NameResolverProvider {
    @Override
    protected boolean isAvailable() {
        return true;
    }

    @Override
    protected int priority() {
        return 5;
    }

    @Nullable
    @Override
    public NameResolver newNameResolver(URI targetUri, Attributes params) {
        return new ZkNameResolver(targetUri);
    }

    @Override
    public String getDefaultScheme() {
        return "zk";
    }
}
复制代码

ZkNameResolver实现

public class ZkNameResolver extends NameResolver implements Watcher {
    private URI zkUri;
    private ZooKeeper zoo;
    private Listener listener;
    private final int ZK_CONN_TIMEOUT = 3000;
    private final String ZK_PATH = "/grpc_server_list";

    ZkNameResolver(URI zkUri) {
        this.zkUri = zkUri;
    }

    @Override
    public String getServiceAuthority() {
        return zkUri.getAuthority();
    }

    @Override
    public void start(Listener listener) {
        this.listener = listener;
        final CountDownLatch latch = new CountDownLatch(1);
        String zkAddr = zkUri.getHost() + ":" + zkUri.getPort();
        System.out.printf("connect to zookeeper server %s", zkAddr);
        try {
            this.zoo = new ZooKeeper(zkAddr, ZK_CONN_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
        } catch (IOException e) {
            System.out.printf(e);
            System.out.printf("connect to zookeeper failed, JVM exited [%s]", e.getMessage());
            System.exit(1);
        }
        try {
            latch.await();
            System.out.printf("connect to zookeeper succeed");
        } catch (InterruptedException e) {
            System.out.printf(e);
            System.out.printf("CountDownLatch interrupted, JVM exited [%s]", e.getMessage());
            System.exit(1);
        }
        try {
            Stat stat = zoo.exists(ZK_PATH, true);
            if (stat == null) {
                System.out.printf("%s not exists", ZK_PATH);
            } else {
                System.out.printf("%s exists", ZK_PATH);
            }
        } catch (KeeperException | InterruptedException e) {
            System.out.printf(e);
        }

        try {
            List<String> children = zoo.getChildren(ZK_PATH, this);
            addServersToListener(children);
        } catch (KeeperException | InterruptedException e) {
            System.out.printf(e);
            System.out.printf("get children of %s failed [%s], JVM exited", ZK_PATH, e.getMessage());
            System.exit(1);
        }
    }
    // 把zookeeper ZK_PATH的子节点作为rpc的节点地址,注册到gRPC负载均衡服务中
    private void addServersToListener(List<String> servers) {
        System.out.printf("rpc servers:%s", servers);
        ArrayList<EquivalentAddressGroup> addressGroups = new ArrayList<EquivalentAddressGroup>();
        for (String server : servers) {
            List<SocketAddress> socketAddresses = new ArrayList<SocketAddress>();
            String[] address = server.split(":");
            socketAddresses.add(new InetSocketAddress(address[0], Integer.parseInt(address[1])));
            addressGroups.add(new EquivalentAddressGroup(socketAddresses));
        }
        if (addressGroups.size() > 0) {
            listener.onAddresses(addressGroups, Attributes.EMPTY);
        } else {
            System.out.printf("No servers find, keep looking");
        }
    }

    @Override
    public void shutdown() {
        try {
            zoo.close();
        } catch (InterruptedException e) {
            System.out.printf(e);
        }
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.None) {
            System.out.printf("Zookeeper connection expired");
        } else {
            try {
                List<String> children = zoo.getChildren(ZK_PATH, false);
                addServersToListener(children);
                zoo.getChildren(ZK_PATH, true);
            } catch (Exception e) {
                System.out.printf(e);
            }
        }
    }
}
复制代码
/grpc_server_list/rpc_host:50010
listener.onAddresses

channel 创建

this.channel = ManagedChannelBuilder
// 配置zk地址
.forTarget("zk://zkhost:2181")
// 配置NameResolverProvider实现类
.nameResolverFactory(new ZkNameResolverProvider())
.enableRetry()
.maxRetryAttempts(5)
.keepAliveTime(5, TimeUnit.MINUTES)
.keepAliveWithoutCalls(true)
.keepAliveTimeout(10, TimeUnit.MINUTES)
.idleTimeout(24, TimeUnit.HOURS)
// 轮询策略
.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
.usePlaintext()
.build();
复制代码
  • forTarget("zk://zkhost:2181") 配置zookeeper链接地址
  • nameResolverFactory(new ZkNameResolverProvider()) 配置 NameResolverProvider 实现类,让gRPC通过 ZkNameResolverProvider 查找可用的服务节点地址
  • 调用rpc服务,会根据配置的 loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()) 策略进行轮询调用对应的后端服务

总结

gRPC 提供了非常灵活的的负载均衡接口,通过实现接口, 可以很方便的实现负载均衡。 通过自定义的负载均衡机制,可以保证调用方与每个rpc保持长链接,大大提高了rpc的网络开销,同时轮询到每个rpc服务上,扩展了rpc的响应能力。 通过zookeeper可以watch机制,监听特定path( /grpc_server_list )子节点的增加或者删除,动态实现服务的注册于下线,大大提高了后端服务水平扩展的便捷性。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

必然

必然

凯文·凯利 (Kevin Kelly) / 周峰、董理、金阳 / 译言·东西文库/电子工业出版社 / 2016-1-1 / 58

《必然》的作者凯文·凯利,被称为“硅谷精神之父”和“世界互联网教父”。前两部《失控》和《科技想要什么》在中国出版后,引起巨大反响。书中凯文·凯利对十二种必然的科技力量加以详细的阐述,并描绘出未来三十年这些趋势如何形成合力指引我们前行的方向。 作者凯文·凯利基于过往从业经历和对未来趋势的敏锐观察对十二个关键词“形成”“知化”“流动”“屏读”“使用”“共享”“过滤”“重混”“互动”“追踪”“提问......一起来看看 《必然》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具