标签:lin 项目 new 设计 完美 抽象 strong version 流程
本文的源码工程:Netty 粘包/半包原理与拆包实战 源码
本实例是《Netty 粘包/半包原理与拆包实战》 一文的源代码工程。
大家好,我是作者尼恩。
为了完成了一个高性能的 Java 聊天程序,在前面的文章中,尼恩已经再一次的进行了通讯协议的重新选择。
这就是:放弃了大家非常熟悉的json 格式,选择了性能更佳的 Protobuf协议。
在上一篇文章中,并且完成了Netty 和 Protobuf协议整合实战。
具体的文章为: Netty+Protobuf 整合一:实战案例,带源码
另外,专门开出一篇文章,介绍了通讯消息数据包的几条设计准则。
具体的文章为: Netty +Protobuf 整合二:protobuf 消息通讯协议设计的几个准则
在开始聊天器实战开发之前,还有一个非常基础的问题,需要解决:
这就是通讯的粘包和半包问题。
先从数据包的发送和接收开始讲起。
我们知道, Netty 发送和读取数据的单位,可以形象的使用 ByteBuf 来充当。
每一次发送,就是向Channel 写入一个 ByteBuf ;每一次读取,就是从 Channel 读到一个 ByteBuf 。
发送一次数据,举例如下:
channel.writeAndFlush(buffer);
读取一次数据,举例如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg; //.... } ?
我们的理想是:发送端每发送一个buffer,接收端就能接收到一个一模一样的buffer。
然而,理想很丰满,现实很骨感。
在实际的通讯过程中,并没有大家预料的那么完美。
一种意料之外的情况,如期而至。这就是粘包和半包。
那么,什么是粘包和半包?
粘包和半包定义如下:
粘包和半包,指的都不是一次是正常的 ByteBuf 缓存区接收。
粘包,就是接收端读取的时候,多个发送过来的 ByteBuf “粘”在了一起。
换句话说,接收端读取一次的 ByteBuf ,读到了多个发送端的 ByteBuf ,是为粘包。
半包,就是接收端将一个发送端的ByteBuf “拆”开了,形成一个破碎的包,我们定义这种 ByteBuf 为半包。
换句话说,接收端读取一次的 ByteBuf ,读到了发送端的一个 ByteBuf的一部分,是为半包。
上面的理论比较抽象,下面用一幅图来形象说明。
下图中,发送端发出4个数据包,接受端也接受到了4个数据包。但是,通讯过程中,接收端出现了 粘包和半包。
接收端收到的第一个包,正常。
接收端收到的第二个包,就是一个粘包。 将发送端的第二个包、第三个包,粘在一起了。
接收端收到的第三个包,第四个包,就是半包。将发送端的的第四个包,分开成了两个了。
由于在前文 Netty+Protobuf 整合一:实战案例,带源码 的源码中,没有看到异常的现象。是因为代码屏蔽了半包的输出,所以看到的都是正常的数据包。
稍微调整一下,在前文解码器的代码,加上半包的提示信息输出,就可以看到半包的提示。
示意图如下:
调整过的半包警告的代码,如下:
/** * 解码器 * */ public class ProtobufDecoder extends ByteToMessageDecoder { //.... protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //... ? // 读取传送过来的消息的长度。 int length = in.readUnsignedShort(); ? //... if (length > in.readableBytes()) { // 读到的半包 // ... LOG.error("告警:读到的消息体长度小于传送过来的消息长度"); return; } //... 省略了正常包的处理 } }
具体的源码,请参见本文的源码工程:Netty 粘包/半包原理与拆包实战 源码
源码中,客户端向服务器循环发了1000个数据包,服务器接收端,出现了很多的半包的场景。
可以下载源码,进行实验。
实验时,服务器端运行 ChatServerApp 的main方法,客户端运行 ChatClientApp 的main方法,就可以看到上面图片中所示的半包的结果。
上面的实例,只能看到半包的结果,看不到粘包的结果。
为了看到粘包的场景,这里,不使用protobuf 协议,直接使用缓冲区进行读写通讯,设计了一个的简单的演示实验案例。
案例已经设计好,可以下载源码,进行实验。
运行实例,不仅可以看到半包的提示信息输出,而且可以看到粘包的提示信息输出,示意图如下:
我们可以看到,服务器收到的数据包,有包含多个发送端数据包的,这就是粘包了。
另外,接收端还有出现乱码的数据包,就是只包含部分发送端数据,这就是半包了。
这个实例的源码,直接简化了前面的基于Protobuf协议通讯的实例源码。代码的逻辑结构,是一样的。
源码中,客户端向服务器循环发了1000个数据包,服务器接收端,收到数据包,直接在屏幕输出。
服务器端运行:DemoServerApp 的main方法,客户端运行 DemoClientApp的main方法,就可以看到上面图片中所示的半包的结果。
本实验的具体的源码,还是请参见本文的源码工程:Netty 粘包/半包原理与拆包实战 源码
这得从底层说起。
在操作系统层面来说,我们使用了 TCP 协议。
在Netty的应用层,按照 ByteBuf 为 单位来发送数据,但是到了底层操作系统仍然是按照字节流发送数据,因此,从底层到应用层,需要进行二次拼装。
操作系统底层,是按照字节流的方式读入,到了 Netty 应用层面,需要二次拼装成 ByteBuf。
这就是粘包和半包的根源。
在Netty 层面,拼装成ByteBuf时,就是对底层缓冲的读取,这里就有问题了。
首先,上层应用层每次读取底层缓冲的数据容量是有限制的,当TCP底层缓冲数据包比较大时,将被分成多次读取,造成断包,在应用层来说,就是半包。
其次,如果上层应用层一次读到多个底层缓冲数据包,就是粘包。
如何解决呢?
基本思路是,在接收端,需要根据自定义协议来,来读取底层的数据包,重新组装我们应用层的数据包,这个过程通常在接收端称为拆包。
拆包基本原理,简单来说:
接收端应用层不断从底层的TCP 缓冲区中读取数据。
每次读取完,判断一下是否为一个完整的应用层数据包。如果是,上层应用层数据包读取完成。
如果不是,那就保留该数据在应用层缓冲区,然后继续从 TCP 缓冲区中读取,直到得到一个完整的应用层数据包为止。
至此,半包问题得以解决。
如果从TCP底层读到了多个应用层数据包,则将整个应用层缓冲区,拆成一个一个的独立的应用层数据包,返回给调用程序。
至此,粘包问题得以解决。
拆包这个工作,Netty 已经为大家备好了很多不同的拆包器。本着不重复发明轮子的原则,我们直接使用Netty现成的拆包器。
Netty 中的拆包器大致如下:
固定长度的拆包器 FixedLengthFrameDecoder
每个应用层数据包的都拆分成都是固定长度的大小,比如 1024字节。
这个显然不大适应在 Java 聊天程序 进行实际应用。
行拆包器 LineBasedFrameDecoder
每个应用层数据包,都以换行符作为分隔符,进行分割拆分。
这个显然不大适应在 Java 聊天程序 进行实际应用。
分隔符拆包器 DelimiterBasedFrameDecoder
每个应用层数据包,都通过自定义的分隔符,进行分割拆分。
这个版本,是LineBasedFrameDecoder 的通用版本,本质上是一样的。
这个显然不大适应在 Java 聊天程序 进行实际应用。
基于数据包长度的拆包器 LengthFieldBasedFrameDecoder
将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度。
这个显然比较适和在 Java 聊天程序 进行实际应用。下面我们来应用这个拆分器。
在使用LengthFieldBasedFrameDecoder 拆包器之前 ,在发送端需要对protobuf 的消息包进行一轮包装。
发送端包装的方法是:
在实际的protobuf 二进制消息包的前面,加上四个字节。
前两个字节为版本号,后两个字节为实际发送的 protobuf 的消息长度。
强调一下,二进制消息包装,在发送端进行。
修改发送端的编码器 ProtobufEncoder ,代码如下:
/** * 编码器 */ public class ProtobufEncoder extends MessageToByteEncoder<ProtoMsg.Message> { @Override protected void encode(ChannelHandlerContext ctx, ProtoMsg.Message msg, ByteBuf out) throws Exception { ? byte[] bytes = msg.toByteArray();// 将对象转换为byte int length = bytes.length;// 读取 ProtoMsg 消息的长度 ByteBuf buf = Unpooled.buffer(2 + length); // 先将消息协议的版本写入,也就是消息头 buf.writeShort(Constants.PROTOCOL_VERSION); // 再将 ProtoMsg 消息的长度写入 buf.writeShort(length); // 写入 ProtoMsg 消息的消息体 buf.writeBytes(bytes); //发送 out.writeBytes(buf); ? } ? }
发送端的步骤是:
先将消息协议的版本写入,也就是消息头
? buf.writeShort(Constants.PROTOCOL_VERSION);
再将 ProtoMsg 消息的长度写入 buf.writeShort(length);
最后,写入 ProtoMsg 消息的消息体 buf.writeBytes(bytes);
使用Netty中,基于长度域拆包器 LengthFieldBasedFrameDecoder,按照实际的应用层数据包长度来拆分。
需要做两个工作:
设置长度信息(长度域)在数据包中的位置。
设置长度信息(长度域)自身的长度,也就是占用的字节数。
在前面的小节中,我们的长度信息(长度域)的占用字节数为 2个字节; 在报文中的所处的位置,长度信息(长度域)处于版本号之后。
版本号是2个字节,从0开始数,长度信息(长度域)的在数据包中的位置为2。
这些数据定义在Constansts常量类中。
public class Constants { //协议版本号 public static final short PROTOCOL_VERSION = 1; //头部的长度: 版本号 + 报文长度 public static final short PROTOCOL_HEADLENGTH = 4; //长度的偏移 public static final short LENGTH_OFFSET = 2; //长度的字节数 public static final short LENGTH_BYTES_COUNT = 2; ? }
有了这些数据之后,可以基于Netty 的长度拆包器 LengthFieldBasedFrameDecoder, 开发自己的长度分割器。
新开发的分割器为PackageSpliter,代码如下:
package com.crazymakercircle.chat.common.codec; ? ? public class PackageSpliter extends LengthFieldBasedFrameDecoder { ? public PackageSpliter() { super(Integer.MAX_VALUE, Constants.LENGTH_OFFSET,Constants.LENGTH_BYTES_COUNT); } ? @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ? return super.decode(ctx, in); } }
分割器 PackageSpliter 继承了 LengthFieldBasedFrameDecoder,传入了三个参数。
长度的偏移量 ,这里是 Constants.LENGTH_OFFSET,值为 2
长度的字节数,这里是 Constants.LENGTH_BYTES_COUNT,值为 2
最大的应用包长度,这里是 Integer.MAX_VALUE,表示不限制
分割器 写好之后,只需要在 pipeline 的最前面加上这个分割器,就可以使用这个分割器(自定义的拆包器)。
在服务器端的 pipeline 的最前面加上这个分割器,代码如下:
package com.crazymakercircle.chat.server; //... ? @Service("ChatServer") public class ChatServer { static final Logger LOGGER = LoggerFactory.getLogger(ChatServer.class); //... //有连接到达时会创建一个channel protected void initChannel(SocketChannel ch) throws Exception { //应用自定义拆包器 ch.pipeline().addLast(new PackageSpliter()); ch.pipeline().addLast(new ProtobufDecoder()); ch.pipeline().addLast(new ProtobufEncoder()); // pipeline管理channel中的Handler // 在channel队列中添加一个handler来处理业务 ch.pipeline().addLast("serverHandler", serverHandler); } }); //.... }
在发送端的 pipeline 的最前面加上这个分割器,代码也是类似的, 这里不再赘述。大家可以下载源码查看。
这一点,需要从PackageSpliter 的根源讲起。
下面是自定义分割器 PackageSpliter 的继承关系图。
由此可见,分割器 PackageSpliter 继承了ChannelInboundHandlerAdapter。
本质上,它是一个入站处理器。
在 关于Netty的入站处理流程一文 Pipeline inbound 中, 我们已经知道,Netty的入站处理的顺序,是从pipelin 流水线的前面到后面。
由于在入站过程中,解码器 ProtobufDecoder 进行应用层 protobuf 的数据包的解码,而在此之前,必须完成应用包的正确分割。
所以, 分割器 PackageSpliter 必须处于入站流水线处理的第一站,放在最前面。
题外话, PackageSpliter 分割器 和 ProtobufEncoder 编码器 是否有关系呢?
从流水线处理的角度来说,是没有次序关系的。
PackageSpliter 是入站处理器。 在入站流程中用到。
ProtobufEncoder 是出站处理器,在出站流程中用到。
特别提示一下: 发送端不存在粘包和半包问题。这是接收端的事情。
总之,在出站和入站处理流程上,分割器 PackageSpliter 和 编码器ProtobufEncoder , 没有半毛钱关系的。
至此为止,终于完成了 Java 聊天程序【 亿级流量】实战的一些基础开发工作。
包括了协议的编码解码。包括了粘包和半包的拆包处理。
大家好,我是作者尼恩。 为大家预告一下接下来的工作:
下一步,基本上可以开始[ 疯狂创客圈 IM] 聊天器的正式设计和开发的详细讲解了。
Java (Netty) 聊天程序【 亿级流量】实战 开源项目实战
Netty 源码、原理、JAVA NIO 原理
Java 面试题 一网打尽
疯狂创客圈 【 博客园 总入口 】
标签:lin 项目 new 设计 完美 抽象 strong version 流程
原文地址:https://www.cnblogs.com/crazymakercircle/p/9941658.html