探究netty的观察者设计模式

栏目: IT技术 · 发布时间: 6年前 · 78

观察者的核心思想就是,在适当的时机回调观察者的指定动作函数

我们知道,在使用netty创建channel时,一般都是把这个channel设置成非阻塞的模式,这意味着什么呢? 意味着所有io操作一经调用,即刻返回

这让netty对io的吞吐量有了飞跃性的提升,但是异步编程相对于传统的串行化的编程模式来说,控制起来可太麻烦了

jdk提供了原生的Futrue接口,意为在未来任务,其实就是把任务封装起来交给新的线程执行,在这个线程执行任务的期间,我们的主线程可以腾出时间去做别的事情

下面的netty给出的实例代码,我们可以看到,任务线程有返回一个Futrue对象,这个对象中封装着任务执行的情况

Copy *  *   void showSearch(final String target)
 *  *       throws InterruptedException {
 *  *     Future future
 *  *       = executor.submit(new Callable() {
 *  *         public String call() {
 *  *             return searcher.search(target);
 *  *         }});
 *  *     displayOtherThings(); // do other things while searching
 *  *     try {
 *  *       displayText(future.get()); // use future
 *  *     } catch (ExecutionException ex) { cleanup(); return; }
 *  *   }
 *

虽然jdk原生Futrue可以实现异步提交任务,并且返回了任务执行信息的Futrue,但是有一个致命的缺点,从futrue获取任务执行情况方法,是阻塞的,这是不被允许的,因为在netty中,一条channel可能关系着上千的客户端的链接,其中一个客户端的阻塞导致几千的客户端不可用是不被允许的,netty的Future设计成,继承jdk原生的future,而且进行扩展如下

Copy// todo 这个接口继承了  java 并发包总的Futrue  , 并在其基础上增加了很多方法
// todo  Future 表示对未来任务的封装
public interface Future extends java.util.concurrent.Future {

   // todo 判断IO是否成功返回
   boolean isSuccess();

   // todo 判断是否是 cancel()方法取消
   boolean isCancellable();

   // todo 返回IO 操作失败的原因
   Throwable cause();

   /**
    *  todo 使用了观察者设计模式, 给这个future添加监听器, 一旦Future 完成, listenner 立即被通知
    */
   Future addListener(GenericFutureListener> listener);

   // todo 添加多个listenner
   Future addListeners(GenericFutureListener>... listeners);

   Future removeListener(GenericFutureListener> listener);

   // todo 移除多个 listenner
   Future removeListeners(GenericFutureListener>... listeners);

   // todo  sync(同步) 等待着 future 的完成, 并且,一旦future失败了,就会抛出 future 失败的原因
   // todo bind()是个异步操作,我们需要同步等待他执行成功
   Future sync() throws InterruptedException;

   // todo  不会被中断的 sync等待
   Future syncUninterruptibly();

   // todo 等待
   Future await() throws InterruptedException;

   Future awaitUninterruptibly();

   // todo 无阻塞的返回Future对象, 如果没有,返回null
   // todo  有时 future成功执行后返回值为null, 这是null就是成功的标识, 如 Runable就没有返回值, 因此文档建议还要 通过isDone() 判断一下真的完成了吗

   V getNow();

    @Override
   boolean cancel(boolean mayInterruptIfRunning);
   ...

netty的观察者模式

最常用的关于异步执行的方法writeAndFlush()就是典型的观察者的实现, 在netty中,当一个IO操作刚开始的时候,一个ChannelFutrue对象就会创建出来,此时,这个futrue对象既不是成功的,也不是失败的,更不是被取消的,因为这个IO操作还没有结束

如果我们想在IO操作结束后立刻执行其他的操作时,netty推荐我们使用addListenner()添加监听者的方法而不是使用await()阻塞式等待,使用监听者,我们就不用关系具体什么时候IO操作结束,只需要提供回调方法就可以,当IO操作结束后,方法会自动被回调

在netty中,一个IO操作是状态分为如下几种

Copy *                                      +---------------------------+
 *                                      | Completed successfully    |
 *                                      +---------------------------+
 *                                 +---->      isDone() = true      |
 * +--------- -----------------+    |    |   isSuccess() = true      |
 * |        Uncompleted       |    |    +===========================+
 * +--------------------------+    |    | Completed with failure    |
 * |      isDone() = false    |    |    +---------------------------+
 * |   isSuccess() = false    |----+---->      isDone() = true      |
 * | isCancelled() = false    |    |    |   cause() = non-null  非空|
 * |       cause() = null     |    |    +===========================+
 * +--------------------------+    |    | Completed by cancellation |
 *                                 |    +---------------------------+
 *                                 +---->      isDone() = true      |
 *                                      | isCancelled() = true      |
 *                                      +---------------------------+

源码追踪

对writeAndFlush的使用

CopyChannelFuture channelFuture = ctx.writeAndFlush("from client : " + UUID.randomUUID());
channelFuture.addListener(future->{
    if(future.isSuccess()){
        todo
    }else{
        todo
    }
});

注意点: 我们使用writeAndFlush() 程序立即返回,随后我们使用返回的对象添加监听者,添加回调,这个时writeAndFlush()有可能已经完成了,也有可能没有完成,这是不确定的事

首先我们知道,writeAndFlush()是出站的动作,属于channelOutboundHandler,而且他是从pipeline的尾部开始传播的,源码如下:

Copy@Override
public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

尾节点数据AbstractChannelHandlerContext类, 继续跟进查看源码如下:

Copy@Override
public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

    @Override
    public ChannelPromise newPromise() {
        return new DefaultChannelPromise(channel(), executor());
    }

悄无声息的做了一个很重要的事情,创建了Promise,这个DefaultChannelPromise就是被观察者,过一会由它完成方法的回调

继续跟进writeAndFlush() ,源码如下, 我们可以看到promise被返回了, DefaultChannelPromiseChannelPromise的实现类,而ChannelPromise又继承了ChannelFuture,这也是为什么明明每次使用writeAndFlush()返回的都是ChannelFuture而我们这里却返回了DafaultChannelPromise

Copy// todo 调用本类的 write(msg, true, promise)
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }
    if (isNotValidPromise(promise, true)) {
        ReferenceCountUtil.release(msg);
        return promise;
    }
    write(msg, true, promise);
    return promise;

在去目标地之前,先看一下addListenner()干了什么,我们进入到DefaultChannelPromise 源码如下:

Copy@Override
public ChannelPromise addListener(GenericFutureListener> listener) {
    super.addListener(listener);
    return this;
}

随机进入它的父类 DefaultChannelPromise中

@Override
public Promise addListener(GenericFutureListener> listener) {
    checkNotNull(listener, "listener");
    synchronized (this) {
        addListener0(listener);
    }
    if (isDone()) {
        notifyListeners();
    }
    return this;
}

这个函数分两步进行

第一步: 为什么添加监听事件的方法需要同步?

在这种多线程并发执行的情况下,这个addListener0(listener);任意一个线程都能使用,存在同步添加的情况 这个动作不像将channel和EventLoop做的唯一绑定一样,没有任何必须使用inEventloop()去判断在哪个线程中,直接使用同步

接着进入addListener0(listener)

Copyprivate void addListener0(GenericFutureListener> listener) {
    if (listeners == null) {
        listeners = listener; // todo 第一次添加直接在这里赋值
    } else if (listeners instanceof DefaultFutureListeners) {
         // todo 第三次添加调用这里
        ((DefaultFutureListeners) listeners).add(listener);
    } else {
        // todo 第二次添加来这里复制, 由这个 DefaultFutureListeners 存放观察者 
        listeners = new DefaultFutureListeners((GenericFutureListener) listeners, listener);
    }
}

第二步: 为什么接着判断isDone()

writeAndFlush()是异步执行的,而且在我们添加监听者的操作之前已经开始执行了,所以在添加完监听者之后,立即验证一把,有没有成功

思考一波:#

回顾writeAndFlush()的调用顺序,从tail开始传播两波事件,第一波write,紧接着第二波flush,一直传播到header,进入unsafe类中,由他完成把据写入jdk原生ByteBuffer的操作, 所以按理说,我们添加是listenner的回调就是在header的unsafe中完成的,这是我们的目标地

任何方法的回调都是提前设计好了的,就像pipeline中的handler中的方法的回调,就是通过遍历pipeline内部的链表实现的,这里的通知观察者,其实也是调用观察者的方法,而且他使用的一定是观察的父类及以上的引用实现的方法回调

回到我们的writeAndFlush()这个方法,在第二波事务传递完成,将数据真正写入jdk原生的ByteBuffer之前,只有进行的所有回调都是设置失败的状态,直到把数据安全发出后才可能是 回调成功的操作

此外,想要进行回调的操作,就得有被观察的对象的引用,所以一会我就回看到,Promise 一路被传递下去

我们进入的unsafe的write()就可以看到与回调相关的操作safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);,源码如下

Copy@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) { // todo 缓存 写进来的 buffer

    safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);

    ReferenceCountUtil.release(msg);
    return;
}

我们继续跟进本类方法safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);, 源码如下:

Copyprotected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
    if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
        logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
    }
}

其中重要的方法,就是回调 被观察者的 tryFailure(cause), 这个被观察者的类型是ChannelPromise, 我们去看它的实现,源码如下

Copy@Override
public boolean tryFailure(Throwable cause) {
    if (setFailure0(cause)) {
        notifyListeners();
        return true;
    }
    return false;
}

调用本类方法notifyListeners()

继续跟进本类方法notifyListenersNow();

接着跟进本类方法notifyListener0(this, (GenericFutureListener) listeners);

继续l.operationComplete(future); 终于看到了调用了监听者的完成操作,实际上就是回调用户的方法,虽然是完成的,但是失败了


下面我们去flush()中去查看通知成功的回调过程, 方法的调用顺序如下

Copyflush();
flush0();
doWrite(outboundBuffer);

在doWrite()方法中,就会使用自旋的方式往尝试把数据写出去, 数据被写出去后,有一个标识 done=true, 证明是成功写出了, 紧接着就是把当前的盛放ByteBuf的entry从链表上移除,源码出下

Copyif (done) {
    // todo 跟进去
    in.remove();
} else {

我们继续跟进remove(), 终于我们找到了成功回调的标志,在remove()的底端safeSuccess(promise);, 下一步就是用回调用户添加的监听者操作完成了,并且完成的状态是Success成功的

Copypublic boolean remove() {
// todo 获取当前的 Entry
Entry e = flushedEntry;
if (e == null) {
    clearNioBuffers();
    return false;
}
Object msg = e.msg;

ChannelPromise promise = e.promise;
int size = e.pendingSize;

// todo 将当前的Entry进行移除
removeEntry(e);

if (!e.cancelled) {
    // only release message, notify and decrement if it was not canceled before.
    ReferenceCountUtil.safeRelease(msg);
    safeSuccess(promise);
    decrementPendingOutboundBytes(size, false, true);
}
猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Data Mining

Data Mining

Jiawei Han、Micheline Kamber、Jian Pei / Morgan Kaufmann / 2011-7-6 / USD 74.95

The increasing volume of data in modern business and science calls for more complex and sophisticated tools. Although advances in data mining technology have made extensive data collection much easier......一起来看看 《Data Mining》 这本书的介绍吧!

SHA 加密

SHA 加密

SHA 加密工具

XML、JSON 在线转换

XML、JSON 在线转换

在线XML、JSON转换工具

UNIX 时间戳转换

UNIX 时间戳转换

UNIX 时间戳转换