前面 ,我们分析了Netty Pipeline的初始化及节点添加与删除逻辑。接下来,我们将来分析Pipeline的事件传播机制。
我们通过下面这个例子来演示Netty Pipeline的事件传播机制。
public class NettyPipelineInboundExample { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(1); ServerBootstrap strap = new ServerBootstrap(); strap.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(8888)) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new InboundHandlerA()); ch.pipeline().addLast(new InboundHandlerB()); ch.pipeline().addLast(new InboundHandlerC()); } }); try { ChannelFuture future = strap.bind().sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } class InboundHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandler A : " + msg); // 传播read事件到下一个channelhandler ctx.fireChannelRead(msg); } } class InboundHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandler B : " + msg); // 传播read事件到下一个channelhandler ctx.fireChannelRead(msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // channel激活,触发channelRead事件,从pipeline的heandContext节点开始往下传播 ctx.channel().pipeline().fireChannelRead("Hello world"); } } class InboundHandlerC extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandler C : " + msg); // 传播read事件到下一个channelhandler ctx.fireChannelRead(msg); } } 复制代码
通过 telnet 来连接上面启动好的netty服务,触发channel active事件:
$ telnet 8888 复制代码
InboundHandler A : Hello world InboundHandler B : Hello world InboundHandler C : Hello world 复制代码
... ch.pipeline().addLast(new InboundHandlerB()); ch.pipeline().addLast(new InboundHandlerA()); ch.pipeline().addLast(new InboundHandlerC()); ... 复制代码
InboundHandler B : Hello world InboundHandler A : Hello world InboundHandler C : Hello world 复制代码
触发channel read事件,从下面的入口开始调用
public class DefaultChannelPipeline implements ChannelPipeline { ... // 出发channel read事件 @Override public final ChannelPipeline fireChannelRead(Object msg) { // 从head节点开始往下传播read事件 AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } ... } 复制代码
调用 AbstractChannelHandlerContext 中的 invokeChannelRead(head, msg)
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { ... // 调用channel read static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { // 获取消息 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); // 获取 EventExecutor EventExecutor executor = next.executor(); // true if (executor.inEventLoop()) { // 调用下面的invokeChannelRead接口:invokeChannelRead(Object msg) next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { // handler():获取当前遍历到的channelHandler,第一个为HeandContext,最后为TailContext // 调用channel handler的channelRead接口 ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } ... @Override public ChannelHandlerContext fireChannelRead(final Object msg) { // 调回到上面的 invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) invokeChannelRead(findContextInbound(), msg); return this; } ... // 遍历出下一个ChannelHandler private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { //获取下一个inbound类型的节点 ctx = ctx.next; // 必须为inbound类型 } while (!ctx.inbound); return ctx; } ... } 复制代码
final class HeadContext extends AbstractChannelHandlerContext ... @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // HeadContext往下传播channelRead事件, // 调用HeandlerContext中的接口:fireChannelRead(final Object msg) ctx.fireChannelRead(msg); } ... } 复制代码
就这样一直循环下去,依次会调用到 InboundHandlerA、InboundHandlerB、InboundHandlerC 中的 channelRead(ChannelHandlerContext ctx, Object msg)
public class DefaultChannelPipeline implements ChannelPipeline { final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { ... @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 调用onUnhandledInboundMessage接口 onUnhandledInboundMessage(msg); } ... } ... // 对未处理inbound消息做最后的处理 protected void onUnhandledInboundMessage(Object msg) { try { logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg); } finally { // 对msg对象的引用数减1,当msg对象的引用数为0时,释放该对象的内存 ReferenceCountUtil.release(msg); } } ... } 复制代码
我们还可以继承 SimpleChannelInboundHandler 来自定义ChannelHandler,它的channelRead方法,对消息对象做了msg处理,防止内存泄露。
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter { ... @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { // 对msg对象的引用数减1,当msg对象的引用数为0时,释放该对象的内存 ReferenceCountUtil.release(msg); } } } ... } 复制代码
public class NettyPipelineOutboundExample { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(1); ServerBootstrap strap = new ServerBootstrap(); strap.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(8888)) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new OutboundHandlerA()); ch.pipeline().addLast(new OutboundHandlerB()); ch.pipeline().addLast(new OutboundHandlerC()); } }); try { ChannelFuture future = strap.bind().sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } class OutboundHandlerA extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 输出消息 System.out.println("OutboundHandlerA: " + msg); // 传播write事件到下一个节点 ctx.write(msg, promise); } } class OutboundHandlerB extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 输出消息 System.out.println("OutboundHandlerB: " + msg); // 传播write事件到下一个节点 ctx.write(msg, promise); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 待handlerAdded事件触发3s后,模拟触发一个 ctx.executor().schedule(() -> { // ctx.write("Hello world ! "); ctx.channel().write("Hello world ! "); }, 3, TimeUnit.SECONDS); } } class OutboundHandlerC extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 输出消息 System.out.println("OutboundHandlerC: " + msg); // 传播write事件到下一个节点 ctx.write(msg, promise); } } 复制代码
通过 telnet 来连接上面启动好的netty服务,触发channel added事件:
$ telnet 8888 复制代码
OutboundHandlerC: Hello world ! OutboundHandlerB: Hello world ! OutboundHandlerA: Hello world ! 复制代码
... ch.pipeline().addLast(new InboundHandlerB()); ch.pipeline().addLast(new InboundHandlerA()); ch.pipeline().addLast(new InboundHandlerC()); ... 复制代码
OutboundHandlerC: Hello world ! OutboundHandlerA: Hello world ! OutboundHandlerB: Hello world ! 复制代码
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { ... @Override public ChannelFuture write(Object msg) { // 调用pipeline往下传播wirte事件 return pipeline.write(msg); } ... } 复制代码
public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelFuture write(Object msg) { // 从tail节点开始传播 return tail.write(msg); } ... } 复制代码
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { ... @Override public ChannelFuture write(Object msg) { // 往下调用write接口 return write(msg, newPromise()); } @Override public ChannelFuture write(final Object msg, final ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } try { if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } // 往下调用write接口 write(msg, false, promise); return promise; } ... private void write(Object msg, boolean flush, ChannelPromise promise) { // 寻找下一个outbound类型的channelHandlerContext AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { // 调用接口 invokeWrite(Object msg, ChannelPromise promise) next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } } // 寻找下一个outbound类型的channelHandlerContext private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; } private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { // 继续往下调用 invokeWrite0(msg, promise); } else { write(msg, promise); } } private void invokeWrite0(Object msg, ChannelPromise promise) { try { // 获取当前的channelHandler,调用其write接口 // handler()依次会返回 OutboundHandlerC OutboundHandlerB OutboundHandlerA ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } ... } 复制代码
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 调用unsafe进行写数据操作 unsafe.write(msg, promise); } 复制代码
public class NettyPipelineExceptionCaughtExample { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(1); ServerBootstrap strap = new ServerBootstrap(); strap.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(8888)) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new InboundHandlerA()); ch.pipeline().addLast(new InboundHandlerB()); ch.pipeline().addLast(new InboundHandlerC()); ch.pipeline().addLast(new OutboundHandlerA()); ch.pipeline().addLast(new OutboundHandlerB()); ch.pipeline().addLast(new OutboundHandlerC()); } }); try { ChannelFuture future = strap.bind().sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } static class InboundHandlerA extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("InboundHandlerA.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class InboundHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { throw new Exception("ERROR !!!"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("InboundHandlerB.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class InboundHandlerC extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("InboundHandlerC.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class OutboundHandlerA extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("OutboundHandlerA.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class OutboundHandlerB extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("OutboundHandlerB.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class OutboundHandlerC extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("OutboundHandlerC.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } } 复制代码
通过 telnet 来连接上面启动好的netty服务,并在控制台发送任意字符:
$ telnet 8888 复制代码
触发channel read事件并抛出异常,控制台输出如下信息:
InboundHandlerB.exceptionCaught:ERROR !!! InboundHandlerC.exceptionCaught:ERROR !!! OutboundHandlerA.exceptionCaught:ERROR !!! OutboundHandlerB.exceptionCaught:ERROR !!! OutboundHandlerC.exceptionCaught:ERROR !!! 复制代码
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { ... @Override public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { //调用invokeExceptionCaught接口 invokeExceptionCaught(next, cause); return this; } static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) { ObjectUtil.checkNotNull(cause, "cause"); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // 调用下一个节点的invokeExceptionCaught接口 next.invokeExceptionCaught(cause); } else { try { executor.execute(new Runnable() { @Override public void run() { next.invokeExceptionCaught(cause); } }); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to submit an exceptionCaught() event.", t); logger.warn("The exceptionCaught() event that was failed to submit was:", cause); } } } } ... private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { // 抛出异常 ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { // 异常捕获,往下传播 notifyHandlerException(t); } } else { fireChannelRead(msg); } } // 通知Handler发生异常事件 private void notifyHandlerException(Throwable cause) { if (inExceptionCaught(cause)) { if (logger.isWarnEnabled()) { logger.warn( "An exception was thrown by a user handler " + "while handling an exceptionCaught event", cause); } return; } // 往下调用invokeExceptionCaught接口 invokeExceptionCaught(cause); } private void invokeExceptionCaught(final Throwable cause) { if (invokeHandler()) { try { // 调用当前ChannelHandler的exceptionCaught接口 // 在我们的案例中,依次会调用InboundHandlerB、InboundHandlerC、 // OutboundHandlerA、OutboundHandlerB、OutboundHandlC handler().exceptionCaught(this, cause); } catch (Throwable error) { if (logger.isDebugEnabled()) { logger.debug( "An exception {}" + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:", ThrowableUtil.stackTraceToString(error), cause); } else if (logger.isWarnEnabled()) { logger.warn( "An exception '{}' [enable DEBUG level for full stacktrace] " + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:", error, cause); } } } else { fireExceptionCaught(cause); } } ... } 复制代码
public class DefaultChannelPipeline implements ChannelPipeline { ... final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { ... @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { onUnhandledInboundException(cause); } ... protected void onUnhandledInboundException(Throwable cause) { try { logger.warn( "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + "It usually means the last handler in the pipeline did not handle the exception.", cause); } finally { ReferenceCountUtil.release(cause); } } } ... } 复制代码
... class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof Exception) { // TODO System.out.println("Successfully caught exception ! "); } else { // TODO } } } ... ch.pipeline().addLast(new ExceptionCaughtHandler()); ... 复制代码
InboundHandlerB.exceptionCaught:ERROR !!! InboundHandlerC.exceptionCaught:ERROR !!! OutboundHandlerA.exceptionCaught:ERROR !!! OutboundHandlerB.exceptionCaught:ERROR !!! OutboundHandlerC.exceptionCaught:ERROR !!! Successfully caught exception ! // 成功捕获日志 复制代码
