标签:也有 receive ada 计数 cto div end mini 最小
Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)
到目前为止,我们已经启动服务,并接收了客户端连接,双方已经可以正式通信。下面就要处理请求:接收数据、业务处理、发送数据。
接收数据我们会碰到以下问题:
我们再看一下,Netty 是如何解决这两个问题的。这部分才是本小节内容的核心。当然发送数据也有同样的问题,写的数据太多怎么办,可以将接收数据和发送数据两小节的内部对比起来学习。
自适应数据大小的分配器(AdaptiveRecvByteBufAllocator):
根据最近几次请求的数据包大小,猜测下一次数据包大小。AdaptiveRecvByteBufAllocator 对 ByteBuf 的猜测:放大果断,缩小谨慎(需要连续2 次判断)
连续读(defaultMaxMessagesPerRead):
默认每个连接最多连接读取 16 次数据,即使还有数据也暂时不处理了,先处理下一个连接。
NioEventLoop 不断的轮询,接收 OP_READ 事件;然后将读取到的数据通过 pipeline.fireChannelRead(byteBuf) 传播出去。
NioEventLoop#run
-> processSelectedKeys
-> AbstractNioMessageChannel.NioMessageUnsafe#read
-> NioServerSocketChannel#doReadMessages
-> pipeline#fireChannelRead
(1)读取数据本质
(2)fireChannelReadComplete 和 fireChannelRead 关系
一次数据取的数据可能有多条记录,每条记录都会触发一次 fireChannelRead 事件,但一次读只会触发一次 fireChannelReadComplete 事件。
(3)缓冲区大小自适应
AdaptiveRecvByteBufAllocator 对 byteBuf 的猜测:放大果断,缩小谨慎(需要连续2 次判断)
(4)高并发处理
默认最多只能读取16 次。“雨露均沾”
在上一小节中,我们知道 Netty 对 OP_READ 和 OP_ACCEPT 事件是统一处理的。不同的是接收客户端连接使用 NioMessageUnsafe#read,而读取数据使用 NioByteUnsafe#read。
我们就重点分析 NioByteUnsafe#read 这个方法。Netty 每次读取数据都要分以下几步:
@Override
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
// 每次读取数据时,都重新开始计数
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 1. 分配缓冲区,大小自适应
byteBuf = allocHandle.allocate(allocator);
// 2. 从 socket revbuf 中接收数据
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 3. 触发事件处理
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
// 4. 判断是否继续读
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
说明: 可以看到,接收数据时缓冲区自适应大小分配和是否继续读这两个重要的功能都委托给了 allocHandle。Netty 中默认的 allocHandle 是 AdaptiveRecvByteBufAllocator。
doReadBytes 方法从 socket revbuf 读取数据,但每次读取前都需要缓冲区中可写区域的大小,用于判断缓冲区是否读满,继而决定是否继续读取数据。
// NioSocketChannel
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 每次读取数据前,记录缓冲区中可写区域大小,判断是否将缓冲区读满
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
在分析代码前,我们先比较一下 ByteBufAllocator 和 RecvByteBufAllocator 的区别:
AdaptiveRecvByteBufAllocator 只是负责创建 Handle,真正的功能都委托给了 Handle 处理。相关默认配置见 DefaultChannelConfig。
(1)缓冲区分配
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
说明: 可以看到,缓冲区的分配直接委托给了 ByteBufAllocator。AdaptiveRecvByteBufAllocator 只是通过 guess() 方法决定分配缓冲区的大小。
(2)更新自适应缓冲区大小
guess() 方法直接返回 nextReceiveBufferSize 变量大小,默认为 1024 byte。每次最小读 64 byte,最大 64 KB。
static final int DEFAULT_MINIMUM = 64;
static final int DEFAULT_INITIAL = 1024;
static final int DEFAULT_MAXIMUM = 65536;
每次调用 allocHandle.lastBytesRead(doReadBytes(byteBuf)) 读取数据后,都会根据上一次的读取的数据包大小决定扩缩容缓冲区大小。
@Override
public void lastBytesRead(int bytes) {
// attemptedBytesRead为读取前可写缓冲区大小,bytes表示当前读取的数据包大小。
// 如果二者相等,说明 socket revbuf 中还有数据可读,判断是否扩缩容
if (bytes == attemptedBytesRead()) {
// 核心方法:判断是否扩容或缩容
record(bytes);
}
super.lastBytesRead(bytes);
}
(3)自适应缓冲区策略
record 是最核心的方法,计算 AdaptiveRecvByteBufAllocator 缓冲区扩容或缩容的策略。
在分析 record 方法前,我们先看一下缓冲区大小是怎么分配的。AdaptiveRecvByteBufAllocator 将缓冲区按 512 byte 分隔,小于 512 byte 时按 16 byte 扩容或缩容,大于 512 byte 时按两倍大小进行扩容或缩容。也就是 [16, 32, 48, ..., 512, 1024, 2048, .., Integer.MAX_VALUE],这就是 SIZE_TABLE,每次分配的缓冲区大小必定是上述数组中的一个值。
private void record(int actualReadBytes) {
// 缩容
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
if (decreaseNow) {
index = max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
// 扩容
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
说明: record 的扩容或缩容时,都会重新调整 nextReceiveBufferSize 值。
自适应的整体策略是:放大果断,缩小谨慎。即缩容的条件需要连续 2 次,而扩容只需要读取 1 次即可。但要注意的是,INDEX_INCREMENT = 4,而 INDEX_DECREMENT = 1,如 512 KB,如果 512 KB * 24 则满足扩容条件,而 512 / 21 则满足缩容条件。
(4)继续读
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = ()->
attemptedBytesRead == lastBytesRead;
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}
说明: continueReading 的参数默认是 defaultMaybeMoreSupplier。如果继续读需要满足以下条件:
每天用心记录一点点。内容也许不重要,但习惯很重要!
标签:也有 receive ada 计数 cto div end mini 最小
原文地址:https://www.cnblogs.com/binarylei/p/12640521.html