内容简介:本系列为本人Java编程方法论 响应式解读系列的Rxjava源码解读与分享:https://www.bilibili.com/video/av34537840Reactor源码解读与分享:https://www.bilibili.com/video/av35326911
本系列为本人 Java 编程方法论 响应式解读系列的 Webflux
部分,现分享出来,前置知识Rxjava2 ,Reactor的相关解读已经录制分享视频,并发布在b站,地址如下:
Rxjava源码解读与分享:https://www.bilibili.com/video/av34537840
Reactor源码解读与分享:https://www.bilibili.com/video/av35326911
NIO源码解读相关视频分享: https://www.bilibili.com/video/av43230997
NIO源码解读视频相关配套文章:
BIO到NIO源码的一些事儿之NIO 下 之 Selector
BIO到NIO源码的一些事儿之NIO 下 Buffer解读 上
BIO到NIO源码的一些事儿之NIO 下 Buffer解读 下
Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 上
Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 下
Java编程方法论-Spring WebFlux篇 Reactor-Netty下HttpServer 的封装
其中,Rxjava与Reactor作为本人书中内容将不对外开放,大家感兴趣可以花点时间来观看视频,本人对着两个库进行了全面彻底细致的解读,包括其中的设计理念和相关的方法论,也希望大家可以留言纠正我其中的错误。
本书主要针对 Netty
服务器来讲,所以读者应具备有关 Netty
的基本知识和应用技能。接下来,我们将对 Reactor-netty
从设计到实现的细节一一探究,让大家真的从中学习到好的封装设计理念。本书在写时所参考的最新版本是 Reactor-netty 0.7.8.Release
这个版本,但现在已有 0.8
版本,而且 0.7
与 0.8
版本在源码细节有不小的变动,这点给大家提醒下。我会针对 0.8
版本进行全新的解读并在未来出版的书中进行展示。
TcpServer的功能实现
这里,我们会首先解读 Reactor Netty
是如何针对 Netty
中 Bootstrap
的 ChildHandler
进行封装以及响应式拓展等一些细节探究。接着,我们会涉及到 HttpHandler
的引入,以此来对接我们上层web服务。
针对Bootstrap的ChildHandler的封装
因为这是我们切入自定义逻辑的地方,所以,我们首先来关注下与其相关的 ChannelHandler
,以及前文并未提到的,服务器到底是如何启动以及如何通过响应式来做到优雅的关闭,首先我们会接触关闭服务器的设定。
ChannelHandler引入与使用响应式优雅关闭服务器
我们再回到 reactor.ipc.netty.http.server.HttpServer#HttpServer
这个构造器中,由上一章我们知道请求是 HTTP
层面的(应用层),必须依赖于 TCP
的连接实现,所以这里就要有一个 TCPServer
的实现,其实就是 Channel
上 Pipeline
的操作。
//reactor.ipc.netty.http.server.HttpServer#HttpServer
private HttpServer(HttpServer.Builder builder) {
HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder();
...
this.options = serverOptionsBuilder.build();
this.server = new TcpBridgeServer(this.options);
}
复制代码
这里的话在 DiscardServer Demo
中, TCPServer
我们主要针对 childHandler
的内容的封装,也就是如下内容:
b.group(bossGroup, workerGroup)
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
...
复制代码
那 childHandler
到底代表什么类型,我们可以在 io.netty.bootstrap.ServerBootstrap
找到其相关定义:
//io.netty.bootstrap.ServerBootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
public ServerBootstrap() { }
private ServerBootstrap(ServerBootstrap bootstrap) {
super(bootstrap);
childGroup = bootstrap.childGroup;
childHandler = bootstrap.childHandler;
synchronized (bootstrap.childOptions) {
childOptions.putAll(bootstrap.childOptions);
}
synchronized (bootstrap.childAttrs) {
childAttrs.putAll(bootstrap.childAttrs);
}
}
...
}
复制代码
由字段定义可知, childHandler
代表的是 ChannelHandler
,顾名思义,是关于 Channel
的一个处理类,这里通过查看其定义可知它是用来拦截处理 Channel
中的 I/O
事件,并通过 Channel
下的 ChannelPipeline
将处理后的事件转发到其下一个处理程序中。
那这里如何实现 DiscardServer Demo
中的 b.childHandler(xxx)
行为,通过 DiscardServer Demo
我们可以知道,我们最关注的其实是 ch.pipeline().addLast(new DiscardServerHandler());
中的 DiscardServerHandler
实现,但是我们发现,这个核心语句是包含在 ChannelInitializer
内,其继承了 ChannelInboundHandlerAdapter
,它的最顶层的父类接口就是 ChannelHandler
,也就对应了 io.netty.bootstrap.ServerBootstrap
在执行 b.childHandler(xxx)
方法时,其需要传入 ChannelHandler
类型的设定。这里就可以分拆成两步来做,一个是 b.childHandler(xxx)
行为包装,一个是此 ChannelHandler
的定义拓展实现。
那么,为了 API
的通用性,我们先来看Netty的客户端的建立的一个Demo(摘自本人RPC项目的一段代码):
private Channel createNewConChannel() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<Channel>() {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO))
.addLast(new RpcDecoder(10 * 1024 * 1024))
.addLast(new RpcEncoder())
.addLast(new RpcClientHandler())
;
}
});
try {
final ChannelFuture f =
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.TCP_NODELAY, true)
.connect(ip, port).sync(); // <1>
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
LOGGER.info("Connect success {} ", f);
}
});
final Channel channel = f.channel();
channel.closeFuture().addListener((ChannelFutureListener) future -> LOGGER.info("Channel Close {} {}", ip, port));
return channel;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
复制代码
将 Netty
的客户端与服务端的建立进行对比,我们可以发现 b.childHandler(xxx)
与相应的启动( Server
端的话是 serverBootstrap.bind(port).sync();
,客户端的话是上述Demo中 <1>
处的内容)都可以抽取出来作为一个接口来进行功能的聚合,然后和相应的 Server
(如 TcpServer
)或 Client
(如 TcpClient
)进行其特有的实现。在 Reactor Netty
内的话,就是定义一个 reactor.ipc.netty.NettyConnector
接口,除了做到上述的功能之外,为了适配响应式的理念,也进行了响应式的设计。即在 netty
客户端与服务端在启动时,可以保存其状态,以及提供结束的对外接口方法,这种在响应式中可以很优雅的实现。接下来,我们来看此 reactor.ipc.netty.NettyConnector
的接口定义:
//reactor.ipc.netty.NettyConnector
public interface NettyConnector<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> {
Mono<? extends NettyContext> newHandler(BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> ioHandler);
...
default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
BlockingNettyContext start(T handler) {
return new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());
}
}
...
default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
void startAndAwait(T handler, @Nullable Consumer<BlockingNettyContext> onStart) {
BlockingNettyContext facade = new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());
facade.installShutdownHook();
if (onStart != null) {
onStart.accept(facade);
}
facade.getContext()
.onClose()
.block();
}
复制代码
其中, newHandler
可以是我们上层web处理,里面包含了 INBOUND, OUTBOUND
,具体的话就是 request,response
,后面会专门来涉及到这点。
接着就是提供了一个启动方法 start
,其内创建了一个 BlockingNettyContext
实例,而逻辑的核心就在其构造方法内,就是要将配置好的服务器启动,整个启动过程还是放在 newHandler(handler)
中,其返回的 Mono<? extends NettyContext>
中的 NettyContext
类型元素是管理 io.netty.channel.Channel
上下文信息的一个对象,这个对象更多的是一些无状态的操作,并不会对此对象做什么样的改变,也是通过对此对象的一个 Mono<? extends NettyContext>
包装然后通过 block
产生订阅,来做到 sync()
的效果,通过,通过 block
产生订阅后返回的 NettyContext
对象,可以使中断关闭服务器的操作也可以做到更优雅:
public class BlockingNettyContext {
private static final Logger LOG = Loggers.getLogger(BlockingNettyContext.class);
private final NettyContext context;
private final String description;
private Duration lifecycleTimeout;
private Thread shutdownHook;
public BlockingNettyContext(Mono<? extends NettyContext> contextAsync,
String description) {
this(contextAsync, description, Duration.ofSeconds(45));
}
public BlockingNettyContext(Mono<? extends NettyContext> contextAsync,
String description, Duration lifecycleTimeout) {
this.description = description;
this.lifecycleTimeout = lifecycleTimeout;
this.context = contextAsync
.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be started within " + lifecycleTimeout.toMillis() + "ms")))
.doOnNext(ctx -> LOG.info("Started {} on {}", description, ctx.address()))
.block();
}
...
/**
* Shut down the {@link NettyContext} and wait for its termination, up to the
* {@link #setLifecycleTimeout(Duration) lifecycle timeout}.
*/
public void shutdown() {
if (context.isDisposed()) {
return;
}
removeShutdownHook(); //only applies if not called from the hook's thread
context.dispose();
context.onClose()
.doOnError(e -> LOG.error("Stopped {} on {} with an error {}", description, context.address(), e))
.doOnTerminate(() -> LOG.info("Stopped {} on {}", description, context.address()))
.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
.block();
}
...
}
复制代码
这里,我们来接触下在Reactor中并没有深入接触的 blockXXX()
操作,其实整个逻辑还是比较简单的,这里拿 reactor.core.publisher.Mono#block()
来讲,就是获取并返回这个下发的元素:
//reactor.core.publisher.Mono#block()
@Nullable
public T block() {
BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
return subscriber.blockingGet();
}
//reactor.core.publisher.BlockingMonoSubscriber
final class BlockingMonoSubscriber<T> extends BlockingSingleSubscriber<T> {
@Override
public void onNext(T t) {
if (value == null) {
value = t;
countDown();
}
}
@Override
public void onError(Throwable t) {
if (value == null) {
error = t;
}
countDown();
}
}
//reactor.core.publisher.BlockingSingleSubscriber
abstract class BlockingSingleSubscriber<T> extends CountDownLatch
implements InnerConsumer<T>, Disposable {
T value;
Throwable error;
Subscription s;
volatile boolean cancelled;
BlockingSingleSubscriber() {
super(1);
}
...
@Nullable
final T blockingGet() {
if (Schedulers.isInNonBlockingThread()) {
throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
}
if (getCount() != 0) {
try {
await();
}
catch (InterruptedException ex) {
dispose();
throw Exceptions.propagate(ex);
}
}
Throwable e = error;
if (e != null) {
RuntimeException re = Exceptions.propagate(e);
//this is ok, as re is always a new non-singleton instance
re.addSuppressed(new Exception("#block terminated with an error"));
throw re;
}
return value;
}
...
@Override
public final void onComplete() {
countDown();
}
}
复制代码
可以看到,此处使用的 CountDownLatch
的一个特性,在元素下发赋值之后,等待数值减1,这里刚好也就这一个限定(由 super(1)
定义),解除所调用的 blockingGet
中的等待,得到所需的值,这里,为了保证 block()
的语义,其 onComplete
方法也调用了 countDown();
,即当上游为 Mono<Void>
时,做到匹配。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 上
- Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 下
- Java编程方法论-Spring WebFlux篇 Reactor-Netty下HttpServer 的封装
- 架构设计方法论
- 性能分析方法论
- 架构制图:工具与方法论
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Masterminds of Programming
Federico Biancuzzi、Chromatic / O'Reilly Media / 2009-03-27 / USD 39.99
Description Masterminds of Programming features exclusive interviews with the creators of several historic and highly influential programming languages. Think along with Adin D. Falkoff (APL), Jame......一起来看看 《Masterminds of Programming》 这本书的介绍吧!