Netty系列(一):NioEventLoopGroup源码解析

栏目: 后端 · 发布时间: 5年前

内容简介:对于这里咱们可以从一步步往下走,可以发现最终调用到构造函数:

前言

对于 NioEventLoopGroup 这个对象,在我的理解里面它就和 ThreadGroup 类似, NioEventLoopGroup 中有一堆 NioEventLoop 小弟, ThreadGroup 中有一堆 Thread 小弟,真正意义上干活的都是 NioEventLoopThread 这两个小弟。下面的文章大家可以类比下进行阅读,应该会很容易弄懂的。(本文基于netty-4.1.32.Final)

NioEventLoopGroup

这里咱们可以从 NioEventLoopGroup 最简单的无参构造函数开始。

1    public NioEventLoopGroup() {
2        this(0);
3    }
复制代码

一步步往下走,可以发现最终调用到构造函数:

1    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
2                             final SelectStrategyFactory selectStrategyFactory) {
3        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
4    }
复制代码

参数说明:

  1. nThreads:在整个方法链的调用过程中,其值到这里为止一直为0,在没有主动配置的情况下后面会进行设置。若配置 io.netty.eventLoopThreads 系统环境变量,则优先考虑,否则设置成为 CPU核心数*2
  2. executor: 到目前为止是 null
  3. selectorProvider: 这里为JDK的默认实现 SelectorProvider.provider()
  4. selectStrategyFactory:这里的值是DefaultSelectStrategyFactory的一个实例 SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory()
  5. RejectedExecutionHandlers:这里是个拒绝策略,这里默认的实现是队列溢出时抛出 RejectedExecutionException 异常。

MultithreadEventLoopGroup

继续往下面走,调用父类 MultithreadEventLoopGroup 中的构造函数:

1    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
2        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
3    }
复制代码

这里可以看到判断 nThreads == 0 后就会给其附上一个默认值。继续走,调用父类 MultithreadEventExecutorGroup 中的构造方法。

1    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
2        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
3    }
复制代码

DefaultEventExecutorChooserFactory

这里有个关注的点, DefaultEventExecutorChooserFactory 。这是一个chooserFactory,用来生产 EventExecutorChooser 选择器的。而 EventExecutorChooser 的功能是用来选择哪个 EventExecutor 去执行咱们的任务。咱们从下面的代码中可以观察到 DefaultEventExecutorChooserFactory 一共给咱们提供了两种策略。

1    public EventExecutorChooser newChooser(EventExecutor[] executors) {
2        if (isPowerOfTwo(executors.length)) {
3            return new PowerOfTwoEventExecutorChooser(executors);
4        } else {
5            return new GenericEventExecutorChooser(executors);
6        }
7    }
复制代码

这里的策略也很简单:

  1. 如果给的线程数是2^n个,那么选择 PowerOfTwoEventExecutorChooser 这个选择器,因为这样可以采用位运算去获取执行任务的 EventExecutor
1        public EventExecutor next() {
2            return executors[idx.getAndIncrement() & executors.length - 1];
3        }
复制代码
  1. GenericEventExecutorChooser 选择器,这里采用的是取模的方式去获取执行任务的 EventExecutor
1        public EventExecutor next() {
2            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
3        }
复制代码

相比而言, 位运算的效率要比取模的效率高 ,所以咱们在自定义线程数的时候,最好设置成为2^n个线程数。

干正事

到达最终调用的函数

 1    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
 2                                            EventExecutorChooserFactory chooserFactory, Object... args) {
 3        if (nThreads <= 0) {
 4            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
 5        }
 6
 7        if (executor == null) {
 8            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
 9        }
10
11        children = new EventExecutor[nThreads];
12
13        for (int i = 0; i < nThreads; i ++) {
14            boolean success = false;
15            try {
16                children[i] = newChild(executor, args);
17                success = true;
18            } catch (Exception e) {
19                // TODO: Think about if this is a good exception type
20                throw new IllegalStateException("failed to create a child event loop", e);
21            } finally {
22                if (!success) {
23                    for (int j = 0; j < i; j ++) {
24                        //创建NioEventLoop失败后进行资源的一些释放
25                        children[j].shutdownGracefully();
26                    }
27
28                    for (int j = 0; j < i; j ++) {
29                        EventExecutor e = children[j];
30                        try {
31                            while (!e.isTerminated()) {
32                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
33                            }
34                        } catch (InterruptedException interrupted) {
35                            // Let the caller handle the interruption.
36                            Thread.currentThread().interrupt();
37                            break;
38                        }
39                    }
40                }
41            }
42        }
43       //这里可以去看下上面对于 DefaultEventExecutorChooserFactory的一些介绍
44        chooser = chooserFactory.newChooser(children);
45
46        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
47            @Override
48            public void operationComplete(Future<Object> future) throws Exception {
49                if (terminatedChildren.incrementAndGet() == children.length) {
50                    terminationFuture.setSuccess(null);
51                }
52            }
53        };
54
55        for (EventExecutor e: children) {
56            // 给每一个成功创建的EventExecutor 绑定一个监听终止事件
57            e.terminationFuture().addListener(terminationListener);
58        }
59
60        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
61        Collections.addAll(childrenSet, children);
62        // 弄一个只读的EventExecutor数组,方便后面快速迭代,不会抛出并发修改异常
63        readonlyChildren = Collections.unmodifiableSet(childrenSet);
64    }
复制代码

从上面的代码可以观察到,等了很久的executor 在这里终于给其赋值了,其值为 ThreadPerTaskExecutor 的一个实例对象,这一块的初始化赋值都是很简单的,干活调用的是如下方法:

1    public void execute(Runnable command) {
2        threadFactory.newThread(command).start();
3    }
复制代码

对这一块不是很了解的可以去查阅下线程池有关的资料,咱们重点关注一下 newChild 这个方法,可以说是上面整个流程中的重点:

newChild

newChild 这个方法在 NioEventLoopGroup 中被重写了:

1    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
2        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
3            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
4    }
复制代码

细心的小伙伴可以观察到,这里有用到SelectorProvider,SelectStrategyFactory以及RejectedExecutionHandler这个三个参数,实际上就是本文最开始初始化的三个实例对象(可以翻阅到顶部查看一下)。

继续往下走流程:

 1    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
 2                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
 3        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
 4        if (selectorProvider == null) {
 5            throw new NullPointerException("selectorProvider");
 6        }
 7        if (strategy == null) {
 8            throw new NullPointerException("selectStrategy");
 9        }
10        provider = selectorProvider;
11        final SelectorTuple selectorTuple = openSelector();
12        selector = selectorTuple.selector;
13        unwrappedSelector = selectorTuple.unwrappedSelector;
14        selectStrategy = strategy;
15    }
复制代码

在上面的代码片段中除了调用父类的构造器之外就进行了参数的判空和简单的赋值。这里 openSelector 方法调用后返回 SelectorTuple 实例主要是为了能同时得到包装前后的 selectorunwrappedSelector

 1    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
 2                                        boolean addTaskWakesUp, int maxPendingTasks,
 3                                        RejectedExecutionHandler rejectedHandler) {
 4        super(parent);
 5        this.addTaskWakesUp = addTaskWakesUp;
 6        this.maxPendingTasks = Math.max(16, maxPendingTasks);
 7        this.executor = ObjectUtil.checkNotNull(executor, "executor");
 8        taskQueue = newTaskQueue(this.maxPendingTasks);
 9        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
10    }
复制代码

这里会有一个 taskQueue 队列的初始化( Queue<Runnable> taskQueue ),看名字就知道,这个队列里面放着的是咱们要去执行的任务。这里的初始化方法 newTaskQueueNioEventLoop 中重写了的。具体如下:

1    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
2        // This event loop never calls takeTask()
3        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
4                                                    : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
5    }
复制代码

这里生成的是一个 MPSC队列(Multi Producer Single Consumer) ,这是一个多生产者单消费的无锁队列,支持并发。从字面意思上就可以观察到这个队列效率应该是蛮高的。这里的 maxPendingTasks 值为 Integer.MAX_VALUE 。然后最终生成的是 MpscUnboundedArrayQueue 这样一个无边界的队列。

这样 newChild 这个方法到这里就走完了。

terminationListener

简单介绍下这个环节,在上面的创建 NioEventLoopGroup 有个环节是给每个 NioEventLoop 儿子绑定一个terminationListener监听事件

1        for (EventExecutor e: children) {
2            e.terminationFuture().addListener(terminationListener);
3        }
复制代码

这个事件的回调方法是:

1            @Override
2            public void operationComplete(Future<Object> future) throws Exception {
3                if (terminatedChildren.incrementAndGet() == children.length) {
4                    terminationFuture.setSuccess(null);
5                }
6            }
复制代码

在每一个 NioEventLoop 关闭后,就会回调这个方法,然后给 NioEventLoopGroup 实例中的 terminatedChildren 字段自增1,并与初始化成功的 NioEventLoop 的总个数进行比较,如果

terminatedChildren 的值与 NioEventLoop 的总个数相等,则调用 bossGroup.terminationFuture().get() 方法就不会阻塞,并正常返回 null

同样, future.channel().closeFuture().sync() 这段代码也将不会阻塞住了,调用 sync.get() 也会返回 null

下面给一段测试代码,完整示例大家可以到我的 github 中去获取:

Netty系列(一):NioEventLoopGroup源码解析
terminationListener_test

上面的代码只是一个简单的测试,后面还有别的发现的话会继续在 github 中与大家一起分享~

End


以上所述就是小编给大家介绍的《Netty系列(一):NioEventLoopGroup源码解析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Ruby Cookbook

Ruby Cookbook

Lucas Carlson、Leonard Richardson / O'Reilly Media / 2006-7-29 / USD 49.99

Do you want to push Ruby to its limits? The "Ruby Cookbook" is the most comprehensive problem-solving guide to today's hottest programming language. It gives you hundreds of solutions to real-world pr......一起来看看 《Ruby Cookbook》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码