码迷,mamicode.com
首页 > 其他好文 > 详细

[编织消息框架][设计协议]解决粘包半包(下)

时间:2017-03-14 13:46:41      阅读:426      评论:0      收藏:0      [点我收藏+]

标签:his   限制   state   case   slice   选项   expec   int   abstract   

接下来介绍netty如何切割分包

学习目的,了解处理业务,方便以后脱离依赖

读者如果不感兴趣或看不懂可以先忽略,难度比较大

LengthFieldBasedFrameDecoder.class

    public LengthFieldBasedFrameDecoder(
            ByteOrder byteOrder,     //大小端模式 默认大端 ByteOrder BIG_ENDIAN
            int maxFrameLength,      //包Frame netty叫帧概念 最大上限
            int lengthFieldOffset,     //包长度信息偏移多少bytes
            int lengthFieldLength,    //包长度单位为bytes
            int lengthAdjustment,     //包附加信息占多少位,如包尾部checksum 也可以不用设置
            int initialBytesToStrip,//忽悠包头信息,返给上层时会去掉这部份bytes
            boolean failFast)        //解包出错,控制抛异常,默认为true 无需关心这选项

解读netty源码

分四部份

1.netty解码介绍

2.边界判断

3.计算逻辑

4.切割包

第一部份

先看下LengthFieldBasedFrameDecoder继承类之间的关系

技术分享

ByteToMessageDecoder处理比较复杂,先不考虑

其实解码只要工作方法是

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
//公开调用方法 out 是解码后保存返回,为什么是个数组?原因有可能出现粘包情况多次解码,合并结果一次返回上层业务
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    //调用实现解码方法
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}

提示:如果自己实现解码 继承ByteToMessageDecoder类,逻辑写在decode(ChannelHandlerContext ctx, ByteBuf in)方法即可

第三部份

技术分享
 1     //计算包长度偏移坐标 已读坐标+lengthFieldOffset参数
 2     int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
 3     //算出包长度
 4     long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
 5     
 6     //包实际长度  加上开始忽悠的 lengthFieldEndOffset参数 加上 lengthAdjustment 参数
 7     frameLength += lengthAdjustment + lengthFieldEndOffset;
 8     
 9     //跳过忽悠头部 initialBytesToStrip参数
10     in.skipBytes(initialBytesToStrip);
11         
12     //length 单位是byte 如2 是占一个short长度 4占length长度 8占long长度
13     protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
14         //转换大小端模式
15         buf = buf.order(order);
16         long frameLength;
17         switch (length) {
18         case 1:
19             frameLength = buf.getUnsignedByte(offset);
20             break;
21         case 2:
22             frameLength = buf.getUnsignedShort(offset);
23             break;
24         case 3:
25             frameLength = buf.getUnsignedMedium(offset);
26             break;
27         case 4:
28             frameLength = buf.getUnsignedInt(offset);
29             break;
30         case 8:
31             frameLength = buf.getLong(offset);
32             break;
33         default:
34             throw new DecoderException(
35                     "unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
36         }
37         return frameLength;
38     }
计算逻辑

第四部份

技术分享
 1 int frameLengthInt = (int) frameLength;
 2     // extract frame
 3     int readerIndex = in.readerIndex();
 4     //计算实际包长度 上面逻辑已跑过 in.skipBytes(initialBytesToStrip) 所以要减去 initialBytesToStrip
 5     int actualFrameLength = frameLengthInt - initialBytesToStrip;
 6     //切割包 
 7     ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
 8     //记录已读坐标
 9     in.readerIndex(readerIndex + actualFrameLength);
10         
11         
12     //切割实现,直接调用ByteBuf 方法切割
13     protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
14         return buffer.retainedSlice(index, length);
15     }
16     
17     AbstractUnpooledSlicedByteBuf.class
18     //深追ByteBuf切割实现,发现返回的ByteBuf只是保留引用,只是简单记录了下offsetStart adjustment writerIndex
19     //如果内存吃紧ByteBuf回收会产生意想不到bug
20     AbstractUnpooledSlicedByteBuf(ByteBuf buffer, int index, int length) {
21         super(length);
22         checkSliceOutOfBounds(index, length, buffer);
23 
24         if (buffer instanceof AbstractUnpooledSlicedByteBuf) {
25             this.buffer = ((AbstractUnpooledSlicedByteBuf) buffer).buffer;
26             adjustment = ((AbstractUnpooledSlicedByteBuf) buffer).adjustment + index;
27         } else if (buffer instanceof DuplicatedByteBuf) {
28             this.buffer = buffer.unwrap();
29             adjustment = index;
30         } else {
31             this.buffer = buffer;
32             adjustment = index;
33         }
34 
35         initLength(length);
36         writerIndex(length);
37     }
切割包

第二部份

现在回过头来看边界判断,主要是看处理出错逻辑部份

技术分享
 1 long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
 2         //解码出错
 3         if (frameLength < 0) {
 4             //无法计算包内容时netty策略选择跳过 包头信息跟包长度大小
 5             in.skipBytes(lengthFieldEndOffset);
 6             throw new CorruptedFrameException(
 7                     "negative pre-adjustment length field: " + frameLength);
 8         }
 9 
10          frameLength += lengthAdjustment + lengthFieldEndOffset;
11         //奇怪的逻辑,上面已加上 lengthFieldEndOffset 除非 lengthAdjustment 参数或 lengthFieldEndOffset参数设置为负值
12         //一般没什么复杂情况最好不要设置负值
13         if (frameLength < lengthFieldEndOffset) {
14             in.skipBytes(lengthFieldEndOffset);
15             throw new CorruptedFrameException(
16                     "Adjusted frame length (" + frameLength + ") is less " +
17                     "than lengthFieldEndOffset: " + lengthFieldEndOffset);
18         }
19         
20         // never overflows because it‘s less than maxFrameLength
21         int frameLengthInt = (int) frameLength;
22         //缓冲剩余可读bytes少于包长度,说明出现半包情况,忽略处理
23         if (in.readableBytes() < frameLengthInt) {
24             return null;
25         }
26         //如果包长度少于 initialBytesToStrip 参数会抛异常并且跳过包内容
27         //没什么特殊情况最好不要设置initialBytesToStrip参数
28         if (initialBytesToStrip > frameLengthInt) {
29             in.skipBytes(frameLengthInt);
30             throw new CorruptedFrameException(
31                     "Adjusted frame length (" + frameLength + ") is less " +
32                     "than initialBytesToStrip: " + initialBytesToStrip);
33         }
基本处理
技术分享
 1         //包长度大于 maxFrameLength
 2         //看下netty处理策略
 3         if (frameLength > maxFrameLength) {
 4             //算出超出范围
 5             long discard = frameLength - in.readableBytes();
 6             //记录包长度
 7             tooLongFrameLength = frameLength;
 8 
 9             //有两种情况处理
10             
11             //存在粘包情况,直接 in.skipBytes((int) frameLength);跳过当前包 什么也不用记录
12             if (discard < 0) {
13                 // buffer contains more bytes then the frameLength so we can discard all now
14                 in.skipBytes((int) frameLength);
15             } else {
16                 //当 discard>=0 
17                 //说明没存在粘包情况,有可能出现半包情况,in.skipBytes(in.readableBytes()); 清空所有接收数据
18                 //设置discardingTooLongFrame,bytesToDiscard记录超出范围,下次解码时用上
19                 // Enter the discard mode and discard everything received so far.
20                 discardingTooLongFrame = true;
21                 bytesToDiscard = discard;
22                 in.skipBytes(in.readableBytes());
23             }
24             failIfNecessary(true);
25             return null;
26         }
27         
28         //当上次包出现超过最大包限制,要忽悠上个包半包情况
29         if (discardingTooLongFrame) {
30             long bytesToDiscard = this.bytesToDiscard;
31             //剩余半包长度
32             int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
33             //忽略剩余半包长度
34             in.skipBytes(localBytesToDiscard);
35             //记录忽略已忽略多少bytes
36             bytesToDiscard -= localBytesToDiscard;
37             this.bytesToDiscard = bytesToDiscard;
38             //如果this.bytesToDiscard==0 说明已全部忽略上一个包 会重置 discardingTooLongFrame
39             failIfNecessary(false);
40         }
41         
42     private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
43         if (bytesToDiscard == 0) {
44             // Reset to the initial state and tell the handlers that
45             // the frame was too large.
46             long tooLongFrameLength = this.tooLongFrameLength;
47             this.tooLongFrameLength = 0;
48             discardingTooLongFrame = false;
49             if (!failFast ||
50                 failFast && firstDetectionOfTooLongFrame) {
51                 fail(tooLongFrameLength);
52             }
53         } else {
54             // Keep discarding and notify handlers if necessary.
55             if (failFast && firstDetectionOfTooLongFrame) {
56                 fail(tooLongFrameLength);
57             }
58         }
59     }
大于maxFrameLength策略

当大于maxFrameLength 处理核心忽略当前包,如果存在粘包直接忽略,如果存在半包记录剩余包大小bytesToDiscard,下次循环解码时不停跳过bytesToDiscard

 

关于分包处理

为何需要分包?比如要传送10G大小的文件,如果把数据全读进内存再发送出去,明显是不合理的,这时要把10G大小的数据切割分批读,分批发送出去

为什么不做在应用协议里?因业太多业务上的逻辑,底层是无法控制的,再说是特定的功能,没必要添加支持,看作成业务上一个小功能实现就行了

[编织消息框架][设计协议]解决粘包半包(下)

标签:his   限制   state   case   slice   选项   expec   int   abstract   

原文地址:http://www.cnblogs.com/solq111/p/6542239.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!