标签:这一 下标 amp out soc lis ORC 内容 二次
Netty 的解码器有很多种,比如基于长度的,基于分割符的,私有协议的。但是,总体的思路都是一致的。
拆包思路:当数据满足了 解码条件时,将其拆开。放到数组。然后发送到业务 handler 处理。
半包思路: 当读取的数据不够时,先存起来,直到满足解码条件后,放进数组。送到业务 handler 处理。
而实现这个逻辑的就是我们今天的主角:ByteToMessageDecoder。
看名字的意思是:将字节转换成消息的解码器。人如其名。而他本身也是一个入站 handler,所以,我们还是从他的 channelRead 方法入手。
精简过的代码如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 从对象池中取出一个List
CodecOutputList out = CodecOutputList.newInstance();
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
// 第一次解码
cumulation = data;// 累计
} else {
// 第二次解码,就将 data 向 cumulation 追加,并释放 data
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 得到追加后的 cumulation 后,调用 decode 方法进行解码
// 解码过程中,调用 fireChannelRead 方法,主要目的是将累积区的内容 decode 到 数组中
callDecode(ctx, cumulation, out);
// 如果累计区没有可读字节了
if (cumulation != null && !cumulation.isReadable()) {
// 将次数归零
numReads = 0;
// 释放累计区
cumulation.release();
// 等待 gc
cumulation = null;
} // 如果超过了 16 次,就压缩累计区,主要是将已经读过的数据丢弃,将 readIndex 归零。
else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
// 如果没有向数组插入过任何数据
decodeWasNull = !out.insertSinceRecycled();
// 循环数组,向后面的 handler 发送数据,如果数组是空,那不会调用
fireChannelRead(ctx, out, size);
// 将数组中的内容清空,将数组的数组的下标恢复至原来
out.recycle();
}
楼主已经在方法中写了注释,但还是说说主要的步骤:
下面来说说详细的步骤。
代码:
@1
CodecOutputList out = CodecOutputList.newInstance();
@2
static CodecOutputList newInstance() {
return CODEC_OUTPUT_LISTS_POOL.get().getOrCreate();
}
@3
private static final FastThreadLocal<CodecOutputLists> CODEC_OUTPUT_LISTS_POOL =
new FastThreadLocal<CodecOutputLists>() {
@Override
protected CodecOutputLists initialValue() throws Exception {
// 16 CodecOutputList per Thread are cached.
return new CodecOutputLists(16);
}
};
@4
CodecOutputLists(int numElements) {
elements = new CodecOutputList[MathUtil.safeFindNextPositivePowerOfTwo(numElements)];
for (int i = 0; i < elements.length; ++i) {
// Size of 16 should be good enough for the majority of all users as an initial capacity.
elements[i] = new CodecOutputList(this, 16);
}
count = elements.length;
currentIdx = elements.length;
mask = elements.length - 1;
}
@5
private CodecOutputList(CodecOutputListRecycler recycler, int size) {
this.recycler = recycler;
array = new Object[size];
}
@6
public CodecOutputList getOrCreate() {
if (count == 0) {
// Return a new CodecOutputList which will not be cached. We use a size of 4 to keep the overhead
// low.
return new CodecOutputList(NOOP_RECYCLER, 4);
}
--count;
int idx = (currentIdx - 1) & mask;
CodecOutputList list = elements[idx];
currentIdx = idx;
return list;
}
代码分为 1,2,3,4,5, 6 步骤。
由于这个 getOrCreate 方法会被一个线程的多个地方使用,因此 16 是个统计值。当 16 不够的时候,就会创建一个新的 List。也就是 count == 0 的逻辑。而 & mask 的操作就是一个取模的操作。
代码如下:
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
这个 cumulator 默认是个 Cumulator 类型的 MERGE_CUMULATOR,该实例最主要的是从重写了 cumulate 方法:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
可以看到该方法,主要是将 unsafe.read 传递过来的 ByteBuf 的内容写入到 cumulation 累积区中,然后释放掉旧的内容,由于这个变量是成员变量,因此可以多次调用 channelRead 方法写入。
同时这个方法也考虑到了扩容的问题,总的来说就是 copy。
当然,ByteToMessageDecoder 中还有一个 Cumulator 实例,称之为 COMPOSITE_CUMULATOR,混合累积。由于上个实例的 cumulate 方法是使用内存拷贝的,因此,这里提供了使用混合内存。相较于拷贝,性能会更好点,但同时也会更复杂。
当数据追击到累积区之后,需要调用 decode 方法进行解码,代码如下:
@ 1
callDecode(ctx, cumulation, out);
@2
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 如果累计区还有可读字节
while (in.isReadable()) {
int outSize = out.size();
// 上次循环成功解码
if (outSize > 0) {
// 调用后面的业务 handler 的 ChannelRead 方法
fireChannelRead(ctx, out, outSize);
// 将 size 置为0
out.clear();//
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
// 得到可读字节数
int oldInputLength = in.readableBytes();
// 调用 decode 方法,将成功解码后的数据放入道 out 数组中,可能会删除当前节点,删除之前会将数据发送到最后的 handler
decodeRemovalReentryProtection(ctx, in, out);// decode()
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (isSingleDecode()) {
break;
}
}
}
该方法主要逻辑:只要累积区还有未读数据,就循环进行读取。
调用 decodeRemovalReentryProtection 方法,内部调用了子类重写的 decode 方法,很明显,这里是个模板模式。decode 方法的逻辑就是将累积区的内容按照约定进行解码,如果成功解码,就添加到数组中。同时该方法也会检查该 handler 的状态,如果被移除出 pipeline 了,就将累积区的内容直接刷新到后面的 handler 中。
如果 Context 节点被移除了,直接结束循环。如果解码前的数组大小和解码后的数组大小相等,且累积区的可读字节数没有变化,说明此次读取什么都没做,就直接结束。如果字节数变化了,说明虽然数组没有增加,但确实在读取字节,就再继续读取。
如果上面的判断过了,说明数组读到数据了,但如果累积区的 readIndex 没有变化,则抛出异常,说明没有读取数据,但数组却增加了,子类的操作是不对的。
如果是个单次解码器,解码一次就直接结束了。
所以,这段代码的关键就是子类需要重写 decode 方法,将累积区的数据正确的解码并添加到数组中。每添加一次成功,就会调用 fireChannelRead 方法,将数组中的数据传递给后面的 handler。完成之后将数组的 size 设置为 0.
所以,如果你的业务 handler 在这个地方可能会被多次调用。也可能一次也不调用。取决于数组中的值。当然,如果解码 handler 被移除了,就会将累积区的所有数据刷到后面的 handler。
上面的逻辑就是解码器最主要的逻辑:
将 read 方法的数据读取到累积区,使用解码器解码累积区的数据,解码成功一个就放入到一个数组中,并将数组中的数据一次次的传递到后面的handler。
从上面的逻辑看,除非 handler 被移除,否则不会调用后面的 handler 方法,也就是说,只要不满足解码器的解码规则,就不会传递给后面的 handler。
再看看后面的逻辑,主要在 finally 块中:
这样就能节省一些内存了,但这会引起一些内存复制的过程,以性能损耗为前提的。
如果是 true,则会判断 autoRead 属性,如果是 false 的话,那么 Netty 认为还有数据没有读到,不然数组为什么一直是空的?就主动调用 read 方法从 Socket 读取。
答:如果是单次解码器,就需要发送了,因此单词解码器是不会再 callDecode 方法中发送的。
最后一行的 recycler.recycle(this),有两种结果,如果是 CodecOutputLists 的 recycle 方法,内容如下:
恢复数组下标,对 count ++,表示有对象可用了。
还有第二种,当 16 个数组不够用了,就需要创建一个新的,在 getOrCreate 方法体现。而构造函数中的 recycler 是一个空对象。我们看看这个对象:
当调用 recycle 方法的时候,什么都不做。等待 GC 回收。因为这不是个对象池的引用。
好,到这里,关于 ByteToMessageDecoder 解码器的主要功能就解读完了。
可以说,ByteToMessageDecoder 是解码器的核心所做,Netty 在这里使用了模板模式,留给子类扩展的方法就是 decode 方法。
主要逻辑就是将所有的数据全部放入累积区,子类从累积区取出数据进行解码后放入到一个 数组中,ByteToMessageDecoder 会循环数组调用后面的 handler 方法,将数据一帧帧的发送到业务 handler 。完成这个的解码逻辑。
使用这种方式,无论是粘包还是拆包,都可以完美的实现。
还有一些小细节:
Netty 所有的解码器,都可以在此类上扩展,一切取决于 decode 的实现。只要遵守 ByteToMessageDecoder 的约定即可。
good luck!!!!
Netty 解码器抽象父类 ByteToMessageDecoder 源码解析
标签:这一 下标 amp out soc lis ORC 内容 二次
原文地址:https://www.cnblogs.com/stateis0/p/9062165.html