Java编程方法论-Spring WebFlux篇 Reactor-Netty下TcpServer的功能实现 1

栏目: Java · 发布时间: 5年前

内容简介:本系列为本人Java编程方法论 响应式解读系列的Rxjava源码解读与分享:https://www.bilibili.com/video/av34537840Reactor源码解读与分享:https://www.bilibili.com/video/av35326911

本系列为本人 Java 编程方法论 响应式解读系列的 Webflux 部分,现分享出来,前置知识Rxjava2 ,Reactor的相关解读已经录制分享视频,并发布在b站,地址如下:

Rxjava源码解读与分享:https://www.bilibili.com/video/av34537840

Reactor源码解读与分享:https://www.bilibili.com/video/av35326911

NIO源码解读相关视频分享: https://www.bilibili.com/video/av43230997

NIO源码解读视频相关配套文章:

BIO到NIO源码的一些事儿之BIO

BIO到NIO源码的一些事儿之NIO 上

BIO到NIO源码的一些事儿之NIO 中

BIO到NIO源码的一些事儿之NIO 下 之 Selector

BIO到NIO源码的一些事儿之NIO 下 Buffer解读 上

BIO到NIO源码的一些事儿之NIO 下 Buffer解读 下

Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 上

Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 下

Java编程方法论-Spring WebFlux篇 Reactor-Netty下HttpServer 的封装

其中,Rxjava与Reactor作为本人书中内容将不对外开放,大家感兴趣可以花点时间来观看视频,本人对着两个库进行了全面彻底细致的解读,包括其中的设计理念和相关的方法论,也希望大家可以留言纠正我其中的错误。

本书主要针对 Netty 服务器来讲,所以读者应具备有关 Netty 的基本知识和应用技能。接下来,我们将对 Reactor-netty 从设计到实现的细节一一探究,让大家真的从中学习到好的封装设计理念。本书在写时所参考的最新版本是 Reactor-netty 0.7.8.Release 这个版本,但现在已有 0.8 版本,而且 0.70.8 版本在源码细节有不小的变动,这点给大家提醒下。我会针对 0.8 版本进行全新的解读并在未来出版的书中进行展示。

TcpServer的功能实现

这里,我们会首先解读 Reactor Netty 是如何针对 NettyBootstrapChildHandler 进行封装以及响应式拓展等一些细节探究。接着,我们会涉及到 HttpHandler 的引入,以此来对接我们上层web服务。

针对Bootstrap的ChildHandler的封装

因为这是我们切入自定义逻辑的地方,所以,我们首先来关注下与其相关的 ChannelHandler ,以及前文并未提到的,服务器到底是如何启动以及如何通过响应式来做到优雅的关闭,首先我们会接触关闭服务器的设定。

ChannelHandler引入与使用响应式优雅关闭服务器

我们再回到 reactor.ipc.netty.http.server.HttpServer#HttpServer 这个构造器中,由上一章我们知道请求是 HTTP 层面的(应用层),必须依赖于 TCP 的连接实现,所以这里就要有一个 TCPServer 的实现,其实就是 ChannelPipeline 的操作。

//reactor.ipc.netty.http.server.HttpServer#HttpServer
private HttpServer(HttpServer.Builder builder) {
    HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder();
            ...
    this.options = serverOptionsBuilder.build();
    this.server = new TcpBridgeServer(this.options);
}
复制代码

这里的话在 DiscardServer Demo 中, TCPServer 我们主要针对 childHandler 的内容的封装,也就是如下内容:

b.group(bossGroup, workerGroup)
    ...
 .childHandler(new ChannelInitializer<SocketChannel>() { 
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new DiscardServerHandler());
            }
        })
    ...
复制代码

childHandler 到底代表什么类型,我们可以在 io.netty.bootstrap.ServerBootstrap 找到其相关定义:

//io.netty.bootstrap.ServerBootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);

    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
    private volatile EventLoopGroup childGroup;
    private volatile ChannelHandler childHandler;

    public ServerBootstrap() { }

    private ServerBootstrap(ServerBootstrap bootstrap) {
        super(bootstrap);
        childGroup = bootstrap.childGroup;
        childHandler = bootstrap.childHandler;
        synchronized (bootstrap.childOptions) {
            childOptions.putAll(bootstrap.childOptions);
        }
        synchronized (bootstrap.childAttrs) {
            childAttrs.putAll(bootstrap.childAttrs);
        }
    }
...
}
复制代码

由字段定义可知, childHandler 代表的是 ChannelHandler ,顾名思义,是关于 Channel 的一个处理类,这里通过查看其定义可知它是用来拦截处理 Channel 中的 I/O 事件,并通过 Channel 下的 ChannelPipeline 将处理后的事件转发到其下一个处理程序中。 那这里如何实现 DiscardServer Demo 中的 b.childHandler(xxx) 行为,通过 DiscardServer Demo 我们可以知道,我们最关注的其实是 ch.pipeline().addLast(new DiscardServerHandler()); 中的 DiscardServerHandler 实现,但是我们发现,这个核心语句是包含在 ChannelInitializer 内,其继承了 ChannelInboundHandlerAdapter ,它的最顶层的父类接口就是 ChannelHandler ,也就对应了 io.netty.bootstrap.ServerBootstrap 在执行 b.childHandler(xxx) 方法时,其需要传入 ChannelHandler 类型的设定。这里就可以分拆成两步来做,一个是 b.childHandler(xxx) 行为包装,一个是此 ChannelHandler 的定义拓展实现。 那么,为了 API 的通用性,我们先来看Netty的客户端的建立的一个Demo(摘自本人RPC项目的一段代码):

private Channel createNewConChannel() {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.channel(NioSocketChannel.class)
                .group(new NioEventLoopGroup(1))
                .handler(new ChannelInitializer<Channel>() {
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO))
                        .addLast(new RpcDecoder(10 * 1024 * 1024))
                        .addLast(new RpcEncoder())
                        .addLast(new RpcClientHandler())
                        ;
                    }
                });
    try {
        final ChannelFuture f =
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                  .option(ChannelOption.TCP_NODELAY, true)
                  .connect(ip, port).sync(); // <1>
        f.addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                LOGGER.info("Connect success {} ", f);
            }
        });
        final Channel channel = f.channel();
        channel.closeFuture().addListener((ChannelFutureListener) future -> LOGGER.info("Channel Close {} {}", ip, port));
        return channel;
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return null;
}
复制代码

Netty 的客户端与服务端的建立进行对比,我们可以发现 b.childHandler(xxx) 与相应的启动( Server 端的话是 serverBootstrap.bind(port).sync(); ,客户端的话是上述Demo中 <1> 处的内容)都可以抽取出来作为一个接口来进行功能的聚合,然后和相应的 Server (如 TcpServer )或 Client (如 TcpClient )进行其特有的实现。在 Reactor Netty 内的话,就是定义一个 reactor.ipc.netty.NettyConnector 接口,除了做到上述的功能之外,为了适配响应式的理念,也进行了响应式的设计。即在 netty 客户端与服务端在启动时,可以保存其状态,以及提供结束的对外接口方法,这种在响应式中可以很优雅的实现。接下来,我们来看此 reactor.ipc.netty.NettyConnector 的接口定义:

//reactor.ipc.netty.NettyConnector
public interface NettyConnector<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> {

Mono<? extends NettyContext> newHandler(BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> ioHandler);
...
default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
BlockingNettyContext start(T handler) {
    return new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());
}
}
...
default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
void startAndAwait(T handler, @Nullable Consumer<BlockingNettyContext> onStart) {
    BlockingNettyContext facade = new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());

    facade.installShutdownHook();

    if (onStart != null) {
        onStart.accept(facade);
    }

    facade.getContext()
            .onClose()
            .block();
}
复制代码

其中, newHandler 可以是我们上层web处理,里面包含了 INBOUND, OUTBOUND ,具体的话就是 request,response ,后面会专门来涉及到这点。 接着就是提供了一个启动方法 start ,其内创建了一个 BlockingNettyContext 实例,而逻辑的核心就在其构造方法内,就是要将配置好的服务器启动,整个启动过程还是放在 newHandler(handler) 中,其返回的 Mono<? extends NettyContext> 中的 NettyContext 类型元素是管理 io.netty.channel.Channel 上下文信息的一个对象,这个对象更多的是一些无状态的操作,并不会对此对象做什么样的改变,也是通过对此对象的一个 Mono<? extends NettyContext> 包装然后通过 block 产生订阅,来做到 sync() 的效果,通过,通过 block 产生订阅后返回的 NettyContext 对象,可以使中断关闭服务器的操作也可以做到更优雅:

public class BlockingNettyContext {

	private static final Logger LOG = Loggers.getLogger(BlockingNettyContext.class);

	private final NettyContext context;
	private final String description;

	private Duration lifecycleTimeout;
	private Thread shutdownHook;

	public BlockingNettyContext(Mono<? extends NettyContext> contextAsync,
			String description) {
		this(contextAsync, description, Duration.ofSeconds(45));
	}

	public BlockingNettyContext(Mono<? extends NettyContext> contextAsync,
			String description, Duration lifecycleTimeout) {
		this.description = description;
		this.lifecycleTimeout = lifecycleTimeout;
		this.context = contextAsync
				.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be started within " + lifecycleTimeout.toMillis() + "ms")))
				.doOnNext(ctx -> LOG.info("Started {} on {}", description, ctx.address()))
				.block();
	}
    ...
    /**
	 * Shut down the {@link NettyContext} and wait for its termination, up to the
	 * {@link #setLifecycleTimeout(Duration) lifecycle timeout}.
	 */
	public void shutdown() {
		if (context.isDisposed()) {
			return;
		}

		removeShutdownHook(); //only applies if not called from the hook's thread

		context.dispose();
		context.onClose()
		       .doOnError(e -> LOG.error("Stopped {} on {} with an error {}", description, context.address(), e))
		       .doOnTerminate(() -> LOG.info("Stopped {} on {}", description, context.address()))
		       .timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
		       .block();
	}

	...
}
复制代码

这里,我们来接触下在Reactor中并没有深入接触的 blockXXX() 操作,其实整个逻辑还是比较简单的,这里拿 reactor.core.publisher.Mono#block() 来讲,就是获取并返回这个下发的元素:

//reactor.core.publisher.Mono#block()
@Nullable
public T block() {
    BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
    onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
    return subscriber.blockingGet();
}

//reactor.core.publisher.BlockingMonoSubscriber
final class BlockingMonoSubscriber<T> extends BlockingSingleSubscriber<T> {

	@Override
	public void onNext(T t) {
		if (value == null) {
			value = t;
			countDown();
		}
	}

	@Override
	public void onError(Throwable t) {
		if (value == null) {
			error = t;
		}
		countDown();
	}
}
//reactor.core.publisher.BlockingSingleSubscriber
abstract class BlockingSingleSubscriber<T> extends CountDownLatch
implements InnerConsumer<T>, Disposable {

T         value;
Throwable error;

Subscription s;

volatile boolean cancelled;

BlockingSingleSubscriber() {
super(1);
}
...
@Nullable
final T blockingGet() {
if (Schedulers.isInNonBlockingThread()) {
    throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
}
if (getCount() != 0) {
    try {
        await();
    }
    catch (InterruptedException ex) {
        dispose();
        throw Exceptions.propagate(ex);
    }
}

Throwable e = error;
if (e != null) {
    RuntimeException re = Exceptions.propagate(e);
    //this is ok, as re is always a new non-singleton instance
    re.addSuppressed(new Exception("#block terminated with an error"));
    throw re;
}
return value;
}

...
@Override
public final void onComplete() {
    countDown();
}
}
复制代码

可以看到,此处使用的 CountDownLatch 的一个特性,在元素下发赋值之后,等待数值减1,这里刚好也就这一个限定(由 super(1) 定义),解除所调用的 blockingGet 中的等待,得到所需的值,这里,为了保证 block() 的语义,其 onComplete 方法也调用了 countDown(); ,即当上游为 Mono<Void> 时,做到匹配。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Effective STL中文版

Effective STL中文版

[美]Scott Meyers / 潘爱民、陈铭、邹开红 / 清华大学出版社 / 2006-1 / 30.00元

STL是C++标准库的一部分。本书是针对STL的经验总结,书中列出了50个条款,绝大多数条款都解释了在使用STL时应该注意的某一个方面的问题,并且详尽地分析了问题的来源、解决方案的优劣。一起来看看 《Effective STL中文版》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器