Netty源码分析-7-MessageToByteEncoder

栏目: 后端 · 发布时间: 7年前

内容简介:下面的StringToByteEncoder中,会将String对象转换成ByteBuf,传递给后面的ChannelOutboundHandler。使用MessageToByteEncoder,只需要实现encode这个方法即可,即通过ChannleHandlerContext和接收到的对应类型的对象,写入到一个ByteBuf中MessageToByteEncoder同样是过滤出某一种类型的对象,所以也使用了TypeParameterMatcher,其中还有一个参数可以控制是否优先使用DirectMemo

前面我们分析了ByteToMessageDecoder解码器,对应于解码需要有相关的编码器,即从 Java 对象转换成二进制数据的组件,这就是MessageToByteEncoder。

Netty源码分析-7-MessageToByteEncoder

基本使用

下面的StringToByteEncoder中,会将String对象转换成ByteBuf,传递给后面的ChannelOutboundHandler。使用MessageToByteEncoder,只需要实现encode这个方法即可,即通过ChannleHandlerContext和接收到的对应类型的对象,写入到一个ByteBuf中

class StringToByteEncoderextends MessageToByteEncoder<String>{

        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {
            byteBuf.writeBytes(s.getBytes());
        }
}
值得注意的是MessageToByteEncoder 的I可以是ByteBuf,即可以从一种格式的二进制转换为另一种格式的二进制。下面的LengthFieldPrepender是LengthFieldBasedFrameDecoder解码器对应的编码器,负责将已经编码好的ByteBuf按照消息长度在前面追加length字段。虽然继承于MessageToMessageEncoder 但其实也是MessageToByteEncoder

,因为它的接收和输出都是ByteBuf类型。

public class LengthFieldPrependerextends MessageToMessageEncoder<ByteBuf>{

    private final ByteOrder byteOrder;
    private final int lengthFieldLength;
    private final boolean lengthIncludesLengthFieldLength;
    private final int lengthAdjustment;

    public LengthFieldPrepender(
ByteOrder byteOrder,int lengthFieldLength,
int lengthAdjustment, boolean lengthIncludesLengthFieldLength){
        // 只支持1,2,3,4,8字节长的length字段编码
        if (lengthFieldLength != 1 && lengthFieldLength != 2 &&
            lengthFieldLength != 3 && lengthFieldLength != 4 &&
            lengthFieldLength != 8) {
            throw new IllegalArgumentException(
                    "lengthFieldLength must be either 1, 2, 3, 4, or 8: " +
                    lengthFieldLength);
        }
        ObjectUtil.checkNotNull(byteOrder, "byteOrder");
        this.byteOrder = byteOrder;
        this.lengthFieldLength = lengthFieldLength;
        this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength;
        this.lengthAdjustment = lengthAdjustment;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)throws Exception {
        int length = msg.readableBytes() + lengthAdjustment;
        if (lengthIncludesLengthFieldLength) {
            length += lengthFieldLength;
        }

        if (length < 0) {
            throw new IllegalArgumentException(
                    "Adjusted frame length (" + length + ") is less than zero");
        }

        switch (lengthFieldLength) {
        case 1:
            if (length >= 256) {
                throw new IllegalArgumentException(
                        "length does not fit into a byte: " + length);
            }
            out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length));
            break;
        ... 中间省略
        case 8:
            out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length));
            break;
        default:
            throw new Error("should not reach here");
        }
        out.add(msg.retain());
    }
}

实现分析

MessageToByteEncoder同样是过滤出某一种类型的对象,所以也使用了TypeParameterMatcher,其中还有一个参数可以控制是否优先使用DirectMemory堆外直接内存,默认是true使用,如果当前classpath中有 sun.misc.Unsafe 类则使用其获取DirectMemory否则降级为heapMemory。

public abstract class MessageToByteEncoder<I>extends ChannelOutboundHandlerAdapter{
    private final TypeParameterMatcher matcher;
    private final boolean preferDirect;

    // 创建一个满足当前参数类型 I的Encoder,preferDirect表示是否优先使用DirectMemory
    protected MessageToByteEncoder(boolean preferDirect) {
        matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
        this.preferDirect = preferDirect;
    }

作为Encoder,需要处理的是write事件。

write逻辑为

  1. 判断是否为符合当前类型的对象
  2. 根据当前对象和是否使用DirectMemory来申请一个ByteBuf
  3. 调用需要子类override的encoder方法,将需要编码的msg对象encoder到ByteBuf中,然后release msg对象。
  4. 调用ChannelContextWrite.write方法,交给下一个ChannelHandlerContext处理这个ByteBuf
  5. 如果不符合1, 则直接交给下一个ChannelHandlerContext
    @Override
       public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Exception {
           ByteBuf buf = null;
           try {
               // 判断对象类型
               if (acceptOutboundMessage(msg)) {
                   @SuppressWarnings("unchecked")
                   I cast = (I) msg;
                   // 申请ByteBuf
                   buf = allocateBuffer(ctx, cast, preferDirect);
                   try {
                       // 编码写入ByteBuf中
                       encode(ctx, cast, buf);
                   } finally {
                       // release msg对象
                       ReferenceCountUtil.release(cast);
                   }
                   // 如果ByteBuf不空则ctx write
                   if (buf.isReadable()) {
                       ctx.write(buf, promise);
                   } else {
                       // 否则release byteBuf然后写入一个空BUFFER
                       buf.release();
                       ctx.write(Unpooled.EMPTY_BUFFER, promise);
                   }
                   // 帮助GC回收
                   buf = null;
               } else {
                   // 其他类型的对象不处理交给后面的处理
                   ctx.write(msg, promise);
               }
           } catch (EncoderException e) {
               // encode方法的异常抛给上层处理
               throw e;
           } catch (Throwable e) {
               // 其他包装成EncoderException的异常抛给上层处理
               throw new EncoderException(e);
           } finally {
               // 防止encode方法异常导致申请的ByteBuf没有释放
               if (buf != null) {
                   buf.release();
               }
           }
       }
    
       protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                                  boolean preferDirect) throws Exception {
           if (preferDirect) {
               // 这里会判断是否有Unsafe类来决定使用DirectMemory还是HeapMemory
               return ctx.alloc().ioBuffer();
           } else {
               return ctx.alloc().heapBuffer();
           }
       }
       // 留给子类去实现的具体编码方法
       protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out)throws Exception;
    

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Algorithms for Image Processing and Computer Vision

Algorithms for Image Processing and Computer Vision

Parker, J. R. / 2010-12 / 687.00元

A cookbook of algorithms for common image processing applications Thanks to advances in computer hardware and software, algorithms have been developed that support sophisticated image processing with......一起来看看 《Algorithms for Image Processing and Computer Vision》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

SHA 加密
SHA 加密

SHA 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换