内容简介:下面的StringToByteEncoder中,会将String对象转换成ByteBuf,传递给后面的ChannelOutboundHandler。使用MessageToByteEncoder,只需要实现encode这个方法即可,即通过ChannleHandlerContext和接收到的对应类型的对象,写入到一个ByteBuf中MessageToByteEncoder同样是过滤出某一种类型的对象,所以也使用了TypeParameterMatcher,其中还有一个参数可以控制是否优先使用DirectMemo
前面我们分析了ByteToMessageDecoder解码器,对应于解码需要有相关的编码器,即从 Java 对象转换成二进制数据的组件,这就是MessageToByteEncoder。
基本使用
下面的StringToByteEncoder中,会将String对象转换成ByteBuf,传递给后面的ChannelOutboundHandler。使用MessageToByteEncoder,只需要实现encode这个方法即可,即通过ChannleHandlerContext和接收到的对应类型的对象,写入到一个ByteBuf中
class StringToByteEncoderextends MessageToByteEncoder<String>{
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {
byteBuf.writeBytes(s.getBytes());
}
}
,因为它的接收和输出都是ByteBuf类型。
public class LengthFieldPrependerextends MessageToMessageEncoder<ByteBuf>{
private final ByteOrder byteOrder;
private final int lengthFieldLength;
private final boolean lengthIncludesLengthFieldLength;
private final int lengthAdjustment;
public LengthFieldPrepender(
ByteOrder byteOrder,int lengthFieldLength,
int lengthAdjustment, boolean lengthIncludesLengthFieldLength){
// 只支持1,2,3,4,8字节长的length字段编码
if (lengthFieldLength != 1 && lengthFieldLength != 2 &&
lengthFieldLength != 3 && lengthFieldLength != 4 &&
lengthFieldLength != 8) {
throw new IllegalArgumentException(
"lengthFieldLength must be either 1, 2, 3, 4, or 8: " +
lengthFieldLength);
}
ObjectUtil.checkNotNull(byteOrder, "byteOrder");
this.byteOrder = byteOrder;
this.lengthFieldLength = lengthFieldLength;
this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength;
this.lengthAdjustment = lengthAdjustment;
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)throws Exception {
int length = msg.readableBytes() + lengthAdjustment;
if (lengthIncludesLengthFieldLength) {
length += lengthFieldLength;
}
if (length < 0) {
throw new IllegalArgumentException(
"Adjusted frame length (" + length + ") is less than zero");
}
switch (lengthFieldLength) {
case 1:
if (length >= 256) {
throw new IllegalArgumentException(
"length does not fit into a byte: " + length);
}
out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length));
break;
... 中间省略
case 8:
out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length));
break;
default:
throw new Error("should not reach here");
}
out.add(msg.retain());
}
}
实现分析
MessageToByteEncoder同样是过滤出某一种类型的对象,所以也使用了TypeParameterMatcher,其中还有一个参数可以控制是否优先使用DirectMemory堆外直接内存,默认是true使用,如果当前classpath中有 sun.misc.Unsafe 类则使用其获取DirectMemory否则降级为heapMemory。
public abstract class MessageToByteEncoder<I>extends ChannelOutboundHandlerAdapter{
private final TypeParameterMatcher matcher;
private final boolean preferDirect;
// 创建一个满足当前参数类型 I的Encoder,preferDirect表示是否优先使用DirectMemory
protected MessageToByteEncoder(boolean preferDirect) {
matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
this.preferDirect = preferDirect;
}
作为Encoder,需要处理的是write事件。
write逻辑为
- 判断是否为符合当前类型的对象
- 根据当前对象和是否使用DirectMemory来申请一个ByteBuf
- 调用需要子类override的encoder方法,将需要编码的msg对象encoder到ByteBuf中,然后release msg对象。
- 调用ChannelContextWrite.write方法,交给下一个ChannelHandlerContext处理这个ByteBuf
- 如果不符合1, 则直接交给下一个ChannelHandlerContext
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Exception { ByteBuf buf = null; try { // 判断对象类型 if (acceptOutboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; // 申请ByteBuf buf = allocateBuffer(ctx, cast, preferDirect); try { // 编码写入ByteBuf中 encode(ctx, cast, buf); } finally { // release msg对象 ReferenceCountUtil.release(cast); } // 如果ByteBuf不空则ctx write if (buf.isReadable()) { ctx.write(buf, promise); } else { // 否则release byteBuf然后写入一个空BUFFER buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } // 帮助GC回收 buf = null; } else { // 其他类型的对象不处理交给后面的处理 ctx.write(msg, promise); } } catch (EncoderException e) { // encode方法的异常抛给上层处理 throw e; } catch (Throwable e) { // 其他包装成EncoderException的异常抛给上层处理 throw new EncoderException(e); } finally { // 防止encode方法异常导致申请的ByteBuf没有释放 if (buf != null) { buf.release(); } } } protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception { if (preferDirect) { // 这里会判断是否有Unsafe类来决定使用DirectMemory还是HeapMemory return ctx.alloc().ioBuffer(); } else { return ctx.alloc().heapBuffer(); } } // 留给子类去实现的具体编码方法 protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out)throws Exception;
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Algorithms for Image Processing and Computer Vision
Parker, J. R. / 2010-12 / 687.00元
A cookbook of algorithms for common image processing applications Thanks to advances in computer hardware and software, algorithms have been developed that support sophisticated image processing with......一起来看看 《Algorithms for Image Processing and Computer Vision》 这本书的介绍吧!