ChannelPipeline 和 ChannelHandler

栏目: Java · 发布时间: 5年前

内容简介:Channel 概念与 java.nio.channel 概念一致, 用以连接IO设备 (socket, 文件等) 的纽带. 例如将网络的读、写, 客户端发起连接, 主动关闭连接, 链路关闭, 获取通信双方的网络地址等.Channel 的 IO 类型主要有两种: 非阻塞IO (NIO) 以及阻塞IO(OIO).数据传输类型有两种: 按事件消息传递 (Message) 以及按字节传递 (Byte).

ChannelHandler

Channel

Channel 概念与 java.nio.channel 概念一致, 用以连接IO设备 (socket, 文件等) 的纽带. 例如将网络的读、写, 客户端发起连接, 主动关闭连接, 链路关闭, 获取通信双方的网络地址等.

Channel 的 IO 类型主要有两种: 非阻塞IO (NIO) 以及阻塞IO(OIO).

数据传输类型有两种: 按事件消息传递 (Message) 以及按字节传递 (Byte).

适用方类型也有两种: 服务器(ServerSocket) 以及客户端(Socket). 还有一些根据传输协议而制定的的Channel, 如: UDT、SCTP等.

Netty 按照类型逐层设计相应的类. 最底层的为抽象类 AbstractChannel , 再以此根据IO类型、数据传输类型、适用方类型实现. 类图可以一目了然, 如下图所示:

ChannelPipeline 和 ChannelHandler

Channel 状态

ChannelPipeline 和 ChannelHandler

channelRegistered 状态

/**
 * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
 */
void channelRegistered(ChannelHandlerContext ctx) throws Exception;

从注释里面可以看到是在 Channel 绑定到 Eventloop 上面的时候调用的.

不管是 Server 还是 Client, 绑定到 Eventloop 的时候, 最终都是调用 Abstract.initAndRegister() 这个方法上(Server是在 AbstractBootstrap.doBind() 的时候调用的, Client 是在 Bootstrap.doConnect() 的时候调用的).

initAndRegister() 方法定义如下:

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    // 把channel绑定到Eventloop对象上面去
    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

继续跟踪下去会定位到 AbstractChannel.AbstractUnsafe.register0() 方法上.

private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                // 做实际的绑定动作。把Channel感兴趣的事件注册到Eventloop.selector上面.具体实现在Abstract.doRegister()方法内
                doRegister();
                neverRegistered = false;
                registered = true;

                // 通过pipeline的传播机制,触发handlerAdded事件
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                
                // 通过pipeline的传播机制,触发channelRegistered事件
                pipeline.fireChannelRegistered();

                // 还没有绑定,所以这里的 isActive() 返回false.
                if (isActive()) {
                    if (firstRegistration) {
                        // 如果当前链路已经激活,则调用channelActive()方法
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

从上面的代码也可以看出, 在调用完 pipeline.fireChannelRegistered() 之后, 紧接着会调用 isActive() 判断当前链路是否激活, 如果激活了则会调用 pipeline.fireChannelActive() 方法.

这个时候, 对于 Client 和 Server 都还没有激活, 所以, 这个时候不管是 Server 还是 Client 都不会调用 pipeline.fireChanenlActive() 方法.

channelActive 状态

从启动器的 bind() 接口开始, 往下调用 doBind() 方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化及注册
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        // 调用 doBind0
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        ....
    }
}

doBind 方法又会调用 doBind0() 方法, 在 doBind0() 方法中会通过 EventLoop 去执行 channelbind() 任务.

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                // 调用channel.bind接口
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

doBind0() 方法往下会调用到 pipeline.bind(localAddress, promise) ; 方法, 通过 pipeline 的传播机制, 最终会调用到 AbstractChannel.AbstractUnsafe.bind() 方法, 这个方法主要做两件事情:

doBind()
pipeline.fireChannelActive()
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    
    ....
    
    // wasActive 在绑定成功前为 false
    boolean wasActive = isActive();
    try {
        // 调用doBind()调用JDK底层API进行端口绑定
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    // 完成绑定之后,isActive() 返回true
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                // 触发channelActive事件
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}

也就是说当有新客户端连接的时候, 会变成活动状态.

channelInactive 状态

fireChannelnactive() 方法在两个地方会被调用: Channel.close()Channel.disconnect() .

在调用前会先确认状态是从 Active ---> Inactive .

channelUnregistered 状态

fireChannelUnregistered() 方法是在 ChannelEventloop 中解除注册的时候被调用的. Channel.close() 的时候被触发执行.

ChannelHandler 的生命周期

handlerAdded() : 添加到 ChannelPipeline 时调用.

handlerRemoved() : 从 ChannelPipeline 中移除时调用.

exceptionCaught() : 处理过程中在 ChannelPipeline 中有错误产生时调用.

处理 I/O 事件或截获 I/O 操作, 并将其转发到 ChannelPipeline 中的下一个处理程序. ChannelHandler 本身不提供许多方法, 但通常必须实现其子类型之一:

ChannelInboundHandler
ChannelOutboundHandler

ChannelInboundHandler 接口

channelRegistered() : 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用.

channelUnregistered() : 当 Channel 从他的 EventLoop 注销并且无法处理任何 I/O 时被调用.

channelActive() : 当 Channel 处于活动状态时被调用.

channelInactive() : 当 Channel 离开活动状态并且不再连接远程节点时被调用.

channelRead() : 当从 Channel 读取数据时被调用.

channelReadComplete() : 当 Channel 上的一个读操作完成时被调用. 当所有可读字节都从 Channel 中读取之后, 将会调用该回调方法.

ChannelOutboundHandler 接口

出站操作和数据将由 ChannelOutboundHandler 处理. 它的方法将被 Channel ChannelPipeline 以及 ChannelHandlerContext 调用.

ChannelOutboundHandler 的一个强大的功能是可以按需推迟操作或事件, 这使得可以通过一些复杂的方法来处理请求. 例如, 如果到远程节点的写入被暂停, 那么你可以推迟刷新操作并在稍后继续.

connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) : 当请求将 Channel 连接到远程节点时被调用.

disconnect(ChannelHandlerContext ctx, ChannelPromise promise) : 当请求将 Channel 从远程节点断开时被调用.

deregister(ChannelHandlerContext ctx, ChannelPromise promise) : 当请求将 Channel 从它的 EventLoop 注销时被调用.

read(ChannelHandlerContext ctx) : 当请求从 Channel 读取更多的数据时被调用.

write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) : 当请求通过 Channel 将数据写到远程节点时被调用.

flush(ChannelHandlerContext ctx) : 当请求从 Channel 将入队数据冲刷到远程节点时被调用.

ChannelPromise 和 ChannelFuture

ChannelFuture 表示 Channel 中异步I/O操作的结果, 在 netty 中所有的 I/O 操作都是异步的, I/O 的调用会直接返回, 可以通过 ChannelFuture 来获取 I/O 操作的结果或者状态信息.

当 I/O 操作开始时, 将创建一个新对象. 新的对象是未完成的-它既没有成功, 也没有失败, 也没有被取消, 因为 I/O 操作还没有完成.

如果 I/O 操作已成功完成(失败或取消), 则对象将标记为已完成, 其中包含更具体的信息, 例如故障原因.

请注意, 即使失败和取消属于已完成状态.

ChannelPromiseChannelFuture 的一个子接口, 其定义了一些可写的方法, 如 setSuccess()setFailure() , 从而使 ChannelFuture 不可变.

ChannelPipeline 和 ChannelHandler

优先使用addListener(GenericFutureListener),而非await()

当做了一个 I/O 操作并有任何后续任务的时候, 推荐优先使用 addListener(GenericFutureListener) 的方式来获得通知, 而非 await()

addListener(GenericFutureListener) 是非阻塞的. 它会把特定的 ChannelFutureListener 添加到 ChannelFuture 中, 然后 I/O 线程会在 I/O 操作相关的 future 完成的时候通知监听器.

ChannelFutureListener 会利于最佳的性能和资源的利用, 因为它一点阻塞都没有. 而且不会造成死锁.

ChannelHandler 适配器

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 这两个适配器类分别提供了

ChannelInboundHandlerChannelOutboundHandler 的基本实现, 它们继承了共同的父接口

ChannelHandler 的方法, 扩展了抽象类 ChannelHandlerAdapter .

ChannelPipeline 和 ChannelHandler

ChannelHandlerAdapter 还提供了实用方法 isSharable() .

如果其对应的实现被标注为 Sharable , 那么这个方法将返回 true , 表示它可以被添加到多个 ChannelPipeline 中.

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 中所提供的方法体调用了其相关联的 ChannelHandlerContext 上的等效方法, 从而将事件转发到了 ChannelPipeline 中的 ChannelHandler 中.

ChannelPipeline 接口

ChannelPipeline 将多个 ChannelHandler 链接在一起来让事件在其中传播处理. 一个 ChannelPipeline 中可能不仅有入站处理器, 还有出站处理器, 入站处理器只会处理入站的事件, 而出站处理器只会处理出站的数据.

每一个新创建的 Channel 都将会分配一个新的 ChannelPipeline , 不能附加另一个 ChannelPipeline , 也不能分离当前的.

通过调用 ChannelHandlerContext 实现, 它将被转发给同一个超类型的下一个 ChannelHandler .

ChannelPipeline 和 ChannelHandler

从事件途径 ChannelPilpeline 的角度来看, ChannelPipeline 的头部和尾端取决于该事件是入站的还是出站的.

而 Netty 总是将 ChannelPilpeline 的入站口 (左侧) 作为头部, 将出站口 (右侧) 作为尾端.

当通过调用 ChannelPilpeline.add*() 方法将入站处理器和出站处理器混合添加到 ChannelPilpeline 之后, 每一个 ChannelHandler 从头部到尾端的顺序就是我们添加的顺序.

ChannelPilpeline 传播事件时, 它会测试 ChannelPilpeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配. 如果不匹配, ChannelPilpeline 将跳过该 ChannelHandler 并前进到下一个, 直到它找到和该事件期望的方向相匹配的为止.

修改 ChannelPipeline

这里指修改 ChannelPipeline 中的 ChannelHandler 的编排.

通过调用 ChannelPipeline 上的相关方法, ChannelHandler 可以添加, 删除或者替换其他的 ChannelHandler , 从而实时地修改 ChannelPipeline 的布局.

addFirst  // 将 ChannelHandler 插入第一个位置
addBefore // 在某个 ChannelHandler 之前添加一个
addAfter  // 在某个 ChannelHandler 之后添加一个
addLast   // 将 ChannelHandler 插入最后一个位置
remove    // 移除某个 ChannelHandler
replace   // 将某个 ChannelHandler 替换成指定 ChannelHandler

ChannelHandlerContext 接口

ChannelHandlerContext 代表了 ChanelHandlerChannelPipeline 之间的关联, 每当有 ChanelHandler 添加到 ChannelPipeline 中, 都会创建 ChannelHandlerContext .

ChannelHandlerContext 的主要功能是管理它所关联的 ChannelPipeline 和同一个 ChannelPipeline 中的其他 ChanelHandler 之间的交互.

ChannelHandlerContext 有很多的方法, 其中一些方法也存在于 ChannelChannelPipeline 上, 但是有一点重要的不同.

如果调用 ChannelChannelPipeline 上的这些方法将沿着 ChannelPipeline 进行传播(从头或尾开始).

而调用位于 ChannelHandlerContext 上的相同方法, 则将从当前所关联的 ChannelHandler 开始, 并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler .

这样做可以减少 ChannelHandler 的调用开销.

使用 ChannelHandlerContext

ChannelPipeline 和 ChannelHandler

上图为 Channel ChannelPipeline ChannelHandler 以及 ChannelHandlerContext 之间的关系.


以上所述就是小编给大家介绍的《ChannelPipeline 和 ChannelHandler》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

High Performance Python

High Performance Python

Micha Gorelick、Ian Ozsvald / O'Reilly Media / 2014-9-10 / USD 39.99

If you're an experienced Python programmer, High Performance Python will guide you through the various routes of code optimization. You'll learn how to use smarter algorithms and leverage peripheral t......一起来看看 《High Performance Python》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试