标签:ide 局部变量 限制 粘包 expand param getc 非阻塞 抽象方法
TCP 是一个“流”协议,所谓流,就是没有界限的一长串二进制数据。TCP 作为传输层协议并不了解上层业务数据的具体含义,它会根据TCP 缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP 拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP 粘包和拆包问题。同样, 在Netty 的编码器中, 也会对半包和粘包问题做相应的处理。什么是半包, 顾名思义, 就是不完整的数据包, 因为netty 在轮询读事件的时候, 每次将channel 中读取的数据, 不一定是一个完整的数据包, 这种情况, 就叫半包。粘包同样也不难理解, 如果Client 往Server 发送数据包, 如果发送频繁很有可能会将多个数据包的数据都发送到通道中, 如果在server 在读取的时候可能会读取到超过一个完整数据包的长度, 这种情况叫粘包。有关半包和粘包, 如下图所示:
由于底层的TCP 无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决。业界的主流协议的解决方案,可以归纳如下:
Netty 对半包的或者粘包的处理其实也很简单, 通过之前的学习, 我们知道, 每个handler 是和channel 唯一绑定的, 一个handler 只对应一个channel(初始化Channel中config的时候初始化), 所以将channel 中的数据读取时候经过解析, 如果不是一个完整的数据包, 则解析失败, 将这块数据包进行保存, 等下次解析时再和这个数据包进行组装解析, 直到解析到完整的数据包, 才会将数据包进行向下传递。
通常我们也习惯将编码(Encode)称为序列化(serialization),它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。反之,解码(Decode)/反序列化(deserialization)把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。进行远程跨进程服务调用时(例如RPC 调用),需要使用特定的编解码技术,对需要进行网络传输的对象做编码或者解码,以便完成远程调用。
作为一个高性能的异步、NIO 通信框架,编解码框架是Netty 的重要组成部分。尽管站在微内核的角度看,编解码框架并不是Netty 微内核的组成部分,但是通过ChannelHandler 定制扩展出的编解码框架却是不可或缺的。然而,我们已经知道在Netty 中,从网络读取的Inbound 消息,需要经过解码,将二进制的数据报转换成应用层协议消息或者业务消息,才能够被上层的应用逻辑识别和处理;同理,用户发送到网络的Outbound 业务消息,需要经过编码转换成二进制字节数组(对于Netty 就是ByteBuf)才能够发送到网络对端。编码和解码功能是NIO 框架的有机组成部分,无论是由业务定制扩展实现,还是NIO 框架内置编解码能力,该功能是必不可少的。
为了降低用户的开发难度,Netty 对常用的功能和API 做了装饰,以屏蔽底层的实现细节。编解码功能的定制,对于熟悉Netty 底层实现的开发者而言,直接基于ChannelHandler 扩展开发,难度并不是很大。但是对于大多数初学者或者不愿意去了解底层实现细节的用户,需要提供给他们更简单的类库和API,而不是ChannelHandler。Netty 在这方面做得非常出色,针对编解码功能,它既提供了通用的编解码框架供用户扩展,又提供了常用的编解码类库供用户直接使用。在保证定制扩展性的基础之上,尽量降低用户的开发工作量和开发门槛,提升开发效率。Netty 预置的编解码功能列表如下:Base64、Protobuf、JBoss Marshalling、Spdy 等。
Netty 默认提供了多个解码器,可以进行分包的操作,满足99%的编码需求。
使用NIO 进行网络编程时,往往需要将读取到的字节数组或者字节缓冲区解码为业务可以使用的POJO 对象。为了方便业务将ByteBuf 解码成业务POJO 对象,Netty 提供了ByteToMessageDecoder 抽象工具解码类。用户自定义解码器继承ByteToMessageDecoder,只需要实现void decode(ChannelHandler Context ctx, ByteBuf in,List<Object> out)抽象方法即可完成ByteBuf 到POJO 对象的解码。
由于ByteToMessageDecoder 并没有考虑TCP 粘包和拆包等场景,用户自定义解码器需要自己处理“读半包”问题。正因为如此,大多数场景不会直接继承ByteToMessageDecoder,而是继承另外一些更高级的解码器来屏蔽半包的处理。实际项目中,通常将LengthFieldBasedFrameDecoder 和ByteToMessageDecoder 组合使用,前者负责将网络读取的数据报解码为整包消息,后者负责将整包消息解码为最终的业务对象。除了和其它解码器组合形成新的解码器之外,ByteToMessageDecoder 也是很多基础解码器的父类,它的继承关系如下图所示:
下面我们来看源码,ByteToMessageDecoder 类的定义:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { ...... }
从源码中可以看出,ByteToMessageDecoder 继承了ChannelInboundHandlerAdapter, 根据之前的学习, 我们知道,这是个inbound 类型的handler, 也就是处理流向自身事件的handler。其次,该类通过abstract 关键字修饰, 说明是个抽象类, 在我们实际使用的时候, 并不是直接使用这个类, 而是使用其子类, 类定义了解码器的骨架方法, 具体实现逻辑交给子类, 同样, 在半包处理中也是由该类进行实现的。Netty 中很多解码器都实现了这个类, 并且, 我们也可以通过实现该类进行自定义解码器。
我们重点关注一下该类的cumulation 这个属性(ByteBuf类型), 它就是有关半包处理的关键属性, 从概述中我们知道,Netty 会将不完整的数据包进行保存, 这个数据包就是保存在这个属性中。之前的学习我们知道, ByteBuf 读取完数据会传递channelRead 事件, 传播过程中会调用handler 的channelRead 方法, ByteToMessageDecoder 的channelRead方法, 就是编码的关键部分。我们来看其channelRead()方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //如果message 是byteBuf 类型 if (msg instanceof ByteBuf) { //简单当成一个arrayList, 用于盛放解析到的对象 CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; //当前累加器为空, 说明这是第一次从io 流里面读取数据 first = cumulation == null; if (first) { //如果是第一次, 则将累加器赋值为刚读进来的对象 cumulation = data; } else { //如果不是第一次, 则把当前累加的数据和读进来的数据进行累加 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); }
//调用子类的方法进行解析 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { numReads = 0; discardSomeReadBytes(); }
//记录list 长度 int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); //向下传播 fireChannelRead(ctx, out, size); out.recycle(); } } else { //不是byteBuf 类型则向下传播 ctx.fireChannelRead(msg); } }
这方法比较长, 我带大家一步步剖析。首先判断如果传来的数据是ByteBuf, 则进入if 块中,CodecOutputList out =CodecOutputList.newInstance() 这里就当成一个ArrayList 就好, 用于保存解码完成的数据ByteBuf data = (ByteBuf)msg 这步将数据转化成ByteBuf;first = cumulation == null 表示如果cumulation == null, 说明没有存储半包数据,则将当前的数据保存在属性cumulation 中;如果cumulation != null , 说明存储了半包数据, 则通过cumulator.cumulate(ctx.alloc(), cumulation, data)将读取到的数据和原来的数据进行累加, 保存在属性cumulation 中,我们看cumulator 属性的定义:
private Cumulator cumulator = MERGE_CUMULATOR;
这里调用了其静态属性MERGE_CUMULATOR, 我们跟进去:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer; //不能到过最大内存 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1) { buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; }
//将当前数据buffer buffer.writeBytes(in); in.release(); return buffer; } };
这里创建了Cumulator 类型的静态对象, 并重写了cumulate()方法, 这个cumulate()方法, 就是用于将ByteBuf 进行拼接的方法。在方法中, 首先判断cumulation 的写指针+in 的可读字节数是否超过了cumulation 的最大长度, 如果超过了, 将对cumulation 进行扩容, 如果没超过, 则将其赋值到局部变量buffer 中。然后,将in 的数据写到buffer 中, 将in 进行释放, 返回写入数据后的ByteBuf。回到channelRead()方法:最后调用callDecode(ctx, cumulation, out)方法进行解码, 这里传入了Context 对象, 缓冲区cumulation 和集合out。我们跟进到callDecode(ctx, cumulation, out)方法:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { //只要累加器里面有数据 while (in.isReadable()) { int outSize = out.size(); //判断当前List 是否有对象 if (outSize > 0) { //如果有对象, 则向下传播事件 fireChannelRead(ctx, out, outSize); //清空当前list out.clear(); //解码过程中如ctx 被removed 掉就break if (ctx.isRemoved()) { break; }
outSize = 0; }
//当前可读数据长度 int oldInputLength = in.readableBytes(); //子类实现 //子类解析, 解析玩对象放到out 里面 decode(ctx, in, out); if (ctx.isRemoved()) { break; }
//List 解析前大小和解析后长度一样(什么没有解析出来) if (outSize == out.size()) { //原来可读的长度==解析后可读长度 //说明没有读取数据(当前累加的数据并没有拼成一个完整的数据包) if (oldInputLength == in.readableBytes()) { //跳出循环(下次在读取数据才能进行后续的解析) break; } else { //没有解析到数据, 但是进行读取了 continue; } }
//out 里面有数据, 但是没有从累加器读取数据 if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } }
首先循环判断传入的ByteBuf 是否有可读字节, 如果还有可读字节说明没有解码完成, 则循环继续解码。然后判断集合out 的大小, 如果大小大于1, 说明out 中盛放了解码完成之后的数据, 然后将事件向下传播, 并清空out。因为我们第一次解码out 是空的, 所以这里不会进入if 块, 这部分我们稍后分析, 所以继续往下看,通过int oldInputLength =in.readableBytes() 获取当前ByteBuf, 其实也就是属性cumulation 的可读字节数, 这里就是一个备份用于比较。我们继续往下看,decode(ctx, in, out)方法是最终的解码操作, 这部会读取cumulation 并且将解码后的数据放入到集合out中, 在ByteToMessageDecoder 中的该方法是一个抽象方法, 让子类进行实现, 我们使用的netty 很多的解码都是继承了ByteToMessageDecoder 并实现了decode 方法从而完成了解码操作, 同样我们也可以遵循相应的规则进行自定义解码器, 在之后的小节中会讲解netty 定义的解码器, 并剖析相关的实现细节。继续往下看,if (outSize == out.size()) 这个判断表示解析之前的out 大小和解析之后out 大小进行比较, 如果相同, 说明并没有解析出数据, 我们进入到if 块中。if (oldInputLength == in.readableBytes()) 表示cumulation 的可读字节数在解析之前和解析之后是相同的, 说明解码方法中并没有解析数据, 也就是当前的数据并不是一个完整的数据包, 则跳出循环, 留给下次解析, 否则, 说明没有解析到数据, 但是读取了, 所以跳过该次循环进入下次循环。最后判断if (oldInputLength == in.readableBytes()) , 这里代表out 中有数据, 但是并没有从cumulation 读数据, 说明这个out 的内容是非法的, 直接抛出异常。现在回到channRead()方法,我们关注finally 代码块中的内容:
finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { numReads = 0; discardSomeReadBytes(); }
//记录list 长度 int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); //向下传播 fireChannelRead(ctx, out, size); out.recycle(); }
首先判断cumulation 不为null, 并且没有可读字节, 则将累加器进行释放, 并设置为null,之后记录out 的长度, 通过fireChannelRead(ctx, out, size)将channelRead 事件进行向下传播, 并回收out 对象。我们跟到fireChannelRead(ctx,out, size)方法来看代码:
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) { //遍历List for (int i = 0; i < numElements; i ++) { //逐个向下传递 ctx.fireChannelRead(msgs.getUnsafe(i)); } }
这里遍历out 集合, 并将里面的元素逐个向下传递,以上就是有关解码的骨架逻辑。
LineBasedFrameDecoder 是回车换行解码器,如果用户发送的消息以回车换行符(以\r\n 或者直接以\n 结尾)作为消息结束的标识,则可以直接使用Netty 的LineBasedFrameDecoder 对消息进行解码,只需要在初始化Netty 服务端或者客户端时将LineBasedFrameDecoder 正确的添加到ChannelPipeline 中即可,不需要自己重新实现一套换行解码器。LineBasedFrameDecoder 的工作原理是它依次遍历ByteBuf 中的可读字节,判断看是否有“\n”或者“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。防止由于数据报没有携带换行符导致接收到ByteBuf 无限制积压,引起系统内存溢出。它的使用效果如下:
通常情况下,LineBasedFrameDecoder 会和StringDecoder 配合使用,组合成按行切换的文本解码器,对于文本类协议的解析,文本换行解码器非常实用,例如对HTTP 消息头的解析、FTP 协议消息的解析等。下面我们简单给出文本换行解码器的使用示例:
pipeline.addLast(new LineBasedFrameDecoder(1024)); pipeline.addLast(new StringDecoder());
初始化Channel 的时候,首先将LineBasedFrameDecoder 添加到ChannelPipeline 中,然后再依次添加字符串解码器StringDecoder,业务Handler。接下来,我们来看LineBasedFrameDecoder 的源码,LineBasedFrameDecoder 也继承了ByteToMessageDecoder。首先看其参数定义:
//数据包的最大长度, 超过该长度会进行丢弃模式 private final int maxLength; //超出最大长度是否要抛出异常 private final boolean failFast; //最终解析的数据包是否带有换行符 private final boolean stripDelimiter; //为true 说明当前解码过程为丢弃模式 private boolean discarding; //丢弃了多少字节 private int discardedBytes;
其中的丢弃模式, 我们会在源码中看到其中的含义,我们看其decode()方法:
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } }
这里的decode()方法调用重载的decode()方法, 并将解码后的内容放到out 集合中。我们跟到重载的decode()方法中:
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { //找这行的结尾 final int eol = findEndOfLine(buffer); if (!discarding) { if (eol >= 0) { final ByteBuf frame; //计算从换行符到可读字节之间的长度 final int length = eol - buffer.readerIndex(); //拿到分隔符长度, 如果是\r\n 结尾, 分隔符长度为2 final int delimLength = buffer.getByte(eol) == ‘\r‘? 2 : 1; //如果长度大于最大长度 if (length > maxLength) { //指向换行符之后的可读字节(这段数据完全丢弃) buffer.readerIndex(eol + delimLength); //传播异常事件 fail(ctx, length); return null; }
//如果这次解析的数据是有效的 //分隔符是否算在完整数据包里 //true 为丢弃分隔符 if (stripDelimiter) { //截取有效长度 frame = buffer.readRetainedSlice(length); //跳过分隔符的字节 buffer.skipBytes(delimLength); } else { //包含分隔符 frame = buffer.readRetainedSlice(length + delimLength); } return frame; } else { //如果没找到分隔符(非丢弃模式) //可读字节长度 final int length = buffer.readableBytes(); //如果朝超过能解析的最大长度 if (length > maxLength) { //将当前长度标记为可丢弃的 discardedBytes = length; //直接将读指针移动到写指针 buffer.readerIndex(buffer.writerIndex()); //标记为丢弃模式 discarding = true; //超过最大长度抛出异常 if (failFast) { fail(ctx, "over " + discardedBytes); } }
//没有超过, 则直接返回 return null; } } else { //丢弃模式 if (eol >= 0) { //找到分隔符 //当前丢弃的字节(前面已经丢弃的+现在丢弃的位置-写指针) final int length = discardedBytes + eol - buffer.readerIndex(); //当前换行符长度为多少 final int delimLength = buffer.getByte(eol) == ‘\r‘? 2 : 1; //读指针直接移到换行符+换行符的长度 buffer.readerIndex(eol + delimLength); //当前丢弃的字节为0 discardedBytes = 0; //设置为未丢弃模式 discarding = false; //丢弃完字节之后触发异常 if (!failFast) { fail(ctx, length); } } else { //累计已丢弃的字节个数+当前可读的长度 discardedBytes += buffer.readableBytes(); //移动 buffer.readerIndex(buffer.writerIndex()); }
return null; } }
final int eol = findEndOfLine(buffer) 这里是找当前行的结尾的索引值, 也就是\r\n 或者是\n:
从上图中不难看出, 如果是以\n 结尾的, 返回的索引值是\n 的索引值, 如果是\r\n 结尾的, 返回的索引值是\r 的索引值我们看findEndOfLine(buffer)方法:
private static int findEndOfLine(final ByteBuf buffer) { //找到/n 这个字节 int i = buffer.forEachByte(ByteProcessor.FIND_LF); //如果找到了, 并且前面的字符是-r, 则指向/r 字节 if (i > 0 && buffer.getByte(i - 1) == ‘\r‘) { i--; }
return i; }
从上面代码看到,通过一个forEachByte()方法找\n 这个字节, 如果找到了, 并且前面是\r, 则返回\r 的索引, 否则返回\n 的索引。回到重载的decode()方法,if (!discarding) 判断是否为非丢弃模式, 默认是就是非丢弃模式, 所以进入if中;if (eol >= 0) 如果找到了换行符, 我们看非丢弃模式下找到换行符的相关逻辑:
final ByteBuf frame; //计算从换行符到可读字节之间的长度 final int length = eol - buffer.readerIndex(); //拿到分隔符长度, 如果是\r\n 结尾, 分隔符长度为2 final int delimLength = buffer.getByte(eol) == ‘\r‘? 2 : 1; //如果长度大于最大长度 if (length > maxLength) { //指向换行符之后的可读字节(这段数据完全丢弃) buffer.readerIndex(eol + delimLength); //传播异常事件 fail(ctx, length); return null; }
//如果这次解析的数据是有效的 //分隔符是否算在完整数据包里 //true 为丢弃分隔符 if (stripDelimiter) { //截取有效长度 frame = buffer.readRetainedSlice(length); //跳过分隔符的字节 buffer.skipBytes(delimLength); } else { //包含分隔符 frame = buffer.readRetainedSlice(length + delimLength); } return frame;
首先获得换行符到可读字节之间的长度, 然后拿到换行符的长度, 如果是\n 结尾, 那么长度为1, 如果是\r 结尾, 长度为2。if (length > maxLength) 带表如果长度超过最大长度, 则直接通过readerIndex(eol + delimLength) 这种方式, 将读指针指向换行符之后的字节, 说明换行符之前的字节需要完全丢弃。
丢弃之后通过fail 方法传播异常, 并返回null。继续往下看, 走到下一步, 说明解析出来的数据长度没有超过最大长度,说明是有效数据包。if (stripDelimiter) 表示是否要将分隔符放在完整数据包里面, 如果是true, 则说明要丢弃分隔符,然后截取有效长度, 并跳过分隔符长度,将包含分隔符进行截取。以上就是非丢弃模式下找到换行符的相关逻辑,我们再看非丢弃模式下没有找到换行符的相关逻辑, 也就是非丢弃模式下, if (eol >= 0) 中的else 块:
//如果没找到分隔符(非丢弃模式) //可读字节长度 final int length = buffer.readableBytes(); //如果超过能解析的最大长度 if (length > maxLength) { //将当前长度标记为可丢弃的 discardedBytes = length; //直接将读指针移动到写指针 buffer.readerIndex(buffer.writerIndex()); //标记为丢弃模式 discarding = true; //超过最大长度抛出异常 if (failFast) { fail(ctx, "over " + discardedBytes); } } //没有超过, 则直接返回 return null;
首先通过final int length = buffer.readableBytes() 获取所有的可读字节数。然后判断可读字节数是否超过了最大值,如果超过最大值, 则属性discardedBytes 标记为这个长度, 代表这段内容要进行丢弃。
buffer.readerIndex(buffer.writerIndex()) 这里直接将读指针移动到写指针, 并且将discarding 设置为true, 就是丢弃模式。如果可读字节没有超过最大长度, 则返回null, 表示什么都没解析出来, 等着下次解析。我们再看丢弃模式的处理逻辑, 也就是if (!discarding) 中的else 块。首先这里也分两种情况, 根据if (eol >= 0) 判断是否找到了分隔符, 我们首先看找到分隔符的解码逻辑:
//找到分隔符 //当前丢弃的字节(前面已经丢弃的+现在丢弃的位置-写指针) final int length = discardedBytes + eol - buffer.readerIndex(); //当前换行符长度为多少 final int delimLength = buffer.getByte(eol) == ‘\r‘? 2 : 1; //读指针直接移到换行符+换行符的长度 buffer.readerIndex(eol + delimLength); //当前丢弃的字节为0 discardedBytes = 0; //设置为未丢弃模式 discarding = false; //丢弃完字节之后触发异常 if (!failFast) { fail(ctx, length); }
如果找到换行符, 则需要将换行符之前的数据全部丢弃掉。
final int length = discardedBytes + eol - buffer.readerIndex() 这里获得丢弃的字节总数, 也就是之前丢弃的字节数+现在需要丢弃的字节数。然后计算换行符的长度, 如果是\n 则是1, \r\n 就是2。buffer.readerIndex(eol + delimLength)这里将读指针移动到换行符之后的位置,然后将discarding 设置为false, 表示当前是非丢弃状态。我们再看丢弃模式未找到换行符的情况, 也就是丢弃模式下, if (eol >= 0) 中的else 块:
//累计已丢弃的字节个数+当前可读的长度 discardedBytes += buffer.readableBytes(); //移动 buffer.readerIndex(buffer.writerIndex());
这里做的事情非常简单, 就是累计丢弃的字节数, 并将读指针移动到写指针, 也就是将数据全部丢弃。最后在丢弃模式下, decode()方法返回null, 代表本次没有解析出任何数据。以上就是行解码器的相关逻辑。
DelimiterBasedFrameDecoder 分隔符解码器, 是按照指定分隔符进行解码的解码器, 通过分隔符, 可以将二进制流拆分成完整的数据包。回车换行解码器实际上是一种特殊的DelimiterBasedFrameDecoder 解码器。分隔符解码器在实际工作中也有很广泛的应用,很多简单的文本私有协议,都是以特殊的分隔符作为消息结束的标识,特别是对于那些使用长连接的基于文本的私有协议。分隔符的指定:与大家的习惯不同,分隔符并非以char 或者string 作为构造参数,而是ByteBuf,下面我们就结合实际例子给出它的用法。假如消息以“$_”作为分隔符,服务端或者客户端初始化ChannelPipeline 的代码实例如下:
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); pipeline.addLast(new DelimiterBasedFrameDecoder(1024,delimiter)); pipeline.addLast(new StringDecoder());
首先将“$_”转换成ByteBuf 对象,作为参数构造DelimiterBasedFrameDecoder,将其添加到ChannelPipeline 中,然后依次添加字符串解码器(通常用于文本解码)和用户Handler,请注意解码器和Handler 的添加顺序,如果顺序颠倒,会导致消息解码失败。DelimiterBasedFrameDecoder 同样继承了ByteToMessageDecoder 并重写了decode()方法,我们来看其中的一个构造方法:
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) { this(maxFrameLength, true, delimiter); }
这里参数maxFrameLength 代表最大长度, delimiters 是个可变参数, 可以说可以支持多个分隔符进行解码。我们进入decode()方法:
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } }
这里同样调用了其重载的decode()方法并将解析好的数据添加到集合list 中, 其父类就可以遍历out, 并将内容传播。我们跟到重载decode()方法里面:
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { //行处理器(1) if (lineBasedDecoder != null) { return lineBasedDecoder.decode(ctx, buffer); }
int minFrameLength = Integer.MAX_VALUE; ByteBuf minDelim = null; //找到最小长度的分隔符(2) for (ByteBuf delim: delimiters) { //每个分隔符分隔的数据包长度 int frameLength = indexOf(buffer, delim); if (frameLength >= 0 && frameLength < minFrameLength) { minFrameLength = frameLength; minDelim = delim; } }
//解码(3) //已经找到分隔符 if (minDelim != null) { int minDelimLength = minDelim.capacity(); ByteBuf frame; //当前分隔符否处于丢弃模式 if (discardingTooLongFrame) { //首先设置为非丢弃模式 discardingTooLongFrame = false; //丢弃 buffer.skipBytes(minFrameLength + minDelimLength); int tooLongFrameLength = this.tooLongFrameLength; this.tooLongFrameLength = 0; if (!failFast) { fail(tooLongFrameLength); }
return null; }
//处于非丢弃模式 //当前找到的数据包, 大于允许的数据包 if (minFrameLength > maxFrameLength) { //当前数据包+最小分隔符长度全部丢弃 buffer.skipBytes(minFrameLength + minDelimLength); //传递异常事件 fail(minFrameLength); return null; }
//如果是正常的长度 //解析出来的数据包是否忽略分隔符 if (stripDelimiter) { //如果不包含分隔符 //截取 frame = buffer.readRetainedSlice(minFrameLength); //跳过分隔符 buffer.skipBytes(minDelimLength); } else { //截取包含分隔符的长度 frame = buffer.readRetainedSlice(minFrameLength + minDelimLength); } return frame; } else { //如果没有找到分隔符 //非丢弃模式 if (!discardingTooLongFrame) { //可读字节大于允许的解析出来的长度 if (buffer.readableBytes() > maxFrameLength) { //将这个长度记录下 tooLongFrameLength = buffer.readableBytes(); //跳过这段长度 buffer.skipBytes(buffer.readableBytes()); //标记当前处于丢弃状态 discardingTooLongFrame = true; if (failFast) { fail(tooLongFrameLength); } } } else { tooLongFrameLength += buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); }
return null; } }
这里的方法也比较长, 这里也通过拆分进行剖析:1、行处理器;2、找到最小长度分隔符;3、解码。首先看第1 步行处理器:
这里首先判断成员变量lineBasedDecoder 是否为空, 如果不为空则直接调用lineBasedDecoder 的decode 的方法进行解码, lineBasedDecoder 实际上就是上一小节剖析的LineBasedFrameDecoder 解码器。这个成员变量, 会在分隔符是\n 和\r\n 的时候进行初始化。我们看初始化该属性的构造方法:
public DelimiterBasedFrameDecoder( int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) { //代码省略 //如果是基于行的分隔 if (isLineBased(delimiters) && !isSubclass()) { //初始化行处理器 lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast); this.delimiters = null; } else { //代码省略 }
//代码省略 }
这里isLineBased(delimiters)会判断是否是基于行的分隔, 跟到isLineBased()方法中:
private static boolean isLineBased(final ByteBuf[] delimiters) { //分隔符长度不为2 if (delimiters.length != 2) { return false; }
//拿到第一个分隔符 ByteBuf a = delimiters[0]; //拿到第二个分隔符 ByteBuf b = delimiters[1]; if (a.capacity() < b.capacity()) { a = delimiters[1]; b = delimiters[0]; }
//确保a 是/r/n 分隔符, 确保b 是/n 分隔符 return a.capacity() == 2 && b.capacity() == 1 && a.getByte(0) == ‘\r‘ && a.getByte(1) == ‘\n‘ && b.getByte(0) == ‘\n‘; }
首先判断长度等于2, 直接返回false。然后拿到第一个分隔符a 和第二个分隔符b, 然后判断a 的第一个分隔符是不是\r, a 的第二个分隔符是不是\n, b 的第一个分隔符是不是\n, 如果都为true, 则条件成立。我们回到decode()方法中, 看第2 步, 找到最小长度的分隔符。这里最小长度的分隔符, 意思就是从读指针开始, 找到最近的分隔符:
for (ByteBuf delim: delimiters) { //每个分隔符分隔的数据包长度 int frameLength = indexOf(buffer, delim); if (frameLength >= 0 && frameLength < minFrameLength) { minFrameLength = frameLength; minDelim = delim; }
}
这里会遍历所有的分隔符, 然后找到每个分隔符到读指针到数据包长度。然后通过if 判断, 找到长度最小的数据包的长度, 然后保存当前数据包的的分隔符, 如下图:
这里假设A 和B 同为分隔符, A 分隔符到读指针的长度小于B 分隔符到读指针的长度, 这里会找到最小的分隔符A, 分隔符的最小长度, 就readIndex 到A 的长度。我们继续看第3 步, 解码。if (minDelim != null) 表示已经找到最小长度分隔符, 我们继续看if 块中的逻辑:
int minDelimLength = minDelim.capacity(); ByteBuf frame; //当前分隔符否处于丢弃模式 if (discardingTooLongFrame) { //首先设置为非丢弃模式 discardingTooLongFrame = false; //丢弃 buffer.skipBytes(minFrameLength + minDelimLength); int tooLongFrameLength = this.tooLongFrameLength; this.tooLongFrameLength = 0; if (!failFast) { fail(tooLongFrameLength); } return null; } //处于非丢弃模式 //当前找到的数据包, 大于允许的数据包 if (minFrameLength > maxFrameLength) { //当前数据包+最小分隔符长度全部丢弃 buffer.skipBytes(minFrameLength + minDelimLength); //传递异常事件 fail(minFrameLength); return null; } //如果是正常的长度 //解析出来的数据包是否忽略分隔符 if (stripDelimiter) { //如果不包含分隔符 //截取 frame = buffer.readRetainedSlice(minFrameLength); //跳过分隔符 buffer.skipBytes(minDelimLength); } else { //截取包含分隔符的长度 frame = buffer.readRetainedSlice(minFrameLength + minDelimLength); } return frame;
if (discardingTooLongFrame) 表示当前是否处于非丢弃模式, 如果是丢弃模式, 则进入if 块。因为第一个不是丢弃模式,所以这里先分析if 块后面的逻辑。if (minFrameLength > maxFrameLength) 这里是判断当前找到的数据包长度大于最大长度, 这里的最大长度是我们创建解码器的时候设置的, 如果超过了最大长度, 就通过buffer.skipBytes(minFrameLength + minDelimLength) 方式, 跳过数据包+分隔符的长度, 也就是将这部分数据进行完全丢弃。继续往下看, 如果长度不大最大允许长度, 则通过if (stripDelimiter) 判断解析的出来的数据包是否包含分隔符, 如果不包含分隔符, 则截取数据包的长度之后, 跳过分隔符。我们再回头看if (discardingTooLongFrame) 中的if 块中的逻辑, 也就是丢弃模式。首先将discardingTooLongFrame 设置为false, 标记非丢弃模式, 然后通过buffer.skipBytes(minFrameLength + minDelimLength) 将数据包+分隔符长度的字节数跳过, 也就是进行丢弃, 之后再进行抛出异常。分析完成了找到分隔符之后的丢弃模式非丢弃模式的逻辑处理, 我们在分析没找到分隔符的逻辑处理,也就是if (minDelim != null) 中的else 块:
//如果没有找到分隔符 //非丢弃模式 if (!discardingTooLongFrame) { //可读字节大于允许的解析出来的长度 if (buffer.readableBytes() > maxFrameLength) { //将这个长度记录下 tooLongFrameLength = buffer.readableBytes(); //跳过这段长度 buffer.skipBytes(buffer.readableBytes()); //标记当前处于丢弃状态 discardingTooLongFrame = true; if (failFast) { fail(tooLongFrameLength); } } } else { tooLongFrameLength += buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); } return null;
首先通过if (!discardingTooLongFrame) 判断是否为非丢弃模式, 如果是, 则进入if 块:。在if 块中, 首先通过if(buffer.readableBytes() > maxFrameLength) 判断当前可读字节数是否大于最大允许的长度, 如果大于最大允许的长度, 则将可读字节数设置到tooLongFrameLength 的属性中, 代表丢弃的字节数, 然后通过buffer.skipBytes(buffer.readableBytes()) 将累计器中所有的可读字节进行丢弃,最后将discardingTooLongFrame 设置为true, 也就是丢弃模式, 之后抛出异常。如果if (!discardingTooLongFrame) 为false, 也就是当前处于丢弃模式, 则追加tooLongFrameLength 也就是丢弃的字节数的长度, 并通过buffer.skipBytes(buffer.readableBytes()) 将所有的字节继续进行丢弃。以上就是分隔符解码器的相关逻辑。
FixedLengthFrameDecoder 固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP 的粘包/拆包等问题,非常实用。对于定长消息,如果消息实际长度小于定长,则往往会进行补位操作,它在一定程度上导致了空间和资源的浪费。但是它的优点也是非常明显的,编解码比较简单,因此在实际项目中仍然有一定的应用场景。利用FixedLengthFrameDecoder 解码器,无论一次接收到多少数据报,它都会按照构造函数中设置的固定长度进行解码,如果是半包消息,FixedLengthFrameDecoder 会缓存半包消息并等待下个包到达后进行拼包,直到读取到一个完整的包。假如单条消息的长度是20 字节,使用FixedLengthFrameDecoder 解码器的效果如下:
来看其类的定义:
public class FixedLengthFrameDecoder extends ByteToMessageDecoder { //长度大小 private final int frameLength; public FixedLengthFrameDecoder(int frameLength) { if (frameLength <= 0) { throw new IllegalArgumentException( "frameLength must be a positive integer: " + frameLength); }//保存当前frameLength this.frameLength = frameLength; } @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//通过ByteBuf 去解码.解码到对象之后添加到out 上 Object decoded = decode(ctx, in); if (decoded != null) {
//将解析到byteBuf 添加到对象里面 out.add(decoded); } } protected Object decode( @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//字节是否小于这个固定长度 if (in.readableBytes() < frameLength) { return null; } else {
//当前累加器中截取这个长度的数值 return in.readRetainedSlice(frameLength); } } }
我们看到FixedLengthFrameDecoder 类继承了ByteToMessageDecoder, 重写了decode()方法,这个类只有一个属性叫frameLength, 并在构造方法中初始化了该属性。再看decode()方法, 在decode()方法中又调用了自身另一个重载的decode()方法进行解析, 解析出来之后将解析后的数据放在集合out 中。再看重载的decode()方法,重载的decode()方法中首先判断累加器的字节数是否小于固定长度, 如果小于固定长度则返回null, 代表不是一个完整的数据包, 直接返回null。如果大于等于固定长度, 则直接从累加器中截取这个长度的数值in.readRetainedSlice(frameLength) 会返回一个新的截取后的ByteBuf, 并将原来的累加器读指针后移frameLength 个字节。如果累计器中还有数据, 则会通过ByteToMessageDecoder 中callDecode()方法里while 循环的方式, 继续进行解码。这样, 就是实现了固定长度的解码工作。
了解TCP 通信机制的该都知道TCP 底层的粘包和拆包,当我们在接收消息的时候,不能认为读取到的报文就是个整包消息,特别是对于采用非阻塞I/O 和长连接通信的程序。如何区分一个整包消息,通常有如下4 种做法:
大多数的协议(私有或者公有),协议头中会携带长度字段,用于标识消息体或者整包消息的长度,例如SMPP、HTTP协议等。由于基于长度解码需求的通用性, 以及为了降低用户的协议开发难度, Netty 提供了LengthFieldBasedFrameDecoder,自动屏蔽TCP 底层的拆包和粘包问题,只需要传入正确的参数,即可轻松解决“读半包“问题。
下面我们看看如何通过参数组合的不同来实现不同的“半包”读取策略。第一种常用的方式是消息的第一个字段是长度字段,后面是消息体,消息头中只包含一个长度字段。它的消息结构定义如图所示:
使用以下参数组合进行解码:
解码后的字节缓冲区内容如图所示:
通过ByteBuf.readableBytes()方法我们可以获取当前消息的长度,所以解码后的字节缓冲区可以不携带长度字段,由于长度字段在起始位置并且长度为2,所以将initialBytesToStrip 设置为2,参数组合修改为:
解码后的字节缓冲区内容如图所示:
解码后的字节缓冲区丢弃了长度字段,仅仅包含消息体,对于大多数的协议,解码之后消息长度没有用处,因此可以丢弃。在大多数的应用场景中,长度字段仅用来标识消息体的长度,这类协议通常由消息长度字段+消息体组成,如上图所示的几个例子。但是,对于某些协议,长度字段还包含了消息头的长度。在这种应用场景中,往往需要使用lengthAdjustment 进行修正。由于整个消息(包含消息头)的长度往往大于消息体的长度,所以,lengthAdjustment为负数。下图展示了通过指定lengthAdjustment 字段来包含消息头的长度:
解码之前的码流:
解码之后的码流:
由于协议种类繁多,并不是所有的协议都将长度字段放在消息头的首位,当标识消息长度的字段位于消息头的中间或者尾部时,需要使用lengthFieldOffset 字段进行标识,下面的参数组合给出了如何解决消息长度字段不在首位的问题:
其中lengthFieldOffset 表示长度字段在消息头中偏移的字节数,lengthFieldLength 表示长度字段自身的长度,解码效果如下:解码之前:
解码之后:
由于消息头1 的长度为2,所以长度字段的偏移量为2;消息长度字段Length 为3,所以lengthFieldLength 值为3。由于长度字段仅仅标识消息体的长度,所以lengthAdjustment 和initialBytesToStrip 都为0。
最后一种场景是长度字段夹在两个消息头之间或者长度字段位于消息头的中间,前后都有其它消息头字段,在这种场景下如果想忽略长度字段以及其前面的其它消息头字段,则可以通过initialBytesToStrip 参数来跳过要忽略的字节长度,它的组合配置示意如下:
解码之前的码流(16 字节):
解码之后的码流(13 字节):
由于HDR1 的长度为1,所以长度字段的偏移量lengthFieldOffset 为1;长度字段为2 个字节,所以lengthFieldLength为2。由于长度字段是消息体的长度,解码后如果携带消息头中的字段,则需要使用lengthAdjustment 进行调整,此处它的值为1,代表的是HDR2 的长度,最后由于解码后的缓冲区要忽略长度字段和HDR1 部分,所以lengthAdjustment为3。解码后的结果为13 个字节,HDR1 和Length 字段被忽略。事实上,通过4 个参数的不同组合,可以达到不同的解码效果,用户在使用过程中可以根据业务的实际情况进行灵活调整。
由于TCP 存在粘包和组包问题,所以通常情况下用户需要自己处理半包消息。利用LengthFieldBasedFrameDecoder解码器可以自动解决半包问题,它的习惯用法如下:
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65536,0,2));
在pipeline 中增加LengthFieldBasedFrameDecoder 解码器,指定正确的参数组合,它可以将Netty 的ByteBuf 解码成整包消息,后面的用户解码器拿到的就是个完整的数据报,按照逻辑正常进行解码即可,不再需要额外考虑“读半包”问题,降低了用户的开发难度。
标签:ide 局部变量 限制 粘包 expand param getc 非阻塞 抽象方法
原文地址:https://www.cnblogs.com/wuzhenzhao/p/11231420.html