Netty如何实现Reactor模式 原 荐

栏目: 编程工具 · 发布时间: 6年前

内容简介:在前面的文章中(Reactor模型详解),我们讲解了Reactor模式的各种演变形式,本文主要讲解的则是Netty是如何实现Reactor模式的。这里关于Netty实现的Reactor模式,需要说明的是,其实现的模式如下图所示:对于Netty使用的Reactor模式,其主要特点如下:

在前面的文章中(Reactor模型详解),我们讲解了Reactor模式的各种演变形式,本文主要讲解的则是Netty是如何实现Reactor模式的。这里关于Netty实现的Reactor模式,需要说明的是,其实现的模式如下图所示:

Netty如何实现Reactor模式 原 荐

对于Netty使用的Reactor模式,其主要特点如下:

  • 使用一个线程作为mainReactor,专门用于监听客户端的连接事件,当获取到事件之后就将该事件交由Acceptor处理,以获取客户端连接;
  • 在mainReactor获取客户端Channel之后,将其交由subReactor进行处理,这里的subReactor是一个线程池。
  • subReactor会完成诸如客户端Channel注册,数据读取,业务计算,以及数据写入的工作。这里需要注意的是,对于特定的客户端Channel而言,这一系列处理都是与subReactor中某个特定的线程进行绑定的。

1. 用法示例

关于Netty的用法,这里我们还是使用TimeServer展示其最简单的一个使用示例:

public class TimeServer {
  public void bind(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();  // 声明一个bossGroup作为mainReactor
    EventLoopGroup workerGroup = new NioEventLoopGroup();// 声明一个workerGroup作为subReactor
    try {
      ServerBootstrap bootstrap = new ServerBootstrap();  // 创建服务端启动类
      bootstrap.group(bossGroup, workerGroup)  // 指定使用的bossGroup和workerGroup
        .channel(NioServerSocketChannel.class) // 指定使用的Channel连接方式,这里是nio
        .option(ChannelOption.SO_BACKLOG, 1024)// 设置TCP属性
        .handler(new LoggingHandler(LogLevel.INFO))  // 添加公共handler
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new TimeServerHandler());  // 在pipeline中添加自定义handler
          }
        });
      ChannelFuture future = bootstrap.bind(port).sync();  // 绑定端口并且启动
      future.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws InterruptedException {
    new TimeServer().bind(8080);
  }
}

上面示例中,需要说明的有如下几点:

  • 示例首先声明了一个bossGroup和一个workerGroup,这两个group其实就是两个线程池,其每个线程都是使用一个NioEventLoop进行维护的。也就是说每个NioEventLoopGroup其实是由一系列的NioEventLoop组成的,而NioEventLoop的数量默认为“CPU数目 * 2”;
  • bossGroup对应于上面Reactor模式中的mainReactor。但是这里需要理解的是,mainReactor只使用了一个线程进行客户端连接的获取,而这里bossGroup实际上是一个线程池。这主要和Netty启动线程的方式有关,NioEventLoop虽然维护了一个线程,但是其只有在第一次使用的时候才会尝试启动线程。也就是说对于mainReactor,虽然其使用的是一个NioEventLoopGroup,但实际上只有一个ServerSocketChannel与其某个NioEventLoop进行了绑定,而也只有该线程启动了,对于其他的NioEventLoop虽然进行了声明,但是并未启动;
  • 示例中 bootstrap.bind(port).sync(); 其实是整个程序的启动部分,这里的 bind() 方法会返回一个 ChannelFuture 对象,该对象类似于 Java 中的Future对象,会等待线程中任务执行完成才会获取到结果,否则就会进行阻塞性等待。这里调用的 sync() 方法就是这里的阻塞性方法,也就是说这一行代码的作用主要就是,等待bossGroup和workerGroup绑定指定的端口,完成一些初始化动作,然后主线程才会从 sync() 方法的等待中唤醒,从而执行后面的代码;
  • future.channel().closeFuture().sync(); 的主要作用是等待服务器发出关闭命令,其实就是前一行代码将服务器成功绑定到某以端口之后,这里的 closeFuture().sync() 就会阻塞主线程,以等待服务器的bossGroup和workerGroup发出关闭命令。

2. NioEventLoopGroup线程池的构造

我们首先看一下线程池的构造,这里的构造过程其实比较简单,主要就是创建两个NioEventLoopGroup对象。我们跟踪其构造方法,最后进入了 MultithreadEventExecutorGroup 的构造方法中,如下是其初始化过程:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (executor == null) {
        // 这里如果用户没有设置Executor,则使用这里的ThreadPerTaskExecutor,
        // 该执行器会在每次接收到一个任务时都使用一个新的线程来执行任务
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 这里的children就是维护的线程池,nThreads的值就是"CPU数目 * 2"
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        // 实例化线程池的每个对象,对于NioEventLoopGroup,其返回的就是NioEventLoop对象
        children[i] = newChild(executor, args);
        success = true;
    }

    // 这里的chooser其实是一个选择器,用于进行线程池线程的选择
    chooser = chooserFactory.newChooser(children);

    // 为线程池创建一个只读的视图
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

这里线程池的构造代码省略了其余的不相关的异常处理部分,而只展示主要的逻辑结构。在构造一个线程池的时候,首先会为每个线程创建一个执行器,该执行器会在每次任务到达时都使用一个新的线程处理当前任务,如下是 ThreadPerTaskExecutorexecute() 方法:

@Override
public void execute(Runnable command) {
    threadFactory.newThread(command).start();
}

在创建完执行器后,就会调用 newChild() 方法创建线程池中的线程维护对象。该方法是一个抽象方法,具体的实现在 NioEventLoopGroup 中,实际上就是实例化一个 NioEventLoop 对象,如下是 newChild() 方法的实现:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
      ((SelectStrategyFactory) args[1]).newSelectStrategy(), 
      (RejectedExecutionHandler) args[2]);
}

可以看到,这里的NioEventLoop不仅维护了前面实例化的Executor对象,还维护了包括SelectorProvider,SelectStrategyFactory和RejectedExecutionHandler等对象。这里需要说明的是,对于每个EventLoop,Netty都会为其创建一个Selector对象,用于监听Channel的事件,这里的SelectorProvider就是用于创建Selector的。

在NioEventLoop的创建过程中,后面又创建一个了一个 EventExecutorChooser 类型的chooser对象,该对象的主要作用就是一个EventLoop选择器。关于chooser对象的源码,其实其比较简单,就是通过依次轮换的方式不断的在NioEventLoopGroup中选择NioEventLoop来处理新的channel。如下是 chooserFactory.newChooser() 的源码:

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}

// 如果当前线程池的数量是2的指数,则使用当前Chooser,该Chooser能够优化选择过程为一个&操作,
// 这相对于取余操作会有很大的性能提升
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;
    
    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

// 如果线程池的数量不是2的指数,则使用取余的方式进行Chooser的获取
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    @Override
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

上面的代码中,可以看到,对于Chooser的获取,这里主要就是通过一个 idx 变量,不断递增,然后在EventLoopGroup中获取指定索引的EventLoop。只不过其会根据线程池是否为2的指数次方来使用不同的计算策略。

这里我们有必要从整体上对线程池的构造过程进行一下说明:可以看到在线程池的构造过程中,Netty创建了一个Executor对象,nThreads个EventLoop对象和一个EventExecutorChooser对象。读者应该注意到了,每一个EventLoop对象在构造的时候都是使用的同一个Executor对象。其实本质上讲,EventLoop实现了Runnable接口,而在 run() 方法中,其会在一个死循环中不断轮询所注册的事件(获取客户端连接或者读写事件)和执行提交给当前EventLoop的相关任务。这里的Executor其实执行的就是当前EventLoop所代表的任务。前面我们讲到了Executor会在每次新任务到达时新建一个线程来执行任务,而实际上这里的任务就是EventLoop,并且EventLoop在执行每个任务时,除非系统退出,是不会停止其轮询状态的,也就是说Executor创建的线程数最终与EventLoop的数目是一致的。关于EventExecutorChooser,它的与EventLoop的关系在于,每当一个新的channel到达时,chooser就会按照索引递增顺序在NioEventLoopGroup中获取一个EventLoop与当前的Channel进行绑定,而后面所有与该Channel有关的事件都是由该EventLoop的run()方法进行轮询时执行的。

3. NioServerSocketChannel的创建

在前面的示例程序中创建了一个ServerBootstrap的实例,然后调用其gourp(),channel(),option()和childHandler()方法为其设置了相关的属性,这几个方法的调用只是简单的赋值操作,程序的主入口在于 ServerBootstrap.bind() 方法中,如下是该方法的主要实现:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 这里initAndRegister()方法的主要作用就是创建一个ServerSocketChannel对象,然后对其进行初始化,
    // 初始化的内容主要是将ServerBootstrap中的设置的TCP选项和一些属性设置到ServerSocketChannel中;
    // 接着将用于接收客户端请求的ServerBootstrapAcceptor添加到ServerSocketChannel所维护的
    // pipeline中;最后通过ServerSocketChannel所绑定的EventLoop对其进行注册。
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    // 走到这里,如果注册已经完成,则调用doBind0()方法将ServerSocketChannel绑定到对应的端口上,
    // 如果没有注册完成,则添加一个注册完成事件,在事件完成后调用doBind0()方法进行端口绑定。
    // 需要注意的是,这里虽然看起来doBind0()方法可能是由当前主线程执行,也可能是由注册完成时所在的
    // EventLoop的线程执行,但实际上不是的,doBind0()内部会判断当前是否是在对应的EventLoop中,
    // 如果在,则直接调用,不在则将绑定定位当做一个任务添加到EventLoop的执行队列中
    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {  // 添加注册完成监听事件
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    promise.setFailure(cause);
                } else {
                    promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

在doBind()方法中,首先会进行ServerSocketChannel的创建,然后会对其进行初始化和注册,这里的注册实际上就是指将当前ServerSocketChannel注册到其所对应的EventLoop所维护的Selector上。在注册完成之后,就会通过doBind0()方法将ServerSocketChannel绑定到目标端口上。可以看出,这里的doBind()方法实际上完成了ServerSocketChannel从初始化到注册,再到绑定,以及事件监听的整个过程。这里我们首先看看其是如何创建,初始化和注册的,如下是 initAndRegister() 方法的主要逻辑的源码:

final ChannelFuture initAndRegister() {
    Channel channel = channelFactory.newChannel();
    init(channel);

    ChannelFuture regFuture = config().group().register(channel);
    return regFuture;
}

initAndRegister() 方法中,其主流程非常的清晰,首先通过channelFactory创建一个Channel对象,这里的channelFactory是 ReflectiveChannelFactory ,而创建的Channel对应的就是我们在示例中通过 ServerBootstrap.channel() 方法添加的Channel类型:

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

这里的 ReflectiveChannelFactory 所实现的 newChannel() 方法其实就是通过反射创建一个 ServerSocketChannel 对象。在得到Channel对象之后,就会调用 init() 方法对其进行初始化,如下是初始化方法的主要逻辑:

@Override
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, 
                        currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

init() 方法中,我们省略了对childOptions和childAttrs属性进行设置的代码,设置过程其实主要就是从ServerBootstrap中转移到这两个属性中,它们最终会在创建客户端Channel的时候将其应用到客户端Channel上。这里我们还是主要看ServerSocketChannel所对应的pipeline的变化过程。上面代码中,可以看到,主要是往pipeline中添加了一个 ChannelInitializer 对象,该对象的主要作用其实是在注册Channel注册完成之后通过 ChannelPipeline.channelRegistered() 事件触发时调用其 initChannel() 方法(也就是这里实现的方法)来完成初始化的。也就是说这里 init() 方法本质上只是往ServerSocketChannel所对应的pipeline中添加了ChannelInitializer对象,但是其并未执行任何动作。此时我们有必要对pipeline所对应的状态进行一个展示,其现在主要有三个节点:

HeadContext-->ChannelInitializer-->TailContext

在初始化完成之后,我们接下来看ServerSocketChannel是如何注册的。注册主要是通过 config().group().register(channel); 方法调用进行的。对于NioServerSocketChannel,这里的register()最终是调用的 NioMessageUnsafe.register() 方法。如下是该方法的源码:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop.inEventLoop()) {
        register0(promise);  // 进行注册
    } else {
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);  // 交由对应的EventLoop进行注册
            }
        });
    }
}

private void register0(ChannelPromise promise) {
    boolean firstRegistration = neverRegistered;  // 标志是否是第一次注册
    doRegister();  // 调用Java的ServerSocketChannel进行注册
    neverRegistered = false;
    registered = true;

    pipeline.invokeHandlerAddedIfNeeded();  // 触发handlerAdded()事件

    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();  // 触发注册完成事件
    if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();  // 注册完成后触发channelActive()事件
        } else if (config().isAutoRead()) {
            beginRead();
        }
    }
}

这里 register() 方法的主要过程其实就是判断当前执行是否在EventLoop中,如果不在,则交由EventLoop执行,最后的注册过程在 register0() 方法中,在该方法中,首先会调用 doRegister() 方法进行 JavaServerSocketChannel 注册,如下是注册代码:

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

可以看到,这里才是真正的将创建的 JavaServerSocketChannel 进行注册(前面我们所说的Channel其实都是Netty封装的NioServerSocketChannel的执行过程,最终其注册过程是委托到了Java的ServerSocketChannel的注册)。注意,这里注册时监听的事件是 0 ,表示当前只是注册,而不监听任何事件。在注册完成后,调用了 pipeline.fireChannelRegistered() 方法,我们前面讲到,在pipeline中不仅有头结点和尾节点,还有初始化时添加的ChannelInitializer对象。因而这里就会触发其register()事件,最终就会调用前面实现的 initChannel() 方法。翻看前面的代码可以看到,这里首先会将config.handler()添加到pipeline中,这个handler()其实就是最开始我们的示例程序中通过 ServerBootstrap.handler() 方法添加的 LoggingHandler ,然后创建了一个任务,交由EventLoop执行,该任务中会添加一个 ServerBootstrapAcceptor 的handler到pipeline中。当initChannel()方法执行完成之后,pipeline的节点结构变成了下面这种情况:

HeadContext-->ChannelInitializer-->LoggingHandler-->TailContext

这里要注意的一点是,ChannelInitializer的channelRegister()事件触发时,在调用initChannel()方法之后,会将当前handler,也就是当前ChannelInitializer本身从pipeline中移除。因而当 pipeline.fireChannelRegistered(); 调用完成时,整个pipeline的状态如下:

HeadContext-->LoggingHandler-->TailContext

这里需要说明的是,此时我们并没有将 ServerBootstrapAcceptor 展示在这里的pipeline中。这是因为前面只是将其当做一个任务添加到当前EventLoop的任务队列中的。而当前的 register0() 本身也是EventLoop需要执行的一个任务,因而该任务的执行是要晚于当前注册的任务的。也就是说如果在注册完成,并且执行了该任务之后,pipeline中的节点状态如下:

HeadContext-->LoggingHandler-->ServerBootstrapAcceptor-->TailContext

这里我们其实已经可以看到前面Reactor模式中的Acceptor了,也就是这里的ServerBootstrapAcceptor。它的主要作用也确实时不断监听客户端的连接事件,然后接收客户端连接,并且交由workerGroup进行处理。

我们继续看 register0() 的注册流程,在触发完注册事件之后,其会判断当前Channel是否处于活跃状态,然后调用 pipeline.fireChannelActive(); 触发channelActive()事件。这里的判断主要目的是为了保证channelActive()事件在Channel解注册之后又进行注册,此时会触发该事件。而正常情况下,channelActive()事件是会在端口绑定完成后触发。

这里其实我们的服务端初始化和注册流程基本已经完成。在整个 initAndRegister() 方法的执行过程中,主要完成了NioServerSocketChannel的创建,注册,然后主要触发register()事件,并且在事件触发过程中完成了pipeline节点的维护。下面我们来看看NioServerSocketChannel是如何进行事件绑定的,如下前面 doBind0() 跟踪主要逻辑之后,最终在NioMessageUnsafe中的主要逻辑:

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    boolean wasActive = isActive();
    doBind(localAddress);  // 进行端口绑定
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
}

这里首先会获取当前Channel的状态,然后调用 doBind() 方法进行端口绑定,该方法中主要就是进行调用 JavaServerSocketChannel 进行绑定,如下是该方法的源码:

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

在绑定完成之后,就会判断绑定之前当前Channel是非active的,然后绑定完成后当前Channel是active的,此时,就会触发channelActive()事件,表示当前Channel已经处于活跃状态了。这里需要重点指出的是,上面pipeline中的 HeadContext 在channelActive()中完成了一定的事件处理。如下是其实现源码:

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ctx.fireChannelActive();  // 将channelActive()事件往下传递
    readIfIsAutoRead();  // 注册读取事件
}

可以看到,HeadContext首先会将channelActive()事件往下传递。当监听动作完成之后,就会判断当前是否为autoRead,如果是,则触发pipeline的read()事件(这里默认都是autoRead的)。对于read()事件,其实就是通过HeadContext为当前Channel注册其感兴趣的事件,如下是其实现源码:

@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}

// NioMessageUnSafe中
@Override
public final void beginRead() {
    if (!isActive()) {
        return;
    }

    doBeginRead();
}

@Override
protected void doBeginRead() throws Exception {
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

可以看到,这里最终就是通过NioMessageUnsafe类进行Channel事件的注册的。这里 doBeginRead() 方法本质上就是判断当前selectionKey中是否包含有所感兴趣的目标事件,如果不包含,则对其进行设置。这里的目标事件保存在 readInterestOp 属性中。而该属性则是在创建 NioServerSocketChannelNioSocketChannel 对象的不同而传入不同的值。对于NioServerSocketChannel,其传入的是 SelectionKey.OP_ACCEPT ,而NioSocketChannel传入的则是 SelectionKey.OP_READ 。具体的代码如下所示:

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

如此,NioServerSocketChannel的创建,注册,绑定,以及对监听事件的设置都已经完成。可以看到,虽然只是简简单单的几行代码,但实际上Netty在进行服务端创建时做了非常多的工作。下面我们继续看NioServerSocketChannel是如何接收客户端连接,并且进行处理的。

4. NioSocketChannel连接事件的处理

对于NioSocketChannel连接事件的处理主要是通过NioServerSocketChannel对SelectionKey.OP_ACCEPT事件的监听来进行的。前面我们讲到,每个Channel都会绑定一个NioEventLoop,本质上其是一个Runnable对象,也就是说NioServerSocketChannel绑定的NioEventLoop是通过其run()方法进行事件轮询的。如下是该方法的源码:

@Override
protected void run() {
    for (;;) {
        switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
            case SelectStrategy.CONTINUE:
                continue;
            case SelectStrategy.BUSY_WAIT:
            case SelectStrategy.SELECT:
                select(wakenUp.getAndSet(false));  // 获取客户端事件
            default:
        }

        final long ioStartTime = System.nanoTime();
        try {
            processSelectedKeys();  // 处理客户端事件
        } finally {
            final long ioTime = System.nanoTime() - ioStartTime;
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);  // 执行提交的任务
        }
    }
}

在run()方法中,可以看到,其通过一个无限循环,首先不断地监听客户端连接事件,如果有相应的事件,则调用 processSelectedKeys() 方法处理事件,在处理完事件之后,还会调用 runAllTasks() 执行所有注册到当前NioEventLoop中的任务,这里的任务其实就是我们前面讲到的诸如注册,绑定,添加handler等任务。这里我们主要看看 processSelectedKeys() 方法是如何处理客户端事件的,如下是其最终的调用方法:

// processSelectedKeys() -> processSelectedKeysOptimized() -> processSelectedKey()
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();  // 获取当前感兴趣的事件
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 
        || readyOps == 0) {  // 如果是OP_READ或OP_ACCEPT则进行处理
        unsafe.read();
    }
}

这里处理方式其实主要就是首先获取当前Channel监听的事件,如果是OP_READ或OP_ACCEPT,则调用unsafe.read()方法处理。对于NioServerSocketChannel而言,其最终调用的是 NioMessageUnsafe.read() 方法:

@Override
public void read() {
    do {
        int localRead = doReadMessages(readBuf);
        if (localRead == 0) {
            break;
        }
        if (localRead < 0) {
            closed = true;
            break;
        }

        allocHandle.incMessagesRead(localRead);
    } while (allocHandle.continueReading());

    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        pipeline.fireChannelRead(readBuf.get(i));
    }
}

上面代码中,readBuf是一个List<Object>类型的对象。在 doReadMessages() 方法中,主要就是调用Java的ServerSocketChannel获取客户端连接:

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());  // 获取客户端连接
    if (ch != null) {
        buf.add(new NioSocketChannel(this, ch));  // 封装为一个NioSocketChannel
        return 1;
    }
    return 0;
}

在获取到客户端连接之后,将其存入readBuf中。最后对获取到的每个readBuf对象调用 pipeline.fireChannelRead(readBuf.get(i)); 方法,从而触发其channelRead()事件。前面我们讲到,对于NioServerSocketChannel,其对应的pipeline如下:

HeadContext-->LoggingHandler-->ServerBootstrapAcceptor-->TailContext

因而这里最终会触发到 ServerBootstrapAcceptor 的channelRead()事件,这里我们来看其是如何处理该事件的:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;  // 这里获取到的是一个Channel对象
    child.pipeline().addLast(childHandler);  // 添加handler
    childGroup.register(child).addListener(new ChannelFutureListener() {  // 执行注册
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                forceClose(child, future.cause());
            }
        }
    });
}

channelRead() 方法中,这里首先是对传入的msg进行强转,得到一个Channel对象。然后将childHandler添加到pipeline中。这里的childHandler就是最开始我们的示例中通过 ServerBootstrap.childHandler() 方法设置的handler。我们已经知道,ChannelInitializer会在注册流程中通过触发channelRegister()事件从而得到触发,触发完成之后还会将当前handler从pipeline中移除。上面代码中,接下来就是进行NioSocketChannel的注册,这里的注册逻辑与NioServerSocketChannel的注册逻辑是完全一致的,只是在注册过程中有两点需要说明:

  • NioServerSocketChannel在注册完成之后当前Channel还不是active状态的,因而没有出发channelActive()事件,而是在端口绑定完成之后再触发;而NioSocketChannel在注册完成之后,当前Channel是处于活跃状态的,因而其在注册完成之后会立即触发channelActive()事件,最终完成read()事件的触发,也就是为当前Channel注册感兴趣的事件;
  • 对于感兴趣的事件,前面已经讲到,在构造NioSocketChannel的时候,会传入一个SelectionKey.OP_READ保存起来,如此对于NioSocketChannel而言,其监听的事件就是SelectionKey.OP_READ。

通过触发register()事件,最终就会实现客户端NioSocketChannel的注册以及相应事件的触发。对于客户端NioSocketChannel的数据读取,由于其也是通过绑定的NioEventLoop的不断轮询来实现的,其过程与上面讲的NioServerSocketChannel的轮询代码一致,只是需要注意的是客户端Channel使用的unsafe对象是 NioSocketChannelUnsafe ,在调用read()方法时需要查阅其是如何读取SocketChannel的数据的,这里不再赘述。

5. 小结

本文从宏观层面上对Netty是如何实现Reactor模式进行了讲解。通过上面的讲解,我们基本上可以将Netty中各个角色与本文一开始的图形进行一一对应:

  • mainReactor对应于这里的NioEventLoopGroup中的一个NioEventLoop,通过该EventLoop的不断轮询来实现客户端连接事件的监听;
  • Acceptor对应于ServerBootstrapAcceptor,在NioEventLoop监听到客户端事件之后,其就会触发ServerBootstrapAcceptor的channelRead()事件,从而获取客户端连接,并且交由workerGroup中的某个NioEventLoop处理;
  • subReactor对应于NioEventLoopGroup,其中包含的每一个NioEventLoop都会与一个客户端Channel对应,并且处理其事件;
  • handler就对应了最后的对于Channel的处理过程,包括read,decode,compute,encode,write。

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Java Message Service API Tutorial and Reference

Java Message Service API Tutorial and Reference

Hapner, Mark; Burridge, Rich; Sharma, Rahul / 2002-2 / $ 56.49

Java Message Service (JMS) represents a powerful solution for communicating between Java enterprise applications, software components, and legacy systems. In this authoritative tutorial and comprehens......一起来看看 《Java Message Service API Tutorial and Reference》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码