内容简介:原文:前面 ,我们分析了Netty中的Channel组件,本篇我们来介绍一下与Channel关联的另一个核心的组件 ——Netty版本:4.1.30
前面 ,我们分析了Netty中的Channel组件,本篇我们来介绍一下与Channel关联的另一个核心的组件 —— EventLoop 。
Netty版本:4.1.30
概述
EventLoop定义了Netty的核心抽象,用于处理网络连接生命周期中所有发生的事件。
我们先来从一个比较高的视角来了解一下Channels、Thread、EventLoops、EventLoopGroups之间的关系。
上图是表示了拥有4个EventLoop的EventLoopGroup处理IO的流程图。它们之间的关系如下:
- 一个 EventLoopGroup包含一个或多个EventLoop
- 一个 EventLoop在它的生命周期内只和一个Thread绑定
- 所有由EventLoop处理的I/O事件都将在它专有的Thread上被处理
- 一个Channel在它的生命周期内只注册于一个EventLoop
- 一个EventLoop可能会被分配给一个或多个Channel
EventLoop 原理
下图是Netty EventLoop相关类的UML图。从中我们可以看到EventLoop相关的类都是实现了 java.util.concurrent 包中的 ExecutorService 接口。我们可以直接将任务(Runable 或 Callable) 提交给EventLoop去立即执行或定时执行。
例如,使用EventLoop去执行定时任务,样例代码:
public static void scheduleViaEventLoop() {
Channel ch = new NioSocketChannel();
ScheduledFuture<?> future = ch.eventLoop().schedule(
() -> System.out.println("60 seconds later"), 60, TimeUnit.SECONDS);
}
复制代码
Thread 管理
Netty线程模型的高性能主要取决于当前所执行线程的身份的确定。一个线程提交到EventLoop执行的流程如下:
- 将Task任务提交给EventLoop执行
- 在Task传递到execute方法之后,检查当前要执行的Task的线程是否是分配给EventLoop的那个线程
- 如果是,则该线程会立即执行
- 如果不是,则将线程放入任务队列中,等待下一次执行
其中,Netty中的每一个EventLoop都有它自己的任务队列,并且和其他的EventLoop的任务队列独立开来。
Thread 分配
服务于Channel的I/O和事件的EventLoop包含在EventLoopGroup中。根据不同的传输实现,EventLoop的创建和分配方式也不同。
NIO传输
在NIO传输方式中,使用尽可能少的EventLoop就可以服务多个Channel。如图所示,EventLoopGroup采用顺序循环的方式负责为每一个新创建的Channel分配EventLoop,每一个EventLoop会被分配给多个Channels。
一旦一个Channel被分配给了一个EventLoop,则这个Channel的生命周期内,只会绑定这个EventLoop。这就让我们在ChannelHandler的实现省去了对线程安全和同步问题的担心。
OIO传输
与NIO方式的不同在于,一个EventLoop只会服务于一个Channel。
NioEventLoop & NioEventLoopGroup 创建
初步了解了 EventLoop 以及 EventLoopGroup 的工作机制,接下来我们以 NioEventLoopGroup 为例,来深入分析 NioEventLoopGroup 是如何创建的,又是如何启动的,它的内部执行逻辑又是怎样的等等问题。
MultithreadEventExecutorGroup 构造器
我们从 NioEventLoopGroup 的构造函数开始分析:
EventLoopGroup acceptorEventLoopGroup = new NioEventLoopGroup(1); 复制代码
NioEventLoopGroup构造函数会调用到父类 MultithreadEventLoopGroup 的构造函数,默认情况下,EventLoop的数量 = 处理器数量 x 2:
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
private static final int DEFAULT_EVENT_LOOP_THREADS;
// 默认情况下,EventLoop的数量 = 处理器数量 x 2
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
...
}
复制代码
继续调用父类,会调用到 MultithreadEventExecutorGroup 的构造器,主要做三件事情:
- 创建线程任务执行器 ThreadPerTaskExecutor
- 通过for循环创建数量为 nThreads 个的 EventLoop
- 创建 EventLoop 选择器 EventExecutorChooser
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 创建任务执行器 ThreadPerTaskExecutor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 创建 EventExecutor 数组
children = new EventExecutor[nThreads];
// 通过for循环创建数量为 nThreads 个的 EventLoop
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 调用 newChild 接口
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 创建选择器
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
复制代码
创建线程任务执行器 ThreadPerTaskExecutor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
复制代码
线程任务执行器 ThreadPerTaskExecutor 源码如下,具体的任务都由 ThreadFactory 去执行:
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
// 使用 threadFactory 执行任务
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
复制代码
来看看 newDefaultThreadFactory 方法:
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
复制代码
DefaultThreadFactory
接下来看看 DefaultThreadFactory 这个类,实现了 ThreadFactory 接口,我们可以了解到:
- EventLoopGroup的命名规则
- 具体的线程为 FastThreadLocalThread
public class DefaultThreadFactory implements ThreadFactory {
// 线程池ID编号自增器
private static final AtomicInteger poolId = new AtomicInteger();
// 线程ID自增器
private final AtomicInteger nextId = new AtomicInteger();
// 线程名称前缀
private final String prefix;
// 是否为守护进程
private final boolean daemon;
// 线程优先级
private final int priority;
// 线程组
protected final ThreadGroup threadGroup;
public DefaultThreadFactory(Class<?> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}
...
// 获取线程名,返回结果:nioEventLoopGroup
public static String toPoolName(Class<?> poolType) {
if (poolType == null) {
throw new NullPointerException("poolType");
}
String poolName = StringUtil.simpleClassName(poolType);
switch (poolName.length()) {
case 0:
return "unknown";
case 1:
return poolName.toLowerCase(Locale.US);
default:
if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
} else {
return poolName;
}
}
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
if (poolName == null) {
throw new NullPointerException("poolName");
}
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
// nioEventLoopGroup-2-
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority, System.getSecurityManager() == null ?
Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}
@Override
public Thread newThread(Runnable r) {
// 创建新线程 nioEventLoopGroup-2-1
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
// 创建新线程 FastThreadLocalThread
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
}
复制代码
创建NioEventLoop
继续从 MultithreadEventExecutorGroup 构造器开始,创建完任务执行器 ThreadPerTaskExecutor 之后,进入for循环,开始创建 NioEventLoop:
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建 nioEventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
}
...
}
复制代码
NioEventLoopGroup类中的 newChild() 方法:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
复制代码
NioEventLoop 构造器:
public final class NioEventLoop extends SingleThreadEventLoop{
...
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 调用父类 SingleThreadEventLoop 构造器
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
// 设置 selectorProvider
provider = selectorProvider;
// 获取 SelectorTuple 对象,里面封装了原生的selector和优化过的selector
final SelectorTuple selectorTuple = openSelector();
// 设置优化过的selector
selector = selectorTuple.selector;
// 设置原生的selector
unwrappedSelector = selectorTuple.unwrappedSelector;
// 设置select策略
selectStrategy = strategy;
}
...
}
复制代码
接下来我们看看 获取多路复用选择器 方法—— openSelector() ,
// selectKey 优化选项flag
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
private SelectorTuple openSelector() {
// JDK原生的selector
final Selector unwrappedSelector;
try {
// 通过 SelectorProvider 创建获得selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 如果不优化,则直接返回
if (DISABLE_KEYSET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
// 通过反射创建 sun.nio.ch.SelectorImpl 对象
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
// 如果 maybeSelectorImplClass 不是 selector 的一个实现,则直接返回原生的Selector
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
// 确保当前的选择器实现是我们可以检测的
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
// maybeSelectorImplClass 是selector的实现,则转化为 selector 实现类
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
// 创建新的 SelectionKey 集合 SelectedSelectionKeySet,内部采用的是 SelectionKey 数组的形
// 式,而非 set 集合
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 通过反射的方式获取 sun.nio.ch.SelectorImpl 的成员变量 selectedKeys
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
// 通过反射的方式获取 sun.nio.ch.SelectorImpl 的成员变量 publicSelectedKeys
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
// 设置字段 selectedKeys Accessible 为true
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
// 设置字段 publicSelectedKeys Accessible 为true
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
// 设置 SelectedSelectionKeySet
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
// 返回包含了原生selector和优化过的selector的SelectorTuple
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
复制代码
优化后的 SelectedSelectionKeySet 对象,内部采用 SelectionKey 数组的形式:
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
// 使用数组,来替代HashSet,可以降低时间复杂度为O(1)
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public int size() {
return size;
}
@Override
public Iterator<SelectionKey> iterator() {
return new Iterator<SelectionKey>() {
private int idx;
@Override
public boolean hasNext() {
return idx < size;
}
@Override
public SelectionKey next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return keys[idx++];
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
void reset() {
reset(0);
}
void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}
// 扩容
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
}
复制代码
SingleThreadEventLoop 构造器
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
// 调用 SingleThreadEventExecutor 构造器
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
...
}
复制代码
SingleThreadEventExecutor 构造器,主要做两件事情:
- 设置线程任务执行器。
- 设置任务队列。前面讲到EventLoop对于不能立即执行的Task会放入一个队列中,就是这里设置的。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 设置线程任务执行器
this.executor = ObjectUtil.checkNotNull(executor, "executor");
// 设置任务队列
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
...
}
复制代码
NioEventLoop 中对 newTaskQueue 接口的实现,返回的是 JCTools 工具包 Mpsc 队列。后面我们写文章单独介绍 JCTools 中的相关队列。
Mpsc:Multi Producer Single Consumer (Lock less, bounded and unbounded)
多个生产者对单个消费者(无锁、有界和无界都有实现)
public final class NioEventLoop extends SingleThreadEventLoop {
...
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
...
}
复制代码
创建线程执行选择器chooser
接下来,我们看看 MultithreadEventExecutorGroup 构造器的最后一个部分内容,创建线程执行选择器chooser,它的主要作用就是 EventLoopGroup 用于从 EventLoop 数组中选择一个 EventLoop 去执行任务。
// 创建选择器 chooser = chooserFactory.newChooser(children); 复制代码
EventLoopGroup 中定义的 next() 接口:
public interface EventLoopGroup extends EventExecutorGroup {
...
// 选择下一个 EventLoop 用于执行任务
@Override
EventLoop next();
...
}
复制代码
MultithreadEventExecutorGroup 中对 next() 的实现:
@Override
public EventExecutor next() {
// 调用 DefaultEventExecutorChooserFactory 中的next()
return chooser.next();
}
复制代码
DefaultEventExecutorChooserFactory 对于如何从数组中选择任务执行器,也做了巧妙的优化。
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
// 判断线程任务执行的个数是否为 2 的幂次方。e.g: 2、4、8、16
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
// 幂次方选择器
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
// 通过二级制进行 & 运算,效率更高
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
// 普通选择器
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
// 按照最普通的取模的方式从index=0开始向后开始选择
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
复制代码
小结
通过本节内容,我们了解到了EventLoop与EventLoopGroup的基本原理,EventLoopGroup与EventLoop的创建过程:
- 创建线程任务执行器 ThreadPerTaskExecutor
- 创建EventLoop
- 创建任务选择器 EventExecutorChooser
参考资料
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Spring容器创建源码解析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- LayoutInflater创建View源码阅读
- Spring源码系列:BeanFactory的创建
- Sphinx源码学习笔记(一):索引创建
- Netty源码分析--创建Channel(三)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
An Introduction to Genetic Algorithms
Melanie Mitchell / MIT Press / 1998-2-6 / USD 45.00
Genetic algorithms have been used in science and engineering as adaptive algorithms for solving practical problems and as computational models of natural evolutionary systems. This brief, accessible i......一起来看看 《An Introduction to Genetic Algorithms》 这本书的介绍吧!