内容简介:ChannelPipeline是Channel的负责组织ChannelHandler的组件,如上图所示,想象远端为上方,最上面为head,近端为我们的程序,最下面为tail。一个inbound事件,通常为读到的消息、用户自定义事件等会从上而下经过各个ChannelInboundHandler。而Outbound事件通常为write消息等,会经过ChannelOutboundHandler处理。如果要在程序中发起一个事件,可以通过ChannelHandlerContext,ChannelHandlerCon
ChannelPipeline是Channel的负责组织ChannelHandler的组件,如上图所示,想象远端为上方,最上面为head,近端为我们的程序,最下面为tail。一个inbound事件,通常为读到的消息、用户自定义事件等会从上而下经过各个ChannelInboundHandler。而Outbound事件通常为write消息等,会经过ChannelOutboundHandler处理。
如果要在程序中发起一个事件,可以通过ChannelHandlerContext,ChannelHandlerContext的方法和Channel方法的区别是ChannelHandlerContext的事件会传递给下一个ChannelHandler来处理,而Channel发出的事件会从头ChannelHandler(head或tail)开始处理。ChannelPipeline类似Servlet中的Filter,或其他的Interceptor模式。
Inbound事件传播方法:
- ChannelHandlerContext#fireChannelRegistered()
- ChannelHandlerContext#fireChannelActive()
- ChannelHandlerContext#fireChannelRead(Object)
- ChannelHandlerContext#fireChannelReadComplete()
- ChannelHandlerContext#fireExceptionCaught(Throwable)
- ChannelHandlerContext#fireChannelUserEventTriggered(Object)
- ChannelHandlerContext#fireChannelChannelInactive()
- ChannelHandlerContext#fireChannelChannelUnRegistered()
- ChannelHandlerContext#fireChannelChannelWritabilityChanged()
Outbound事件传播方法包括
- ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
- ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
- ChannelHandlerContext#write(Object, ChannelPromise)
- ChannelHandlerContext#flush()
- ChannelHandlerContext#read() //
- ChannelHandlerContext#close(ChannelPromise)
- ChannelHandlerContext#disconnect(ChannelPromise)
- ChannelHandlerContext#deregister(SocketAddress, ChannelPromise)
ChannelPipeline上的ChannelHandler通常分为以下几类
- Protocol Decoder - 将二进制数据转换为 Java 对象或将一种Java对象转换为另一种Java对象
- Protocol Encoder - 将Java对象转换成二进制数据或将一种Java对象转换为另一种Java对象
- 业务逻辑Handler - 针对不同的事件做出业务逻辑
ChannelPipeline最常用的方法就是在pipeline最后添加ChannelHandler了
ChannelPipeline addLast(ChannelHandler... handlers);
除此之外,pipeline是线程安全的,还能动态地添加删除ChannelHandler。
另外pipeline也包括了firestChannelxxx方法
@Override
ChannelPipelinefireChannelRegistered();
@Override
ChannelPipelinefireChannelUnregistered();
@Override
ChannelPipelinefireChannelActive();
@Override
ChannelPipelinefireChannelInactive();
@Override
ChannelPipelinefireExceptionCaught(Throwable cause);
@Override
ChannelPipelinefireUserEventTriggered(Object event);
@Override
ChannelPipelinefireChannelRead(Object msg);
@Override
ChannelPipelinefireChannelReadComplete();
@Override
ChannelPipelinefireChannelWritabilityChanged();
@Override
ChannelPipelineflush();
DefaultChannelPipeline
ChannelPipeline实现-DefaultChannelPipeline
很自然的我们可以想到使用双向链表来实现pipeline。
DefaultChannelPipeline中包含了两个特殊的ChannelHandler, head和tail, 实现类分别是HeadContext和TailContext,分别作为队列的头和尾。
两个节点在ChannelPipeline创建的时候被设置。
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
protected DefaultChannelPipeline(Channel channel){
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
HeadContext
HeadContext主要负责把Outbound相关事件交给AbstractChannel.Unsafe来处理,如bind、write等。
final class HeadContextextends AbstractChannelHandlerContext
implements ChannelOutboundHandler,ChannelInboundHandler{
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
@Override
public ChannelHandler handler(){
return this;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx)throws Exception {
// NOOP
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx)throws Exception {
// NOOP
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise)throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
unsafe.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
unsafe.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx){
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Excep
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx)throws Exception {
unsafe.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
ctx.fireExceptionCaught(cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx)throws Exception {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx)throws Exception {
ctx.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx)throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
@Override
public void channelInactive(ChannelHandlerContext ctx)throws Exception {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
private void readIfIsAutoRead(){
if (channel.config().isAutoRead()) {
channel.read();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx)throws Exception {
ctx.fireChannelWritabilityChanged();
}
}
TailContext
TailContext的作用主要是最终给一些消息ReferenceCount减一、打印前面没有捕获的异常等。
final class TailContextextends AbstractChannelHandlerContextimplements ChannelInboundHandler{
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
@Override
public ChannelHandler handler(){
return this;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx)throws Exception { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx)throws Exception { }
@Override
public void channelActive(ChannelHandlerContext ctx)throws Exception {
onUnhandledInboundChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx)throws Exception {
onUnhandledInboundChannelInactive();
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx)throws Exception {
onUnhandledChannelWritabilityChanged();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx)throws Exception { }
@Override
public void handlerRemoved(ChannelHandlerContext ctx)throws Exception { }
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
onUnhandledInboundUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
onUnhandledInboundException(cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
onUnhandledInboundMessage(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)throws Exception {
onUnhandledInboundChannelReadComplete();
}
}
addLast实现
addLast将一个ChannelHandler放在pipeline最后,内部实现是在tail之前。有可选参数executor和name。
executor表示执行ChannelHandler处理的线程池,如果没有设置或传入null则使用对应channel的eventLoop。
通常如果有耗时很大的处理,则会自定义一个线程池来执行,避免阻塞eventLoop导致不能及时处理IO事件。
name可以给这个ChannelPipeline上的Handler定义一个名字,方便之后replace等操作,如果没有传入则会自动生成一个。
addLast首先给this加锁,来保证线程安全,因为其中的队列指针操作有很多步骤。
- 创建一个ChannelHandlerContext, ChannelHandlerContext可以理解为ChannelPipeline和ChannelHandler的交集或交叉点,ctx中能得到ChannelPipeline和ChannelHandler。
- 追加到队尾,tail之前
- 设置ChannelHandlerContext状态为已添加,同时触发ChannelHandler的handlerAdded事件
这里还有一个细节,就是Channel在执行addLast的时候可能还没有完成register,如果此时回调handlerAdded则会
导致顺序问题先发生了added再register。所以这里判断是否已经注册,如果没有则先放到一个队列中,等注册完成后再执行。
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
创建ChannelHandlerContext
private AbstractChannelHandlerContextnewContext(EventExecutorGroup group, String name, ChannelHandlerhandler){
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
final class DefaultChannelHandlerContextextends AbstractChannelHandlerContext{
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
@Override
public ChannelHandlerhandler(){
return handler;
}
...
}
链表操作,放到队尾
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
设置ChannelHandlerContext已经添加、回调ChannelHandler的handlerAdded
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx){
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
...
}
总结
ChannelPipeline是保存Channel上的ChannelHandler的组件,内部是双向链表结构,我们看到节点保存的是
ChannelHandlerContext,而ChannelHandlerContext又是通过ChannelPipeline和ChannelHandler构造出来的。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Algorithms + Data Structures = Programs
Niklaus Wirth / Prentice Hall / 1975-11-11 / GBP 84.95
It might seem completely dated with all its examples written in the now outmoded Pascal programming language (well, unless you are one of those Delphi zealot trying to resist to the Java/.NET dominanc......一起来看看 《Algorithms + Data Structures = Programs》 这本书的介绍吧!