内容简介:对于这里咱们可以从一步步往下走,可以发现最终调用到构造函数:
前言
对于 NioEventLoopGroup
这个对象,在我的理解里面它就和 ThreadGroup
类似, NioEventLoopGroup
中有一堆 NioEventLoop
小弟, ThreadGroup
中有一堆 Thread
小弟,真正意义上干活的都是 NioEventLoop
和 Thread
这两个小弟。下面的文章大家可以类比下进行阅读,应该会很容易弄懂的。(本文基于netty-4.1.32.Final)
NioEventLoopGroup
这里咱们可以从 NioEventLoopGroup
最简单的无参构造函数开始。
1 public NioEventLoopGroup() { 2 this(0); 3 } 复制代码
一步步往下走,可以发现最终调用到构造函数:
1 public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, 2 final SelectStrategyFactory selectStrategyFactory) { 3 super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); 4 } 复制代码
参数说明:
- nThreads:在整个方法链的调用过程中,其值到这里为止一直为0,在没有主动配置的情况下后面会进行设置。若配置
io.netty.eventLoopThreads
系统环境变量,则优先考虑,否则设置成为CPU核心数*2
。 - executor: 到目前为止是
null
。 - selectorProvider: 这里为JDK的默认实现
SelectorProvider.provider()
。 - selectStrategyFactory:这里的值是DefaultSelectStrategyFactory的一个实例
SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory()
。 - RejectedExecutionHandlers:这里是个拒绝策略,这里默认的实现是队列溢出时抛出
RejectedExecutionException
异常。
MultithreadEventLoopGroup
继续往下面走,调用父类 MultithreadEventLoopGroup
中的构造函数:
1 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { 2 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); 3 } 复制代码
这里可以看到判断 nThreads == 0
后就会给其附上一个默认值。继续走,调用父类 MultithreadEventExecutorGroup
中的构造方法。
1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { 2 this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); 3 } 复制代码
DefaultEventExecutorChooserFactory
这里有个关注的点, DefaultEventExecutorChooserFactory
。这是一个chooserFactory,用来生产 EventExecutorChooser
选择器的。而 EventExecutorChooser
的功能是用来选择哪个 EventExecutor
去执行咱们的任务。咱们从下面的代码中可以观察到 DefaultEventExecutorChooserFactory
一共给咱们提供了两种策略。
1 public EventExecutorChooser newChooser(EventExecutor[] executors) { 2 if (isPowerOfTwo(executors.length)) { 3 return new PowerOfTwoEventExecutorChooser(executors); 4 } else { 5 return new GenericEventExecutorChooser(executors); 6 } 7 } 复制代码
这里的策略也很简单:
- 如果给的线程数是2^n个,那么选择
PowerOfTwoEventExecutorChooser
这个选择器,因为这样可以采用位运算去获取执行任务的EventExecutor
。
1 public EventExecutor next() { 2 return executors[idx.getAndIncrement() & executors.length - 1]; 3 } 复制代码
-
GenericEventExecutorChooser
选择器,这里采用的是取模的方式去获取执行任务的EventExecutor
。
1 public EventExecutor next() { 2 return executors[Math.abs(idx.getAndIncrement() % executors.length)]; 3 } 复制代码
相比而言, 位运算的效率要比取模的效率高 ,所以咱们在自定义线程数的时候,最好设置成为2^n个线程数。
干正事
到达最终调用的函数
1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, 2 EventExecutorChooserFactory chooserFactory, Object... args) { 3 if (nThreads <= 0) { 4 throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); 5 } 6 7 if (executor == null) { 8 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); 9 } 10 11 children = new EventExecutor[nThreads]; 12 13 for (int i = 0; i < nThreads; i ++) { 14 boolean success = false; 15 try { 16 children[i] = newChild(executor, args); 17 success = true; 18 } catch (Exception e) { 19 // TODO: Think about if this is a good exception type 20 throw new IllegalStateException("failed to create a child event loop", e); 21 } finally { 22 if (!success) { 23 for (int j = 0; j < i; j ++) { 24 //创建NioEventLoop失败后进行资源的一些释放 25 children[j].shutdownGracefully(); 26 } 27 28 for (int j = 0; j < i; j ++) { 29 EventExecutor e = children[j]; 30 try { 31 while (!e.isTerminated()) { 32 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); 33 } 34 } catch (InterruptedException interrupted) { 35 // Let the caller handle the interruption. 36 Thread.currentThread().interrupt(); 37 break; 38 } 39 } 40 } 41 } 42 } 43 //这里可以去看下上面对于 DefaultEventExecutorChooserFactory的一些介绍 44 chooser = chooserFactory.newChooser(children); 45 46 final FutureListener<Object> terminationListener = new FutureListener<Object>() { 47 @Override 48 public void operationComplete(Future<Object> future) throws Exception { 49 if (terminatedChildren.incrementAndGet() == children.length) { 50 terminationFuture.setSuccess(null); 51 } 52 } 53 }; 54 55 for (EventExecutor e: children) { 56 // 给每一个成功创建的EventExecutor 绑定一个监听终止事件 57 e.terminationFuture().addListener(terminationListener); 58 } 59 60 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); 61 Collections.addAll(childrenSet, children); 62 // 弄一个只读的EventExecutor数组,方便后面快速迭代,不会抛出并发修改异常 63 readonlyChildren = Collections.unmodifiableSet(childrenSet); 64 } 复制代码
从上面的代码可以观察到,等了很久的executor 在这里终于给其赋值了,其值为 ThreadPerTaskExecutor
的一个实例对象,这一块的初始化赋值都是很简单的,干活调用的是如下方法:
1 public void execute(Runnable command) { 2 threadFactory.newThread(command).start(); 3 } 复制代码
对这一块不是很了解的可以去查阅下线程池有关的资料,咱们重点关注一下 newChild
这个方法,可以说是上面整个流程中的重点:
newChild
newChild
这个方法在 NioEventLoopGroup
中被重写了:
1 protected EventLoop newChild(Executor executor, Object... args) throws Exception { 2 return new NioEventLoop(this, executor, (SelectorProvider) args[0], 3 ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); 4 } 复制代码
细心的小伙伴可以观察到,这里有用到SelectorProvider,SelectStrategyFactory以及RejectedExecutionHandler这个三个参数,实际上就是本文最开始初始化的三个实例对象(可以翻阅到顶部查看一下)。
继续往下走流程:
1 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, 2 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { 3 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); 4 if (selectorProvider == null) { 5 throw new NullPointerException("selectorProvider"); 6 } 7 if (strategy == null) { 8 throw new NullPointerException("selectStrategy"); 9 } 10 provider = selectorProvider; 11 final SelectorTuple selectorTuple = openSelector(); 12 selector = selectorTuple.selector; 13 unwrappedSelector = selectorTuple.unwrappedSelector; 14 selectStrategy = strategy; 15 } 复制代码
在上面的代码片段中除了调用父类的构造器之外就进行了参数的判空和简单的赋值。这里 openSelector
方法调用后返回 SelectorTuple
实例主要是为了能同时得到包装前后的 selector
与 unwrappedSelector
。
1 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, 2 boolean addTaskWakesUp, int maxPendingTasks, 3 RejectedExecutionHandler rejectedHandler) { 4 super(parent); 5 this.addTaskWakesUp = addTaskWakesUp; 6 this.maxPendingTasks = Math.max(16, maxPendingTasks); 7 this.executor = ObjectUtil.checkNotNull(executor, "executor"); 8 taskQueue = newTaskQueue(this.maxPendingTasks); 9 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); 10 } 复制代码
这里会有一个 taskQueue
队列的初始化( Queue<Runnable> taskQueue
),看名字就知道,这个队列里面放着的是咱们要去执行的任务。这里的初始化方法 newTaskQueue
在 NioEventLoop
中重写了的。具体如下:
1 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { 2 // This event loop never calls takeTask() 3 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() 4 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); 5 } 复制代码
这里生成的是一个 MPSC队列(Multi Producer Single Consumer) ,这是一个多生产者单消费的无锁队列,支持并发。从字面意思上就可以观察到这个队列效率应该是蛮高的。这里的 maxPendingTasks
值为 Integer.MAX_VALUE
。然后最终生成的是 MpscUnboundedArrayQueue
这样一个无边界的队列。
这样 newChild
这个方法到这里就走完了。
terminationListener
简单介绍下这个环节,在上面的创建 NioEventLoopGroup
有个环节是给每个 NioEventLoop
儿子绑定一个terminationListener监听事件
1 for (EventExecutor e: children) { 2 e.terminationFuture().addListener(terminationListener); 3 } 复制代码
这个事件的回调方法是:
1 @Override 2 public void operationComplete(Future<Object> future) throws Exception { 3 if (terminatedChildren.incrementAndGet() == children.length) { 4 terminationFuture.setSuccess(null); 5 } 6 } 复制代码
在每一个 NioEventLoop
关闭后,就会回调这个方法,然后给 NioEventLoopGroup
实例中的 terminatedChildren
字段自增1,并与初始化成功的 NioEventLoop
的总个数进行比较,如果
terminatedChildren
的值与 NioEventLoop
的总个数相等,则调用 bossGroup.terminationFuture().get()
方法就不会阻塞,并正常返回 null
。
同样, future.channel().closeFuture().sync()
这段代码也将不会阻塞住了,调用 sync.get()
也会返回 null
。
下面给一段测试代码,完整示例大家可以到我的 github 中去获取:
上面的代码只是一个简单的测试,后面还有别的发现的话会继续在 github 中与大家一起分享~
End
以上所述就是小编给大家介绍的《Netty系列(一):NioEventLoopGroup源码解析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。