内容简介:gRPC是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。gRPC通过其插件机制,可以很灵活的实现负载均衡、调用链、健康检查、权限认证等模块,本文主要介绍如何通过gRPC定义的接口实现负载均衡功能。RPC服务的除了解决跨语言调用的问题、模块解耦,重要的一点是通过模块的微服务化,可以水平扩展RPC服务的节点,应用层通过异步调用多个服务,降低应用层的延时。
gRPC是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。
gRPC通过其插件机制,可以很灵活的实现负载均衡、调用链、健康检查、权限认证等模块,本文主要介绍如何通过gRPC定义的接口实现负载均衡功能。
负载均衡方案
RPC服务的除了解决跨语言调用的问题、模块解耦,重要的一点是通过模块的微服务化,可以水平扩展RPC服务的节点,应用层通过异步调用多个服务,降低应用层的延时。
由于RPC服务之间是无状态的,可以水平增加机器,扩展其服务能力。但是,如何利用多个节点呢?
通常做法,可以在RPC前面加上loadbalance(LB),LB 后面挂上对应的服务节点。应用层直接访问LB,RPC节点隐藏在LB后面,对应用层不可见。 这种好处比较明显, 只要把LB的地址以及端口提供给应用层即可, 应用层不用关心LB算法,大大降低了应用调用的复杂性。这种方式在web等短连接应用是比较好的解决方案,因为在重新进行LB连接的时候,可以重新选择后端的服务。
但是,对于长链接的服务来说,这就有很大的问题。
比如,一个长链接应用A 连接到LB上, LB 随机转发到后端的RPC服务B上,由于A是长链接,后面的所有请求都会转到B上,那么就不能起到负载均衡的作用了。你可能想到,最好跟RPC服务保持长链接,不用每次调用都进行连接,释放连接。 gRPC没有提供这种负载均衡的组件, 但是暴露了负载均衡的接口,只要extends NameResolverProvider
类,实现接口方法,就能很方便的实现负载均衡模块。
关于gRPC 负载均衡的基础介绍,请参考 juejin.im/post/5cd6e6…
下面介绍如何通过zookeeper实现负载均衡的NameResolver
ZkNameResolverProvider实现
public class ZkNameResolverProvider extends NameResolverProvider { @Override protected boolean isAvailable() { return true; } @Override protected int priority() { return 5; } @Nullable @Override public NameResolver newNameResolver(URI targetUri, Attributes params) { return new ZkNameResolver(targetUri); } @Override public String getDefaultScheme() { return "zk"; } } 复制代码
ZkNameResolver实现
public class ZkNameResolver extends NameResolver implements Watcher { private URI zkUri; private ZooKeeper zoo; private Listener listener; private final int ZK_CONN_TIMEOUT = 3000; private final String ZK_PATH = "/grpc_server_list"; ZkNameResolver(URI zkUri) { this.zkUri = zkUri; } @Override public String getServiceAuthority() { return zkUri.getAuthority(); } @Override public void start(Listener listener) { this.listener = listener; final CountDownLatch latch = new CountDownLatch(1); String zkAddr = zkUri.getHost() + ":" + zkUri.getPort(); System.out.printf("connect to zookeeper server %s", zkAddr); try { this.zoo = new ZooKeeper(zkAddr, ZK_CONN_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); } } }); } catch (IOException e) { System.out.printf(e); System.out.printf("connect to zookeeper failed, JVM exited [%s]", e.getMessage()); System.exit(1); } try { latch.await(); System.out.printf("connect to zookeeper succeed"); } catch (InterruptedException e) { System.out.printf(e); System.out.printf("CountDownLatch interrupted, JVM exited [%s]", e.getMessage()); System.exit(1); } try { Stat stat = zoo.exists(ZK_PATH, true); if (stat == null) { System.out.printf("%s not exists", ZK_PATH); } else { System.out.printf("%s exists", ZK_PATH); } } catch (KeeperException | InterruptedException e) { System.out.printf(e); } try { List<String> children = zoo.getChildren(ZK_PATH, this); addServersToListener(children); } catch (KeeperException | InterruptedException e) { System.out.printf(e); System.out.printf("get children of %s failed [%s], JVM exited", ZK_PATH, e.getMessage()); System.exit(1); } } // 把zookeeper ZK_PATH的子节点作为rpc的节点地址,注册到gRPC负载均衡服务中 private void addServersToListener(List<String> servers) { System.out.printf("rpc servers:%s", servers); ArrayList<EquivalentAddressGroup> addressGroups = new ArrayList<EquivalentAddressGroup>(); for (String server : servers) { List<SocketAddress> socketAddresses = new ArrayList<SocketAddress>(); String[] address = server.split(":"); socketAddresses.add(new InetSocketAddress(address[0], Integer.parseInt(address[1]))); addressGroups.add(new EquivalentAddressGroup(socketAddresses)); } if (addressGroups.size() > 0) { listener.onAddresses(addressGroups, Attributes.EMPTY); } else { System.out.printf("No servers find, keep looking"); } } @Override public void shutdown() { try { zoo.close(); } catch (InterruptedException e) { System.out.printf(e); } } @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.None) { System.out.printf("Zookeeper connection expired"); } else { try { List<String> children = zoo.getChildren(ZK_PATH, false); addServersToListener(children); zoo.getChildren(ZK_PATH, true); } catch (Exception e) { System.out.printf(e); } } } } 复制代码
/grpc_server_list/rpc_host:50010 listener.onAddresses
channel 创建
this.channel = ManagedChannelBuilder // 配置zk地址 .forTarget("zk://zkhost:2181") // 配置NameResolverProvider实现类 .nameResolverFactory(new ZkNameResolverProvider()) .enableRetry() .maxRetryAttempts(5) .keepAliveTime(5, TimeUnit.MINUTES) .keepAliveWithoutCalls(true) .keepAliveTimeout(10, TimeUnit.MINUTES) .idleTimeout(24, TimeUnit.HOURS) // 轮询策略 .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()) .usePlaintext() .build(); 复制代码
-
forTarget("zk://zkhost:2181")
配置zookeeper链接地址 -
nameResolverFactory(new ZkNameResolverProvider())
配置NameResolverProvider
实现类,让gRPC通过ZkNameResolverProvider
查找可用的服务节点地址 -
调用rpc服务,会根据配置的
loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
策略进行轮询调用对应的后端服务
总结
gRPC 提供了非常灵活的的负载均衡接口,通过实现接口, 可以很方便的实现负载均衡。 通过自定义的负载均衡机制,可以保证调用方与每个rpc保持长链接,大大提高了rpc的网络开销,同时轮询到每个rpc服务上,扩展了rpc的响应能力。
通过zookeeper可以watch机制,监听特定path( /grpc_server_list
)子节点的增加或者删除,动态实现服务的注册于下线,大大提高了后端服务水平扩展的便捷性。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。