内容简介:前面我们看到了Java对象如何通过Encoder转换成ByteBuf对象的,那么现在问题来了,ByteBuf是如何写入到远程节点的呢,这就是本文要分析的内容。在前面的ChannelPipeline分析中我们提到过HeadContext和TailContext两个特殊的ChannelHandlerContexty,而HeadContext就是位于最前方的ChannelOutboundHandler,它也是最终负责处理write、flush等outbound事件的处理器。从HeadContext的write和
前面我们看到了 Java 对象如何通过Encoder转换成ByteBuf对象的,那么现在问题来了,ByteBuf是如何写入到远程节点的呢,这就是本文要分析的内容。
在前面的ChannelPipeline分析中我们提到过HeadContext和TailContext两个特殊的ChannelHandlerContexty,而HeadContext就是位于最前方的ChannelOutboundHandler,它也是最终负责处理write、flush等outbound事件的处理器。
从HeadContext的write和flush方法实现可以看到,其委托给了所在channelPipeline的channel的unsafe来实现。
final class HeadContextextends AbstractChannelHandlerContext implementsChannelOutboundHandler,ChannelInboundHandler{ private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } ... @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }
AbstractNioChannel使用的AbstractNioUnsafe和AbstractUnsafe差别不大。我们注重看一下AbstractUnsafe类。首先这里声明了一个ChannelOutboundBuffer类,这个类是用来保存已经write的ByteBuf但是还没有flush的ByteBuf以及已经flush但是还没有写到远端的数据。
protected abstract class AbstractUnsafeimplements Unsafe{ private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); private RecvByteBufAllocator.Handle recvHandle; private boolean inFlush0;
ChannelOutboundBuffer由一个链表构成,链表的Entry中保存表示的消息、next指针等。
其中几个比较关键的变量
- flushedEntry表示从这个节点一直到unflushedEntry前的节点都是被标记为flushed的,但是还没有写入到channel中的
- unflushedEntry标记哪个节点开始还没有被flush
- tailEntry表示链表的最后一个元素
- flushed表示已经flush但是还没有write的Entry的数量
public final class ChannelOutboundBuffer{ private final Channel channel; // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry) // // The Entry that is the first in the linked-list structure that was flushed private Entry flushedEntry; // The Entry which is the first unflushed in the linked-list structure private Entry unflushedEntry; // The Entry which represents the tail of the buffer private Entry tailEntry; // The number of flushed entries that are not written yet private int flushed; ...
addMessage
再来看下AbstractUnsafe的write实现,最关键的地方是调用outboundBuffer.addMessage(msg, size, promise),这样将write的消息放到了outboundBuffer链表中。
@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); }
如果当前channel的outbound没有创建或之前的都已经write完成,则会是下图所示状态。这个Entry本身是unflushedEntry和tailEntry
再加一个消息Entry后
应用层调用完write后不一定会立即进行flush,flush会进行系统调用是一个相对耗时的操作,所以有些优化会在channelReadComplete时来进行flush。
flush
flush分为两步
- 调用outboundBuffer.addFlush,设置flushedEntry,标识flushedEntry及之前的Entry都可以真正写入channel了
- 调用flush0,进行真正写入到channel里,这里有的实现会进行选择性的是否flush到channel,例如AbstractNioChannel里的AbstractNioUnsafe实现为现在是否在等待flush,如果等待状态则说明一定会flush,这里就不需要再调用了
@Override public final void flush(){ assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }
addFlush
addFlush会从unflushedEntry开始,如果之前flushedEntry指向空,则将flushEntry指向unflushedEntry,然后开始遍历到tailEntry并且记录flushed数量
最后将unflushedEntry设置为null
如果这个时候又调用了addMessage,则此时的结构是这样的
flushedEntry –> entry –> entry –> unflushedEntry –> entry –> tailEntry
public void addFlush(){ // There is no need to process all entries if there was already a flush before and no new messages // where added in the meantime. // // See https://github.com/netty/netty/issues/2577 Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry unflushedEntry = null; } }
- flush0中首先判断当前channel是否已经在flush了,如果是则跳过,否则设置inFlush0状态并继续
- 做一些辅助判断,然后调用doWrite(outboundBuffer)
@SuppressWarnings("deprecation") protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link#close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link#isActive()} , {@link#isOpen()} and {@link#isWritable()} * may still return {@codetrue} even if the channel should be closed as result of the exception. */ close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { try { shutdownOutput(voidPromise(), t); } catch (Throwable t2) { close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } } finally { inFlush0 = false; } }
NioSocketChannel.doWrite
doWrite方法是抽象的,需要各个AbstractChannel的具体类实现的。
NioSocketChannel实现如下
- 获取到真正的jdk的SocketChannel
- 在一个循环执行
- 如果没有可写的数据了,清除selectionKey的OP_WRITE并返回
- 取出ChannelOutboundBuffer中的ByteBuffer数组,调用SocketChannel.write方法写入到channel。
- 如果写入数量小于等于0,设置SelectionKey的OP_WRITE
- 否则调用ChannelOutboundBuffer方法
protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) { // All written so clear OP_WRITE clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } // Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case 1: { // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); }
ChannelOutboundBuffer.removeBytes
removeBytes负责从ChannelOutboundBuffer中的链表中删除已经真正写入完成的Entry
- 获取flushedEntry,判断当前Entry的ByteBuf中可读数量如果小于了writtenBytes,说明已经写入了不止当前Entry数据,则调用remove删除当前Entry
- 否则则移动当前Entry的ByteBuf的readerIndex
- 将nioBuffer数组元素均设置为null
public void removeBytes(long writtenBytes){ for (;;) { Object msg = current(); if (!(msg instanceof ByteBuf)) { assert writtenBytes == 0; break; } final ByteBuf buf = (ByteBuf) msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes <= writtenBytes) { if (writtenBytes != 0) { progress(readableBytes); writtenBytes -= readableBytes; } remove(); } else { // readableBytes > writtenBytes if (writtenBytes != 0) { buf.readerIndex(readerIndex + (int) writtenBytes); progress(writtenBytes); } break; } } clearNioBuffers(); }
public boolean remove() { Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; removeEntry(e); if (!e.cancelled) { // only release message, notify and decrement if it was not canceled before. ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } // recycle the entry e.recycle(); return true; }
removeEntry判断当前flushed的Entry是否已经删除完了,如果删完了则设置flushedEntry为null并且如果已经删到了tailEntry
则把tailEntry和unflushedEntry也设置为null
否则将flushedEntry指向移到next
private void removeEntry(Entry e) { if (-- flushed == 0) { // processed everything flushedEntry = null; if (e == tailEntry) { tailEntry = null; unflushedEntry = null; } } else { flushedEntry = e.next; } }
至此,一个应用中的数据如何通过Encoder转换成ByteBuf,存储到ChannelOutboundBuffer,然后写入到SocketChannel中的过程已经清晰了。
以上所述就是小编给大家介绍的《Netty源码分析-8-ChannelOutboundBuffer》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
大学程序设计课程与竞赛训练教材
吴永辉、王建德 / 机械工业出版社 / 2013-6 / 69.00
本书每章为一个主题,实验内容安排紧扣大学算法和数学的教学,用程序设计竞赛中的算法和数学试题作为实验试题,将算法和数学的教学与程序设计竞赛的解题训练结合在一起;在思维方式和解题策略的训练方面,以问题驱动和启发式引导为主要方式,培养读者通过编程解决问题的能力。 本书特点: 书中给出的234道试题全部精选自ACM国际大学生程序设计竞赛的世界总决赛以及各大洲赛区现场赛和网络预赛、大学程序设计竞......一起来看看 《大学程序设计课程与竞赛训练教材》 这本书的介绍吧!