内容简介:使用 Netty 构建 Rpc 中间件(一)
Rpc中间件是目前互联网企业用的最多的中间件,是实现分布式系统最基础的中间件系统。在国内,用的最多的就是Dubbo以及Thrift。在国外,包括grpc,以及Finagle。Rpc的原理大同小异,都是利用TCP/IP协议将要本地要调用的类,方法,参数按照某种协议传输到远程主机,远程主机执行完毕以后再返回到本地主机。
当然其真正的实现非常复杂,涉及到IO,网络,多线程以及整个框架的架构设计。那么接下来的几篇文章(包括这篇文章)就来实现一个简单的基本的RPC框架,NIO框架使用的是Netty,原因你懂得。
定义一个简单的服务
假设有一个叫做IDemoService的简单服务。这个服务放在了contract包中间:
public interface IDemoService { public int sum(int a,int b); }
本地机器只有接口,实现是在远端实现的。
那么在本地调用的时候肯定是通过代理走网络发送到远程主机。如果使用静态代理,那么每个接口都必须实现一个代理类,所以一般来说没有哪个RPC框架使用静态代理,都是使用动态代理:
public class JDKDynamicService<T> implements InvocationHandler { private Class<T> clazz; private RpcClient client = new RpcClient("127.0.0.1", 6666); public void setClass(Class<T> clazz) { this.clazz = clazz; } @SuppressWarnings("unchecked") public T get() { return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class<?>[] { this.clazz }, this); } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { return client.sendCommand(clazz, method, args); } }
使用动态代理,调用被代理类的每一个方法都会调用invoke方法。在invoke方法内部,调用RpcClient来传输协议和返回结果。
构建一个简单的传输协议
本地主机要调用远程接口,肯定要告诉远程主机调用哪个类的哪个接口,参数是什么。这里就简单的定一个传输的协议类:
public class MethodInvoker implements Serializable { private static final long serialVersionUID = 6644047311439601478L; private Class clazz; private String method; private Object[] args; public MethodInvoker(String method, Object[] args, Class clazz) { super(); this.method = method; this.args = args; this.clazz = clazz; } public String getMethod() { return method; } public void setMethod(String method) { this.method = method; } public Object[] getArgs() { return args; } public void setArgs(Object[] args) { this.args = args; } public Class getClazz() { return clazz; } public void setClazz(Class clazz) { this.clazz = clazz; } }
这个类就不具体分析了,原因你懂得,继承Serializable接口是为了使用 JAVA 自带的序列化协议。
使用RPCClient实际发送数据
public class RpcClient { private String host; private int port; public RpcClient(String host, int port) { super(); this.host = host; this.port = port; } public Object sendCommand(Class clazz, Method method, Object[] args) { MethodInvoker invoker = new MethodInvoker(method.getName(), args, clazz); final ClientHandler clientHandler = new ClientHandler(invoker); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ObjectDecoder(1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(clientHandler); } }); // Start the client. ChannelFuture f = b.connect(new InetSocketAddress(host, port)).sync(); // Wait until the connection is closed. 当一个任务完成的时候会继续执行。 f.channel().closeFuture().sync(); return clientHandler.getResponse(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } return null; } }
ClientHandler 用来向服务端发送数据:
public class ClientHandler extends ChannelInboundHandlerAdapter { private Object response; private MethodInvoker methodInvoker; public ClientHandler(MethodInvoker methodInvoker) { this.methodInvoker = methodInvoker; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(this.methodInvoker); // 发送到下一个Handler。input处理完,就输出,然后output。 } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channelRead:"+msg); response = msg; ctx.close(); } public Object getResponse() { return response; } }
值得注意的是,被传输对象的编码和解码使用了Netty自带的编码与解码器。此外,一定要调用ctx.close()方法来关闭这个链接。
server端业务的实现
假设server端实现了IDemoService,并且使用Spring来管理bean对象:
public class DemoServiceImpl implements IDemoService { public int sum(int a, int b) { return a+b; } }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="demoService" class="com.slowlizard.rpc.server.business.DemoServiceImpl"></bean> </beans>
server端接受传过来的数据
public class ServerHandler extends ChannelInboundHandlerAdapter { private static ApplicationContext springApplicationContext; static { springApplicationContext = new ClassPathXmlApplicationContext("context.xml"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server come here:channelRead"); MethodInvoker methodInvoker = (MethodInvoker) msg; Object service = springApplicationContext.getBean(methodInvoker.getClazz()); Method[] methods = service.getClass().getDeclaredMethods(); for (Method method : methods) { if (method.getName().equals(methodInvoker.getMethod())) { Object result = method.invoke(service, methodInvoker.getArgs()); ctx.writeAndFlush(result); } } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("server come here:channelActive"); } }
启动server
public class Server { private static int port = 6666; public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workGroup) .channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new ObjectDecoder(1024*1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())) ); arg0.pipeline().addLast(new ObjectEncoder()); arg0.pipeline().addLast(new ServerHandler()); } } ).option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true); // 保持长连接状态 // 绑定端口,开始接收进来的连接 ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync();// 子线程开始监听 } catch (Exception e) { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
测试
public class App { public static void main(String[] args) { JDKDynamicService<IDemoService> proxy = new JDKDynamicService<IDemoService>(); proxy.setClass(IDemoService.class); IDemoService service = proxy.get(); System.out.println("result" + service.sum(1, 2)); } }
很快,我们就是实现了一个简单的RPC远程调用。但是它只是一个原理性的示范,离真正的RPC框架还非常远。
首先,它的性能怎么样?
RpcClient每次发送协议到服务端,都会建立一个新的连接,能否优化它?
Server端能更快的查找到Bean并更快执行吗?
客户端能否与Spring集成?
Server端 Netty传输能否与Spring分离?
在下一章,我们将着手优化这个“框架”,来解决目前遇到的问题。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 【02-中间件】构建go web框架
- 使用Netty构建Rpc中间件(一)
- 蚂蚁金服启动分布式中间件开源计划:构建云原生架构
- 小米 Go 开发实践:用 Go 构建高性能数据库中间件
- 消息中间件面试题:消息中间件的高可用
- Django中间件
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
ES6标准入门(第3版)
阮一峰 / 电子工业出版社 / 2017-9 / 99.00
ES6是下一代JavaScript语言标准的统称,每年6月发布一次修订版,迄今为止已经发布了3个版本,分别是ES2015、ES2016、ES2017。本书根据ES2017标准,详尽介绍了所有新增的语法,对基本概念、设计目的和用法进行了清晰的讲解,给出了大量简单易懂的示例。本书为中级难度,适合那些已经对JavaScript语言有一定了解的读者,可以作为学习这门语言最新进展的工具书,也可以作为参考手册......一起来看看 《ES6标准入门(第3版)》 这本书的介绍吧!