内容简介:下面的StringToByteEncoder中,会将String对象转换成ByteBuf,传递给后面的ChannelOutboundHandler。使用MessageToByteEncoder,只需要实现encode这个方法即可,即通过ChannleHandlerContext和接收到的对应类型的对象,写入到一个ByteBuf中MessageToByteEncoder同样是过滤出某一种类型的对象,所以也使用了TypeParameterMatcher,其中还有一个参数可以控制是否优先使用DirectMemo
前面我们分析了ByteToMessageDecoder解码器,对应于解码需要有相关的编码器,即从 Java 对象转换成二进制数据的组件,这就是MessageToByteEncoder。
基本使用
下面的StringToByteEncoder中,会将String对象转换成ByteBuf,传递给后面的ChannelOutboundHandler。使用MessageToByteEncoder,只需要实现encode这个方法即可,即通过ChannleHandlerContext和接收到的对应类型的对象,写入到一个ByteBuf中
class StringToByteEncoderextends MessageToByteEncoder<String>{ @Override protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception { byteBuf.writeBytes(s.getBytes()); } }
值得注意的是MessageToByteEncoder
的I可以是ByteBuf,即可以从一种格式的二进制转换为另一种格式的二进制。下面的LengthFieldPrepender是LengthFieldBasedFrameDecoder解码器对应的编码器,负责将已经编码好的ByteBuf按照消息长度在前面追加length字段。虽然继承于MessageToMessageEncoder
但其实也是MessageToByteEncoder
,因为它的接收和输出都是ByteBuf类型。
public class LengthFieldPrependerextends MessageToMessageEncoder<ByteBuf>{ private final ByteOrder byteOrder; private final int lengthFieldLength; private final boolean lengthIncludesLengthFieldLength; private final int lengthAdjustment; public LengthFieldPrepender( ByteOrder byteOrder,int lengthFieldLength, int lengthAdjustment, boolean lengthIncludesLengthFieldLength){ // 只支持1,2,3,4,8字节长的length字段编码 if (lengthFieldLength != 1 && lengthFieldLength != 2 && lengthFieldLength != 3 && lengthFieldLength != 4 && lengthFieldLength != 8) { throw new IllegalArgumentException( "lengthFieldLength must be either 1, 2, 3, 4, or 8: " + lengthFieldLength); } ObjectUtil.checkNotNull(byteOrder, "byteOrder"); this.byteOrder = byteOrder; this.lengthFieldLength = lengthFieldLength; this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength; this.lengthAdjustment = lengthAdjustment; } @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)throws Exception { int length = msg.readableBytes() + lengthAdjustment; if (lengthIncludesLengthFieldLength) { length += lengthFieldLength; } if (length < 0) { throw new IllegalArgumentException( "Adjusted frame length (" + length + ") is less than zero"); } switch (lengthFieldLength) { case 1: if (length >= 256) { throw new IllegalArgumentException( "length does not fit into a byte: " + length); } out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length)); break; ... 中间省略 case 8: out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length)); break; default: throw new Error("should not reach here"); } out.add(msg.retain()); } }
实现分析
MessageToByteEncoder同样是过滤出某一种类型的对象,所以也使用了TypeParameterMatcher,其中还有一个参数可以控制是否优先使用DirectMemory堆外直接内存,默认是true使用,如果当前classpath中有 sun.misc.Unsafe
类则使用其获取DirectMemory否则降级为heapMemory。
public abstract class MessageToByteEncoder<I>extends ChannelOutboundHandlerAdapter{ private final TypeParameterMatcher matcher; private final boolean preferDirect; // 创建一个满足当前参数类型 I的Encoder,preferDirect表示是否优先使用DirectMemory protected MessageToByteEncoder(boolean preferDirect) { matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I"); this.preferDirect = preferDirect; }
作为Encoder,需要处理的是write事件。
write逻辑为
- 判断是否为符合当前类型的对象
- 根据当前对象和是否使用DirectMemory来申请一个ByteBuf
- 调用需要子类override的encoder方法,将需要编码的msg对象encoder到ByteBuf中,然后release msg对象。
- 调用ChannelContextWrite.write方法,交给下一个ChannelHandlerContext处理这个ByteBuf
- 如果不符合1, 则直接交给下一个ChannelHandlerContext
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Exception { ByteBuf buf = null; try { // 判断对象类型 if (acceptOutboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; // 申请ByteBuf buf = allocateBuffer(ctx, cast, preferDirect); try { // 编码写入ByteBuf中 encode(ctx, cast, buf); } finally { // release msg对象 ReferenceCountUtil.release(cast); } // 如果ByteBuf不空则ctx write if (buf.isReadable()) { ctx.write(buf, promise); } else { // 否则release byteBuf然后写入一个空BUFFER buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } // 帮助GC回收 buf = null; } else { // 其他类型的对象不处理交给后面的处理 ctx.write(msg, promise); } } catch (EncoderException e) { // encode方法的异常抛给上层处理 throw e; } catch (Throwable e) { // 其他包装成EncoderException的异常抛给上层处理 throw new EncoderException(e); } finally { // 防止encode方法异常导致申请的ByteBuf没有释放 if (buf != null) { buf.release(); } } } protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception { if (preferDirect) { // 这里会判断是否有Unsafe类来决定使用DirectMemory还是HeapMemory return ctx.alloc().ioBuffer(); } else { return ctx.alloc().heapBuffer(); } } // 留给子类去实现的具体编码方法 protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out)throws Exception;
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。