【Netty】如何接入新连接

栏目: 编程工具 · 发布时间: 5年前

内容简介:欢迎关注公众号:【如果有需要后台回复前文再续,书接上一回【

欢迎关注公众号:【 爱编程

如果有需要后台回复 2019 赠送 1T的学习资料 哦!!

前文再续,书接上一回【 NioEventLoop 】。

在研究NioEventLoop执行过程的时候,检测IO事件(包括新连接),处理IO事件,执行所有任务三个过程。其中检测IO事件中通过持有的selector去轮询事件,检测出新连接。这里复用同一段代码。

Channel的设计

在开始分析前,先了解一下Channel的设计

【Netty】如何接入新连接

顶层Channel接口定义了socket事件如读、写、连接、绑定等事件,并使用AbstractChannel作为骨架实现了这些方法。查看器成员变量,发现大多数通用的组件,都被定义在这里

第二层AbstractNioChannel定义了以NIO,即Selector的方式进行读写事件的监听。其成员变量保存了selector相关的一些属性。

第三层内容比较多,定义了服务端channel(左边继承了AbstractNioMessageChannel的NioServerSocketChannel)以及客户端channel(右边继承了AbstractNioByteChannel的NioSocketChannel)。

如何接入新连接?

本文开始探索一下Netty是如何接入新连接?主要分为四个部分

1.检测新连接

2.创建NioSocketChannel

3.分配线程和注册Selector

4.向Selector注册读事件

1.检测新连接

Netty服务端在启动的时候会绑定一个bossGroup,即NioEventLoop,在 bind() 绑定端口的时候注册accept(新连接接入)事件。扫描到该事件后,便处理。因此入口从: NioEventLoop#processSelectedKeys() 开始。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        //省略代码
        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        //如果当前NioEventLoop是workGroup 则可能是OP_READ,bossGroup是OP_ACCEPT
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {

            //新连接接入以及读事件处理入口
            unsafe.read();
        }
      }

关键的新连接接入以及读事件处理入口 unsafe.read();

a).这里的 unsafe 是在Channel创建过程的时候,调用了父类 AbstractChannel#AbstractChannel() 的构造方法,和 pipeline 一起初始化的。

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

服务端:

unsafe 为 NioServerSockeChanne l的父类AbstractNioMessageChannel#newUnsafe()创建,可以看到对应的是AbstractNioMessageChannel的内部类 NioMessageUnsafe ;

客户端:

unsafe为 NioSocketChannel 的的父类AbstractNioUnsafe#newUnsafe()创建的话,它对应的是AbstractNioByteChannel的内部类 NioByteUnsafe

b).unsafe.read()

NioMessageUnsafe.read() 中主要的操作如下:

1.循环调用jdk底层的代码创建channel,并用netty的NioSocketChannel包装起来,代表新连接成功接入一个通道。

2.将所有获取到的channel存储到一个容器当中,检测接入的连接数,默认是一次接16个连接

3.遍历容器中的channel,依次调用方法fireChannelRead,4.fireChannelReadComplete,fireExceptionCaught来触发对应的传播事件。

private final class NioMessageUnsafe extends AbstractNioUnsafe {
        //临时存储读到的连接
        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();

            //服务端接入速率处理器
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    //while循环调用doReadMessages()创建新连接对象
                    do {
                        //获取jdk底层的channel,并加入readBuf容器
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        //把读到的连接做一个累加totalMessages,默认最多累计读取16个连接,结束循环
                        allocHandle.incMessagesRead(localRead);
                        
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }
                
                //触发readBuf容器内所有的传播事件:ChannelRead 读事件
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                //清空容器
                readBuf.clear();
                allocHandle.readComplete();
                //触发传播事件:ChannelReadComplete,所有的读事件完成
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);
                    //触发传播事件:exceptionCaught,触发异常
                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

而这一段关键代码逻辑中 int localRead = doReadMessages(readBuf); 它创建jdk底层channel并且用NioSocketChannel包装起来,将该channel添加到传入的容器保存起来,同时返回一个计数。

protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
  //将jdk底层的channel封装到netty的channel,并存储到传入的容器当中
                //this为服务端channel
                buf.add(new NioSocketChannel(this, ch));
 //成功和创建 客户端接入的一条通道,并返回
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

2.创建NioSocketChannel

通过检测IO事件轮询新连接,当前成功检测到连接接入事件之后,会调用 NioServerSocketChannel#doReadMessages() 方法,进行创建 NioSocketChannel ,即客户端channel的过程。

下面就来了解一下 NioSocketChannel 的主要工作:

.查看原代码做了两件事,调用父类构造方法,实例化一个NioSocketChannelConfig。

public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        //实例化一个NioSocketChannelConfig
        config = new NioSocketChannelConfig(this, socket.socket());
    }

1)、查看NioSocketChannel父类构造方法,主要是 保存客户端注册的读事件、channel为成员变量,以及设置阻塞模式为非阻塞。

public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        //实例化一个NioSocketChannelConfig
        config = new NioSocketChannelConfig(this, socket.socket());
    }
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        //传入感兴趣的读事件:客户端channel的读事件
        super(parent, ch, SelectionKey.OP_READ);
    }

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        //保存客户端channel为成员变量
        this.ch = ch;
        //保存感兴趣的读事件为成员变量
        this.readInterestOp = readInterestOp;
        try {
            //配置阻塞模式为非阻塞
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

最后调用父类的构造方法,是设置 该客户端channel对应的服务端channel,以及channel的id和两大组件unsafe和pipeline

protected AbstractChannel(Channel parent) {
        //parent为创建次客户端channel的服务端channel(服务端启动过程中通过反射创建的)
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

2)、再看NioSocketChannelConfig实例化。主要是保存了javaSocket,并且通过 setTcpNoDelay(true); 禁止了tcp的Nagle算法,目的是为了尽量让小的数据包整合成大的发送出去,降低延时.

private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
            super(channel, javaSocket);
            calculateMaxBytesPerGatheringWrite();
        }

    public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
        super(channel);
        if (javaSocket == null) {
            throw new NullPointerException("javaSocket");
        }
        //保存socket
        this.javaSocket = javaSocket;

        // Enable TCP_NODELAY by default if possible.
        if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
            try {
                //禁止Nagle算法,目的是为了让小的数据包尽量集合成大的数据包发送出去
                setTcpNoDelay(true);
            } catch (Exception e) {
                // Ignore.
            }
        }
    }

3.分配线程和注册Selector

服务端启动初始化的时候 ServerBootstrap#init() ,主要做了一些参数的配置。其中对于 childGroup,childOptions,childAttrs,childHandler 等参数被进行了单独配置。作为参数和 ServerBootstrapAcceptor 一起,被当作一个特殊的handle,封装到pipeline中。 ServerBootstrapAcceptor 中的 eventLoopworkGroup

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
  //省略了很多代码.............
    @Override
    void init(Channel channel) throws Exception {

        //配置AbstractBootstrap.option
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        //配置AbstractBootstrap.attr
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        //配置pipeline
        ChannelPipeline p = channel.pipeline();

        //获取ServerBootstrapAcceptor配置参数
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;

        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                //配置AbstractBootstrap.handler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        //配置ServerBootstrapAcceptor,作为Handle紧跟HeadContext
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

//省略了很多代码.............
}

可见,整个服务端pipeline的结构如下图所示。 bossGroup 控制IO事件的检测与处理,整个 bossGroup 对应的pipeline只包括头( HeadContext )尾( TailContext )以及中部的 ServerBootstrap.ServerBootstrapAcceptor

【Netty】如何接入新连接

当新连接接入的时候 AbstractNioMessageChannel.NioMessageUnsafe#read() 方法被调用,最终调用 fireChannelRead() ,方法来触发下一个Handler的 channelRead 方法。而这个Handler正是 ServerBootstrapAcceptor

它是ServerBootstrap的内部类,同时继承自 ChannelInboundHandlerAdapter 。也是一个 ChannelInboundHandler 。其中channelRead主要做了以下几件事。

1.为客户端channel的pipeline添加childHandler

2.设置客户端TCP相关属性childOptions和自定义属性childAttrs

3.workGroup选择NioEventLoop并注册Selector

1)、为客户端channel的pipeline添加childHandler

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        private final Entry<AttributeKey<?>, Object>[] childAttrs;
        private final Runnable enableAutoReadTask;

        ServerBootstrapAcceptor(
                final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
            this.childGroup = childGroup;
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;

       //省略了一些代码。。。。。 
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            //该channel为客户端接入时创建的channel
            final Channel child = (Channel) msg;

            //添加childHandler
            child.pipeline().addLast(childHandler);

            //设置TCP相关属性:childOptions
            setChannelOptions(child, childOptions, logger);

            //设置自定义属性:childAttrs
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                //选择NioEventLoop并注册Selector
                childGroup.register(child)
                        .addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
      //省略了一些代码。。。。。
    }

客户端 channel 的pipeline添加 childHandler ,在服务端EchoServer创建流程中,childHandler的时候,使用了 ChannelInitializer 的一个自定义实例。并且覆盖了其 initChannel 方法,改方法获取到pipeline并添加具体的Handler。查看 ChannelInitializer 具体的添加逻辑, handlerAdded 方法。其实在 initChannel 逻辑中,首先是 回调到用户代码执行 initChannel ,用户代码执行添加Handler的添加操作,之后将ChannelInitializer自己从pipeline中删除

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

 @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            // This should always be true with our current DefaultChannelPipeline implementation.
            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
            // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
            // will be added in the expected order.

            //初始化Channel
            if (initChannel(ctx)) {

                // We are done with init the Channel, removing the initializer now.
                removeState(ctx);
            }
        }
    }

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                //回调到用户代码
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                    //删除本身
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }

}

2)、设置客户端TCP相关属性childOptions和自定义属性childAttrs

这点在 ServerBootstrapAcceptor#init() 方法中已经体现

3)、workGroup选择NioEventLoop并注册Selector

这要从 AbstractBootstrap#initAndRegister() 方法开始,然后跟踪源码会来到 AbstractUnsafe#register() 方法

protected abstract class AbstractUnsafe implements Unsafe {
      //省略了一些代码。。。。。
  @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
      //省略了一些代码。。。。。
}

最后调用 AbstractNioUnsafe#doRegister() 方法通过jdk的 javaChannel().register 完成注册功能。

protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
      //省略了一些代码。。。。。
  @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
      //省略了一些代码。。。。。
}

4.向Selector注册读事件

a)、入口: ServerBootstrap.ServerBootstrapAcceptor#channelRead()#childGroup.register() ;

public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

b)、实际上调用了 AbstractChannel.AbstractUnsafe#register0() ,触发了通道激活事件;

//触发通道激活事件,调用HeadContent的
   pipeline.fireChannelActive();

c)、 pipeline 的头部开始,即 DefaultChannelPipeline.HeadContext#channelActive() 从而触发了 readIfIsAutoRead() ;

@Override
  public void channelActive(ChannelHandlerContext ctx) {
            ctx.fireChannelActive();

            readIfIsAutoRead();
  }

d)、读事件将从尾部的TailContent#read()被触发,从而依次执行ctx.read(),从尾部开始,每个outboundHandler的read()事件都被触发。直到头部。

@Override
    public final ChannelPipeline read() {
        tail.read();
        return this;
    }


    @Override
    public ChannelHandlerContext read() {
        //获取最近的outboundhandler
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();

        //并依次执行其read方法
        if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            Tasks tasks = next.invokeTasks;
            if (tasks == null) {
                next.invokeTasks = tasks = new Tasks(next);
            }
            executor.execute(tasks.invokeReadTask);
        }

        return this;
    }

e)、进入头部HeadContext#read(),并且最终更改了selectionKey,向selector注册了读事件

HeadContext#read()

@Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

AbstractChannel#beginRead()

@Override
        public final void beginRead() {
            assertEventLoop();

            if (!isActive()) {
                return;
            }

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

AbstractNioMessageChannel#doBeginRead

@Override
    protected void doBeginRead() throws Exception {
        if (inputShutdown) {
            return;
        }
        super.doBeginRead();
    }

AbstractNioChannel#doBeginRead()

@Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

参考文章:

Jorgezhong

总结

Netty如何接入新连接基本流程如上所述,如果有误,还望各位指正。建议先从前两篇看起比较好理解点。

最后

如果对 Java 、大数据感兴趣请长按二维码关注一波,我会努力带给你们价值。觉得对你哪怕有一丁点帮助的请帮忙点个赞或者转发哦。

关注公众号 【爱编码】 ,回复 2019 有相关资料哦。

【Netty】如何接入新连接


以上所述就是小编给大家介绍的《【Netty】如何接入新连接》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Paradigms of Artificial Intelligence Programming

Paradigms of Artificial Intelligence Programming

Peter Norvig / Morgan Kaufmann / 1991-10-01 / USD 77.95

Paradigms of AI Programming is the first text to teach advanced Common Lisp techniques in the context of building major AI systems. By reconstructing authentic, complex AI programs using state-of-the-......一起来看看 《Paradigms of Artificial Intelligence Programming》 这本书的介绍吧!

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具