基于Netty和SpringBoot实现一个轻量级RPC框架-Client端请求响应同步化处理

栏目: IT技术 · 发布时间: 4年前

内容简介:前置文章:前一篇文章简单介绍了通过动态代理完成了需要的依赖如下:

前提

前置文章:

前一篇文章简单介绍了通过动态代理完成了 Client 端契约接口调用转换为发送 RPC 协议请求的功能。这篇文章主要解决一个遗留的技术难题:请求-响应同步化处理。

需要的依赖如下:

JDK1.8+
Netty:4.1.44.Final
SpringBoot:2.2.2.RELEASE

简单分析Netty请求-响应的处理流程

基于Netty和SpringBoot实现一个轻量级RPC框架-Client端请求响应同步化处理

图中已经忽略了编码解码器和其他入站出站处理器,不同颜色的线程代表完全不相同的线程,不同线程之间的处理逻辑是完全异步,也就是 Netty IO 线程( n-l-g-1 )接收到 Server 端的消息并且解析完成的时候,用户调用线程( u-t-1 )无法感知到解析完毕的消息包,那么这里要做的事情就是让用户调用线程( u-t-1 )获取到 Netty IO 线程( n-l-g-1 )接收并且解析完成的消息包。

这里可以用一个简单的例子来说明模拟 Client 端调用线程等待 Netty IO 线程的处理结果再同步返回的过程。

@Slf4j
public class NettyThreadSyncTest {

    @ToString
    private static class ResponseFuture {

        private final long beginTimestamp = System.currentTimeMillis();
        @Getter
        private final long timeoutMilliseconds;
        @Getter
        private final String requestId;
        @Setter
        @Getter
        private volatile boolean sendRequestSucceed = false;
        @Setter
        @Getter
        private volatile Throwable cause;
        @Getter
        private volatile Object response;

        private final CountDownLatch latch = new CountDownLatch(1);

        public ResponseFuture(String requestId, long timeoutMilliseconds) {
            this.requestId = requestId;
            this.timeoutMilliseconds = timeoutMilliseconds;
        }

        public boolean timeout() {
            return System.currentTimeMillis() - beginTimestamp > timeoutMilliseconds;
        }

        public Object waitResponse(final long timeoutMilliseconds) throws InterruptedException {
            latch.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
            return response;
        }

        public void putResponse(Object response) throws InterruptedException {
            this.response = response;
            latch.countDown();
        }
    }

    static ExecutorService REQUEST_THREAD;
    static ExecutorService NETTY_IO_THREAD;
    static Callable<Object> REQUEST_TASK;
    static Runnable RESPONSE_TASK;

    static String processBusiness(String name) {
        return String.format("%s say hello!", name);
    }

    private static final Map<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap();

    @BeforeClass
    public static void beforeClass() throws Exception {
        String requestId = UUID.randomUUID().toString();
        String requestContent = "throwable";
        REQUEST_TASK = () -> {
            try {
                // 3秒没有得到响应认为超时
                ResponseFuture responseFuture = new ResponseFuture(requestId, 3000);
                RESPONSE_FUTURE_TABLE.put(requestId, responseFuture);
                // 这里忽略发送请求的操作,只打印日志和模拟耗时1秒
                Thread.sleep(1000);
                log.info("发送请求成功,请求ID:{},请求内容:{}", requestId, requestContent);
                // 更新标记属性
                responseFuture.setSendRequestSucceed(true);
                // 剩余2秒等待时间 - 这里只是粗略计算
                return responseFuture.waitResponse(3000 - 1000);
            } catch (Exception e) {
                log.info("发送请求失败,请求ID:{},请求内容:{}", requestId, requestContent);
                throw new RuntimeException(e);
            }
        };
        RESPONSE_TASK = () -> {
            String responseContent = processBusiness(requestContent);
            try {
                ResponseFuture responseFuture = RESPONSE_FUTURE_TABLE.get(requestId);
                if (null != responseFuture) {
                    log.warn("处理响应成功,请求ID:{},响应内容:{}", requestId, responseContent);
                    responseFuture.putResponse(responseContent);
                } else {
                    log.warn("请求ID[{}]对应的ResponseFuture不存在,忽略处理", requestId);
                }
            } catch (Exception e) {
                log.info("处理响应失败,请求ID:{},响应内容:{}", requestId, responseContent);
                throw new RuntimeException(e);
            }
        };
        REQUEST_THREAD = Executors.newSingleThreadExecutor(runnable -> {
            Thread thread = new Thread(runnable, "REQUEST_THREAD");
            thread.setDaemon(true);
            return thread;
        });
        NETTY_IO_THREAD = Executors.newSingleThreadExecutor(runnable -> {
            Thread thread = new Thread(runnable, "NETTY_IO_THREAD");
            thread.setDaemon(true);
            return thread;
        });
    }

    @Test
    public void testProcessSync() throws Exception {
        log.info("异步提交请求处理任务......");
        Future<Object> future = REQUEST_THREAD.submit(REQUEST_TASK);
        // 模拟请求耗时
        Thread.sleep(1500);
        log.info("异步提交响应处理任务......");
        NETTY_IO_THREAD.execute(RESPONSE_TASK);
        // 这里可以设置超时
        log.info("同步获取请求结果:{}", future.get());
        Thread.sleep(Long.MAX_VALUE);
    }
}

执行 testProcessSync() 方法,控制台输出如下:

2020-01-18 13:17:07 [main] INFO  c.t.client.NettyThreadSyncTest - 异步提交请求处理任务......
2020-01-18 13:17:08 [REQUEST_THREAD] INFO  c.t.client.NettyThreadSyncTest - 发送请求成功,请求ID:71f47e27-c17c-458d-b271-4e74fad33a7b,请求内容:throwable
2020-01-18 13:17:09 [main] INFO  c.t.client.NettyThreadSyncTest - 异步提交响应处理任务......
2020-01-18 13:17:09 [NETTY_IO_THREAD] WARN  c.t.client.NettyThreadSyncTest - 处理响应成功,请求ID:71f47e27-c17c-458d-b271-4e74fad33a7b,响应内容:throwable say hello!
2020-01-18 13:17:09 [main] INFO  c.t.client.NettyThreadSyncTest - 同步获取请求结果:throwable say hello!

上面这个例子里面的线程同步处理主要参考主流的 Netty 框架客户端部分的实现逻辑: RocketMQ (具体是 NettyRemotingClient 类)以及 Redisson (具体是 RedisExecutor 类),它们就是用这种方式使得异步线程处理转化为同步处理。

Client端请求响应同步化处理

按照前面的例子,首先新增一个 ResponseFuture 用于承载已发送但未响应的请求:

@ToString
public class ResponseFuture {

    private final long beginTimestamp = System.currentTimeMillis();
    @Getter
    private final long timeoutMilliseconds;
    @Getter
    private final String requestId;
    @Setter
    @Getter
    private volatile boolean sendRequestSucceed = false;
    @Setter
    @Getter
    private volatile Throwable cause;
    @Getter
    private volatile ResponseMessagePacket response;

    private final CountDownLatch latch = new CountDownLatch(1);

    public ResponseFuture(String requestId, long timeoutMilliseconds) {
        this.requestId = requestId;
        this.timeoutMilliseconds = timeoutMilliseconds;
    }

    public boolean timeout() {
        return System.currentTimeMillis() - beginTimestamp > timeoutMilliseconds;
    }

    public ResponseMessagePacket waitResponse(final long timeoutMilliseconds) throws InterruptedException {
        latch.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
        return response;
    }

    public void putResponse(ResponseMessagePacket response) throws InterruptedException {
        this.response = response;
        latch.countDown();
    }
}

接着需要新增一个 HashMap 去缓存这些返送成功但是未得到响应处理的 ResponseFuture

Map<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap();

这里的 KEY 选用 requestId ,而 requestId 之前已经定义为 UUID ,确保每个请求不会重复。为了简单起见,目前所有的逻辑都编写在契约代理工厂 ContractProxyFactory ,添加下面的功能:

  • 添加一个同步发送方法 sendRequestSync() 处理消息包的发送和同步响应, RequestMessagePacket 转换为调用代理目标方法返回值类型的逻辑暂时也编写在此方法中。
  • 添加一个核心线程数量为逻辑核心数量 * 2的线程池用于处理请求。
  • 添加一个单线程的调度线程池用于定时清理那些过期的 ResponseFuture ,清理方法为 scanResponseFutureTable()

修改后的 ContractProxyFactory 如下:

@Slf4j
public class ContractProxyFactory {

    private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor();
    private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap();
    static final ConcurrentMap<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap();
    // 定义请求的最大超时时间为3秒
    private static final long REQUEST_TIMEOUT_MS = 3000;
    private static final ExecutorService EXECUTOR;
    private static final ScheduledExecutorService CLIENT_HOUSE_KEEPER;
    private static final Serializer SERIALIZER = FastJsonSerializer.X;


    @SuppressWarnings("unchecked")
    public static <T> T ofProxy(Class<T> interfaceKlass) {
        // 缓存契约接口的代理类实例
        return (T) CACHE.computeIfAbsent(interfaceKlass, x ->
                Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> {
                    RequestArgumentExtractInput input = new RequestArgumentExtractInput();
                    input.setInterfaceKlass(interfaceKlass);
                    input.setMethod(method);
                    RequestArgumentExtractOutput output = EXTRACTOR.extract(input);
                    // 封装请求参数
                    RequestMessagePacket packet = new RequestMessagePacket();
                    packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER);
                    packet.setVersion(ProtocolConstant.VERSION);
                    packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber());
                    packet.setMessageType(MessageType.REQUEST);
                    packet.setInterfaceName(output.getInterfaceName());
                    packet.setMethodName(output.getMethodName());
                    packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0]));
                    packet.setMethodArguments(args);
                    Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get();
                    return sendRequestSync(channel, packet, method.getReturnType());
                }));
    }

    /**
     * 同步发送请求
     *
     * @param channel channel
     * @param packet  packet
     * @return Object
     */
    static Object sendRequestSync(Channel channel, RequestMessagePacket packet, Class<?> returnType) {
        long beginTimestamp = System.currentTimeMillis();
        ResponseFuture responseFuture = new ResponseFuture(packet.getSerialNumber(), REQUEST_TIMEOUT_MS);
        RESPONSE_FUTURE_TABLE.put(packet.getSerialNumber(), responseFuture);
        try {
            // 获取到承载响应Packet的Future
            Future<ResponseMessagePacket> packetFuture = EXECUTOR.submit(() -> {
                channel.writeAndFlush(packet).addListener((ChannelFutureListener)
                        future -> responseFuture.setSendRequestSucceed(true));
                return responseFuture.waitResponse(REQUEST_TIMEOUT_MS - (System.currentTimeMillis() - beginTimestamp));
            });
            ResponseMessagePacket responsePacket = packetFuture.get(
                    REQUEST_TIMEOUT_MS - (System.currentTimeMillis() - beginTimestamp), TimeUnit.MILLISECONDS);
            if (null == responsePacket) {
                // 超时导致响应包获取失败
                throw new SendRequestException(String.format("ResponseMessagePacket获取超时,请求ID:%s", packet.getSerialNumber()));
            } else {
                ByteBuf payload = (ByteBuf) responsePacket.getPayload();
                byte[] bytes = ByteBufferUtils.X.readBytes(payload);
                return SERIALIZER.decode(bytes, returnType);
            }
        } catch (Exception e) {
            log.error("同步发送请求异常,请求包:{}", JSON.toJSONString(packet), e);
            if (e instanceof RuntimeException) {
                throw (RuntimeException) e;
            } else {
                throw new SendRequestException(e);
            }
        }
    }

    static void scanResponseFutureTable() {
        log.info("开始执行ResponseFutureTable清理任务......");
        Iterator<Map.Entry<String, ResponseFuture>> iterator = RESPONSE_FUTURE_TABLE.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, ResponseFuture> entry = iterator.next();
            ResponseFuture responseFuture = entry.getValue();
            if (responseFuture.timeout()) {
                iterator.remove();
                log.warn("移除过期的请求ResponseFuture,请求ID:{}", entry.getKey());
            }
        }
        log.info("执行ResponseFutureTable清理任务结束......");
    }

    static {
        int n = Runtime.getRuntime().availableProcessors();
        EXECUTOR = new ThreadPoolExecutor(n * 2, n * 2, 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(50), runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("CLIENT_REQUEST_EXECUTOR");
            return thread;
        });
        CLIENT_HOUSE_KEEPER = new ScheduledThreadPoolExecutor(1, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("CLIENT_HOUSE_KEEPER");
            return thread;
        });
        CLIENT_HOUSE_KEEPER.scheduleWithFixedDelay(ContractProxyFactory::scanResponseFutureTable, 5, 5, TimeUnit.SECONDS);
    }
}

接着添加一个客户端入站处理器,用于通过 reuqestId 匹配目标 ResponseFuture 实例,同时设置 ResponseFuture 实例中的 response 属性为响应包,同时释放闭锁:

@Slf4j
public class ClientHandler extends SimpleChannelInboundHandler<ResponseMessagePacket> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception {
        log.info("接收到响应包,内容:{}", JSON.toJSONString(packet));
        ResponseFuture responseFuture = ContractProxyFactory.RESPONSE_FUTURE_TABLE.get(packet.getSerialNumber());
        if (null != responseFuture) {
            responseFuture.putResponse(packet);
        } else {
            log.warn("接收响应包查询ResponseFuture不存在,请求ID:{}", packet.getSerialNumber());
        }
    }
}

最后,客户端启动类 ClientApplication 中添加 ClientHandlerNetty 的处理器流水线中即可:

bootstrap.handler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
        ch.pipeline().addLast(new LengthFieldPrepender(4));
        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
        ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X));
        ch.pipeline().addLast(new ResponseMessagePacketDecoder());
        ch.pipeline().addLast(new ClientHandler());
    }
});

先运行之前- 《基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇》 中编写好的 ServerApplication ,再启动 ClientApplication ,日志输出如下:

// 服务端
2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO  club.throwable.server.ServerHandler - 服务端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)])
2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO  club.throwable.server.ServerHandler - 查找目标实现方法成功,目标类:club.throwable.server.contract.DefaultHelloService,宿主类:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO  club.throwable.server.ServerHandler - 服务端输出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"21d131d26fc74f91b4691e0207826b90","version":1}

// 客户端
2020-01-18 14:32:59 [nioEventLoopGroup-2-1] INFO  club.throwable.client.ClientHandler - 接收到响应包,内容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":{"contiguous":true,"direct":true,"readOnly":false,"readable":true,"writable":false},"serialNumber":"21d131d26fc74f91b4691e0207826b90","version":1}
2020-01-18 14:32:59 [main] INFO  c.throwable.client.ClientApplication - HelloService[throwable]调用结果:"throwable say hello!"
2020-01-18 14:33:04 [CLIENT_HOUSE_KEEPER] INFO  c.t.client.ContractProxyFactory - 开始执行ResponseFutureTable清理任务......
2020-01-18 14:33:04 [CLIENT_HOUSE_KEEPER] WARN  c.t.client.ContractProxyFactory - 移除过期的请求ResponseFuture,请求ID:21d131d26fc74f91b4691e0207826b90

可见异步线程模型已经被改造为同步化,现在可以通过契约接口通过 RPC 同步调用服务端。

小结

Client 端的请求-响应同步化处理基本改造完毕,到此为止,一个 RPC 框架大致已经完成,接下来会对 Client 端和 Server 端进行一些改造,让契约相关组件托管到 IOC 容器,实现契约接口自动注入等等功能。

Demo 项目地址:

(本文完e-a-20200118 c-2-d)


以上所述就是小编给大家介绍的《基于Netty和SpringBoot实现一个轻量级RPC框架-Client端请求响应同步化处理》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Head First Design Patterns

Head First Design Patterns

Elisabeth Freeman、Eric Freeman、Bert Bates、Kathy Sierra、Elisabeth Robson / O'Reilly Media / 2004-11-1 / USD 49.99

You're not alone. At any given moment, somewhere in the world someone struggles with the same software design problems you have. You know you don't want to reinvent the wheel (or worse, a flat tire),......一起来看看 《Head First Design Patterns》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具