内容简介:例如应用中使用protobuf协议,则可以将byte转换为Protobuf对象,再交给后面的Handler来处理。下面这段代码, 先将收到的ByteBuf数据按照换行符分割成一段一段的ByteBuf,然后将ByteBuf转换成String, 再将String转换成int, 最后把int加一后写回。
ByteToMessageDecoder 是一种常用的 ChannelInboundHandler ,可以称为解码器,负责将byte字节流(ByteBuf)转换成一种Message,Message是应用可以自己定义的一种 Java 对象。
例如应用中使用protobuf协议,则可以将byte转换为Protobuf对象,再交给后面的Handler来处理。
使用示例
下面这段代码, 先将收到的ByteBuf数据按照换行符分割成一段一段的ByteBuf,然后将ByteBuf转换成String, 再将String转换成int, 最后把int加一后写回。
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroupbossGroup=newNioEventLoopGroup();
EventLoopGroupworkerGroup=newNioEventLoopGroup();
try{
bootstrap.channel(NioServerSocketChannel.class)
.handler(newLoggingHandler(LogLevel.DEBUG))
.group(bossGroup, workerGroup)
.childHandler(newChannelInitializer<SocketChannel>() {
protectedvoidinitChannel(SocketChannelch)throwsException{
ch.pipeline().addLast(newLineBasedFrameDecoder(1024))
.addLast(newByteToStringDecoder())
.addLast(newStringToIntegerDecoder())
.addLast(newIntegerToByteEncoder())
.addLast(newIntegerIncHandler());
}
});
ChannelFuturebind=bootstrap.bind(8092);
bind.sync();
bind.channel().closeFuture().sync();
}finally{
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
}
这里的ChannelPipeline的组织结构是
1. ByteToStringDecoder - 将byte转换成String的Decoder
2. StringToIntegerDecoder - String转换成Integer对象的Decoder
3. IntegerToByteEncoder - Integer转换成byte的Encoder
4. IntegerIncHandler - 将接受到的int加一后返回
###ByteToStringDecoder
本文主要讲解ByteToMessageDecoder所以先分析ByteToStringDecoder再分析ByteToMessageDecoder
ByteToStringMessageDecoder继承于ByteToMessageDecoder,并实现了ByteToMessageDecoder的
decode(ChannelHandlerContext ctx, ByteBuf in, java.util.List<Object> out) 方法。
decode方法实现中要求将ByteBuf中的数据进行解码然后将解码后的对象增加到list中
public class ByteToStringDecoderextends ByteToMessageDecoder{
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
byte[] data = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(data);
list.add(new String(data, StandardCharsets.UTF_8));
}
}
ByteToMessageDecoder
ByteToMessageDecoder继承了ChannelInboundHandlerAdapter所以是一个处理Inbound事件的Handler。
其内部保存一个Cumulator用于保存待解码的ByteBuf,然后不断调用子类需要实现的抽象方法decode去取出byte数据转换处理。
/**
* Cumulate {@link ByteBuf}s.
*/
public interface Cumulator {
/**
* Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
* The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
* call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
*/
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}
Cumulator有两种实现,MERGE_CUMULATOR和COMPOSITE_CMUMULATOR。MERGE_CUMULATOR通过memory copy的方法将in中的数据复制写入到cumulation中。COMPOSITE_CUMULATOR采取的是类似链表的方式,没有进行memory copy, 通过一种CompositeByteBuf来实现,在某些场景下会更适合。默认采用的是MERGE_CUMULATOR。
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain() or if its read-only.
// 如果cumulation是只读的、或者要超过capacity了,或者还有其他地方在引用, 则都通过创建一个新的byteBuf的方式来扩容ByteBuf
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
ByteToMessageDecoder中最主要的部分在channelRead处理上
- 收到一个msg后先判断是否是ByteBuf类型,是的情况创建一个CodecOutputList(也是一种list)保存转码后的对象列表
- 如果cumulation为null则把msg设置为cumulation,否则合并到cumulation里
- 调用callDecode方法,尝试解码
- finally中如果cumulation已经读完了,就release并置为null等待gc
- 调用fireChannelRead将解码后的out传递给后面的Handler
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } }
callDecode中不断执行抽象decode(ctx, in, out)方法直到in可读数据没有减少或当前handler被remove。
protected void callDecode(ChannelHandlerContext ctx, ByteBufin, List<Object>out){
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// 检查当前handler是否被remove了
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
decodeRemovalReentryProtection(ctx, in, out);
// 检查当前handler是否被remove了
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) { // 这种情况是解码出了对象但是并没有移动in的readIndex
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} ...
}
fireChannelRead(ctx, msgs, numElements)的处理方式是对每个解码后的消息进行fireChannelRead,交给下一个Handler处理
/**
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
if (msgs instanceof CodecOutputList) {
fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
} else {
for (int i = 0; i < numElements; i++) {
ctx.fireChannelRead(msgs.get(i));
}
}
}
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
以上就是ByteToMessageDecoder的主要处理部分。关于Netty,面试中会喜欢问道“粘包/拆包”问题,指的是一个消息在网络中是二进制byte流的形式传过去的,接收方如何判断一个消息是否读完、哪里是分割点等,例如构建一个需要的对象需要100byte,但是ByteToMessageDecoder中只取到了50bytes,这时该如何处理。这些可以通过Netty中提供的一些Decoder来实现,例如DelimiterBasedFrameDecoder,FixedLengthFrameDecoder, LengthFieldBasedFrameDecoder。其中最常见的应该是LengthFieldBasedFrameDecoder了,因为自定义的协议中通常会有一个协议头,里面有一个字段描述一个消息的大小长度,然后接收方就能知道消息读到什么时候是读完一个Frame了。这些解码器会在后续的文章中介绍。
以上所述就是小编给大家介绍的《Netty源码分析4-ByteToMessageDecoder》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。