内容简介:一般直接接触RPC框架的时候内部都做了对于粘包分包的解决方案,咱们来一起了解下这方便的含义,包括编码解码这块。源码:https://github.com/limingios/netFuture/tree/master/源码/『互联网架构』软件架构-io与nio线程模型reactor模型(上)(53)/nio
一般直接接触RPC框架的时候内部都做了对于粘包分包的解决方案,咱们来一起了解下这方便的含义,包括编码解码这块。
源码:https://github.com/limingios/netFuture/tree/master/源码/『互联网架构』软件架构-io与nio线程模型reactor模型(上)(53)/nio
(一)粘包分包概念
-
粘包
> TCP
由于TCP协议本身的机制(面向连接的可靠地协议-三次握手机制)客户端与服务器会维持一个连接(Channel),数据在连接不断开的情况下,可以持续不断地将多个数据包发往服务器,但是如果发送的网络数据包太小,那么他本身会启用Nagle算法(可配置是否启用)对较小的数据包进行合并(基于此,TCP的网络延迟要UDP的高些)然后再发送(超时或者包大小足够)。那么这样的话,服务器在接收到消息(数据流)的时候就无法区分哪些数据包是客户端自己分开发送的,这样产生了粘包;服务器在接收到数据库后,放到缓冲区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个数据包的情况,造成粘包现象(确切来讲,对于基于TCP协议的应用,不应用包来描述,而应 用 流来描述),个人认为服务器接收端产生的粘包应该与 linux 内核处理socket的方式 select轮询机制的线性扫描频度无关。
UDP
本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包),他不会对数据包进行合并发送(也就没有Nagle算法之说了),他直接是一端发送什么数据,直接就发出去了,既然他不会对数据合并,每一个数据包都是完整的(数据+UDP头+IP头等等发一次数据封装一次)也就没有粘包一说了。
-
分包
>可能是IP分片传输导致的,也可能是传输过程中丢失部分包导致出现的半包,还有可能就是一个包可能被分成了两次传输,在取数据的时候,先取到了一部分(还可能与接收的缓冲区大小有关系),总之就是一个数据包被分成了多次接收。
-
TCP当中,只有流的概念,没有包的概念(根本原因)
>简单的概括
(1)粘包:
1.服务端
原因收到的数据放在系统接收缓冲区,用户进程从该缓冲区取数据
2.客户端
原因TCP为提高传输效率,要收集到足够多的数据后才发送一包数据
(2).分包:
1.应用程序写入的字节大小大于套接字发送缓冲区的大小
2.进行mss(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>MSS
3.以太网帧的payload(净荷)大于MTU(1500字节)进行ip分片
(二)Netty粘包分包现象演示
源码:pack目录下的error
Server.java
package com.dig8.pack.error;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* netty服务端
*
* @author idig8.com
*/
public class Server {
public static void main(String[] args) {
// 服务类
ServerBootstrap bootstrap = new ServerBootstrap();
// boss线程,主要监听端口和获取worker线程及分配socketChannel给worker线程
ExecutorService boss = Executors.newCachedThreadPool();
// worker线程负责数据读写
ExecutorService worker = Executors.newCachedThreadPool();
// 设置niosocket工厂
bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
// 设置管道的工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// 管道过滤器
pipeline.addLast("myHandler", new ServerHandler());
return pipeline;
}
});
// 服务类绑定端口
bootstrap.bind(new InetSocketAddress(8888));
}
}
ServerHandler.java
package com.dig8.pack.error;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
/**
* @author idig8.com
*/
public class ServerHandler extends SimpleChannelHandler{
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
byte[] bs = buffer.array();
System.out.println("server receive data: " +new String(bs));
}
}
Client.java
package com.dig8.pack.error;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 客户端
*
* @author idig8.com
*/
public class Client {
public static void main(String[] args) throws Exception {
//服务类
ClientBootstrap bootstrap = new ClientBootstrap();
//线程池
ExecutorService boss = Executors.newCachedThreadPool();
ExecutorService worker = Executors.newCachedThreadPool();
//socket工厂
bootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));
//管道工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("1", new StringEncoder());
pipeline.addLast("2", new ClientHandler());
return pipeline;
}
});
//连接服务端
bootstrap.connect(new InetSocketAddress("127.0.0.1", 8888)).sync();
}
}
ClientHandler.java
package com.dig8.pack.error;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;
/**
* 客户端消息处理类
* @author idig8.com
*/
public class ClientHandler extends SimpleChannelHandler {
// 包头
private static final int HEAD_FLAG = -32323231;
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Channel channel = ctx.getChannel();
String msg = "Hello,idig8.com";
for (int i = 0; i < 1000; i++) {
channel.write(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
}
}
运行后出现粘包和分包现象
(三)粘包分包问题解决思路
服务端和客户端约定好稳定的数据包结构
1.客户端根据约定的数据包结构发送数据
2.服务端根据约定的数据包结构来读取数据
通过MyDecoder集成FrameDecoder的方式来
源码:pack目录下的custom
Server.java
package com.dig8.pack.custom;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
/**
* netty服务端
*
* @author idig8.com
*/
public class Server {
public static void main(String[] args) {
// 服务类
ServerBootstrap bootstrap = new ServerBootstrap();
// boss线程,主要监听端口和获取worker线程及分配socketChannel给worker线程
ExecutorService boss = Executors.newCachedThreadPool();
// worker线程负责数据读写
ExecutorService worker = Executors.newCachedThreadPool();
// 设置niosocket工厂
bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
// 设置管道的工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// 管道过滤器
pipeline.addLast("myDecoder", new MyDecoder());
pipeline.addLast("myHandler", new ServerHandler());
return pipeline;
}
});
// 服务类绑定端口
bootstrap.bind(new InetSocketAddress(7778));
}
}
ServerHandler.java
package com.dig8.pack.custom;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
/**
* @author idig8.com
*/
public class ServerHandler extends SimpleChannelHandler{
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
/*ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
byte[] bs = buffer.array();*/
System.out.println("server receive data: " + e.getMessage());
}
}
Client.java
package com.dig8.pack.custom;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringEncoder;
/**
* 客户端
*
* @author idig8.com
*/
public class Client {
public static void main(String[] args) throws Exception {
//服务类
ClientBootstrap bootstrap = new ClientBootstrap();
//线程池
ExecutorService boss = Executors.newCachedThreadPool();
ExecutorService worker = Executors.newCachedThreadPool();
//socket工厂
bootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));
//管道工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("1", new StringEncoder());
pipeline.addLast("2", new ClientHandler());
return pipeline;
}
});
//连接服务端
bootstrap.connect(new InetSocketAddress("127.0.0.1", 7778)).sync();
}
}
ClientHandler.java
package com.dig8.pack.custom;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
/**
* 客户端消息处理类
* @author idig8.com
*/
public class ClientHandler extends SimpleChannelHandler {
// 包头
private static final int HEAD_FLAG = -32323231;
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Channel channel = ctx.getChannel();
String msg = "Hello,idig8.com 通过定义包头+长度+数据 防止粘包和分包";
byte[] bytes = msg.getBytes();
// 定义数据包 ,结构为:包头 + 长度 + 数据
ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
// 1.写包头
buffer.writeInt(HEAD_FLAG);// 4字节
// 2.写长度
buffer.writeInt(bytes.length);// 4字节
// 3.写数据本身
buffer.writeBytes(bytes);
for (int i = 0; i < 1000; i++) {
channel.write(buffer);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
}
}
MyDecoder.java
/**
*
*/
package com.dig8.pack.custom;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
/**
* @author idig8.com
*/
public class MyDecoder extends FrameDecoder{
// 包头
private static final int HEAD_FLAG = -32323231;
// 数据包基本长度
private final static int BASE_LENGTH = 4 + 4;
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
// 收到数据之后,先判断buffer中可读的数据长度是否大于数据包的基本长度
if(buffer.readableBytes() > BASE_LENGTH){
// 防止socket攻击:
if(buffer.readableBytes() > 4096 * 2){ // 4k
System.out.println("socket 攻击了");
buffer.skipBytes(buffer.readableBytes());
}
// 记录包头开始的位置
int headIndex;
while(true){
headIndex = buffer.readerIndex();
buffer.markReaderIndex();
// 代码很关键
if(buffer.readableBytes() < 4){// 包头的长度
buffer.readerIndex(headIndex);
return null;
}
// 此时说明包头的长度是足够的
// 正好读取的是包头
if(buffer.readInt() == HEAD_FLAG ){
break;
}
// [1,2,3,4] 1 1 1 1 1
// 如果不是包头,需要略过一个字节,在略过之前,需要还原读指针位置
buffer.resetReaderIndex();
buffer.readByte();// 略过一个字节
if(buffer.readableBytes() < BASE_LENGTH){
return null;
}
}
// 此时说明有数据包到来
// 做标记(记住当前读指针的位置)
// buffer.markReaderIndex();
// 1.读长度
int dataLength = buffer.readInt();
if(buffer.readableBytes() < dataLength){
// 说明数据本身的长度还不够, 肯定要继续等待后面的数据到来
// 还原读指针的位置
buffer.readerIndex(headIndex);
return null;
}
// 此时说明数据包已经位置
// 2.读数据本身
byte[] dst = new byte[dataLength];
buffer.readBytes(dst);
// 继续传递下去
// ?
// 如果此时buffer中的数据还没有读完,那么剩下的数据怎么办?
return new String(dst);
}
// return null 表示此时的数据包不完整,需要继续等待下一个数据包的到来 ?
return null;
}
}
(三)Netty自带粘包分包解决方案
消息定长
1.FixedLengthFrameDecoder
行分隔符
2.LineBasedFrameDecoder
自定义特殊符号进行分割
3.DelimiterBasedFrameDecoder
源码:pack目录下的nettysolution
Server.java
package com.dig8.pack.nettysolution;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;
import org.jboss.netty.handler.codec.frame.LineBasedFrameDecoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
/**
* 服务端
* @author idig8.com
*/
public class Server {
public static void main(String[] args) throws Exception {
// 服务类
ServerBootstrap bootstrap = new ServerBootstrap();
// boss线程,主要监听端口和获取worker线程及分配socketChannel给worker线程
ExecutorService boss = Executors.newCachedThreadPool();
// worker线程负责数据读写
ExecutorService worker = Executors.newCachedThreadPool();
// 设置niosocket工厂
bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
// 设置管道的工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// 管道过滤器
// 方案1:消息定长
//pipeline.addLast("fixedLength", new FixedLengthFrameDecoder(18));
// 方案2:行分隔符
//pipeline.addLast("fixedLength", new LineBasedFrameDecoder(1024));
// 方案3:自定义特殊符号进行分割
pipeline.addLast("delimiter", new DelimiterBasedFrameDecoder(1024,
ChannelBuffers.copiedBuffer("#@#".getBytes())));
pipeline.addLast("1",new StringDecoder());
pipeline.addLast("2",new ServerMessageHandler());
return pipeline;
}
});
// 服务类绑定端口
bootstrap.bind(new InetSocketAddress(7777));
System.out.println("服务端启动...");
}
}
ServerMessageHandler.java
package com.dig8.pack.nettysolution;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
/**
* 服务端消息处理类
* @author idig8.com
*/
public class ServerMessageHandler extends SimpleChannelHandler {
/**
* 接收消息
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("receive request: " + e.getMessage());
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
}
}
Client.java
package com.dig8.pack.nettysolution;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;
import org.jboss.netty.handler.codec.frame.LineBasedFrameDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
/**
* 客户端
*
* @author idig8.com
*/
public class Client {
public static void main(String[] args) throws Exception {
//服务类
ClientBootstrap bootstrap = new ClientBootstrap();
//线程池
ExecutorService boss = Executors.newCachedThreadPool();
ExecutorService worker = Executors.newCachedThreadPool();
//socket工厂
bootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));
//管道工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// 方案1:消息定长
//pipeline.addLast("fixedLength", new FixedLengthFrameDecoder(18));
// 方案2:行分隔符
//pipeline.addLast("fixedLength", new LineBasedFrameDecoder(1024));
// 方案3:自定义特殊符号进行分割
pipeline.addLast("delimiter", new DelimiterBasedFrameDecoder(1024,
ChannelBuffers.copiedBuffer("#@#".getBytes())));
pipeline.addLast("1",new StringEncoder());
pipeline.addLast("2", new ClientMessageHandler());
return pipeline;
}
});
//连接服务端
@SuppressWarnings("unused")
ChannelFuture connect = bootstrap.connect(new InetSocketAddress("127.0.0.1", 7777)).sync();
// Channel channel = connect.getChannel();
// System.out.println("client start");
// Scanner scanner = new Scanner(System.in);
// while(true){
// System.out.println("请输入:");
// channel.write(scanner.next());
// }
}
}
ClientMessageHandler.java
package com.dig8.pack.nettysolution;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
/**
* 客户端消息接受处理类
* @author idig8.com
*/
public class ClientMessageHandler extends SimpleChannelHandler {
/**
* 接收消息
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("server response : " + e.getMessage());
}
/**
* 新连接
*/
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Channel channel = ctx.getChannel();
String separator = "#@#";//System.getProperty("line.separator");// 系统换行符
String msg = "idig8.com send cmd";
for (int i = 0; i < 1000; i++) {
channel.write(msg + i + separator);
}
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
}
}
PS:基本上netty针对tcp 分包粘包已经说完了,确实有了netty真的很方便比传统的socket方便很多。下次说说http 协议实现。
>>原创文章,欢迎转载。转载请注明:转载自IT人故事会,谢谢!
>>原文链接地址:上一篇:已是最新文章
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- mpvue 分包方案
- 小程序系列--如何使用分包加载
- 小程序多业务线融合【完整分包业务接入】
- 003. 如果将老项目的小程序快速改为分包模式
- 『互联网架构』软件架构-分布式架构(14)
- 『互联网架构』软件架构-电商系统架构(上)(69)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Approximation Algorithms
Vijay V. Vazirani / Springer / 2001-07-02 / USD 54.95
'This book covers the dominant theoretical approaches to the approximate solution of hard combinatorial optimization and enumeration problems. It contains elegant combinatorial theory, useful and inte......一起来看看 《Approximation Algorithms》 这本书的介绍吧!