内容简介:根据k.attachment()获取附加的对象,那我们是在哪里附加上去的呢?上一篇
上一章节《Netty 源码解析系列-服务端启动流程解析》 我们完成了服务端启动,那么服务端启动完成后,客户端接入以及读 I/O 事件是怎么哪里开始的?以及 netty 的 boss 线程接收到客户端 TCP 连接请求后如何将链路注册到 worker 线程池?带着这些疑问,我们开始客户端连接接入及读写 I/O 解析。
1.NioEventLoop run()开始
processSelectedKeys(); 复制代码
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } } 复制代码
根据 selectedKeys 是否为空,判断是否采用优化后的 selectedKeys ,进到 processSelectedKeysOptimized 。
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } ... } } 复制代码
k.attachment()获取附加的对象,那我们是在哪里附加上去的呢?上一篇 《Netty 源码解析-服务端启动流程解析》 注册时 attach 上去的对象,其实就是 NioServerSocketChannel 自身。
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { ... selectionKey = javaChannel().register(eventLoop().selector, 0, this); ... } } 复制代码
我们再回到 k.attachment() ,在取出附加对象后,判断类型是否为 AbstractNioChannel ,从这里我们可以看到,不是附加 AbstractNioChannel 类型,那么就是附加的 NioTask 对象,在这里我们只看关于 AbstractNioChannel 的,进到 processSelectedKey() 方法。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); ... int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } ... } 复制代码
当操作类型是读操作或者连接操作,进入 unsafe.read() ,有两个类实现了这个方法,一个是 AbstractNioByteChannel 的内部类 NioByteUnsafe ,一个是 AbstractNioMessageChannel 的内部类 NioMessageUnsafe ,这两个类都是 NioUnsafe 实现类 AbstractNioChannel 的子类,那到底是哪一个子类?我们看看 NioServerSocketChannel 创建时是创建的 NioByteUnsafe 还是 NioMessageUnsafe 。
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } } 复制代码
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } 复制代码
public abstract class AbstractNioMessageChannel extends AbstractNioChannel { protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); } } 复制代码
public abstract class AbstractNioChannel extends AbstractChannel { protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); } } 复制代码
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); } } 复制代码
NioServerSocketChannel是 AbstractNioMessageChannel 的子类, AbstractNioMessageChannel 是 AbstractNioChannel 的子类, newUnsafe() 是 AbstractChannel 的抽象方法,那么我们从这里就知道, AbstractNioMessageChannel 实现了 AbstractChannel的newUnsafe() 抽象方法,由此判断,我们选择 AbstractNioMessageChannel 的内部类 NioMessageUnsafe 的 read() 。
private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { ... for (;;) { int localRead = doReadMessages(readBuf); ... } setReadPending(false); int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); pipeline.fireChannelReadComplete(); ... } 复制代码
这里分两部分,一个是处理消息,一个是处理事件。
1.处理消息
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); ... buf.add(new NioSocketChannel(this, ch)); return 1; ... } 复制代码
接受了一个客户端 SocketChannel ,封装到 NioSocketChannel ,添加到 list 集合中,我们看看 new NioSocketChannel() 。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } } 复制代码
public abstract class AbstractNioByteChannel extends AbstractNioChannel { protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); } @Override protected AbstractNioUnsafe newUnsafe() { return new NioByteUnsafe(); } protected class NioByteUnsafe extends AbstractNioUnsafe { @Override public final void read() { ... } } } 复制代码
AbstractNioByteChannel也继承了 AbstractNioChannel ,并实现了 newUnsafe() 方法,由此我们可以推断出当客户端第一次连接时,走的是 AbstractNioMessageChannel 的子类 NioMessageUnsafe的read() ,当客户端发送数据时,走的是 AbstractNioByteChannel 的内部类 AbstractNioUnsafe 的 read() 方法。
2.处理事件
for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } 复制代码
@Override public ChannelPipeline fireChannelRead(Object msg) { head.fireChannelRead(msg); return this; } 复制代码
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(msg); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelRead(msg); } }); } return this; } 复制代码
从 next 的 debug 可以看出,当前 handler 是 ServerBootstrapAcceptor 这个处理器来处理 ChannelRead() 方法,如果看了 上一篇《Netty 源码解析-服务端启动流程解析》 就会知道,这是在 init() 方法中 pipeline.addLast(new ServerBootstrapAcceptor()) 。为什么不是 p.addLast(new ChannelInitializer())? 因为在 ChannelInitializer.channelRegistered() 会删除当前 initChannel
处理器。
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { initChannel((C) ctx.channel()); ctx.pipeline().remove(this); ctx.fireChannelRegistered(); } 复制代码
我们继续看 ServerBootstrapAcceptor 的 ChannelRead() 方法。
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } 复制代码
这里分三个步骤
(1)将 childHandler 添加到处理器上,这个从哪里来?就是从最开始设置 serverBootstrap.childHandler(new IOChannelInitialize()) 。
(2)设置一些参数。
(3) work线程池 register 客户端的 channel 。
@Override public ChannelFuture register(Channel channel) { return next().register(channel); } 复制代码
@Override public EventLoop next() { return (EventLoop) super.next(); } 复制代码
@Override public EventExecutor next() { return chooser.next(); } 复制代码
private final class GenericEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[Math.abs(childIndex.getAndIncrement() % children.length)]; } } 复制代码
从 work 线程池选一个线程来执行 register 。
@Override public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); } 复制代码
@Override public ChannelFuture register(final Channel channel, final ChannelPromise promise) { ... channel.unsafe().register(this, promise); return promise; } 复制代码
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { ... } } } 复制代码
@Override protected void doRegister() throws Exception { ... selectionKey = javaChannel().register(eventLoop().selector, 0, this); ... } 复制代码
后面的流程和 上一篇《Netty 源码解析-服务端启动流程解析》 的注册流程是一样的,区别在于服务启动时注册是在 boss 线程池任务队列中执行注册,客户端新接入注册是在 work 线程池任务队列中执行 register0() 方法,并将 work 线程池的 selector 注册到 Java NIO 到这里,我们就可以回答开篇的的几个问题:客户端是如何接入? netty 的 boss 线程接收到客户端 TCP 连接请求后如何将链路注册到 worker 线程池? 现在我们还剩下一个问题:读写 I/O 事件是怎么哪里开始的?
我们回到文章开头
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } ... } } 复制代码
前面 boss 线程池在这里完成了客户端连接接入,并将链路注册到 worker 线程池任务队列,添加了 read 事件的监听,那么现在 work 线程不停循环 selectedKeys 中有没有待处理的事件,当有待处理事件,那么会执行 processSelectedKey() 方法。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ... int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); ... } ... } 复制代码
在这里 unsafe.read() 选择 AbstractNioByteChannel 的 read() 。
@Override public final void read() { final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes(); int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); byteBuf = null; close = localReadAmount < 0; break; } if (!readPendingReset) { readPendingReset = true; setReadPending(false); } pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; if (!config.isAutoRead()) { break; } if (localReadAmount < writable) { break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } } 复制代码
把这一大段代码分解成几部分
1.设置循环读,16次,未读完则会等到下一轮 select 继续读取, maxMessagesPerRead 默认等于16。
2.获取缓存操作 handler , config.getRecvByteBufAllocator().newHandle() 。
3.申请缓存空间, allocHandle.allocate(allocator) 。
4.从 socket 中读取数据到 byteBuf 中。
5.传递读事件到下一个 handler 处理器。
6.读完之后发送读完时间到下一个 handler 处理器 我们只看读事件,其他细节后面的文章再详细解析。
@Override public ChannelPipeline fireChannelRead(Object msg) { head.fireChannelRead(msg); return this; } 复制代码
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { if (msg == null) { throw new NullPointerException("msg"); } final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(msg); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelRead(msg); } }); } return this; } 复制代码Handler 事件顺序是 HeadContextHandler --> IdleStateHandler -->IOHandler --> TailContext
private void invokeChannelRead(Object msg) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } 复制代码
进到 IdleStateHandler
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { reading = true; firstReaderIdleEvent = firstAllIdleEvent = true; } ctx.fireChannelRead(msg); } 复制代码
设置读事件为true,为后面状态检测做准备,继续向下传递读事件,这次是 IOHandler 的读事件。
public class IOHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); System.out.println(msg.toString()); } ... } 复制代码
交给用户自定义 handler 处理读事件,自此读 I/O 事件是怎么哪里开始,如何交给用户 handler 处理已解析完毕。
总结:
1.boss线程处理 NioServerSocketChannel 的 accept 事件,并将客户端添加到 work 任务队列,任务队列执行 redister0() 方法, 将 read 事件注册到 work 线程的 selector 。
2.work线程轮询 selectkeys ,当有事件上来时,将缓存数据发送到用户 handler 。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 蚂蚁金服 mPaaS 服务端核心组件:亿级并发下的移动端到端网络接入架构解析
- 云转码接入视频网站解决方案 express-ffmpeg接入discuz方案
- 数据接入治理平台
- 【Netty】如何接入新连接
- 有赞统一接入层架构演进
- Bytom矿池接入协议指南
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。