Netty源码分析3-ChannelPipeline

栏目: 编程工具 · 发布时间: 7年前

内容简介:ChannelPipeline是Channel的负责组织ChannelHandler的组件,如上图所示,想象远端为上方,最上面为head,近端为我们的程序,最下面为tail。一个inbound事件,通常为读到的消息、用户自定义事件等会从上而下经过各个ChannelInboundHandler。而Outbound事件通常为write消息等,会经过ChannelOutboundHandler处理。如果要在程序中发起一个事件,可以通过ChannelHandlerContext,ChannelHandlerCon

Netty源码分析3-ChannelPipeline

ChannelPipeline是Channel的负责组织ChannelHandler的组件,如上图所示,想象远端为上方,最上面为head,近端为我们的程序,最下面为tail。一个inbound事件,通常为读到的消息、用户自定义事件等会从上而下经过各个ChannelInboundHandler。而Outbound事件通常为write消息等,会经过ChannelOutboundHandler处理。

如果要在程序中发起一个事件,可以通过ChannelHandlerContext,ChannelHandlerContext的方法和Channel方法的区别是ChannelHandlerContext的事件会传递给下一个ChannelHandler来处理,而Channel发出的事件会从头ChannelHandler(head或tail)开始处理。ChannelPipeline类似Servlet中的Filter,或其他的Interceptor模式。

Inbound事件传播方法:

  • ChannelHandlerContext#fireChannelRegistered()
  • ChannelHandlerContext#fireChannelActive()
  • ChannelHandlerContext#fireChannelRead(Object)
  • ChannelHandlerContext#fireChannelReadComplete()
  • ChannelHandlerContext#fireExceptionCaught(Throwable)
  • ChannelHandlerContext#fireChannelUserEventTriggered(Object)
  • ChannelHandlerContext#fireChannelChannelInactive()
  • ChannelHandlerContext#fireChannelChannelUnRegistered()
  • ChannelHandlerContext#fireChannelChannelWritabilityChanged()

Outbound事件传播方法包括

  • ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
  • ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
  • ChannelHandlerContext#write(Object, ChannelPromise)
  • ChannelHandlerContext#flush()
  • ChannelHandlerContext#read() //
  • ChannelHandlerContext#close(ChannelPromise)
  • ChannelHandlerContext#disconnect(ChannelPromise)
  • ChannelHandlerContext#deregister(SocketAddress, ChannelPromise)

ChannelPipeline上的ChannelHandler通常分为以下几类

  • Protocol Decoder - 将二进制数据转换为 Java 对象或将一种Java对象转换为另一种Java对象
  • Protocol Encoder - 将Java对象转换成二进制数据或将一种Java对象转换为另一种Java对象
  • 业务逻辑Handler - 针对不同的事件做出业务逻辑

ChannelPipeline最常用的方法就是在pipeline最后添加ChannelHandler了

ChannelPipeline addLast(ChannelHandler... handlers);

除此之外,pipeline是线程安全的,还能动态地添加删除ChannelHandler。

另外pipeline也包括了firestChannelxxx方法

@Override
    ChannelPipelinefireChannelRegistered();

     @Override
    ChannelPipelinefireChannelUnregistered();

    @Override
    ChannelPipelinefireChannelActive();

    @Override
    ChannelPipelinefireChannelInactive();

    @Override
    ChannelPipelinefireExceptionCaught(Throwable cause);

    @Override
    ChannelPipelinefireUserEventTriggered(Object event);

    @Override
    ChannelPipelinefireChannelRead(Object msg);

    @Override
    ChannelPipelinefireChannelReadComplete();

    @Override
    ChannelPipelinefireChannelWritabilityChanged();

    @Override
    ChannelPipelineflush();

DefaultChannelPipeline

ChannelPipeline实现-DefaultChannelPipeline

很自然的我们可以想到使用双向链表来实现pipeline。

DefaultChannelPipeline中包含了两个特殊的ChannelHandler, head和tail, 实现类分别是HeadContext和TailContext,分别作为队列的头和尾。

两个节点在ChannelPipeline创建的时候被设置。

final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;

protected DefaultChannelPipeline(Channel channel){
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}

HeadContext

HeadContext主要负责把Outbound相关事件交给AbstractChannel.Unsafe来处理,如bind、write等。

final class HeadContextextends AbstractChannelHandlerContext
implements ChannelOutboundHandler,ChannelInboundHandler{
    private final Unsafe unsafe;
    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, false, true);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }
    @Override
    public ChannelHandler handler(){
        return this;
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx)throws Exception {
        // NOOP
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx)throws Exception {
        // NOOP
    }
    @Override
    public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
        unsafe.bind(localAddress, promise);
    }
    @Override
    public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise)throws Exception {
        unsafe.connect(remoteAddress, localAddress, promise);
    }
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
        unsafe.disconnect(promise);
    }
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
        unsafe.close(promise);
    }
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
        unsafe.deregister(promise);
    }
    @Override
    public void read(ChannelHandlerContext ctx){
        unsafe.beginRead();
    }
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Excep
unsafe.write(msg, promise);
    }
    @Override
    public void flush(ChannelHandlerContext ctx)throws Exception {
        unsafe.flush();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
        ctx.fireExceptionCaught(cause);
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx)throws Exception {
        invokeHandlerAddedIfNeeded();
        ctx.fireChannelRegistered();
    }
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelUnregistered();
        // Remove all handlers sequentially if channel is closed and unregistered.
        if (!channel.isOpen()) {
            destroy();
        }
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelActive();
        readIfIsAutoRead();
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelInactive();
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
        ctx.fireChannelRead(msg);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelReadComplete();
        readIfIsAutoRead();
    }
    private void readIfIsAutoRead(){
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
        ctx.fireUserEventTriggered(evt);
    }
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelWritabilityChanged();
    }
}

TailContext

TailContext的作用主要是最终给一些消息ReferenceCount减一、打印前面没有捕获的异常等。

final class TailContextextends AbstractChannelHandlerContextimplements ChannelInboundHandler{
    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, true, false);
        setAddComplete();
    }
    @Override
    public ChannelHandler handler(){
        return this;
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx)throws Exception { }
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx)throws Exception { }
    @Override
    public void channelActive(ChannelHandlerContext ctx)throws Exception {
        onUnhandledInboundChannelActive();
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx)throws Exception {
        onUnhandledInboundChannelInactive();
    }
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx)throws Exception {
        onUnhandledChannelWritabilityChanged();
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx)throws Exception { }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx)throws Exception { }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
        onUnhandledInboundUserEventTriggered(evt);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
        onUnhandledInboundException(cause);
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
        onUnhandledInboundMessage(msg);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)throws Exception {
        onUnhandledInboundChannelReadComplete();
    }
}

addLast实现

addLast将一个ChannelHandler放在pipeline最后,内部实现是在tail之前。有可选参数executor和name。

executor表示执行ChannelHandler处理的线程池,如果没有设置或传入null则使用对应channel的eventLoop。

通常如果有耗时很大的处理,则会自定义一个线程池来执行,避免阻塞eventLoop导致不能及时处理IO事件。

name可以给这个ChannelPipeline上的Handler定义一个名字,方便之后replace等操作,如果没有传入则会自动生成一个。

addLast首先给this加锁,来保证线程安全,因为其中的队列指针操作有很多步骤。

  1. 创建一个ChannelHandlerContext, ChannelHandlerContext可以理解为ChannelPipeline和ChannelHandler的交集或交叉点,ctx中能得到ChannelPipeline和ChannelHandler。
  2. 追加到队尾,tail之前
  3. 设置ChannelHandlerContext状态为已添加,同时触发ChannelHandler的handlerAdded事件

这里还有一个细节,就是Channel在执行addLast的时候可能还没有完成register,如果此时回调handlerAdded则会

导致顺序问题先发生了added再register。所以这里判断是否已经注册,如果没有则先放到一个队列中,等注册完成后再执行。

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        newCtx = newContext(group, filterName(name, handler), handler);
        addLast0(newCtx);
        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;

创建ChannelHandlerContext

private AbstractChannelHandlerContextnewContext(EventExecutorGroup group, String name, ChannelHandlerhandler){
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

final class DefaultChannelHandlerContextextends AbstractChannelHandlerContext{
    private final ChannelHandler handler;
    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }
    @Override
    public ChannelHandlerhandler(){
        return handler;
    }
    ...
}

链表操作,放到队尾

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

设置ChannelHandlerContext已经添加、回调ChannelHandler的handlerAdded

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx){
    try {
        ctx.handler().handlerAdded(ctx);
        ctx.setAddComplete();
    } catch (Throwable t) {
        ...
}

总结

ChannelPipeline是保存Channel上的ChannelHandler的组件,内部是双向链表结构,我们看到节点保存的是

ChannelHandlerContext,而ChannelHandlerContext又是通过ChannelPipeline和ChannelHandler构造出来的。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Algorithms + Data Structures = Programs

Algorithms + Data Structures = Programs

Niklaus Wirth / Prentice Hall / 1975-11-11 / GBP 84.95

It might seem completely dated with all its examples written in the now outmoded Pascal programming language (well, unless you are one of those Delphi zealot trying to resist to the Java/.NET dominanc......一起来看看 《Algorithms + Data Structures = Programs》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

SHA 加密
SHA 加密

SHA 加密工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具