内容简介:如果你对netty的reactor线程不了解,建议先看下上一篇文章我们已经了解到netty reactor线程的第一步是轮询出注册在selector上面的IO事件(select),那么接下来就要处理这些IO事件(process selected keys),本篇文章我们将一起来探讨netty处理IO事件的细节我们进入到reactor线程的
如果你对netty的reactor线程不了解,建议先看下上一篇文章 netty源码分析之揭开reactor线程的面纱(一) ,这里再把reactor中的三个步骤的图贴一下
我们已经了解到netty reactor线程的第一步是轮询出注册在selector上面的IO事件(select),那么接下来就要处理这些IO事件(process selected keys),本篇文章我们将一起来探讨netty处理IO事件的细节
我们进入到reactor线程的 run
方法,找到处理IO事件的代码,如下
processSelectedKeys(); 复制代码
跟进去
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } } 复制代码
我们发现处理IO事件,netty有两种选择,从名字上看,一种是处理优化过的selectedKeys,一种是正常的处理
我们对优化过的selectedKeys的处理稍微展开一下,看看netty是如何优化的,我们查看 selectedKeys
被引用过的地方,有如下代码
private SelectedSelectionKeySet selectedKeys; private Selector NioEventLoop.openSelector() { //... final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); // selectorImplClass -> sun.nio.ch.SelectorImpl Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, selectedKeySet); publicSelectedKeysField.set(selector, selectedKeySet); //... selectedKeys = selectedKeySet; } 复制代码
首先,selectedKeys是一个 SelectedSelectionKeySet
类对象,在 NioEventLoop
的 openSelector
方法中创建,之后就通过反射将selectedKeys与 sun.nio.ch.SelectorImpl
中的两个field绑定
sun.nio.ch.SelectorImpl
中我们可以看到,这两个field其实是两个HashSet
// Public views of the key sets private Set<SelectionKey> publicKeys; // Immutable private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition protected SelectorImpl(SelectorProvider sp) { super(sp); keys = new HashSet<SelectionKey>(); selectedKeys = new HashSet<SelectionKey>(); if (Util.atBugLevel("1.4")) { publicKeys = keys; publicSelectedKeys = selectedKeys; } else { publicKeys = Collections.unmodifiableSet(keys); publicSelectedKeys = Util.ungrowableSet(selectedKeys); } } 复制代码
selector在调用 select()
族方法的时候,如果有IO事件发生,就会往里面的两个field中塞相应的 selectionKey
(具体怎么塞有待研究),即相当于往一个hashSet中add元素,既然netty通过反射将jdk中的两个field替换掉,那我们就应该意识到是不是netty自定义的 SelectedSelectionKeySet
在 add
方法做了某些优化呢?
带着这个疑问,我们进入到 SelectedSelectionKeySet
类中探个究竟
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { private SelectionKey[] keysA; private int keysASize; private SelectionKey[] keysB; private int keysBSize; private boolean isA = true; SelectedSelectionKeySet() { keysA = new SelectionKey[1024]; keysB = keysA.clone(); } @Override public boolean add(SelectionKey o) { if (o == null) { return false; } if (isA) { int size = keysASize; keysA[size ++] = o; keysASize = size; if (size == keysA.length) { doubleCapacityA(); } } else { int size = keysBSize; keysB[size ++] = o; keysBSize = size; if (size == keysB.length) { doubleCapacityB(); } } return true; } private void doubleCapacityA() { SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1]; System.arraycopy(keysA, 0, newKeysA, 0, keysASize); keysA = newKeysA; } private void doubleCapacityB() { SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1]; System.arraycopy(keysB, 0, newKeysB, 0, keysBSize); keysB = newKeysB; } SelectionKey[] flip() { if (isA) { isA = false; keysA[keysASize] = null; keysBSize = 0; return keysA; } else { isA = true; keysB[keysBSize] = null; keysASize = 0; return keysB; } } @Override public int size() { if (isA) { return keysASize; } else { return keysBSize; } } @Override public boolean remove(Object o) { return false; } @Override public boolean contains(Object o) { return false; } @Override public Iterator<SelectionKey> iterator() { throw new UnsupportedOperationException(); } } 复制代码
该类其实很简单,继承了 AbstractSet
,说明该类可以当作一个set来用,但是底层使用两个数组来交替使用,在 add
方法中,判断当前使用哪个数组,找到对应的数组,然后经历下面三个步骤 1.将SelectionKey塞到该数组的逻辑尾部 2.更新该数组的逻辑长度+1 3.如果该数组的逻辑长度等于数组的物理长度,就将该数组扩容
我们可以看到,待程序跑过一段时间,等数组的长度足够长,每次在轮询到nio事件的时候,netty只需要O(1)的时间复杂度就能将 SelectionKey
塞到 set中去,而jdk底层使用的hashSet需要O(lgn)的时间复杂度
这里关于为何使用两个数组循环交替使用,其实我也是很费解,思考了很久,查找 SelectedSelectionKeySet
所有使用的地方,我觉得使用一个数组就能够达到优化目的,并且不用每次都判断使用哪个数组,所以对于该问题,我提了一个issue给netty官方,官方也给出了答复说会跟进,issue链接: github.com/netty/netty… 版本中,netty已经将 SelectedSelectionKeySet.java
底层使用一个数组了, 链接
关于netty对 SelectionKeySet
的优化我们暂时就跟这么多,下面我们继续跟netty对IO事件的处理,转到 processSelectedKeysOptimized
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { // 1.取出IO事件以及对应的channel final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null; final Object a = k.attachment(); // 2.处理该channel if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } // 3.判断是否该再来次轮询 if (needsToSelectAgain) { for (;;) { i++; if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; } selectAgain(); selectedKeys = this.selectedKeys.flip(); i = -1; } } } 复制代码
我们可以将该过程分为以下三个步骤
1.取出IO事件以及对应的netty channel类
这里其实也能体会到优化过的 SelectedSelectionKeySet
的好处,遍历的时候遍历的是数组,相对jdk原生的 HashSet
效率有所提高
拿到当前SelectionKey之后,将 selectedKeys[i]
置为null,这里简单解释一下这么做的理由:想象一下这种场景,假设一个NioEventLoop平均每次轮询出N个IO事件,高峰期轮询出3 N个事件,那么 selectedKeys
的物理长度要大于等于3 N,如果每次处理这些key,不置 selectedKeys[i]
为空,那么高峰期一过,这些保存在数组尾部的 selectedKeys[i]
对应的 SelectionKey
将一直无法被回收, SelectionKey
对应的对象可能不大,但是要知道,它可是有attachment的,这里的attachment具体是什么下面会讲到,但是有一点我们必须清楚,attachment可能很大,这样一来,这些元素是GC root可达的,很容易造成gc不掉,内存泄漏就发生了
这个bug在 4.0.19.Final
版本中被修复,建议使用netty的项目升级到最新版本^^
2.处理该channel
拿到对应的attachment之后,netty做了如下判断
if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } 复制代码
源码读到这,我们需要思考为啥会有这么一条判断,凭什么说attachment可能会是 AbstractNioChannel
对象?
我们的思路应该是找到底层selector, 然后在selector调用register方法的时候,看一下注册到selector上的对象到底是什么鬼,我们使用intellij的全局搜索引用功能,最终在 AbstractNioChannel
中搜索到如下方法
protected void doRegister() throws Exception { // ... selectionKey = javaChannel().register(eventLoop().selector, 0, this); // ... } 复制代码
javaChannel()
返回netty类 AbstractChannel
对应的jdk底层channel对象
protected SelectableChannel javaChannel() { return ch; } 复制代码
我们查看到SelectableChannel方法,结合netty的 doRegister()
方法,我们不难推论出,netty的轮询注册机制其实是将 AbstractNioChannel
内部的jdk类 SelectableChannel
对象注册到jdk类 Selctor
对象上去,并且将 AbstractNioChannel
作为 SelectableChannel
对象的一个attachment附属上,这样再jdk轮询出某条 SelectableChannel
有IO事件发生时,就可以直接取出 AbstractNioChannel
进行后续操作
下面是jdk中的register方法
//* //* @param sel //* The selector with which this channel is to be registered //* //* @param ops //* The interest set for the resulting key //* //* @param att //* The attachment for the resulting key; may be <tt>null</tt> public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException; 复制代码
由于篇幅原因,详细的 processSelectedKey(SelectionKey k, AbstractNioChannel ch)
过程我们单独写一篇文章来详细展开,这里就简单说一下 1.对于boss NioEventLoop来说,轮询到的是基本上就是连接事件,后续的事情就通过他的pipeline将连接扔给一个worker NioEventLoop处理 2.对于worker NioEventLoop来说,轮询到的基本上都是io读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理
上面处理attachment的时候,还有个else分支,我们也来分析一下 else部分的代码如下
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); 复制代码
说明注册到selctor上的attachment还有另外一中类型,就是 NioTask
,NioTask主要是用于当一个 SelectableChannel
注册到selector的时候,执行一些任务
NioTask的定义
public interface NioTask<C extends SelectableChannel> { void channelReady(C ch, SelectionKey key) throws Exception; void channelUnregistered(C ch, Throwable cause) throws Exception; } 复制代码
由于 NioTask
在netty内部没有使用的地方,这里不过多展开
3.判断是否该再来次轮询
if (needsToSelectAgain) { for (;;) { i++; if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; } selectAgain(); selectedKeys = this.selectedKeys.flip(); i = -1; } 复制代码
我们回忆一下netty的reactor线程经历前两个步骤,分别是抓取产生过的IO事件以及处理IO事件,每次在抓到IO事件之后,都会将 needsToSelectAgain 重置为false,那么什么时候needsToSelectAgain会重新被设置成true呢?
还是和前面一样的思路,我们使用intellij来帮助我们查看needsToSelectAgain被使用的地方,在NioEventLoop类中,只有下面一处将needsToSelectAgain设置为true
NioEventLoop.java
void cancel(SelectionKey key) { key.cancel(); cancelledKeys ++; if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; needsToSelectAgain = true; } } 复制代码
继续查看 cancel
函数被调用的地方
AbstractChannel.java
@Override protected void doDeregister() throws Exception { eventLoop().cancel(selectionKey()); } 复制代码
不难看出,在channel从selector上移除的时候,调用cancel函数将key取消,并且当被去掉的key到达 CLEANUP_INTERVAL
的时候,设置needsToSelectAgain为true, CLEANUP_INTERVAL
默认值为256
private static final int CLEANUP_INTERVAL = 256; 复制代码
也就是说,对于每个NioEventLoop而言,每隔256个channel从selector上移除的时候,就标记 needsToSelectAgain 为true,我们还是跳回到上面这段代码
if (needsToSelectAgain) { for (;;) { i++; if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; } selectAgain(); selectedKeys = this.selectedKeys.flip(); i = -1; } 复制代码
每满256次,就会进入到if的代码块,首先,将selectedKeys的内部数组全部清空,方便被jvm垃圾回收,然后重新调用 selectAgain
重新填装一下 selectionKey
private void selectAgain() { needsToSelectAgain = false; try { selector.selectNow(); } catch (Throwable t) { logger.warn("Failed to update SelectionKeys.", t); } } 复制代码
netty这么做的目的我想应该是每隔256次channel断线,重新清理一下selectionKey,保证现存的SelectionKey及时有效
到这里,我们初次阅读源码的时候对reactor的第二个步骤的了解已经足够了。总结一下:netty的reactor线程第二步做的事情为处理IO事件,netty使用数组替换掉jdk原生的HashSet来保证IO事件的高效处理,每个SelectionKey上绑定了netty类 AbstractChannel
对象作为attachment,在处理每个SelectionKey的时候,就可以找到 AbstractChannel
,然后通过pipeline的方式将处理串行到ChannelHandler,回调到用户方法
下一篇文章,我们将一起来看下netty中reactor线程中最后一步, runTasks
,你将了解到netty中异步执行任务机制的细节,尽请期待
如果你想系统地学Netty,我的小册 《Netty 入门与实战:仿写微信 IM 即时通讯系统》 可以帮助你,如果你想系统学习Netty原理,那么你一定不要错过我的Netty源码分析系列视频: coding.imooc.com/class/230.h…
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- netty源码分析之揭开reactor线程的面纱(一)
- netty源码分析之揭开reactor线程的面纱(三)
- 揭开「拓扑排序」的神秘面纱
- 揭开隐藏算法的神秘面纱
- 『MySQL』揭开索引神秘面纱
- 揭开区块链渗透测试的神秘面纱
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。