内容简介:原文链接:前面的一些章节,我们分析了Netty的三大组件 ——Channel 、本文内容主要分为以下四部分:
原文链接: wangwei.one/posts/netty…
前面的一些章节,我们分析了Netty的三大组件 ——Channel 、 EventLoop 、Pipeline ,对Netty的工作原理有了深入的了解。在此基础上,我们来分析一下当Netty服务端启动后,Netty是如何处理新连接接入的。
本文内容主要分为以下四部分:
- 新连接检测
- NioSocketChannel创建
- NioSocketChannel初始化与注册
- NioSocketChannel注册READ兴趣集
新连接检测
前面,我们在讲 EventLoop的启动过程源码分析 时,解读过下面这段代码:
public final class NioEventLoop extends SingleThreadEventLoop {
...
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
...
try {
...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 读取read事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
...
}
...
}
复制代码
我们还是以服务端 NioServerSocketChannel 为例,它绑定的unsafe实例为 NioMessageUnsafe 。上面的 unsafe.read() 接口,会向下调用到 NioMessageUnsafe.read() 接口,如下:
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
...
private final class NioMessageUnsafe extends AbstractNioUnsafe {
// 用于保存新建立的 NioSocketChannel 的集合
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
// 确保在当前线程与EventLoop中的一致
assert eventLoop().inEventLoop();
// 获取 NioServerSocketChannel config配置
final ChannelConfig config = config();
// 获取 NioServerSocketChannel 绑定的 pipeline
final ChannelPipeline pipeline = pipeline();
// 获取RecvByteBuf 分配器 Handle
// 当channel在接收数据时,allocHandle 会用于分配ByteBuf来保存数据
// 关于allocHandle后面再去做详细介绍
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 重置已累积的所有计数器,并为下一个读取循环读取多少消息/字节数据提供建议
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 调用后面的 doReadMessages 接口,读取到message则返回1
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// 对当前read循环所读取到的message数量计数+1
allocHandle.incMessagesRead(localRead);
// 判断是否继续读取message
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 调用pipeline传播ChannelRead事件
pipeline.fireChannelRead(readBuf.get(i));
}
// 清空readBuf
readBuf.clear();
allocHandle.readComplete();
// 调用pipeline传播 ChannelReadComplete 事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
...
}
复制代码
对于 doReadMessages(...) 的分析:
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {
...
// 读取消息
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 获取 SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 使用SocketChannel创建NioSocketChannel,将其存入buf list中
// 关于NioSocketChannel的创建请看后面的分析
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
...
}
复制代码
对于 continueReading() 接口的分析,至于结果为什么返回false,后面会单独分析:
public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
private volatile int maxMessagesPerRead;
private volatile boolean respectMaybeMoreData = true;
...
public abstract class MaxMessageHandle implements ExtendedHandle {
private ChannelConfig config;
// 每次读取最大的消息数
private int maxMessagePerRead;
private int totalMessages;
private int totalBytesRead;
private int attemptedBytesRead;
private int lastBytesRead;
private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return attemptedBytesRead == lastBytesRead;
}
};
...
// 判断是否继续读取message
@Override
public boolean continueReading() {
return continueReading(defaultMaybeMoreSupplier);
}
// 判断是否继续读取message
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
// 默认情况下 config.isAutoRead() 为true
// respectMaybeMoreData 默认为 true
// maybeMoreDataSupplier.get() 为false
// totalMessages第一次循环则为1
// maxMessagePerRead为16
// 结果返回false
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}
...
}
...
}
复制代码
NioSocketChannel创建
上面分析新连接接入,提到了 NioSocketChannel 的创建,我们这里来详细分析一下,NioSocketChannel的创建过程与此前我们分析 NioServerSocketChannel创建 大体类似。
构造器
先来看看 NioSocketChannel 的构造函数:
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
...
public NioSocketChannel(Channel parent, SocketChannel socket) {
// 调用父类构造器
super(parent, socket);
// 创建NioSocketChannelConfig
config = new NioSocketChannelConfig(this, socket.socket());
}
...
}
复制代码
父类 AbstractNioByteChannel 构造器:
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
...
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 调用父类构造器,并设置兴趣集为SelectionKey.OP_READ,对read事件感兴趣
super(parent, ch, SelectionKey.OP_READ);
}
...
}
复制代码
父类 AbstractNioChannel 构造器:
public abstract class AbstractNioChannel extends AbstractChannel {
...
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// 调用父类构造器
super(parent);
// 设置channel
this.ch = ch;
// 设置兴趣集
this.readInterestOp = readInterestOp;
try {
// 设置为非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
...
}
}
}
复制代码
父类 AbstractChannel 构造器:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
...
protected AbstractChannel(Channel parent) {
// 设置parent
this.parent = parent;
// 创建channelId
id = newId();
// 创建unsafe
unsafe = newUnsafe();
// 创建pipeline
pipeline = newChannelPipeline();
}
...
}
复制代码
ChannelConfig创建
接着我们看看 NioSocketChannelConfig 的创建逻辑:
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
...
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
// 调用父类构造器
super(channel, javaSocket);
calculateMaxBytesPerGatheringWrite();
}
...
}
复制代码
父类 DefaultSocketChannelConfig 构造器:
public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {
...
public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
// 调用父类构造器,绑定socketchannel
super(channel);
if (javaSocket == null) {
throw new NullPointerException("javaSocket");
}
// 绑定java socket
this.javaSocket = javaSocket;
// Enable TCP_NODELAY by default if possible.
// netty一般运行在服务器上,不在Android上,canEnableTcpNoDelayByDefault返回true
if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
try {
// 开启 TCP_NODELAY ,开启TCP的nagle算法
// 尽量不要等待,只要发送缓冲区中有数据,并且发送窗口是打开的,就尽量把数据发送到网络上去。
setTcpNoDelay(true);
} catch (Exception e) {
// Ignore.
}
}
}
...
}
复制代码
NioSocketChannel初始化与注册
上面小节分析了NioSocketChannel的创建逻辑,创建完成之后,我们来分析一下NioSocketChannel是如何注册到NioEventLoop上去的。
在前面小节分析新连接检测的有如下小段代码:
private final class NioMessageUnsafe extends AbstractNioUnsafe {
...
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 调用pipeline传播ChannelRead事件
pipeline.fireChannelRead(readBuf.get(i));
}
...
}
复制代码
调用pipeline传播ChannelRead事件,这里的Pipeline是服务端Channel,也就是NioServerSocketChannel所绑定的Pipeline,此时的Pipeline的内部结构是怎么样子的呢?
那这个 ServerBootstrapAcceptor 是从哪里来的呢?
在此前,我们分析 NioServerSocketChannel初始化 时,有过下面这段代码:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
...
// NioServerSocketChannel初始化
void init(Channel channel) throws Exception {
// 获取启动器 启动时配置的option参数,主要是TCP的一些属性
final Map<ChannelOption<?>, Object> options = options0();
// 将获得到 options 配置到 ChannelConfig 中去
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 获取 ServerBootstrap 启动时配置的 attr 参数
final Map<AttributeKey<?>, Object> attrs = attrs0();
// 配置 Channel attr,主要是设置用户自定义的一些参数
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 获取channel中的 pipeline,这个pipeline使我们前面在channel创建过程中设置的 pipeline
ChannelPipeline p = channel.pipeline();
// 将启动器中配置的 childGroup 保存到局部变量 currentChildGroup
final EventLoopGroup currentChildGroup = childGroup;
// 将启动器中配置的 childHandler 保存到局部变量 currentChildHandler
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
// 保存用户设置的 childOptions 到局部变量 currentChildOptions
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
// 保存用户设置的 childAttrs 到局部变量 currentChildAttrs
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 获取启动器上配置的handler
ChannelHandler handler = config.handler();
if (handler != null) {
// 添加 handler 到 pipeline 中
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 用child相关的参数创建出一个新连接接入器ServerBootstrapAcceptor
// 通过 ServerBootstrapAcceptor 可以将一个新连接绑定到一个线程上去
// 每次有新的连接进来 ServerBootstrapAcceptor 都会用child相关的属性对它们进行配置,并注册到ChaildGroup上去
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
...
}
复制代码
ServerBootstrapAcceptor
NioServerSocketChannel初始化时,向NioServerSocketChannel所绑定的Pipeline添加了一个InboundHandler节点 —— ServerBootstrapAcceptor ,其代码如下:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
...
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
// 子EventLoopGroup,即为workGroup
private final EventLoopGroup childGroup;
// ServerBootstrap启动时配置的 childHandler
private final ChannelHandler childHandler;
// ServerBootstrap启动时配置的 childOptions
private final Entry<ChannelOption<?>, Object>[] childOptions;
// ServerBootstrap启动时配置的 childAttrs
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
// 构造函数
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// Task which is scheduled to re-enable auto-read.
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
// not be able to load the class because of the file limit it already reached.
//
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
// 处理Pipeline所传播的channelRead事件
// 也就是前面新连接检测时看到的那段代码
// pipeline.fireChannelRead(readBuf.get(i));
// ServerBootstrapAcceptor的channelRead接口将会被调用,用于处理channelRead事件
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 获取传播事件的对象数据,即为前面的readBuf.get(i)
// readBuf.get(i)取出的对象为 NioSocketChannel
final Channel child = (Channel) msg;
// 向 NioSocketChannel 添加childHandler,也就是我们常看到的
// ServerBootstrap在启动时配置的代码:
// ServerBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {...} )
// 最终的结果就是向NioSocketChannel的Pipeline添加用户自定义的ChannelHandler
// 用于处理客户端的channel连接
child.pipeline().addLast(childHandler);
// 配置 NioSocketChannel的TCP属性
setChannelOptions(child, childOptions, logger);
// 配置 NioSocketChannel 一些用户自定义数据
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
// 将NioSocketChannel注册到childGroup,也就是Netty的WorkerGroup当中去
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
...
}
...
}
复制代码
关于 ChannelInitializer 的讲解,可以看此前Pipeline源码分析 文章。
后面的register逻辑,就与我们前面讲解 NioServerSocketChannel注册 大体类似了,这里简单介绍一下。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
...
// 注册NioSocketChannel
// eventLoop为childGroup
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
// 绑定eventLoop到NioSocketChannel上
AbstractChannel.this.eventLoop = eventLoop;
// 现在分析的逻辑是在服务端的线程上,eventLoop与主线程不同,返回false
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 这里来调用register0方法
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
// 注册
private void register0(ChannelPromise promise) {
try {
...
boolean firstRegistration = neverRegistered;
// 调用 doRegister()
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// 服务端的NioServerSocketChannel已经与客户端的NioSocketChannel建立了连接
// 所以,NioSocketChannel是处于激活状态,isActive()返回ture
if (isActive()) {
// 对于新连接,是第一次注册
if (firstRegistration) {
// 传播ChannelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
...
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
...
}
复制代码
调用到NioSocketChannel中的doRegister()方法:
public abstract class AbstractNioChannel extends AbstractChannel {
...
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将selector注册到底层JDK channel上,并附加了NioSocketChannel对象
// 兴趣集设置为0,表示不关心任何事件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
...
}
复制代码
NioSocketChannel 注册OP_READ兴趣集
紧接着上面的分析,传播ChannelActive事件之后的逻辑,主要就是向客户端的NioSocketChannel注册一个Read兴趣集
if (isActive()) {
// 对于新连接,是第一次注册
if (firstRegistration) {
// 传播ChannelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
复制代码
通过Pipeline的传播机制 ,最终会调用到doBeginRead()接口,如下:
public abstract class AbstractNioChannel extends AbstractChannel {
...
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
...
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
// 保存selectionKey到局部变量
final SelectionKey selectionKey = this.selectionKey;
// 判断有效性
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// 获取selectionKey的兴趣集
// 前面小结分析doRegister()接口提到,selectionKey的兴趣集设置为0
final int interestOps = selectionKey.interestOps();
// 这里的 readInterestOp 是前面讲NioSocketChannel创建时设置的值
// 为 SelectionKey.OP_READ,也就是1
if ((interestOps & readInterestOp) == 0) {
// 这样,selectionKey最终设置的兴趣集为SelectionKey.OP_READ
// 表示对读事件感兴趣
selectionKey.interestOps(interestOps | readInterestOp);
}
}
...
}
...
}
复制代码
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 【Netty】如何接入新连接
- Netty-新连接接入源码解读
- Netty 源码解析系列-客户端连接接入及读I/O解析
- 云转码接入视频网站解决方案 express-ffmpeg接入discuz方案
- 数据接入治理平台
- 有赞统一接入层架构演进
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Spring in Action
Craig Walls / Manning Publications / 2011-6-29 / USD 49.99
Spring in Action, Third Edition has been completely revised to reflect the latest features, tools, practices Spring offers to java developers. It begins by introducing the core concepts of Spring and......一起来看看 《Spring in Action》 这本书的介绍吧!