基于Netty的高性能JAVA的RPC框架

栏目: 编程工具 · 发布时间: 6年前

内容简介:前言今年7月份左右报名参加了阿里巴巴组织的高性能中间件挑战赛,这次比赛不像以往的比赛,是从一个工程的视角来比赛的。这个比赛有两个赛题,第一题是实现一个RPC框架,第二道题是实现一个Mom消息中间件。

前言

今年7月份左右报名参加了阿里巴巴组织的高性能中间件挑战赛,这次比赛不像以往的比赛,是从一个工程的视角来比赛的。

这个比赛有两个赛题,第一题是实现一个RPC框架,第二道题是实现一个Mom消息中间件。

RPC题目如下

一个简单的RPC框架

RPC(Remote Procedure Call )——远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

框架——让编程人员便捷地使用框架所提供的功能,由于RPC的特性,聚焦于应用的分布式服务化开发,所以成为一个对开发人员无感知的接口代理,显然是RPC框架优秀的设计。

题目要求

1.要成为框架:对于框架的使用者,隐藏RPC实现。

2.网络模块可以自己编写,如果要使用IO框架,要求使用netty-4.0.23.Final。

3.支持异步调用,提供future、callback的能力。

4.能够传输基本类型、自定义业务类型、异常类型(要在客户端抛出)。

5.要处理超时场景,服务端处理时间较长时,客户端在指定时间内跳出本次调用。

6.提供RPC上下文,客户端可以透传数据给服务端。

7.提供Hook,让开发人员进行RPC层面的AOP。

注:为了降低第一题的难度,RPC框架不需要注册中心,客户端识别-DSIP的JVM参数来获取服务端IP。

衡量标准

满足所有要求。 性能测试。

测试时会运行rpc-use-demo中的测试用例,测试的demo包由测试 工具 做好。

参赛者必须以com.alibaba.middleware.race.rpc.api.impl.RpcConsumerImpl为全类名,继承com.alibaba.middleware.race.rpc.api.RpcConsumer,并覆写所有的public方法。

参赛者必须以com.alibaba.middleware.race.rpc.api.impl.RpcProviderImpl为全类名,继承com.alibaba.middleware.race.rpc.api.RpcProvider,并覆写所有的public方法。

参赛者依赖公共maven中心库上的三方包,即可看到一个示例的demo,按照对应的包名,在自己的工程中建立对应的类(包名、类名一致)。

三方库里的代码起到提示的作用,可以作为参考,不要在最终的pom中依赖。

所以最终参赛者需要打出一个rpc-api的jar包,供测试工程调用。 (注意,参考完rpc-api的示例后,请从pom依赖中将其删除,避免依赖冲突)

测试Demo工程请参考Taocode SVN上的代码。

RPC的实现

题目中推荐的网络框架使用Netty4来实现,这个RPC框架中需要实现的有

  1. RPC客户端
  2. RPC服务端

RPC客户端的实现

RPC客户端和RPC服务器端需要一个相同的接口类,RPC客户端通过一个代理类来调用RPC服务器端的函数

RpcConsumerImpl的实现

......

package com.alibaba.middleware.race.rpc.api.impl;

import java.lang.reflect.InvocationHandler;

import java.lang.reflect.Method;

import java.lang.reflect.Proxy;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

import java.util.UUID;

import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.middleware.race.rpc.aop.ConsumerHook;

import com.alibaba.middleware.race.rpc.api.RpcConsumer;

import com.alibaba.middleware.race.rpc.async.ResponseCallbackListener;

import com.alibaba.middleware.race.rpc.context.RpcContext;

import com.alibaba.middleware.race.rpc.model.RpcRequest;

import com.alibaba.middleware.race.rpc.model.RpcResponse;

import com.alibaba.middleware.race.rpc.netty.RpcConnection;

import com.alibaba.middleware.race.rpc.netty.RpcNettyConnection;

import com.alibaba.middleware.race.rpc.tool.Tool;

public class RpcConsumerImpl extends RpcConsumer implements InvocationHandler {

private static AtomicLong callTimes = new AtomicLong(0L);
private RpcConnection connection;
private List<RpcConnection> connection_list;
private Map<String,ResponseCallbackListener> asyncMethods;
private Class<?> interfaceClass;

private String version;

private int timeout;

private ConsumerHook hook;

public Class<?> getInterfaceClass() {
    return interfaceClass;
}
public String getVersion() {
    return version;
}
public int getTimeout() {
    this.connection.setTimeOut(timeout);
    return timeout;
}
public ConsumerHook getHook() {
    return hook;
}
RpcConnection select()
{
    //Random rd=new Random(System.currentTimeMillis());
    int d=(int) (callTimes.getAndIncrement()%(connection_list.size()+1));
    if(d==0)
        return connection;
    else
    {
        return connection_list.get(d-1);
    }
}
public RpcConsumerImpl()
{
    //String ip=System.getProperty("SIP");
    String ip="127.0.0.1";
    this.asyncMethods=new HashMap<String,ResponseCallbackListener>();
    this.connection=new RpcNettyConnection(ip,8888);
    this.connection.connect();
    connection_list=new ArrayList<RpcConnection>();
    int num=Runtime.getRuntime().availableProcessors()/3 -2;
    for (int i = 0; i < num; i++) {
        connection_list.add(new RpcNettyConnection(ip, 8888));
    }
    for (RpcConnection conn:connection_list) 
    {
        conn.connect();
    }

}
public void destroy() throws Throwable {
    if (null != connection) {
        connection.close();
    }
}

@SuppressWarnings("unchecked")
public <T> T proxy(Class<T> interfaceClass) throws Throwable {
    if (!interfaceClass.isInterface()) {
        throw new IllegalArgumentException(interfaceClass.getName()
                + " is not an interface");
    }
    return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
            new Class<?>[] { interfaceClass }, this);
}
@Override
public RpcConsumer interfaceClass(Class<?> interfaceClass) {
    // TODO Auto-generated method stub
    this.interfaceClass=interfaceClass;
    return this;
}

@Override
public RpcConsumer version(String version) {
    // TODO Auto-generated method stub
    this.version=version;
    return this;
}

@Override
public RpcConsumer clientTimeout(int clientTimeout) {
    // TODO Auto-generated method stub
    this.timeout=clientTimeout;
    return this;
}

@Override
public RpcConsumer hook(ConsumerHook hook) {
    // TODO Auto-generated method stub
    this.hook=hook;
    return this;
}

@Override
public Object instance() {
    // TODO Auto-generated method stub
    try {
        return proxy(this.interfaceClass);
    }
    catch (Throwable e) 
    {
        e.printStackTrace();
    }
    return null;
}

@Override
public void asynCall(String methodName) {
    // TODO Auto-generated method stub
     asynCall(methodName, null);
}

@Override
public <T extends ResponseCallbackListener> void asynCall(
        String methodName, T callbackListener) {

    this.asyncMethods.put(methodName, callbackListener);
    this.connection.setAsyncMethod(asyncMethods);

    for (RpcConnection conn:connection_list) 
    {
        conn.setAsyncMethod(asyncMethods);
    }
}

@Override
public void cancelAsyn(String methodName) {
    // TODO Auto-generated method stub
    this.asyncMethods.remove(methodName);
    this.connection.setAsyncMethod(asyncMethods);
    for (RpcConnection conn:connection_list) 
    {
        conn.setAsyncMethod(asyncMethods);
    }
}

@Override
public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable {
    // TODO Auto-generated method stub
    List<String> parameterTypes = new LinkedList<String>();
    for (Class<?> parameterType : method.getParameterTypes()) {
        parameterTypes.add(parameterType.getName());
    }
    RpcRequest request = new RpcRequest();
    request.setRequestId(UUID.randomUUID().toString());
    request.setClassName(method.getDeclaringClass().getName());
    request.setMethodName(method.getName());
    request.setParameterTypes(method.getParameterTypes());
    request.setParameters(args);
    if(hook!=null)
        hook.before(request);
    RpcResponse response = null;
    try
    {
        request.setContext(RpcContext.props);
        response = (RpcResponse) select().Send(request,asyncMethods.containsKey(request.getMethodName()));
        if(hook!=null)
            hook.after(request);

        if(!asyncMethods.containsKey(request.getMethodName())&&response.getExption()!=null)
        {

            Throwable e=(Throwable) Tool.deserialize(response.getExption(),response.getClazz());
            throw e.getCause();
        }
    }
    catch (Throwable t)
    {   
        //t.printStackTrace();
        //throw new RuntimeException(t);
        throw t;
    }
    finally
    {

// if(asyncMethods.containsKey(request.getMethodName())&&asyncMethods.get(request.getMethodName())!=null)

// {

// cancelAsyn(request.getMethodName());

// }

}

if(response==null)

{

return null;

}

else if (response.getErrorMsg() != null)

{

throw response.getErrorMsg();

}

else

{

return response.getAppResponse();

}

}

}

RpcConsumer consumer;

consumer = (RpcConsumer) getConsumerImplClass().newInstance();

consumer.someMethod();123

因为consumer对象是通过代理生成的,所以当consumer调用的时候,就会调用invoke函数,我们就可以把这次本地的函数调用的信息通过网络发送到RPC服务器然后等待服务器返回的信息后再返回。

服务器实现

RPC服务器主要是在收到RPC客户端之后解析出RPC调用的接口名,函数名以及参数。

package com.alibaba.middleware.race.rpc.api.impl;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import java.lang.reflect.Method;

import java.util.HashMap;

import java.util.Map;

import net.sf.cglib.reflect.FastClass;

import net.sf.cglib.reflect.FastMethod;

import com.alibaba.middleware.race.rpc.context.RpcContext;

import com.alibaba.middleware.race.rpc.model.RpcRequest;

import com.alibaba.middleware.race.rpc.model.RpcResponse;

import com.alibaba.middleware.race.rpc.serializer.KryoSerialization;

import com.alibaba.middleware.race.rpc.tool.ByteObjConverter;

import com.alibaba.middleware.race.rpc.tool.ReflectionCache;

import com.alibaba.middleware.race.rpc.tool.Tool;

/**

  • 处理服务器收到的RPC请求并返回结果
  • @author sei.zz
  • */

    public class RpcRequestHandler extends ChannelInboundHandlerAdapter {

    //对应每个请求ID和端口好 对应一个RpcContext的Map;

    private static Map<String,Map<String,Object>> ThreadLocalMap=new HashMap<String, Map<String,Object>>();

    //服务端接口-实现类的映射表

    private final Map<String, Object> handlerMap;

    KryoSerialization kryo=new KryoSerialization();

    public RpcRequestHandler(Map<String, Object> handlerMap) {

    this.handlerMap = handlerMap;

    br/>}

    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override

    br/>System.out.println("active");

    }

    @Override

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

    // TODO Auto-generated method stub

    System.out.println("disconnected");

    }

    //更新RpcContext的类容

    private void UpdateRpcContext(String host,Map<String,Object> map)

    {

    if(ThreadLocalMap.containsKey(host))

    {

    Map<String,Object> local=ThreadLocalMap.get(host);

    local.putAll(map);//把客户端的加进来

    ThreadLocalMap.put(host, local);//放回去

    for(Map.Entry<String, Object> entry:map.entrySet()){ //更新变量

    RpcContext.addProp(entry.getKey(), entry.getValue());

    }

    }

    else

    {

    ThreadLocalMap.put(host, map);

    //把对应线程的Context更新

    for(Map.Entry<String, Object> entry:map.entrySet()){

    RpcContext.addProp(entry.getKey(), entry.getValue());

    }

    }

    }

    //用来缓存住需要序列化的结果

    private static Object cacheName=null;

    private static Object cacheVaule=null;

    @Override

    public void channelRead(

    ChannelHandlerContext ctx, Object msg) throws Exception {

    RpcRequest request=(RpcRequest)msg;

    String host=ctx.channel().remoteAddress().toString();

    //更新上下文

    UpdateRpcContext(host,request.getContext());

    //TODO 获取接口名 函数名 参数 找到实现类 反射实现

    RpcResponse response = new RpcResponse();

    response.setRequestId(request.getRequestId());

    try

    {

    Object result = handle(request);

    if(cacheName!=null&&cacheName.equals(result))

    {

    response.setAppResponse(cacheVaule);

    }

    else

    {

    response.setAppResponse(ByteObjConverter.ObjectToByte(result));

    cacheName=result;

    cacheVaule=ByteObjConverter.ObjectToByte(result);

    }

    }

    catch (Throwable t)

    {

    //response.setErrorMsg(t);

    response.setExption(Tool.serialize(t));

    response.setClazz(t.getClass());

    }

    ctx.writeAndFlush(response);

    }

    /**

    • 运行调用的函数返回结果
    • @param request
    • @return
    • @throws Throwable

      */

      private static RpcRequest methodCacheName=null;

      private static Object methodCacheValue=null;

      private Object handle(RpcRequest request) throws Throwable

      {

      String className = request.getClassName();

      Object classimpl = handlerMap.get(className);//通过类名找到实现的类

      Class<?> clazz = classimpl.getClass();

      String methodName = request.getMethodName();

      Class<?>[] parameterTypes = request.getParameterTypes();

      Object[] parameters = request.getParameters();

// Method method = ReflectionCache.getMethod(clazz.getName(),methodName, parameterTypes);

// method.setAccessible(true);

//System.out.println(className+":"+methodName+":"+parameters.length);
     if(methodCacheName!=null&&methodCacheName.equals(request))
     {
         return methodCacheValue;
     }
     else
     {
         try 
         {
             methodCacheName=request;
             if(methodMap.containsKey(methodName))
             {
                 methodCacheValue= methodMap.get(methodName).invoke(classimpl, parameters);
                 return methodCacheValue;
             }
             else
             {
                 FastClass serviceFastClass = FastClass.create(clazz);
                 FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
                 methodMap.put(methodName, serviceFastMethod);
                 methodCacheValue= serviceFastMethod.invoke(classimpl, parameters);
                 return methodCacheValue;
             }
             //return method.invoke(classimpl, parameters);
         }
         catch (Throwable e) 
         {
             throw e.getCause();
         }
     }
 }
  private Map<String,FastMethod> methodMap=new HashMap<String, FastMethod>();
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
  {
      //ctx.close();
      //cause.printStackTrace();
      ctx.close();
  }
}

handel函数通过 Java 的反射机制,找到要调用的接口类然后调用对应函数然后执行,然后返回结果到客户端,本次RPC调用结束。

RPC主要的实现类在我的github上可以看见,我的这套RPC框架虽说不上完美,但是性能还是挺好的在服务器上测试时TPC有9w+。

主要的优化就是使用Neety4这个框架以及对数据包的处理,数据序列化与反序列化的速度

欢迎工作一到五年的Java工程师朋友们加入Java架构开发: 854393687

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

MySQL必知必会

MySQL必知必会

[英] Ben Forta / 刘晓霞、钟鸣 / 人民邮电出版社 / 2009-1 / 39.00元

《MySQL必知必会》MySQL是世界上最受欢迎的数据库管理系统之一。书中从介绍简单的数据检索开始,逐步深入一些复杂的内容,包括联结的使用、子查询、正则表达式和基于全文本的搜索、存储过程、游标、触发器、表约束,等等。通过重点突出的章节,条理清晰、系统而扼要地讲述了读者应该掌握的知识,使他们不经意间立刻功力大增。一起来看看 《MySQL必知必会》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

URL 编码/解码
URL 编码/解码

URL 编码/解码

SHA 加密
SHA 加密

SHA 加密工具