内容简介:阅读和理解本文需要以下技术储备:深入理解现代计算机CPU计算架构,精通至少一门常用编程语言,至少5年web项目经验。好吧,这是我编的,如果这三条你都满足,对不起打扰了,关闭按钮应该在左上角。为什么会突然想到跟你们聊聊Pipeline呢?有一次跟同事一起去门口的拉面馆拉面吃,漫长的等待中,我发现这儿的后厨师傅只有一个人,一个人负责和面拉面、煮熟、加汤、加牛肉、加萝卜、加香菜。。。因为等的时间太长了,所以记得这么清楚[手动捂脸]。我们等在外面就像这样:
阅读和理解本文需要以下技术储备:深入理解现代计算机CPU计算架构,精通至少一门常用编程语言,至少5年web项目经验。好吧,这是我编的,如果这三条你都满足,对不起打扰了,关闭按钮应该在左上角。
为什么会突然想到跟你们聊聊Pipeline呢?有一次跟同事一起去门口的拉面馆拉面吃,漫长的等待中,我发现这儿的后厨师傅只有一个人,一个人负责和面拉面、煮熟、加汤、加牛肉、加萝卜、加香菜。。。因为等的时间太长了,所以记得这么清楚[手动捂脸]。我们等在外面就像这样:
作为一枚程序猿脑子里就蹦出来Pipeline,不要把逻辑都耦合在一起啊,代码又多又难看。我们进去不到10分钟,排队的人已经排到门外面去了,假如把和面拉面交给一个人;煮面交给一个人;加牛肉、加萝卜、加香菜交给另外一个人,老板啊你一天少说多卖100碗。
什么是Pipeline
Pipeline的工作其实就是将一件需要重复做的事情切割成各个不同的阶段,每一个阶段由独立的单元负责,所有待执行的对象依次进入作业队列。业界用到Pipeline的地方很多,例如CPU的计算单元里,linux管道命令里,netty框架里等等,今天我就以netty框架为依托,从为什么要使用Pipeline,netty里的读写事件是如何依托Pipeline传播的两点跟大家聊聊。
为什么要使用Pipeline
netty的使用场景里一个最常见的就是IM系统的开发,而一个请求从客户端发起之后,大概会经历一下几个步骤:
在“一系列逻辑”部分,最先让人想到的处理方式就是根据某个标识,分辨消息类型,用if/else来进行处理,为什么第一反应是if/else呢?这跟我们的思维方式有很大关系,人第一反应想到的都是先干什么,然后再干什么,这样的处理方式在逻辑很简单的情况下既清晰又快捷,可是如果这个过程包括很多复杂的逻辑(加牛肉、加萝卜、加香菜),势必会导致编写逻辑的类越来越臃肿,直到有一天你自己看自己写的代码都头皮发麻。Pipeline的作用就是组织不同处理器,在netty里就相当于它的大动脉,一个请求在生命周期内都是在Pipeline上的不同位置扭转的,这样既保证了逻辑的清晰,又使代码变得优雅。
读写事件在Pipeline中是如何传播的
在聊事件传播在之前先说下Pipeline的结构,Pipeline的结构是双向链表,每个节点上是包装了具体逻辑处理器channelHandler的channelHandlerContext对象,绕死我了,参考下图:
Pipeline会在创建对应的channel时创建,同时会初始化Pipeline的head和tail两个节点,而我们编写的业务逻辑处理器,调用Pipeline的addLast方法会被添加到head节点和tail节点之间。netty的事件从类型上可以分为inbound和outbound事件,我们在编写具体逻辑处理器时也会继承ChannelInboundHandlerAdapter或者ChannelOutboundHandlerAdapter,也就标识了事件处理的方向。在Pipeline的源码里,有个很贴心的图,贴出来给大家回忆一下:
先说结论:inbound事件的传播顺序是和添加顺序正相关,比如添加顺序是A—>B—>C,则netty在处理时的顺序也是A—>B—>C;而outbound事件的传播顺序是和添加顺序逆相关,比如添加顺序是A—>B—>C,则处理顺序是C—>B—>A;下面重点以inbound事件为例讲解。
从ChannelInboundHandlerAdapter开始:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); }
读事件的传播会调用ctx.fireChannelRead(msg)方法,该方法回调到AbstractChannelHandlerContext
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
我们再看看findContextInbound()方法
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
恍然大悟有没有,我们说过Pipeline是双向链表结构,在处理完当前handler后,针对读事件,会去从当前节点开始往后循环,找到下一个inbound事件,执行具体读事件后继续调用fireChannelRead,就是一个递归的过程;
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) {//事件在netty线程组里 next.invokeChannelRead(m); } else { ... } private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg);//此处会调用到添加的inboundhandler } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
那netty又是怎么判断事件是inbound还是outbound的呢?
我们上面提过,Pipeline的每个节点上是一个channelHandlerContext对象,具体业务逻辑处理器在放入Pipeline之前,会被封装成channelHandlerContext对象,到DefaultChannelPipeline里:
@Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { … newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); … }
关键代码就是这个newContext操作,可以看到这里传入的handler被封装成一个channelHandlerContext对象,然后再执行addLast0操作,继续往newContext方法里看,来到DefaultChannelHandlerContext里:
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; }
我们在DefaultChannelHandlerContext的构造器里,看到一个isInbound方法,对,没错,就是这里判断的添加的handler是入还是出,逻辑很简单,一个instanceof关键字搞定。
private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; }
还有个点需要和小伙伴们强调一下,在具体的逻辑处理器中,我们可以有两种主动发起事件传播的方式:
ctx.pipeline().fireChannelRead(msg); ctx.fireChannelRead(msg);
第一种是从头节点开始往后开始查找,而第二种则是从调用方法的当前节点开始查找紧邻的inbound事件,两种方式的区别不言而喻,那如何选择呢?netty这么优秀,怎么会让你自己负责事件传播这种和业务无关的事情呢?实际开发中只需要将自定义handler继承SimpleChannelInboundHandler类,实现channelRead0方法即可,上面说的buf释放,事件传播,在父类里都给帮你干了。
没看过瘾?写长了您也没时间看啊。留个家庭作业,各位看官自己研究下netty里写事件是怎么传播的,异常事件又是怎么传播的吧?提个醒:我们是聊Pipeline的,不要身陷源码细节~
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Distributed Systems
Sukumar Ghosh / Chapman and Hall/CRC / 2014-7-14 / USD 119.95
Distributed Systems: An Algorithmic Approach, Second Edition provides a balanced and straightforward treatment of the underlying theory and practical applications of distributed computing. As in the p......一起来看看 《Distributed Systems》 这本书的介绍吧!