内容简介:Netty中另外两个重要组件—— ChannelHandle,ChannelHandleContext,Pipeline。Netty中I/O事件的传播机制以及数据的过滤和写出均由它们负责。head与tail它们都会调用父类AbstractChannelHandlerContext构造器去完成初始化,由此我们可以预见ChanelPipeline里面存放的是一个个ChannelHandlerContext,根据DefaultChannelPipeline构造方法我们可以知道它们数据结构为双向链表,根据Abstr
Netty中另外两个重要组件—— ChannelHandle,ChannelHandleContext,Pipeline。Netty中I/O事件的传播机制以及数据的过滤和写出均由它们负责。
pipeline的初始化
步骤
- pipeline在创建Channel的时候被创建
- AbstractChannel中 pipeline = newChannelPipeline()-DefaultChannelPipeline
- pipeline节点数据结构:ChannelHandlerContext的双向链表
- pipeline中的两大哨兵:head和tail
分析
- pipeline在创建Channel的时候被创建
- 在服务端Channel和客户端Channel创建的时候,调用父类AbstractChannel初始化时候会对pipeline进行初始化。
// AbstractChannel(..) protected AbstractChannel(Channel parent) { ... // 创建pipeline pipeline = newChannelPipeline(); } // newChannelPipeline() protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); } // DefaultChannelPipeline(..) protected DefaultChannelPipeline(Channel channel) { ... tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } 复制代码
- 看下HeadContext和TailContext构造方法
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { ... HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); ... } final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); ... } abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { ... AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; ... } 复制代码
head与tail它们都会调用父类AbstractChannelHandlerContext构造器去完成初始化,由此我们可以预见ChanelPipeline里面存放的是一个个ChannelHandlerContext,根据DefaultChannelPipeline构造方法我们可以知道它们数据结构为双向链表,根据AbstractChannelHandlerContext构造方法,我们可以发现head指定的为出栈处理,而tail指定的为入栈处理器。
- pipeline中的两大哨兵:head和tail
pipeline里面的事件传播机制我们接下来验证,但是我们可以推测出入栈从head开始传播,因为它是出栈处理器,所以它只管往下传播不做任何处理,一直到tail会结束。出栈从tail开始传播,因为他是入栈处理器,所以它只管往下传播事件即可,也不做任何处理。这么看来对于入栈,从head开始到tail结束;对于出栈恰恰相反,从tail开始到head结束。
添加删除ChannelHandler
步骤
- 添加ChannelHandler流程
- 判断是否重复添加
- filterName(..)
- 创建节点并添加至链表
- 回调添加完成事件
- callHandlerAdded0(..)
- 判断是否重复添加
- 删除ChannelHandler流程
- 找到节点
- 链表的删除
- 回调删除handler事件
分析
- 判断是否重复添加
// filterName(..) private String filterName(String name, ChannelHandler handler) { if (name == null) { return generateName(handler); } checkDuplicateName(name); return name; } // 判断重名 private void checkDuplicateName(String name) { if (context0(name) != null) { throw new IllegalArgumentException("Duplicate handler name: " + name); } } // 找有没有同名的context private AbstractChannelHandlerContext context0(String name) { AbstractChannelHandlerContext context = head.next; while (context != tail) { if (context.name().equals(name)) { return context; } context = context.next; } return null; } 复制代码
- 创建节点并添加至链表
// 插入到链表中tail节点的前面。 private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; } 复制代码
- 顺着callHandlerAdded0(..)方法一直跟到AbstractChannelHandlerContext的callHandlerAdded(..)
final void callHandlerAdded() throws Exception { ... if (setAddComplete()) { // 调用具体handler的handlerAdded方法 handler().handlerAdded(this); } } 复制代码
- 删除ChannelHandler流程
- 找到节点
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) { AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler); if (ctx == null) { throw new NoSuchElementException(handler.getClass().getName()); } else { return ctx; } } // 相同堆内地址即为找到 public final ChannelHandlerContext context(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } AbstractChannelHandlerContext ctx = head.next; for (;;) { if (ctx == null) { return null; } if (ctx.handler() == handler) { return ctx; } ctx = ctx.next; } } 复制代码
- 链表的删除
private static void remove0(AbstractChannelHandlerContext ctx) { AbstractChannelHandlerContext prev = ctx.prev; AbstractChannelHandlerContext next = ctx.next; prev.next = next; next.prev = prev; } 复制代码
- 回调handler remove方法
final void callHandlerRemoved() throws Exception { try { // Only call handlerRemoved(...) if we called handlerAdded(...) before. if (handlerState == ADD_COMPLETE) { handler().handlerRemoved(this); } } finally { // Mark the handler as removed in any case. setRemoved(); } } 复制代码
事件和异常的传播
步骤
- inBound事件的传播
- ChannelRead事件的传播
- SimpleInBoundHandler处理器
- outBound事件的传播
- write事件的传播
- 异常的传播
- 异常触发链
分析
- inBound事件的传播分析
// 省略代码 ... serverBootstrap ... .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new Inbound1()) .addLast(new InBound2()) .addLast(new Inbound3()); } }); ... public class Inbound1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("at InBound1: " + msg); ctx.fireChannelRead(msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().pipeline().fireChannelRead("hello cj"); } } public class Inbound2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("at InBound2: " + msg); ctx.fireChannelRead(msg); } } public class Inbound3 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("at InBound3: " + msg); ctx.fireChannelRead(msg); } } 复制代码
- 我们来画一下现在客户端接入做入栈处理时,客户端Channel的pipeline中的情况:
从head开始一直向下一个inboud传播直到tail结束,也可以看到ChannelHandlerContext起到的正是中间纽带的作用, 它能拿到handle也可以向上获取到channel与pipeline,一个channel只会有一个pipeline,一个pipeline可以有多个入栈handler和出栈handler,而且每个handler都会被ChannelHandlerContext包裹着。事件传播依赖的ChannelHandlerContext的fire*方法。
- 我们来运行下代码验证下: telnet创建一个客户端连接
- 控制台打印
按照我们上边说的那样 InBoud1 -> InBound2 -> InBoud3
- outBound事件的传播分析
public class Outbound1 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println("oubound1 write:" + msg); ctx.write(msg, promise); } } public class Outbound2 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println("oubound2 write:" + msg); ctx.write(msg, promise); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { ctx.executor().schedule(()-> { ctx.channel().pipeline().write("hello cj..."); }, 5, TimeUnit.SECONDS); } } public class Outbound3 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println("oubound3 write:" + msg); ctx.write(msg, promise); } } 复制代码
- 我们来画一下现在向客户端写出时间做出栈处理时,客户端Channel的pipeline中的情况:
与入栈事件传递顺序是完全相反的,也就是从链表尾部开始。
- 我们验证下结果
- 异常的传播
public class Inbound1 extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("Inbound1..."); super.exceptionCaught(ctx, cause); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { throw new RuntimeException("cj test throw caught..."); } } public class Inbound3 extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("Inbound2..."); super.exceptionCaught(ctx, cause); } } public class Outbound1 extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("Outbound1..."); super.exceptionCaught(ctx, cause); } } public class Outbound2 extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("Outbound2..."); super.exceptionCaught(ctx, cause); } } public class Outbound3 extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("Outbound3..."); super.exceptionCaught(ctx, cause); } } 复制代码
异常的传播过程是从head一直遍历到tail结束,并在tail中将其打印出来。
- 直接验证下结果
- 对于ctx.write()和ctx.pipeline().write()有和不同
ctx.write("hello cj..."); ctx.pipeline().write("hello cj..."); 复制代码
ctx.write(..) 我们按照上面的内容是可以想到的,ctx.write其实是直接激活当前节点的下一个节点write,所以它不会从尾部开始向前遍历所有的outbound,而ctx.pipeline().write(..)我们看源码可以知道,它先调用pipeline的write方法,跟踪源码(下图)可以发现,他是从tail开始遍历的,所有的outboud会依次被执行。同理inbound也是如此
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python语言程序设计
[美]梁勇(Lang Y. D.) / 李娜 / 机械工业出版社 / 2015-4 / 79.00元
本书采用“问题驱动”、“基础先行”和“实例和实践相结合”的方式,讲述如何使用Python语言进行程序设计。本书首先介绍Python程序设计的基本概念,接着介绍面向对象程序设计方法,最后介绍算法与数据结构方面的内容。为了帮助学生更好地掌握相关知识,本书每章都包括以下模块:学习目标,引言,关键点,检查点,问题,本章总结,测试题,编程题,注意、提示和警告。 本书可以作为高等院校计算机及相关专业Py......一起来看看 《Python语言程序设计》 这本书的介绍吧!
MD5 加密
MD5 加密工具
XML、JSON 在线转换
在线XML、JSON转换工具