RPC基本原理以及如何用Netty来实现RPC

栏目: 服务器 · 发布时间: 6年前

内容简介:在微服务大行其道的今天,分布式系统越来越重要,实现服务化首先就要考虑服务之间的通信问题。这里面涉及序列化、反序列化、寻址、连接等等问题。。不过,有了RPC框架,我们就无需苦恼。RPC(Remote Procedure Call)— 远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。值得注意是,两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样。

在微服务大行其道的今天,分布式系统越来越重要,实现服务化首先就要考虑服务之间的通信问题。这里面涉及序列化、反序列化、寻址、连接等等问题。。不过,有了RPC框架,我们就无需苦恼。

一、什么是RPC?

RPC(Remote Procedure Call)— 远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而 程序员 无需额外地为这个交互作用编程。

值得注意是,两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样。

RPC基本原理以及如何用Netty来实现RPC

RPC框架有很多,比较知名的如阿里的Dubbo、google的gRPC、 Go 语言的rpcx、Apache的thrift。当然了,还有Spring Cloud,不过对于Spring Cloud来说,RPC只是它的一个功能模块。

复杂的先不讲,如果要实现一个基本功能、简单的RPC,要涉及哪些东西呢?

  • 动态代理
  • 反射
  • 序列化、反序列化
  • 网络通信
  • 编解码
  • 服务发现和注册
  • 心跳与链路检测
  • ......

下面我们一起通过代码来分析,怎么把这些技术点串到一起,实现我们自己的RPC。

二、环境准备

在开始之前,笔者先介绍一下所用到的软件环境。

SpringBoot、Netty、zookeeper、zkclient、fastjson

  • SpringBoot 项目的基础框架,方便打成JAR包,便于测试。
  • Netty 通信服务器
  • zookeeper 服务的发现与注册
  • zkclient zookeeper客户端
  • fastjson 序列化、反序列化

三、RPC生产者

1、服务接口API

整个RPC,我们分为生产者和消费者。首先它们有一个共同的服务接口API。在这里,我们搞一个操作用户信息的service接口。

public interface InfoUserService {
    List<InfoUser> insertInfoUser(InfoUser infoUser);
    InfoUser getInfoUserById(String id);
    void deleteInfoUserById(String id);
    String getNameById(String id);
    Map<String,InfoUser> getAllUser();
}
复制代码

2、服务类实现

作为生产者,它当然要有实现类,我们创建InfoUserServiceImpl实现类,并用注解把它标注为RPC的服务,然后注册到Spring的Bean容器中。在这里,我们把infoUserMap当做数据库,存储用户信息。

package com.viewscenes.netsupervisor.service.impl;

@RpcService
public class InfoUserServiceImpl implements InfoUserService {

    Logger logger = LoggerFactory.getLogger(this.getClass());
	//当做数据库,存储用户信息
    Map<String,InfoUser> infoUserMap = new HashMap<>();

    public List<InfoUser> insertInfoUser(InfoUser infoUser) {
        logger.info("新增用户信息:{}", JSONObject.toJSONString(infoUser));
        infoUserMap.put(infoUser.getId(),infoUser);
        return getInfoUserList();
    }
    public InfoUser getInfoUserById(String id) {
        InfoUser infoUser = infoUserMap.get(id);
        logger.info("查询用户ID:{}",id);
        return infoUser;
    }

    public List<InfoUser> getInfoUserList() {
        List<InfoUser> userList = new ArrayList<>();
        Iterator<Map.Entry<String, InfoUser>> iterator = infoUserMap.entrySet().iterator();
        while (iterator.hasNext()){
            Map.Entry<String, InfoUser> next = iterator.next();
            userList.add(next.getValue());
        }
        logger.info("返回用户信息记录数:{}",userList.size());
        return userList;
    }
    public void deleteInfoUserById(String id) {
        logger.info("删除用户信息:{}",JSONObject.toJSONString(infoUserMap.remove(id)));
    }
    public String getNameById(String id){
        logger.info("根据ID查询用户名称:{}",id);
        return infoUserMap.get(id).getName();
    }
    public Map<String,InfoUser> getAllUser(){
        logger.info("查询所有用户信息{}",infoUserMap.keySet().size());
        return infoUserMap;
    }
}
复制代码

元注解定义如下:

package com.viewscenes.netsupervisor.annotation;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {}
复制代码

3、请求信息和返回信息

所有的请求信息和返回信息,我们用两个JavaBean来表示。其中的重点是,返回信息要带有请求信息的ID。

package com.viewscenes.netsupervisor.entity;
public class Request {
	private String id;
	private String className;// 类名
	private String methodName;// 函数名称
	private Class<?>[] parameterTypes;// 参数类型
	private Object[] parameters;// 参数列表
	get/set ...
}
复制代码
package com.viewscenes.netsupervisor.entity;
public class Response {
	private String requestId;
	private int code;
	private String error_msg;
	private Object data;
	get/set ...
}
复制代码

4、Netty服务端

Netty作为高性能的NIO通信框架,在很多RPC框架中都有它的身影。我们也采用它当做通信服务器。说到这,我们先看个配置文件,重点有两个,zookeeper的注册地址和Netty通信服务器的地址。

TOMCAT端口
server.port=8001
#zookeeper注册地址
registry.address=192.168.245.131:2181,192.168.245.131:2182,192.168.245.131:2183
#RPC服务提供者地址
rpc.server.address=192.168.197.1:18868
复制代码

为了方便管理,我们把它也注册成Bean,同时实现ApplicationContextAware接口,把上面@RpcService注解的服务类捞出来,缓存起来,供消费者调用。同时,作为服务器,还要对客户端的链路进行心跳检测,超过60秒未读写数据,关闭此连接。

package com.viewscenes.netsupervisor.netty.server;
@Component
public class NettyServer implements ApplicationContextAware,InitializingBean{

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    private static final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private static final EventLoopGroup workerGroup = new NioEventLoopGroup(4);

    private Map<String, Object> serviceMap = new HashMap<>();

    @Value("${rpc.server.address}")
    private String serverAddress;

    @Autowired
    ServiceRegistry registry;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RpcService.class);
        for(Object serviceBean:beans.values()){
            Class<?> clazz = serviceBean.getClass();
            Class<?>[] interfaces = clazz.getInterfaces();
            for (Class<?> inter : interfaces){
                String interfaceName = inter.getName();
                logger.info("加载服务类: {}", interfaceName);
                serviceMap.put(interfaceName, serviceBean);
            }
        }
        logger.info("已加载全部服务接口:{}", serviceMap);
    }
    public void afterPropertiesSet() throws Exception {
        start();
    }
    public void start(){
        final NettyServerHandler handler = new NettyServerHandler(serviceMap);
        new Thread(() -> {
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup,workerGroup).
                        channel(NioServerSocketChannel.class).
                        option(ChannelOption.SO_BACKLOG,1024).
                        childOption(ChannelOption.SO_KEEPALIVE,true).
                        childOption(ChannelOption.TCP_NODELAY,true).
                        childHandler(new ChannelInitializer<SocketChannel>() {
                            //创建NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件
                            protected void initChannel(SocketChannel channel) throws Exception {
                                ChannelPipeline pipeline = channel.pipeline();
                                pipeline.addLast(new IdleStateHandler(0, 0, 60));
                                pipeline.addLast(new JSONEncoder());
                                pipeline.addLast(new JSONDecoder());
                                pipeline.addLast(handler);
                            }
                        });
                String[] array = serverAddress.split(":");
                String host = array[0];
                int port = Integer.parseInt(array[1]);
                ChannelFuture cf = bootstrap.bind(host,port).sync();
                logger.info("RPC 服务器启动.监听端口:"+port);
                registry.register(serverAddress);
                //等待服务端监听端口关闭
                cf.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }).start();
    }
}
复制代码

上面的代码就把Netty服务器启动了,在处理器中的构造函数中,我们先把服务Bean的Map传进来,所有的处理要基于这个Map才能找到对应的实现类。在channelRead中,获取请求方法的信息,然后通过反射调用方法获取返回值。

package com.viewscenes.netsupervisor.netty.server;
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
    private final Map<String, Object> serviceMap;

    public NettyServerHandler(Map<String, Object> serviceMap) {
        this.serviceMap = serviceMap;
    }
    public void channelActive(ChannelHandlerContext ctx)   {
        logger.info("客户端连接成功!"+ctx.channel().remoteAddress());
    }
    public void channelInactive(ChannelHandlerContext ctx)   {
        logger.info("客户端断开连接!{}",ctx.channel().remoteAddress());
        ctx.channel().close();
    }
    public void channelRead(ChannelHandlerContext ctx, Object msg)   {
        Request request = JSON.parseObject(msg.toString(),Request.class);

        if ("heartBeat".equals(request.getMethodName())) {
            logger.info("客户端心跳信息..."+ctx.channel().remoteAddress());
        }else{
            logger.info("RPC客户端请求接口:"+request.getClassName()+"   方法名:"+request.getMethodName());
            Response response = new Response();
            response.setRequestId(request.getId());
            try {
                Object result = this.handler(request);
                response.setData(result);
            } catch (Throwable e) {
                e.printStackTrace();
                response.setCode(1);
                response.setError_msg(e.toString());
                logger.error("RPC Server handle request error",e);
            }
            ctx.writeAndFlush(response);
        }
    }
    /**
     * 通过反射,执行本地方法
     * @param request
     * @return
     * @throws Throwable
     */
    private Object handler(Request request) throws Throwable{
        String className = request.getClassName();
        Object serviceBean = serviceMap.get(className);

        if (serviceBean!=null){
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = request.getMethodName();
            Class<?>[] parameterTypes = request.getParameterTypes();
            Object[] parameters = request.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            return method.invoke(serviceBean, getParameters(parameterTypes,parameters));
        }else{
            throw new Exception("未找到服务接口,请检查配置!:"+className+"#"+request.getMethodName());
        }
    }
    /**
     * 获取参数列表
     * @param parameterTypes
     * @param parameters
     * @return
     */
    private Object[] getParameters(Class<?>[] parameterTypes,Object[] parameters){
        if (parameters==null || parameters.length==0){
            return parameters;
        }else{
            Object[] new_parameters = new Object[parameters.length];
            for(int i=0;i<parameters.length;i++){
                new_parameters[i] = JSON.parseObject(parameters[i].toString(),parameterTypes[i]);
            }
            return new_parameters;
        }
    }
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.ALL_IDLE){
                logger.info("客户端已超过60秒未读写数据,关闭连接.{}",ctx.channel().remoteAddress());
                ctx.channel().close();
            }
        }else{
            super.userEventTriggered(ctx,evt);
        }
    }
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)   {
        logger.info(cause.getMessage());
        ctx.close();
    }
}
复制代码

4、服务注册

我们启动了Netty通信服务器,并且把服务实现类加载到缓存,等待请求时调用。这一步,我们要进行服务注册。为了简单化处理,我们只注册通信服务器的监听地址即可。 在上面代码中,bind之后我们执行了 registry.register(serverAddress); 它的作用就是,将Netty监听的IP端口注册到zookeeper。

package com.viewscenes.netsupervisor.registry;
@Component
public class ServiceRegistry {
    Logger logger = LoggerFactory.getLogger(this.getClass());
    @Value("${registry.address}")
    private String registryAddress;
    private static final String ZK_REGISTRY_PATH = "/rpc";

    public void register(String data) {
        if (data != null) {
            ZkClient client = connectServer();
            if (client != null) {
                AddRootNode(client);
                createNode(client, data);
            }
        }
    }
	//连接zookeeper
    private ZkClient connectServer() {
        ZkClient client = new ZkClient(registryAddress,20000,20000);
        return client;
    }
	//创建根目录/rpc
    private void AddRootNode(ZkClient client){
        boolean exists = client.exists(ZK_REGISTRY_PATH);
        if (!exists){
            client.createPersistent(ZK_REGISTRY_PATH);
            logger.info("创建zookeeper主节点 {}",ZK_REGISTRY_PATH);
        }
    }
	//在/rpc根目录下,创建临时顺序子节点
    private void createNode(ZkClient client, String data) {
        String path = client.create(ZK_REGISTRY_PATH + "/provider", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        logger.info("创建zookeeper数据节点 ({} => {})", path, data);
    }
}
复制代码

有一点需要注意,子节点必须是临时节点。这样,生产者端停掉之后,才能通知到消费者,把此服务从服务列表中剔除。到此为止,生产者端已经完成。我们看一下它的启动日志:

加载服务类: com.viewscenes.netsupervisor.service.InfoUserService
已加载全部服务接口:{com.viewscenes.netsupervisor.service.InfoUserService=com.viewscenes.netsupervisor.service.impl.InfoUserServiceImpl@46cc127b}
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 8001 (http) with context path ''
Started RpcProviderApplication in 2.003 seconds (JVM running for 3.1)
RPC 服务器启动.监听端口:18868
Starting ZkClient event thread.
Socket connection established to 192.168.245.131/192.168.245.131:2183, initiating session
Session establishment complete on server 192.168.245.131/192.168.245.131:2183, sessionid = 0x367835b48970010, negotiated timeout = 4000
zookeeper state changed (SyncConnected)
创建zookeeper主节点 /rpc
创建zookeeper数据节点 (/rpc/provider0000000000 => 192.168.197.1:28868)
复制代码

四、RPC消费者

首先,我们需要把生产者端的服务接口API,即InfoUserService。以相同的目录放到消费者端。路径不同,调用会找不到的哦。

1、代理

RPC的目标其中有一条,《程序员无需额外地为这个交互作用编程。》所以,我们在调用的时候,就像调用本地方法一样。就像下面这样:

@Controller
public class IndexController {	
	@Autowired
    InfoUserService userService;
	
	@RequestMapping("getById")
    @ResponseBody
    public InfoUser getById(String id){
        logger.info("根据ID查询用户信息:{}",id);
        return userService.getInfoUserById(id);
    }
}
复制代码

那么,问题来了。消费者端并没有此接口的实现,怎么调用到的呢?这里,首先就是代理。笔者这里用的是Spring的工厂Bean机制创建的代理对象,涉及的代码较多,就不在文章中体现了,如果有不懂的同学,请想象一下,MyBatis中的Mapper接口怎么被调用的。可以参考笔者文章: Mybatis源码分析(四)mapper接口方法是怎样被调用到的

总之,在调用userService方法的时候,会调用到代理对象的invoke方法。在这里,封装请求信息,然后调用Netty的客户端方法发送消息。然后根据方法返回值类型,转成相应的对象返回。

package com.viewscenes.netsupervisor.configurer.rpc;

@Component
public class RpcFactory<T> implements InvocationHandler {

    @Autowired
    NettyClient client;

    Logger logger = LoggerFactory.getLogger(this.getClass());
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Request request = new Request();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameters(args);
        request.setParameterTypes(method.getParameterTypes());
        request.setId(IdUtil.getId());

        Object result = client.send(request);
        Class<?> returnType = method.getReturnType();

        Response response = JSON.parseObject(result.toString(), Response.class);
        if (response.getCode()==1){
            throw new Exception(response.getError_msg());
        }
        if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)){
            return response.getData();
        }else if (Collection.class.isAssignableFrom(returnType)){
            return JSONArray.parseArray(response.getData().toString(),Object.class);
        }else if(Map.class.isAssignableFrom(returnType)){
            return JSON.parseObject(response.getData().toString(),Map.class);
        }else{
            Object data = response.getData();
            return JSONObject.parseObject(data.toString(), returnType);
        }
    }
}
复制代码

2、服务发现

在生产者端,我们把服务IP端口都注册到zookeeper中,所以这里,我们要去拿到服务地址,然后通过Netty连接。重要的是,还要对根目录进行监听子节点变化,这样随着生产者的上线和下线,消费者端可以及时感知。

package com.viewscenes.netsupervisor.connection;

@Component
public class ServiceDiscovery {

    @Value("${registry.address}")
    private String registryAddress;
    @Autowired
    ConnectManage connectManage;

    // 服务地址列表
    private volatile List<String> addressList = new ArrayList<>();
    private static final String ZK_REGISTRY_PATH = "/rpc";
    private ZkClient client;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @PostConstruct
    public void init(){
        client = connectServer();
        if (client != null) {
            watchNode(client);
        }
    }
	
	//连接zookeeper
    private ZkClient connectServer() {
        ZkClient client = new ZkClient(registryAddress,30000,30000);
        return client;
    }
	//监听子节点数据变化
    private void watchNode(final ZkClient client) {
        List<String> nodeList = client.subscribeChildChanges(ZK_REGISTRY_PATH, (s, nodes) -> {
            logger.info("监听到子节点数据变化{}",JSONObject.toJSONString(nodes));
            addressList.clear();
            getNodeData(nodes);
            updateConnectedServer();
        });
        getNodeData(nodeList);
        logger.info("已发现服务列表...{}", JSONObject.toJSONString(addressList));
        updateConnectedServer();
    }
	//连接生产者端服务
    private void updateConnectedServer(){
        connectManage.updateConnectServer(addressList);
    }

    private void getNodeData(List<String> nodes){
        logger.info("/rpc子节点数据为:{}", JSONObject.toJSONString(nodes));
        for(String node:nodes){
            String address = client.readData(ZK_REGISTRY_PATH+"/"+node);
            addressList.add(address);
        }
    }
}
复制代码

其中, connectManage.updateConnectServer(addressList); 就是根据服务地址,去连接生产者端的Netty服务。然后创建一个Channel列表,在发送消息的时候,从中选取一个Channel和生产者端进行通信。

3、Netty客户端

Netty客户端有两个方法比较重要,一个是根据IP端口连接服务器,返回Channel,加入到连接管理器;一个是用Channel发送请求数据。同时,作为客户端,空闲的时候还要往服务端发送心跳信息。

package com.viewscenes.netsupervisor.netty.client;

@Component
public class NettyClient {
    Logger logger = LoggerFactory.getLogger(this.getClass());
    private EventLoopGroup group = new NioEventLoopGroup(1);
    private Bootstrap bootstrap = new Bootstrap();
    @Autowired
    NettyClientHandler clientHandler;
    @Autowired
    ConnectManage connectManage;
   
    public Object send(Request request) throws InterruptedException{

        Channel channel = connectManage.chooseChannel();
        if (channel!=null && channel.isActive()) {
            SynchronousQueue<Object> queue = clientHandler.sendRequest(request,channel);
            Object result = queue.take();
            return JSONArray.toJSONString(result);
        }else{
            Response res = new Response();
            res.setCode(1);
            res.setError_msg("未正确连接到服务器.请检查相关配置信息!");
            return JSONArray.toJSONString(res);
        }
    }
    public Channel doConnect(SocketAddress address) throws InterruptedException {
        ChannelFuture future = bootstrap.connect(address);
        Channel channel = future.sync().channel();
        return channel;
    }
	....其他方法略
}
复制代码

我们必须重点关注send方法,它是在代理对象invoke方法调用到的。首先从连接器中轮询选择一个Channel,然后发送数据。但是,Netty是异步操作,我们还要转为同步,就是说要等待生产者端返回数据才往下执行。笔者在这里用的是同步队列SynchronousQueue,它的take方法会阻塞在这里,直到里面有数据可读。然后在处理器中,拿到返回信息写到队列中,take方法返回。

package com.viewscenes.netsupervisor.netty.client;
@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Autowired
    NettyClient client;
    @Autowired
    ConnectManage connectManage;
    Logger logger = LoggerFactory.getLogger(this.getClass());
    private ConcurrentHashMap<String,SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();

    public void channelActive(ChannelHandlerContext ctx)   {
        logger.info("已连接到RPC服务器.{}",ctx.channel().remoteAddress());
    }
    public void channelInactive(ChannelHandlerContext ctx)   {
        InetSocketAddress address =(InetSocketAddress) ctx.channel().remoteAddress();
        logger.info("与RPC服务器断开连接."+address);
        ctx.channel().close();
        connectManage.removeChannel(ctx.channel());
    }
    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
        Response response = JSON.parseObject(msg.toString(),Response.class);
        String requestId = response.getRequestId();
        SynchronousQueue<Object> queue = queueMap.get(requestId);
        queue.put(response);
        queueMap.remove(requestId);
    }
    public SynchronousQueue<Object> sendRequest(Request request,Channel channel) {
        SynchronousQueue<Object> queue = new SynchronousQueue<>();
        queueMap.put(request.getId(), queue);
        channel.writeAndFlush(request);
        return queue;
    }
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
        logger.info("已超过30秒未与RPC服务器进行读写操作!将发送心跳消息...");
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.ALL_IDLE){
                Request request = new Request();
                request.setMethodName("heartBeat");
                ctx.channel().writeAndFlush(request);
            }
        }else{
            super.userEventTriggered(ctx,evt);
        }
    }
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        logger.info("RPC通信服务器发生异常.{}",cause);
        ctx.channel().close();
    }
}
复制代码

至此,消费者端也基本完成。同样的,我们先看一下启动日志:

Waiting for keeper state SyncConnected
Opening socket connection to server 192.168.139.129/192.168.139.129:2181. Will not attempt to authenticate using SASL (unknown error)
Socket connection established to 192.168.139.129/192.168.139.129:2181, initiating session
Session establishment complete on server 192.168.139.129/192.168.139.129:2181, sessionid = 0x100000273ba002c, negotiated timeout = 20000
zookeeper state changed (SyncConnected)
/rpc子节点数据为:["provider0000000015"]
已发现服务列表...["192.168.100.74:18868"]
加入Channel到连接管理器./192.168.100.74:18868
已连接到RPC服务器./192.168.100.74:18868
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 7002 (http) with context path ''
Started RpcConsumerApplication in 4.218 seconds (JVM running for 5.569)
复制代码

五、测试

我们以Controller里面的两个方法为例,先开启100个线程调用insertInfoUser方法,然后开启1000个线程调用查询方法getAllUser。

public class IndexController {

    Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    InfoUserService userService;

    @RequestMapping("insert")
    @ResponseBody
    public List<InfoUser> getUserList() throws InterruptedException {
        long start = System.currentTimeMillis();
        int thread_count = 100;
        CountDownLatch countDownLatch = new CountDownLatch(thread_count);
        for (int i=0;i<thread_count;i++){
            new Thread(() -> {
                InfoUser infoUser = new InfoUser(IdUtil.getId(),"Jeen","BeiJing");
                List<InfoUser> users = userService.insertInfoUser(infoUser);
                logger.info("返回用户信息记录:{}", JSON.toJSONString(users));
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        long end = System.currentTimeMillis();
        logger.info("线程数:{},执行时间:{}",thread_count,(end-start));
        return null;
    }
	@RequestMapping("getAllUser")
    @ResponseBody
    public Map<String,InfoUser> getAllUser() throws InterruptedException {

        long start = System.currentTimeMillis();
        int thread_count = 1000;
        CountDownLatch countDownLatch = new CountDownLatch(thread_count);
        for (int i=0;i<thread_count;i++){
            new Thread(() -> {
                Map<String, InfoUser> allUser = userService.getAllUser();
                logger.info("查询所有用户信息:{}",JSONObject.toJSONString(allUser));
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        long end = System.currentTimeMillis();
        logger.info("线程数:{},执行时间:{}",thread_count,(end-start));

        return null;
    }
}
复制代码

结果如下:

RPC基本原理以及如何用Netty来实现RPC
RPC基本原理以及如何用Netty来实现RPC

六、总结

本文简单介绍了RPC的整个流程,如果你正在学习RPC的相关知识,可以根据文中的例子,自己实现一遍。相信写完之后,你会对RPC会有更深一些的认识。

生产者端流程:

  • 加载服务,并缓存
  • 启动通讯服务器(Netty)
  • 服务注册(把通讯地址放入zookeeper,也可以把加载到的服务也放进去)
  • 反射,本地调用

消费者端流程:

  • 代理服务接口
  • 服务发现(连接zookeeper,拿到服务地址列表)
  • 远程调用(轮询生产者服务列表,发送消息)

限于篇幅,本文代码并不完整,如有需要,访问: github.com/taoxun/simp… 或者添加笔者微信公众号:<清幽之地的博客>),获取完整项目。


以上所述就是小编给大家介绍的《RPC基本原理以及如何用Netty来实现RPC》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

暗网

暗网

杰米·巴特利特 / 刘丹丹 / 北京时代华文书局 / 2018-7 / 59.00

全面深入揭秘“黑暗版淘宝”暗网的幕后世界和操纵者 现实中所有的罪恶,在暗网中,都是明码标价的商品。 暗杀、色情、恋童癖、比特币犯罪、毒品交易…… TED演讲、谷歌特邀专家、英国智库网络专家杰米•巴特利特代表作! 1、 被大家戏称为“黑暗版淘宝”的暗网究竟是什么?微信猎奇 文不能告诉你的真相都在这里了! 2、 因章莹颖一案、Facebook信息泄露危机而被国人所知的暗网......一起来看看 《暗网》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具