使用 Netty 构建 Rpc 中间件(一)

栏目: Java · 发布时间: 8年前

内容简介:使用 Netty 构建 Rpc 中间件(一)

Rpc中间件是目前互联网企业用的最多的中间件,是实现分布式系统最基础的中间件系统。在国内,用的最多的就是Dubbo以及Thrift。在国外,包括grpc,以及Finagle。Rpc的原理大同小异,都是利用TCP/IP协议将要本地要调用的类,方法,参数按照某种协议传输到远程主机,远程主机执行完毕以后再返回到本地主机。

当然其真正的实现非常复杂,涉及到IO,网络,多线程以及整个框架的架构设计。那么接下来的几篇文章(包括这篇文章)就来实现一个简单的基本的RPC框架,NIO框架使用的是Netty,原因你懂得。

定义一个简单的服务

假设有一个叫做IDemoService的简单服务。这个服务放在了contract包中间:

public interface IDemoService 
{
    public   int  sum(int a,int  b);

}

本地机器只有接口,实现是在远端实现的。

那么在本地调用的时候肯定是通过代理走网络发送到远程主机。如果使用静态代理,那么每个接口都必须实现一个代理类,所以一般来说没有哪个RPC框架使用静态代理,都是使用动态代理:

public class JDKDynamicService<T> implements InvocationHandler {

    private Class<T> clazz;

    private RpcClient client = new RpcClient("127.0.0.1", 6666);

    public void setClass(Class<T> clazz) {
        this.clazz = clazz;

    }

    @SuppressWarnings("unchecked")
    public T get() {

        return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class<?>[] { this.clazz }, this);

    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        return client.sendCommand(clazz, method, args);

    }

}

使用动态代理,调用被代理类的每一个方法都会调用invoke方法。在invoke方法内部,调用RpcClient来传输协议和返回结果。

构建一个简单的传输协议

本地主机要调用远程接口,肯定要告诉远程主机调用哪个类的哪个接口,参数是什么。这里就简单的定一个传输的协议类:

public class MethodInvoker implements Serializable {
    private static final long serialVersionUID = 6644047311439601478L;
    private Class clazz;
    private String method;
    private Object[] args;
    public MethodInvoker(String method, Object[] args, Class clazz) {
        super();
        this.method = method;
        this.args = args;
        this.clazz = clazz;
    }

    public String getMethod() {
        return method;
    }

    public void setMethod(String method) {
        this.method = method;
    }

    public Object[] getArgs() {
        return args;
    }

    public void setArgs(Object[] args) {
        this.args = args;
    }

    public Class getClazz() {
        return clazz;
    }
    public void setClazz(Class clazz) {
        this.clazz = clazz;
    }

}

这个类就不具体分析了,原因你懂得,继承Serializable接口是为了使用 JAVA 自带的序列化协议。

使用RPCClient实际发送数据

public class RpcClient {
    private String host;

    private int port;

    public RpcClient(String host, int port) {
        super();
        this.host = host;
        this.port = port;
    }

    public Object sendCommand(Class clazz, Method method, Object[] args) {

        MethodInvoker invoker = new MethodInvoker(method.getName(), args, clazz);
        final ClientHandler clientHandler = new ClientHandler(invoker);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ObjectDecoder(1024 * 1024,
                            ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                    ch.pipeline().addLast(new ObjectEncoder());
                    ch.pipeline().addLast(clientHandler);

                }
            });

            // Start the client.
            ChannelFuture f = b.connect(new InetSocketAddress(host, port)).sync();
            // Wait until the connection is closed. 当一个任务完成的时候会继续执行。
        f.channel().closeFuture().sync();
            return clientHandler.getResponse();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
        return null;

    }

}

ClientHandler 用来向服务端发送数据:

public class ClientHandler extends ChannelInboundHandlerAdapter {
    private Object response;
    private MethodInvoker methodInvoker;

    public ClientHandler(MethodInvoker methodInvoker) {

        this.methodInvoker = methodInvoker;

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        ctx.writeAndFlush(this.methodInvoker); // 发送到下一个Handler。input处理完,就输出,然后output。
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("channelRead:"+msg);
        response = msg;
        ctx.close();
    }

    public Object getResponse() {
        return response;
    }

}

值得注意的是,被传输对象的编码和解码使用了Netty自带的编码与解码器。此外,一定要调用ctx.close()方法来关闭这个链接。

server端业务的实现

假设server端实现了IDemoService,并且使用Spring来管理bean对象:

public class DemoServiceImpl implements  IDemoService
{

    public int sum(int a, int b) {
        return a+b;
    }

}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd">


<bean  id="demoService"  class="com.slowlizard.rpc.server.business.DemoServiceImpl"></bean>

</beans>

server端接受传过来的数据

public class ServerHandler extends ChannelInboundHandlerAdapter {
    private static ApplicationContext springApplicationContext;
    static {
        springApplicationContext = new ClassPathXmlApplicationContext("context.xml");
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server come here:channelRead");
        MethodInvoker methodInvoker = (MethodInvoker) msg;
         Object service = springApplicationContext.getBean(methodInvoker.getClazz());
        Method[] methods = service.getClass().getDeclaredMethods();
        for (Method method : methods) {
            if (method.getName().equals(methodInvoker.getMethod())) {
                Object result = method.invoke(service, methodInvoker.getArgs());
                ctx.writeAndFlush(result);
            }

        }

    }
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("server come here:channelActive");
}
}

启动server

public class Server {
    private static int port = 6666;
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel arg0) throws Exception {
                            arg0.pipeline().addLast(new ObjectDecoder(1024*1024,
                                    ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())) );
                            arg0.pipeline().addLast(new ObjectEncoder());
                            arg0.pipeline().addLast(new ServerHandler());
                        }
                    }

                    ).option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true); // 保持长连接状态
            // 绑定端口,开始接收进来的连接
            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();// 子线程开始监听
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

测试

public class App {
    public static void main(String[] args) {

        JDKDynamicService<IDemoService> proxy = new JDKDynamicService<IDemoService>();
        proxy.setClass(IDemoService.class);
        IDemoService service = proxy.get();
        System.out.println("result" + service.sum(1, 2));
    }
}

很快,我们就是实现了一个简单的RPC远程调用。但是它只是一个原理性的示范,离真正的RPC框架还非常远。

首先,它的性能怎么样?

RpcClient每次发送协议到服务端,都会建立一个新的连接,能否优化它?

Server端能更快的查找到Bean并更快执行吗?

客户端能否与Spring集成?

Server端 Netty传输能否与Spring分离?

在下一章,我们将着手优化这个“框架”,来解决目前遇到的问题。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Practical Django Projects, Second Edition

Practical Django Projects, Second Edition

James Bennett / Apress / 2009 / 44.99

Build a django content management system, blog, and social networking site with James Bennett as he introduces version 1.1 of the popular Django framework. You’ll work through the development of ea......一起来看看 《Practical Django Projects, Second Edition》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

URL 编码/解码
URL 编码/解码

URL 编码/解码