基于Netty的高性能JAVA的RPC框架

栏目: 编程工具 · 发布时间: 6年前

内容简介:前言今年7月份左右报名参加了阿里巴巴组织的高性能中间件挑战赛,这次比赛不像以往的比赛,是从一个工程的视角来比赛的。这个比赛有两个赛题,第一题是实现一个RPC框架,第二道题是实现一个Mom消息中间件。

前言

今年7月份左右报名参加了阿里巴巴组织的高性能中间件挑战赛,这次比赛不像以往的比赛,是从一个工程的视角来比赛的。

这个比赛有两个赛题,第一题是实现一个RPC框架,第二道题是实现一个Mom消息中间件。

RPC题目如下

一个简单的RPC框架

RPC(Remote Procedure Call )——远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

框架——让编程人员便捷地使用框架所提供的功能,由于RPC的特性,聚焦于应用的分布式服务化开发,所以成为一个对开发人员无感知的接口代理,显然是RPC框架优秀的设计。

题目要求

1.要成为框架:对于框架的使用者,隐藏RPC实现。

2.网络模块可以自己编写,如果要使用IO框架,要求使用netty-4.0.23.Final。

3.支持异步调用,提供future、callback的能力。

4.能够传输基本类型、自定义业务类型、异常类型(要在客户端抛出)。

5.要处理超时场景,服务端处理时间较长时,客户端在指定时间内跳出本次调用。

6.提供RPC上下文,客户端可以透传数据给服务端。

7.提供Hook,让开发人员进行RPC层面的AOP。

注:为了降低第一题的难度,RPC框架不需要注册中心,客户端识别-DSIP的JVM参数来获取服务端IP。

衡量标准

满足所有要求。 性能测试。

测试时会运行rpc-use-demo中的测试用例,测试的demo包由测试 工具 做好。

参赛者必须以com.alibaba.middleware.race.rpc.api.impl.RpcConsumerImpl为全类名,继承com.alibaba.middleware.race.rpc.api.RpcConsumer,并覆写所有的public方法。

参赛者必须以com.alibaba.middleware.race.rpc.api.impl.RpcProviderImpl为全类名,继承com.alibaba.middleware.race.rpc.api.RpcProvider,并覆写所有的public方法。

参赛者依赖公共maven中心库上的三方包,即可看到一个示例的demo,按照对应的包名,在自己的工程中建立对应的类(包名、类名一致)。

三方库里的代码起到提示的作用,可以作为参考,不要在最终的pom中依赖。

所以最终参赛者需要打出一个rpc-api的jar包,供测试工程调用。 (注意,参考完rpc-api的示例后,请从pom依赖中将其删除,避免依赖冲突)

测试Demo工程请参考Taocode SVN上的代码。

RPC的实现

题目中推荐的网络框架使用Netty4来实现,这个RPC框架中需要实现的有

  1. RPC客户端
  2. RPC服务端

RPC客户端的实现

RPC客户端和RPC服务器端需要一个相同的接口类,RPC客户端通过一个代理类来调用RPC服务器端的函数

RpcConsumerImpl的实现

......

package com.alibaba.middleware.race.rpc.api.impl;

import java.lang.reflect.InvocationHandler;

import java.lang.reflect.Method;

import java.lang.reflect.Proxy;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

import java.util.UUID;

import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.middleware.race.rpc.aop.ConsumerHook;

import com.alibaba.middleware.race.rpc.api.RpcConsumer;

import com.alibaba.middleware.race.rpc.async.ResponseCallbackListener;

import com.alibaba.middleware.race.rpc.context.RpcContext;

import com.alibaba.middleware.race.rpc.model.RpcRequest;

import com.alibaba.middleware.race.rpc.model.RpcResponse;

import com.alibaba.middleware.race.rpc.netty.RpcConnection;

import com.alibaba.middleware.race.rpc.netty.RpcNettyConnection;

import com.alibaba.middleware.race.rpc.tool.Tool;

public class RpcConsumerImpl extends RpcConsumer implements InvocationHandler {

private static AtomicLong callTimes = new AtomicLong(0L);
private RpcConnection connection;
private List<RpcConnection> connection_list;
private Map<String,ResponseCallbackListener> asyncMethods;
private Class<?> interfaceClass;

private String version;

private int timeout;

private ConsumerHook hook;

public Class<?> getInterfaceClass() {
    return interfaceClass;
}
public String getVersion() {
    return version;
}
public int getTimeout() {
    this.connection.setTimeOut(timeout);
    return timeout;
}
public ConsumerHook getHook() {
    return hook;
}
RpcConnection select()
{
    //Random rd=new Random(System.currentTimeMillis());
    int d=(int) (callTimes.getAndIncrement()%(connection_list.size()+1));
    if(d==0)
        return connection;
    else
    {
        return connection_list.get(d-1);
    }
}
public RpcConsumerImpl()
{
    //String ip=System.getProperty("SIP");
    String ip="127.0.0.1";
    this.asyncMethods=new HashMap<String,ResponseCallbackListener>();
    this.connection=new RpcNettyConnection(ip,8888);
    this.connection.connect();
    connection_list=new ArrayList<RpcConnection>();
    int num=Runtime.getRuntime().availableProcessors()/3 -2;
    for (int i = 0; i < num; i++) {
        connection_list.add(new RpcNettyConnection(ip, 8888));
    }
    for (RpcConnection conn:connection_list) 
    {
        conn.connect();
    }

}
public void destroy() throws Throwable {
    if (null != connection) {
        connection.close();
    }
}

@SuppressWarnings("unchecked")
public <T> T proxy(Class<T> interfaceClass) throws Throwable {
    if (!interfaceClass.isInterface()) {
        throw new IllegalArgumentException(interfaceClass.getName()
                + " is not an interface");
    }
    return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
            new Class<?>[] { interfaceClass }, this);
}
@Override
public RpcConsumer interfaceClass(Class<?> interfaceClass) {
    // TODO Auto-generated method stub
    this.interfaceClass=interfaceClass;
    return this;
}

@Override
public RpcConsumer version(String version) {
    // TODO Auto-generated method stub
    this.version=version;
    return this;
}

@Override
public RpcConsumer clientTimeout(int clientTimeout) {
    // TODO Auto-generated method stub
    this.timeout=clientTimeout;
    return this;
}

@Override
public RpcConsumer hook(ConsumerHook hook) {
    // TODO Auto-generated method stub
    this.hook=hook;
    return this;
}

@Override
public Object instance() {
    // TODO Auto-generated method stub
    try {
        return proxy(this.interfaceClass);
    }
    catch (Throwable e) 
    {
        e.printStackTrace();
    }
    return null;
}

@Override
public void asynCall(String methodName) {
    // TODO Auto-generated method stub
     asynCall(methodName, null);
}

@Override
public <T extends ResponseCallbackListener> void asynCall(
        String methodName, T callbackListener) {

    this.asyncMethods.put(methodName, callbackListener);
    this.connection.setAsyncMethod(asyncMethods);

    for (RpcConnection conn:connection_list) 
    {
        conn.setAsyncMethod(asyncMethods);
    }
}

@Override
public void cancelAsyn(String methodName) {
    // TODO Auto-generated method stub
    this.asyncMethods.remove(methodName);
    this.connection.setAsyncMethod(asyncMethods);
    for (RpcConnection conn:connection_list) 
    {
        conn.setAsyncMethod(asyncMethods);
    }
}

@Override
public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable {
    // TODO Auto-generated method stub
    List<String> parameterTypes = new LinkedList<String>();
    for (Class<?> parameterType : method.getParameterTypes()) {
        parameterTypes.add(parameterType.getName());
    }
    RpcRequest request = new RpcRequest();
    request.setRequestId(UUID.randomUUID().toString());
    request.setClassName(method.getDeclaringClass().getName());
    request.setMethodName(method.getName());
    request.setParameterTypes(method.getParameterTypes());
    request.setParameters(args);
    if(hook!=null)
        hook.before(request);
    RpcResponse response = null;
    try
    {
        request.setContext(RpcContext.props);
        response = (RpcResponse) select().Send(request,asyncMethods.containsKey(request.getMethodName()));
        if(hook!=null)
            hook.after(request);

        if(!asyncMethods.containsKey(request.getMethodName())&&response.getExption()!=null)
        {

            Throwable e=(Throwable) Tool.deserialize(response.getExption(),response.getClazz());
            throw e.getCause();
        }
    }
    catch (Throwable t)
    {   
        //t.printStackTrace();
        //throw new RuntimeException(t);
        throw t;
    }
    finally
    {

// if(asyncMethods.containsKey(request.getMethodName())&&asyncMethods.get(request.getMethodName())!=null)

// {

// cancelAsyn(request.getMethodName());

// }

}

if(response==null)

{

return null;

}

else if (response.getErrorMsg() != null)

{

throw response.getErrorMsg();

}

else

{

return response.getAppResponse();

}

}

}

RpcConsumer consumer;

consumer = (RpcConsumer) getConsumerImplClass().newInstance();

consumer.someMethod();123

因为consumer对象是通过代理生成的,所以当consumer调用的时候,就会调用invoke函数,我们就可以把这次本地的函数调用的信息通过网络发送到RPC服务器然后等待服务器返回的信息后再返回。

服务器实现

RPC服务器主要是在收到RPC客户端之后解析出RPC调用的接口名,函数名以及参数。

package com.alibaba.middleware.race.rpc.api.impl;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import java.lang.reflect.Method;

import java.util.HashMap;

import java.util.Map;

import net.sf.cglib.reflect.FastClass;

import net.sf.cglib.reflect.FastMethod;

import com.alibaba.middleware.race.rpc.context.RpcContext;

import com.alibaba.middleware.race.rpc.model.RpcRequest;

import com.alibaba.middleware.race.rpc.model.RpcResponse;

import com.alibaba.middleware.race.rpc.serializer.KryoSerialization;

import com.alibaba.middleware.race.rpc.tool.ByteObjConverter;

import com.alibaba.middleware.race.rpc.tool.ReflectionCache;

import com.alibaba.middleware.race.rpc.tool.Tool;

/**

  • 处理服务器收到的RPC请求并返回结果
  • @author sei.zz
  • */

    public class RpcRequestHandler extends ChannelInboundHandlerAdapter {

    //对应每个请求ID和端口好 对应一个RpcContext的Map;

    private static Map<String,Map<String,Object>> ThreadLocalMap=new HashMap<String, Map<String,Object>>();

    //服务端接口-实现类的映射表

    private final Map<String, Object> handlerMap;

    KryoSerialization kryo=new KryoSerialization();

    public RpcRequestHandler(Map<String, Object> handlerMap) {

    this.handlerMap = handlerMap;

    br/>}

    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override

    br/>System.out.println("active");

    }

    @Override

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

    // TODO Auto-generated method stub

    System.out.println("disconnected");

    }

    //更新RpcContext的类容

    private void UpdateRpcContext(String host,Map<String,Object> map)

    {

    if(ThreadLocalMap.containsKey(host))

    {

    Map<String,Object> local=ThreadLocalMap.get(host);

    local.putAll(map);//把客户端的加进来

    ThreadLocalMap.put(host, local);//放回去

    for(Map.Entry<String, Object> entry:map.entrySet()){ //更新变量

    RpcContext.addProp(entry.getKey(), entry.getValue());

    }

    }

    else

    {

    ThreadLocalMap.put(host, map);

    //把对应线程的Context更新

    for(Map.Entry<String, Object> entry:map.entrySet()){

    RpcContext.addProp(entry.getKey(), entry.getValue());

    }

    }

    }

    //用来缓存住需要序列化的结果

    private static Object cacheName=null;

    private static Object cacheVaule=null;

    @Override

    public void channelRead(

    ChannelHandlerContext ctx, Object msg) throws Exception {

    RpcRequest request=(RpcRequest)msg;

    String host=ctx.channel().remoteAddress().toString();

    //更新上下文

    UpdateRpcContext(host,request.getContext());

    //TODO 获取接口名 函数名 参数 找到实现类 反射实现

    RpcResponse response = new RpcResponse();

    response.setRequestId(request.getRequestId());

    try

    {

    Object result = handle(request);

    if(cacheName!=null&&cacheName.equals(result))

    {

    response.setAppResponse(cacheVaule);

    }

    else

    {

    response.setAppResponse(ByteObjConverter.ObjectToByte(result));

    cacheName=result;

    cacheVaule=ByteObjConverter.ObjectToByte(result);

    }

    }

    catch (Throwable t)

    {

    //response.setErrorMsg(t);

    response.setExption(Tool.serialize(t));

    response.setClazz(t.getClass());

    }

    ctx.writeAndFlush(response);

    }

    /**

    • 运行调用的函数返回结果
    • @param request
    • @return
    • @throws Throwable

      */

      private static RpcRequest methodCacheName=null;

      private static Object methodCacheValue=null;

      private Object handle(RpcRequest request) throws Throwable

      {

      String className = request.getClassName();

      Object classimpl = handlerMap.get(className);//通过类名找到实现的类

      Class<?> clazz = classimpl.getClass();

      String methodName = request.getMethodName();

      Class<?>[] parameterTypes = request.getParameterTypes();

      Object[] parameters = request.getParameters();

// Method method = ReflectionCache.getMethod(clazz.getName(),methodName, parameterTypes);

// method.setAccessible(true);

//System.out.println(className+":"+methodName+":"+parameters.length);
     if(methodCacheName!=null&&methodCacheName.equals(request))
     {
         return methodCacheValue;
     }
     else
     {
         try 
         {
             methodCacheName=request;
             if(methodMap.containsKey(methodName))
             {
                 methodCacheValue= methodMap.get(methodName).invoke(classimpl, parameters);
                 return methodCacheValue;
             }
             else
             {
                 FastClass serviceFastClass = FastClass.create(clazz);
                 FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
                 methodMap.put(methodName, serviceFastMethod);
                 methodCacheValue= serviceFastMethod.invoke(classimpl, parameters);
                 return methodCacheValue;
             }
             //return method.invoke(classimpl, parameters);
         }
         catch (Throwable e) 
         {
             throw e.getCause();
         }
     }
 }
  private Map<String,FastMethod> methodMap=new HashMap<String, FastMethod>();
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
  {
      //ctx.close();
      //cause.printStackTrace();
      ctx.close();
  }
}

handel函数通过 Java 的反射机制,找到要调用的接口类然后调用对应函数然后执行,然后返回结果到客户端,本次RPC调用结束。

RPC主要的实现类在我的github上可以看见,我的这套RPC框架虽说不上完美,但是性能还是挺好的在服务器上测试时TPC有9w+。

主要的优化就是使用Neety4这个框架以及对数据包的处理,数据序列化与反序列化的速度

欢迎工作一到五年的Java工程师朋友们加入Java架构开发: 854393687

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Visual Thinking

Visual Thinking

Colin Ware / Morgan Kaufmann / 2008-4-18 / USD 49.95

Increasingly, designers need to present information in ways that aid their audiences thinking process. Fortunately, results from the relatively new science of human visual perception provide valuable ......一起来看看 《Visual Thinking》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

html转js在线工具
html转js在线工具

html转js在线工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换