标签:
本来我打算单独开一章,专门说明粘包和断包,但是觉得这个事儿我在做的时候挺头疼的,但是对于别人或许不那么重要,于是就在这里写吧。
那么何谓粘包、何谓断包呢?
粘包:我们知道客户端在写入报文给服务端的时候,首先要将需要写入的内容写入Buffer,以ByteBuffer为例,如果你Buffer定义的足够大,并且你发送的报文足够快,此时就会产生粘包现象,举例来说 你发送一个 报文“ M|A”,然后你有发送了一个“M|B”,如果产生粘包,服务端从缓冲区里面读出的就是“M|AM|B”,这样的字符串;也就是说,客户端的第一条报文和第二条报文粘在了一起。
断包:断包往往是在粘包之后产生的,按照刚才的例子,假设你的缓冲区大小设置为4(当然没人会设置这么小的缓冲区,举例子,凑合看吧),如果你发送的报文足够快,就会产生发送给服务器的报文变为这样:第一个包“M|AM”,第二个包“|B”
在大多数的NIO例子中,均不包括此过程的处理,而且很多的例子中也不会浮现这个情况,甚至程序上线,如果系统压力不大,这样的情况都出现的很少(尤其是断包)。值得庆幸的是,这两种情况,我均重现了,我在客户端不做任何停顿的情况下,for循环发送10万条报文给服务端,当我的缓冲区服务端缓冲区设置为4096,客户端缓冲区设置为1024的时候,出现的频率还是蛮高的,可以加大缓冲区来减少断包的情况发生,但是不能避免,粘包则是必然发生的。
好、我回答在第7小点中提到的问题,为什么要在通讯协议的外层在加上四位?这四位就是用来标记我报文指令的长度的,一旦我知道了这个长度,我就可以根据长度对断包和粘包进行相关的处理。具体代码如下:
/** * 处理断包和粘包现象 * * @param socketChannel * @param byteBuffer */ private void handlePacket(SocketChannel socketChannel, ByteBuffer byteBuffer) { //标记读取缓冲区起始位置 int location = 0; //如果缓冲区从0到limit的数量大于包体大小标记数字 while (byteBuffer.remaining() > PACKET_HEAD_LENGTH) { //包体大小标记 String strBsize; //如果endPacket的字节length大于0,则证明:断包的前一截为包含包头和包体的; if (endPacketStr.getBytes().length > 0) { String strPacket = endPacketStr.substring(PACKET_HEAD_LENGTH) + new String(byteBuffer.array(), 0, remainBodySize); byteBuffer.position(remainBodySize); location = remainBodySize; // if(logger.isDebugEnabled()) { logger.info("【断包处理】(包含包体)合并后的报文:" + strPacket + ",缓冲区的position:" + location); // } offerPacket(socketChannel, strPacket); //处理完毕,清理断包的前一截,以便于下次使用; endPacketStr = ""; //清理后一截报文的字节数标记; remainBodySize = 0; continue; //如果endBufferStr的字节length大于0,则证明:断包的前一截仅包含包头或包头的一部分,不包含包体; } else if (endBufferStr.getBytes().length > 0) { strBsize = (new StringBuffer(endBufferStr).append(new String(byteBuffer.array(), location, PACKET_HEAD_LENGTH - endBufferStr.getBytes().length))).toString(); //移动缓冲区position byteBuffer.position(PACKET_HEAD_LENGTH - endBufferStr.getBytes().length); location = byteBuffer.position(); //得到包体大小 int byteBufferSize = Integer.parseInt(strBsize.trim()); //进行报文合并,把保存的仅包含包头或包头一部分的前一截与后一截合并 String strPacket = endBufferStr + (new String(byteBuffer.array(), PACKET_HEAD_LENGTH - endBufferStr.getBytes().length, byteBufferSize)); byteBuffer.position(location + byteBufferSize);//将缓冲区的位置移动到下一个包体大小标记位置 location = byteBuffer.position(); logger.info("【断包处理】(不包含包体)合并后的报文:" + strPacket + ",缓冲区的position:" + location); offerPacket(socketChannel, strPacket); endBufferStr = ""; continue; //进入正常处理(规范的报文处理,不考虑断包) } else { strBsize = new String(byteBuffer.array(), location, PACKET_HEAD_LENGTH); //移动缓冲区position byteBuffer.position(location + PACKET_HEAD_LENGTH); } if (logger.isDebugEnabled()) { logger.debug("收到客户端包体大小:" + strBsize + ",查看position变化:" + byteBuffer.position()); } //得到包体大小 int byteBufferSize = Integer.parseInt(strBsize.trim()); //如果从缓冲区当前位置到limit大于包体大小,证明粘包了,进行包体处理。等于则为正常包体,不存在粘包现象。 if (byteBuffer.remaining() >= byteBufferSize) { String strPacket = endBufferStr + (new String(byteBuffer.array(), PACKET_HEAD_LENGTH + location, byteBufferSize)); byteBuffer.position(location + PACKET_HEAD_LENGTH + byteBufferSize);//将缓冲区的位置移动到下一个包体大小标记位置 if (logger.isDebugEnabled()) { logger.debug("收到客户端包体内容:" + strPacket + ",2查看position变化:" + byteBuffer.position()); } //将字符串报文封装为类 offerPacket(socketChannel, strPacket); location = byteBuffer.position();//设定读取缓冲区起始位置 //如果缓冲区当前位置到limit小于包体,证明断包了,进行断包处理 } else { endPacketStr = new String(byteBuffer.array(), location, byteBuffer.limit() - location); remainBodySize = Integer.parseInt(endPacketStr.substring(0, PACKET_HEAD_LENGTH).trim()) - endPacketStr.getBytes().length + PACKET_HEAD_LENGTH; //已经找到断包前半截,所以把整个buffer的position调整至最后,不再处理。等待新的key进入 byteBuffer.position(byteBuffer.limit()); logger.info("处理断包仅包含完整包头的尾部报文,缓冲区位置:" + location + ",缓冲区limit:" + byteBuffer.limit() + ",包含完全包头的剩余字符:" + endPacketStr + ",bodySize:" + remainBodySize); } } //处理仅包含包头前一截的报文; if (byteBuffer.remaining() > 0) { //缓冲区中剩余的仅包含包头前一截的报文 endBufferStr = new String(byteBuffer.array(), location, byteBuffer.limit() - location); logger.info("处理断包仅包含包头前一截的尾部报文,缓冲区位置:" + location + ",缓冲区limit:" + byteBuffer.limit() + ",不包含完全包头的剩余字符:" + endBufferStr); //移动缓冲区指针到最后,代表已经保存了前一截报文,无需再进行处理; byteBuffer.position(byteBuffer.limit()); } //我也不知道这是否有用,能不能释放内存资源 byteBuffer.clear(); }
这块儿很可能有不合理的地方,因为对于一个接近40岁的程序员来说,逻辑在头脑中已经比较混乱了。我知道要对如下几种情况进行处理:
1、粘包,粘包比较好处理,主要是根据包头的前四位,确定包体的大小,然后移动buffer的位置(position),把整个包读出来放入队列就行了;
2、断包:断包分为两种情况,第一种从包头开始就断了,这是你无法获得包体大小,需要把前面的一截保存起来,就必须等下一个报文来了之后,把他们连在一起,然后再做处理;第二种,已经读到完整的包头,仍然需要把前面一截保存起来,确定后面还有多少,然后再处理;我利用了三个类成员:
//断包处理,前一截包含完整包头; private String endPacketStr = ""; //断包处理,前一截不包含完整包头; private String endBufferStr = ""; //断包处理,前一截包含完整包头时,包体的大小标记; private int remainBodySize = 0;
注意这些类的成员需要在使用后,清空,以便于下次使用,否则就乱套了。这块儿代码,我写完就没再看过,挺费神。如果有人能提供更好地办法,不胜感激。
基于NIO的消息路由的实现(四) 服务端通讯主线程(2)断包和粘包的处理
标签:
原文地址:http://my.oschina.net/u/2397619/blog/494159