内容简介:前置文章:前一篇文章简单介绍了通过动态代理完成了需要的依赖如下:
前提
前置文章:
- 《基于Netty和SpringBoot实现一个轻量级RPC框架-协议篇》
- 《基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇》
- 《基于Netty和SpringBoot实现一个轻量级RPC框架-Client篇》
前一篇文章简单介绍了通过动态代理完成了 Client
端契约接口调用转换为发送 RPC
协议请求的功能。这篇文章主要解决一个遗留的技术难题:请求-响应同步化处理。
需要的依赖如下:
JDK1.8+ Netty:4.1.44.Final SpringBoot:2.2.2.RELEASE
简单分析Netty请求-响应的处理流程
图中已经忽略了编码解码器和其他入站出站处理器,不同颜色的线程代表完全不相同的线程,不同线程之间的处理逻辑是完全异步,也就是 Netty IO
线程( n-l-g-1
)接收到 Server
端的消息并且解析完成的时候,用户调用线程( u-t-1
)无法感知到解析完毕的消息包,那么这里要做的事情就是让用户调用线程( u-t-1
)获取到 Netty IO
线程( n-l-g-1
)接收并且解析完成的消息包。
这里可以用一个简单的例子来说明模拟 Client
端调用线程等待 Netty IO
线程的处理结果再同步返回的过程。
@Slf4j public class NettyThreadSyncTest { @ToString private static class ResponseFuture { private final long beginTimestamp = System.currentTimeMillis(); @Getter private final long timeoutMilliseconds; @Getter private final String requestId; @Setter @Getter private volatile boolean sendRequestSucceed = false; @Setter @Getter private volatile Throwable cause; @Getter private volatile Object response; private final CountDownLatch latch = new CountDownLatch(1); public ResponseFuture(String requestId, long timeoutMilliseconds) { this.requestId = requestId; this.timeoutMilliseconds = timeoutMilliseconds; } public boolean timeout() { return System.currentTimeMillis() - beginTimestamp > timeoutMilliseconds; } public Object waitResponse(final long timeoutMilliseconds) throws InterruptedException { latch.await(timeoutMilliseconds, TimeUnit.MILLISECONDS); return response; } public void putResponse(Object response) throws InterruptedException { this.response = response; latch.countDown(); } } static ExecutorService REQUEST_THREAD; static ExecutorService NETTY_IO_THREAD; static Callable<Object> REQUEST_TASK; static Runnable RESPONSE_TASK; static String processBusiness(String name) { return String.format("%s say hello!", name); } private static final Map<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap(); @BeforeClass public static void beforeClass() throws Exception { String requestId = UUID.randomUUID().toString(); String requestContent = "throwable"; REQUEST_TASK = () -> { try { // 3秒没有得到响应认为超时 ResponseFuture responseFuture = new ResponseFuture(requestId, 3000); RESPONSE_FUTURE_TABLE.put(requestId, responseFuture); // 这里忽略发送请求的操作,只打印日志和模拟耗时1秒 Thread.sleep(1000); log.info("发送请求成功,请求ID:{},请求内容:{}", requestId, requestContent); // 更新标记属性 responseFuture.setSendRequestSucceed(true); // 剩余2秒等待时间 - 这里只是粗略计算 return responseFuture.waitResponse(3000 - 1000); } catch (Exception e) { log.info("发送请求失败,请求ID:{},请求内容:{}", requestId, requestContent); throw new RuntimeException(e); } }; RESPONSE_TASK = () -> { String responseContent = processBusiness(requestContent); try { ResponseFuture responseFuture = RESPONSE_FUTURE_TABLE.get(requestId); if (null != responseFuture) { log.warn("处理响应成功,请求ID:{},响应内容:{}", requestId, responseContent); responseFuture.putResponse(responseContent); } else { log.warn("请求ID[{}]对应的ResponseFuture不存在,忽略处理", requestId); } } catch (Exception e) { log.info("处理响应失败,请求ID:{},响应内容:{}", requestId, responseContent); throw new RuntimeException(e); } }; REQUEST_THREAD = Executors.newSingleThreadExecutor(runnable -> { Thread thread = new Thread(runnable, "REQUEST_THREAD"); thread.setDaemon(true); return thread; }); NETTY_IO_THREAD = Executors.newSingleThreadExecutor(runnable -> { Thread thread = new Thread(runnable, "NETTY_IO_THREAD"); thread.setDaemon(true); return thread; }); } @Test public void testProcessSync() throws Exception { log.info("异步提交请求处理任务......"); Future<Object> future = REQUEST_THREAD.submit(REQUEST_TASK); // 模拟请求耗时 Thread.sleep(1500); log.info("异步提交响应处理任务......"); NETTY_IO_THREAD.execute(RESPONSE_TASK); // 这里可以设置超时 log.info("同步获取请求结果:{}", future.get()); Thread.sleep(Long.MAX_VALUE); } }
执行 testProcessSync()
方法,控制台输出如下:
2020-01-18 13:17:07 [main] INFO c.t.client.NettyThreadSyncTest - 异步提交请求处理任务...... 2020-01-18 13:17:08 [REQUEST_THREAD] INFO c.t.client.NettyThreadSyncTest - 发送请求成功,请求ID:71f47e27-c17c-458d-b271-4e74fad33a7b,请求内容:throwable 2020-01-18 13:17:09 [main] INFO c.t.client.NettyThreadSyncTest - 异步提交响应处理任务...... 2020-01-18 13:17:09 [NETTY_IO_THREAD] WARN c.t.client.NettyThreadSyncTest - 处理响应成功,请求ID:71f47e27-c17c-458d-b271-4e74fad33a7b,响应内容:throwable say hello! 2020-01-18 13:17:09 [main] INFO c.t.client.NettyThreadSyncTest - 同步获取请求结果:throwable say hello!
上面这个例子里面的线程同步处理主要参考主流的 Netty
框架客户端部分的实现逻辑: RocketMQ
(具体是 NettyRemotingClient
类)以及 Redisson
(具体是 RedisExecutor
类),它们就是用这种方式使得异步线程处理转化为同步处理。
Client端请求响应同步化处理
按照前面的例子,首先新增一个 ResponseFuture
用于承载已发送但未响应的请求:
@ToString public class ResponseFuture { private final long beginTimestamp = System.currentTimeMillis(); @Getter private final long timeoutMilliseconds; @Getter private final String requestId; @Setter @Getter private volatile boolean sendRequestSucceed = false; @Setter @Getter private volatile Throwable cause; @Getter private volatile ResponseMessagePacket response; private final CountDownLatch latch = new CountDownLatch(1); public ResponseFuture(String requestId, long timeoutMilliseconds) { this.requestId = requestId; this.timeoutMilliseconds = timeoutMilliseconds; } public boolean timeout() { return System.currentTimeMillis() - beginTimestamp > timeoutMilliseconds; } public ResponseMessagePacket waitResponse(final long timeoutMilliseconds) throws InterruptedException { latch.await(timeoutMilliseconds, TimeUnit.MILLISECONDS); return response; } public void putResponse(ResponseMessagePacket response) throws InterruptedException { this.response = response; latch.countDown(); } }
接着需要新增一个 HashMap
去缓存这些返送成功但是未得到响应处理的 ResponseFuture
:
Map<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap();
这里的 KEY
选用 requestId
,而 requestId
之前已经定义为 UUID
,确保每个请求不会重复。为了简单起见,目前所有的逻辑都编写在契约代理工厂 ContractProxyFactory
,添加下面的功能:
- 添加一个同步发送方法
sendRequestSync()
处理消息包的发送和同步响应,RequestMessagePacket
转换为调用代理目标方法返回值类型的逻辑暂时也编写在此方法中。 - 添加一个核心线程数量为逻辑核心数量 * 2的线程池用于处理请求。
- 添加一个单线程的调度线程池用于定时清理那些过期的
ResponseFuture
,清理方法为scanResponseFutureTable()
。
修改后的 ContractProxyFactory
如下:
@Slf4j public class ContractProxyFactory { private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor(); private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap(); static final ConcurrentMap<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap(); // 定义请求的最大超时时间为3秒 private static final long REQUEST_TIMEOUT_MS = 3000; private static final ExecutorService EXECUTOR; private static final ScheduledExecutorService CLIENT_HOUSE_KEEPER; private static final Serializer SERIALIZER = FastJsonSerializer.X; @SuppressWarnings("unchecked") public static <T> T ofProxy(Class<T> interfaceKlass) { // 缓存契约接口的代理类实例 return (T) CACHE.computeIfAbsent(interfaceKlass, x -> Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> { RequestArgumentExtractInput input = new RequestArgumentExtractInput(); input.setInterfaceKlass(interfaceKlass); input.setMethod(method); RequestArgumentExtractOutput output = EXTRACTOR.extract(input); // 封装请求参数 RequestMessagePacket packet = new RequestMessagePacket(); packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER); packet.setVersion(ProtocolConstant.VERSION); packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber()); packet.setMessageType(MessageType.REQUEST); packet.setInterfaceName(output.getInterfaceName()); packet.setMethodName(output.getMethodName()); packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0])); packet.setMethodArguments(args); Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get(); return sendRequestSync(channel, packet, method.getReturnType()); })); } /** * 同步发送请求 * * @param channel channel * @param packet packet * @return Object */ static Object sendRequestSync(Channel channel, RequestMessagePacket packet, Class<?> returnType) { long beginTimestamp = System.currentTimeMillis(); ResponseFuture responseFuture = new ResponseFuture(packet.getSerialNumber(), REQUEST_TIMEOUT_MS); RESPONSE_FUTURE_TABLE.put(packet.getSerialNumber(), responseFuture); try { // 获取到承载响应Packet的Future Future<ResponseMessagePacket> packetFuture = EXECUTOR.submit(() -> { channel.writeAndFlush(packet).addListener((ChannelFutureListener) future -> responseFuture.setSendRequestSucceed(true)); return responseFuture.waitResponse(REQUEST_TIMEOUT_MS - (System.currentTimeMillis() - beginTimestamp)); }); ResponseMessagePacket responsePacket = packetFuture.get( REQUEST_TIMEOUT_MS - (System.currentTimeMillis() - beginTimestamp), TimeUnit.MILLISECONDS); if (null == responsePacket) { // 超时导致响应包获取失败 throw new SendRequestException(String.format("ResponseMessagePacket获取超时,请求ID:%s", packet.getSerialNumber())); } else { ByteBuf payload = (ByteBuf) responsePacket.getPayload(); byte[] bytes = ByteBufferUtils.X.readBytes(payload); return SERIALIZER.decode(bytes, returnType); } } catch (Exception e) { log.error("同步发送请求异常,请求包:{}", JSON.toJSONString(packet), e); if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { throw new SendRequestException(e); } } } static void scanResponseFutureTable() { log.info("开始执行ResponseFutureTable清理任务......"); Iterator<Map.Entry<String, ResponseFuture>> iterator = RESPONSE_FUTURE_TABLE.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, ResponseFuture> entry = iterator.next(); ResponseFuture responseFuture = entry.getValue(); if (responseFuture.timeout()) { iterator.remove(); log.warn("移除过期的请求ResponseFuture,请求ID:{}", entry.getKey()); } } log.info("执行ResponseFutureTable清理任务结束......"); } static { int n = Runtime.getRuntime().availableProcessors(); EXECUTOR = new ThreadPoolExecutor(n * 2, n * 2, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), runnable -> { Thread thread = new Thread(runnable); thread.setDaemon(true); thread.setName("CLIENT_REQUEST_EXECUTOR"); return thread; }); CLIENT_HOUSE_KEEPER = new ScheduledThreadPoolExecutor(1, runnable -> { Thread thread = new Thread(runnable); thread.setDaemon(true); thread.setName("CLIENT_HOUSE_KEEPER"); return thread; }); CLIENT_HOUSE_KEEPER.scheduleWithFixedDelay(ContractProxyFactory::scanResponseFutureTable, 5, 5, TimeUnit.SECONDS); } }
接着添加一个客户端入站处理器,用于通过 reuqestId
匹配目标 ResponseFuture
实例,同时设置 ResponseFuture
实例中的 response
属性为响应包,同时释放闭锁:
@Slf4j public class ClientHandler extends SimpleChannelInboundHandler<ResponseMessagePacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception { log.info("接收到响应包,内容:{}", JSON.toJSONString(packet)); ResponseFuture responseFuture = ContractProxyFactory.RESPONSE_FUTURE_TABLE.get(packet.getSerialNumber()); if (null != responseFuture) { responseFuture.putResponse(packet); } else { log.warn("接收响应包查询ResponseFuture不存在,请求ID:{}", packet.getSerialNumber()); } } }
最后,客户端启动类 ClientApplication
中添加 ClientHandler
到 Netty
的处理器流水线中即可:
bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X)); ch.pipeline().addLast(new ResponseMessagePacketDecoder()); ch.pipeline().addLast(new ClientHandler()); } });
先运行之前- 《基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇》 中编写好的 ServerApplication
,再启动 ClientApplication
,日志输出如下:
// 服务端 2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO club.throwable.server.ServerHandler - 服务端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)]) 2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO club.throwable.server.ServerHandler - 查找目标实现方法成功,目标类:club.throwable.server.contract.DefaultHelloService,宿主类:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello 2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO club.throwable.server.ServerHandler - 服务端输出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"21d131d26fc74f91b4691e0207826b90","version":1} // 客户端 2020-01-18 14:32:59 [nioEventLoopGroup-2-1] INFO club.throwable.client.ClientHandler - 接收到响应包,内容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":{"contiguous":true,"direct":true,"readOnly":false,"readable":true,"writable":false},"serialNumber":"21d131d26fc74f91b4691e0207826b90","version":1} 2020-01-18 14:32:59 [main] INFO c.throwable.client.ClientApplication - HelloService[throwable]调用结果:"throwable say hello!" 2020-01-18 14:33:04 [CLIENT_HOUSE_KEEPER] INFO c.t.client.ContractProxyFactory - 开始执行ResponseFutureTable清理任务...... 2020-01-18 14:33:04 [CLIENT_HOUSE_KEEPER] WARN c.t.client.ContractProxyFactory - 移除过期的请求ResponseFuture,请求ID:21d131d26fc74f91b4691e0207826b90
可见异步线程模型已经被改造为同步化,现在可以通过契约接口通过 RPC
同步调用服务端。
小结
Client
端的请求-响应同步化处理基本改造完毕,到此为止,一个 RPC
框架大致已经完成,接下来会对 Client
端和 Server
端进行一些改造,让契约相关组件托管到 IOC
容器,实现契约接口自动注入等等功能。
Demo
项目地址:
(本文完e-a-20200118 c-2-d)
以上所述就是小编给大家介绍的《基于Netty和SpringBoot实现一个轻量级RPC框架-Client端请求响应同步化处理》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Head First Design Patterns
Elisabeth Freeman、Eric Freeman、Bert Bates、Kathy Sierra、Elisabeth Robson / O'Reilly Media / 2004-11-1 / USD 49.99
You're not alone. At any given moment, somewhere in the world someone struggles with the same software design problems you have. You know you don't want to reinvent the wheel (or worse, a flat tire),......一起来看看 《Head First Design Patterns》 这本书的介绍吧!