从零实现一个RPC框架系列文章(二):11个类实现简单RPC

栏目: Java · 发布时间: 8年前

内容简介:从零实现一个RPC框架系列文章(二):11个类实现简单RPC

https://github.com/wephone/MeiZhuoRPC/tree/1.0

在上一博文中 跟大家讲了RPC的实现思路 思路毕竟只是思路 那么这篇就带着源码给大家讲解下实现过程中的各个具体问题

读懂本篇需要的基本知识 若尚未清晰请自行了解后再阅读本文

  • java动态代理
  • netty框架的基本使用
  • spring的基本配置

最终项目的使用如下

/**
 *调用端代码及spring配置
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"file:src/test/java/rpcTest/ClientContext.xml"})
public class Client {

    @Test
    public void start(){
        Service service= (Service) RPC.call(Service.class);
        System.out.println("测试Integer,Double类型传参与返回String对象:"+service.stringMethodIntegerArgsTest(233,666.66));
        //输出string233666.66
    }

}

/**
 *Service抽象及其实现
 *调用与实现端共同依赖Service
 */
public interface Service {
    String stringMethodIntegerArgsTest(Integer a,Double b);
}
/**
 * ServiceImpl实现端对接口的具体实现
*/
public class ServiceImpl implements Service {
    @Override
    public String stringMethodIntegerArgsTest(Integer a, Double b) {
        return "String"+a+b;
    }
}

1.0版本分3个包

  • Client 调用端
  • Server 实现端
  • Core 核心方法

首先看这句代码

调用端只需如此调用 定义接口 传入接口类类型 后面调用的接口内的方法 全部是由实现端实现

Service service= (Service) RPC.call(Service.class);

这句的作用其实就是生成调用端的动态代理

/**
     * 暴露调用端使用的静态方法 为抽象接口生成动态代理对象
     * TODO 考虑后面优化不在使用时仍需强转
     * @param cls 抽象接口的类类型
     * @return 接口生成的动态代理对象
     */
    public static Object call(Class cls){
        RPCProxyHandler handler=new RPCProxyHandler();
        Object proxyObj=Proxy.newProxyInstance(cls.getClassLoader(),new Class<?>[]{cls},handler);
        return proxyObj;
    }

RPCProxyHandler为动态代理的方法被调用后的回调方法 每个方法被调用时都会执行这个invoke

/**
     * 代理抽象接口调用的方法
     * 发送方法信息给服务端 加锁等待服务端返回
     * @param proxy
     * @param method
     * @param args
     * @return
     * @throws Throwable
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RPCRequest request=new RPCRequest();
        request.setRequestID(buildRequestID(method.getName()));
        request.setClassName(method.getDeclaringClass().getName());//返回表示声明由此 Method 对象表示的方法的类或接口的Class对象
        request.setMethodName(method.getName());
//        request.setParameterTypes(method.getParameterTypes());//返回形参类型
        request.setParameters(args);//输入的实参
        RPCRequestNet.requestLockMap.put(request.getRequestID(),request);
        RPCRequestNet.connect().send(request);
        //调用用结束后移除对应的condition映射关系
        RPCRequestNet.requestLockMap.remove(request.getRequestID());
        return request.getResult();//目标方法的返回结果
    }

也就是收集对应调用的接口的信息 然后send给实现端 那么这个requestLockMap又是作何作用的呢

  • 由于我们的 网络调用都是异步
  • 但是 RPC调用都要做到同步 等待这个远程调用方法完全返回后再继续执行
  • 所以将每个请求的 request对象作为对象锁 每个请求发送后加锁 等到网络异步调用返回后再释放所
  • 生成每个请求的ID 这里我用随机数加时间戳
  • 将请求ID和请求对象维护在静态全局的一个map 中 实现端通过ID来对应是哪个请求
  • 异步调用返回后 通过ID notify唤醒对应请求对象的线程 netty异步返回的调用 释放对象锁
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String responseJson= (String) msg;
        RPCResponse response= (RPCResponse) RPC.responseDecode(responseJson);
        synchronized (RPCRequestNet.requestLockMap.get(response.getRequestID())) {
            //唤醒在该对象锁上wait的线程
            RPCRequest request= (RPCRequest) RPCRequestNet.requestLockMap.get(response.getRequestID());
            request.setResult(response.getResult());
            request.notifyAll();
        }
    }

接下来是RPCRequestNet.connect().send(request);方法 connect方法其实是单例模式返回RPCRequestNet实例 RPCRequestNet构造方法是使用netty对实现端进行TCP链接 send方法如下

try {
            //判断连接是否已完成 只在连接启动时会产生阻塞
            if (RPCRequestHandler.channelCtx==null){
                connectlock.lock();
                //挂起等待连接成功
                System.out.println("正在等待连接实现端");
                connectCondition.await();
                connectlock.unlock();
            }
            //编解码对象为json 发送请求
            String requestJson= null;
            try {
                requestJson = RPC.requestEncode(request);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            ByteBuf requestBuf= Unpooled.copiedBuffer(requestJson.getBytes());
            RPCRequestHandler.channelCtx.writeAndFlush(requestBuf);
            System.out.println("调用"+request.getRequestID()+"已发送");
            //挂起等待实现端处理完毕返回 TODO 后续配置超时时间
            synchronized (request) {
                //放弃对象锁 并阻塞等待notify
                request.wait();
            }
            System.out.println("调用"+request.getRequestID()+"接收完毕");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

condition和lock同样是为了同步等待异步IO返回用的 send方法基本是编解码json后发送给实现端

调用端基本实现综上所述 代理 发送 同步锁

下面是服务端的使用和实现

/**
 *实现端代码及spring配置
 */
 @RunWith(SpringJUnit4ClassRunner.class)
 @ContextConfiguration(locations={"file:src/test/java/rpcTest/ServerContext.xml"})
 public class Server {
 
     @Test
     public void start(){
         //启动spring后才可启动 防止容器尚未加载完毕
         RPC.start();
     }
 }

出了配置spring之外 实现端就一句 RPC.start() 其实就是启动netty服务器 服务端的处理客户端信息回调如下

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
        String requestJson= (String) msg;
        System.out.println("receive request:"+requestJson);
        RPCRequest request= RPC.requestDeocde(requestJson);
        Object result=InvokeServiceUtil.invoke(request);
        //netty的write方法并没有直接写入通道(为避免多次唤醒多路复用选择器)
        //而是把待发送的消息放到缓冲数组中,flush方法再全部写到通道中
//        ctx.write(resp);
        //记得加分隔符 不然客户端一直不会处理
        RPCResponse response=new RPCResponse();
        response.setRequestID(request.getRequestID());
        response.setResult(result);
        String respStr=RPC.responseEncode(response);
        ByteBuf responseBuf= Unpooled.copiedBuffer(respStr.getBytes());
        ctx.writeAndFlush(responseBuf);
    }

主要是编解码json 反射对应的方法 我们看看反射的 工具

/**
     * 反射调用相应实现类并结果
     * @param request
     * @return
     */
    public static Object invoke(RPCRequest request){
        Object result=null;//内部变量必须赋值 全局变量才不用
        //实现类名
        String implClassName= RPC.getServerConfig().getServerImplMap().get(request.getClassName());
        try {
            Class implClass=Class.forName(implClassName);
            Object[] parameters=request.getParameters();
            int parameterNums=request.getParameters().length;
            Class[] parameterTypes=new Class[parameterNums];
            for (int i = 0; i <parameterNums ; i++) {
                parameterTypes[i]=parameters[i].getClass();
            }
            Method method=implClass.getDeclaredMethod(request.getMethodName(),parameterTypes);
            Object implObj=implClass.newInstance();
            result=method.invoke(implObj,parameters);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
        return result;
    }

解析Parameters getClass获取他们的类类型 反射调用对应的方法

这里需要注意一个点

  • 本文最初采用Gson处理json 但 gson默认会把int类型转为double类型 例如2变为2.0 不适用本场景 我也不想去专门适配
  • 所以换用了jackson
  • 常见json处理框架 反序列化为对象时 int,long等基本类型都会变成他们的包装类Integer Long
  • 所以 本例程中 远程调度接口方法的形参不可以使用int等基本类型
  • 否则method.invoke(implObj,parameters);会找不到对应的方法报错
  • 因为parameters已经是包装类了 而method还是int这些基本类 所以找不到对应方法

最后是借助spring配置基础配置 我写了两个类 ServerConfig ClientConfig 作为调用端和服务端的配置 只需 在spring中配置这两个bean 并启动IOC容器即可

调用端

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean class="org.meizhuo.rpc.client.ClientConfig">
        <property name="host" value="127.0.0.1"></property>
        <property name="port" value="9999"></property>
    </bean>
</beans>

实现端

<?xml version="1.0" encoding="UTF-8"?>
 <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
     <bean class="org.meizhuo.rpc.server.ServerConfig">
         <property name="port" value="9999"></property>
         <property name="serverImplMap">
             <map>
                 <!--配置对应的抽象接口及其实现-->
                 <entry key="rpcTest.Service" value="rpcTest.ServiceImpl"></entry>
             </map>
         </property>
     </bean>
 </beans>

最后有个小问题

我们的框架是作为一个依赖包引入的 我们不可能在我们的框架中读取对应的spring xml 这样完全是去了框架的灵活性 那我们怎么在 运行过程中获得我们所处于的IOC容器 已获得我们的正确配置信息呢 答案是spring提供的 ApplicationContextAware 接口

/**
 * Created by wephone on 17-12-26.
 */
public class ClientConfig implements ApplicationContextAware {

    private String host;
    private int port;
    //调用超时时间
    private long overtime;

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public long getOvertime() {
        return overtime;
    }

    public void setOvertime(long overtime) {
        this.overtime = overtime;
    }

    /**
     * 加载Spring配置文件时,如果Spring配置文件中所定义的Bean类
     * 如果该类实现了ApplicationContextAware接口
     * 那么在加载Spring配置文件时,会自动调用ApplicationContextAware接口中的
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RPC.clientContext=applicationContext;
    }
}

这样我们在RPC类内部就维护了一个静态IOC容器的context 只需如此获取配置 RPC.getServerConfig().getPort()

public static ServerConfig getServerConfig(){
        return serverContext.getBean(ServerConfig.class);
    }

就这样 这个RPC框架的核心部分 已经讲述完毕了

本例程仅为1.0版本 后续博客中 会加入异常处理 zookeeper支持 负载均衡策略等博客:zookeeper支持 欢迎持续关注 欢迎star 提issue


以上所述就是小编给大家介绍的《从零实现一个RPC框架系列文章(二):11个类实现简单RPC》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Build Your Own Web Site the Right Way Using HTML & CSS

Build Your Own Web Site the Right Way Using HTML & CSS

Ian Lloyd / SitePoint / 2006-05-02 / USD 29.95

Build Your Own Website The Right Way Using HTML & CSS teaches web development from scratch, without assuming any previous knowledge of HTML, CSS or web development techniques. This book introduces you......一起来看看 《Build Your Own Web Site the Right Way Using HTML & CSS》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具