标签:empty cancel dup volatil cep 写入 ESS row tab
netty 中,每一个 channel 有一个写缓冲 ChannelOutboundBuffer
ChannelOutboundBuffer 类中维持一个 Entry 链表,Entry 是链表的节点,封装了待写入的 ByteBuf,而 netty 最终写入 socket 的是 ByteBuffer,所以最终会把 ByteBuf 转为 ByteBuffer
static final class Entry { // 毫无意外,使用对象池 private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() { @Override public Entry newObject(Handle<Entry> handle) { return new Entry(handle); } }); private final Handle<Entry> handle; // 下个节点 Entry next; // 消息内容,即 ByteBuf Object msg; // 一般情况,一个 ByteBuf 底层对应一个 ByteBuffer // 所以 bufs 多数时候为空,只有 buf 会被赋值 ByteBuffer[] bufs; // 真正写入 socket 的数据结构 ByteBuffer buf; // 对应写入成功的回调 ChannelPromise promise; // ByteBuf 中已写入 socket 的字节数 long progress; // ByteBuf 可读的字节数 long total; int pendingSize; int count = -1; boolean cancelled; }
// 暂时不需要写到 socket 的 Entry 的指针 private Entry unflushedEntry; // 要写到 socket 的 Entry 的指针 private Entry flushedEntry; // 尾部 private Entry tailEntry; // 要写入 socket 的 Entry 的数量 // 等于从 flushedEntry 到 unflushedEntry 之间的 Entry 数量,不包括 unflushedEntry private int flushed;
每调用一次 HeadContext.write 最终触发 addMessage,把数据加在 tailEntry 后面
添加 Entry
public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; } else { Entry tail = tailEntry; tail.next = entry; } tailEntry = entry; if (unflushedEntry == null) { unflushedEntry = entry; } incrementPendingOutboundBytes(entry.pendingSize, false); }
每调用一次 HeadContext.flush 最终触发 addFlush 和 flush
// io.netty.channel.AbstractChannel.AbstractUnsafe#flush public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } // 移动 flushedEntry 和 unflushedEntry 指针 outboundBuffer.addFlush(); // 真正写 socket flush0(); }
移动 flushedEntry 和 unflushedEntry 指针
public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // 如果 flushedEntry 指针为空,则直接指向 unflushedEntry,最后把 unflushedEntry 置空 flushedEntry = entry; } // 如果 flushedEntry 指针不为空,则直接把 unflushedEntry 置空 do { flushed ++; if (!entry.promise.setUncancellable()) { int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry unflushedEntry = null; } }
需要说明的是,缓冲中只有一条链表,需要写到 socket 的是从 flushedEntry 到 unflushedEntry 之间的 Entry,不包括 unflushedEntry
我们知道
flush 之后,如果数据充足,且每次都写成功,netty 默认会持续写 16 次
// io.netty.channel.socket.nio.NioSocketChannel#doWrite protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); // 默认 16 次 int writeSpinCount = config().getWriteSpinCount(); do { // 当 ChannelOutboundBuffer 无可写的数据,返回 if (in.isEmpty()) { // All written so clear OP_WRITE clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } // Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); // 把 ChannelOutboundBuffer 中的 msg,转换成 ByteBuffer ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); // ByteBuffer 的数量 int nioBufferCnt = in.nioBufferCount(); switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case 1: { // 最简单的情形 ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); // 把 ByteBuffer 写入 socket final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { // 如果 socket 不可写,则注册 OP_WRITE 事件 incompleteWrite(true); return; } // 根据写入的字节数调整下次写入的量 adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); // 删除 ChannelOutboundBuffer 中的 Entry in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); }
把所有 flushedEntry 中的 ByteBuf 转换成 ByteBuffer
// io.netty.channel.ChannelOutboundBuffer#nioBuffers(int, long) public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) { assert maxCount > 0; assert maxBytes > 0; long nioBufferSize = 0; int nioBufferCount = 0; final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); Entry entry = flushedEntry; // 遍历 flushedEntry while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { if (!entry.cancelled) { ByteBuf buf = (ByteBuf) entry.msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes > 0) { if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) { break; } nioBufferSize += readableBytes; int count = entry.count; if (count == -1) { entry.count = count = buf.nioBufferCount(); } int neededSpace = min(maxCount, nioBufferCount + count); if (neededSpace > nioBuffers.length) { nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); NIO_BUFFERS.set(threadLocalMap, nioBuffers); } if (count == 1) { ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } nioBuffers[nioBufferCount++] = nioBuf; } else { nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount); } if (nioBufferCount == maxCount) { break; } } } entry = entry.next; } this.nioBufferCount = nioBufferCount; this.nioBufferSize = nioBufferSize; return nioBuffers; }
删除 Entry
根据写入的字节数,删除 Entry
public void removeBytes(long writtenBytes) { for (;;) { // 当前 flushedEntry 节点 Object msg = current(); if (!(msg instanceof ByteBuf)) { assert writtenBytes == 0; break; } final ByteBuf buf = (ByteBuf) msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; // 写入的数据大于当前 flushedEntry 的数据,即该 flushedEntry 写完 if (readableBytes <= writtenBytes) { if (writtenBytes != 0) { // 更新进度 progress(readableBytes); writtenBytes -= readableBytes; } // 删除 flushedEntry 指向的节点,向后移动 flushedEntry remove(); } else { // readableBytes > writtenBytes // 该 flushedEntry 没有写完,则只更新进度 if (writtenBytes != 0) { buf.readerIndex(readerIndex + (int) writtenBytes); progress(writtenBytes); } break; } } clearNioBuffers(); }
高水位线和低水位线
netty 统计 pending 的数据,超过了高水位线则改标志,注意,改了标志,也可以写入,需要用户自己判断继续写还是不写。
通过 ctx.channel().isWritable() 获取是否可写状态
// 利用 cas 设置 unwritable 的值 private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable"); // 0 可写,1 不可写 private volatile int unwritable; private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); } } private void setUnwritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue | 1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0 && newValue != 0) { fireChannelWritabilityChanged(invokeLater); } break; } } }
一旦设置为不可写,只有当水位降到低水位线,标志才会重新变回可写
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); } }
标签:empty cancel dup volatil cep 写入 ESS row tab
原文地址:https://www.cnblogs.com/allenwas3/p/12173915.html