通过之前《Netty 4源码解析:服务端启动》的分析,我们知道在最前端“扛压力”的是NioEventLoop.run()方法。我们指定创建出的NioServerSocketChannel就是注册到了NioEventLoop中的Selector上。所以我们继续顺藤摸瓜,看看服务端启动完成后,Netty是如何处理每个请求的。
之前我们曾分析过到NioEventLoop.run()方法,但因为之前只关注启动流程所以“浅尝辄止”了,这次我们就以它为起点开始深入分析。NioEventLoop于Selector绑定,它是真正轮询Selector的地方。至于有哪一个或哪些Channel的事件绑定到Selector了,则是注册阶段决定的。对于MainReactor来说,只有一个NioEventLoop负责处理一个ServerSocketChannel的事件。
// NioEventLoop
@Override
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
}
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
}
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}
if (!i.hasNext()) {
break;
}
}
}
不知道大家是否还记得,NioServerSocketChannel注册时有一个小细节,就是它将自己作为了attachment。所以上面处理SelectionKey时就能通过attachement取到注册时的Channel。为什么一定要拿到当时的Channel呢?继续往下看。
@Override
protected void doRegister() throws Exception {
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
// ...
}
}
}
原来跟之前介绍过的注册和绑定一样,最终都是通过Channel的unsafe工具类来完成的。
// NioEventLoop
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
还有一个小细节,就是doRegister()注册时实际传给register()的事件是0,也就是对任何事件都不感兴趣,这又是怎么回事呢?原来具体对什么事件感兴趣是在子类的构造方法里传入的。如果是isAutoRead(),那么一旦Channel连接成功就会自动触发一次读操作。真正注册感兴趣事件的地方就是第一次读操作的时候。
// NioServerSocketChannel
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
}
// AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
//...
}
}
// DefaultChannelPipeline
@Override
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
// AbstractNioChannel
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
紧接着刚才的processSelectedKey()说,既然OP_ACCEPT都已经注册上了,当接收到新用户连接时就会触发unsafe.read()方法。read()会不断调用doReadMessages(),将产生的readBuf逐一发送给Pipeline.fireChannelRead()去处理。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
final ChannelConfig config = config();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
if (readBuf.size() >= maxMessagesPerRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
setReadPending(false);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
}
}
}
}
来看看NioServerSocketChannel对doReadMessages()的覆写吧,原来接收并创建Channel的工作就是在这完成的。JDK Channel被保存到Netty包装后的NioSocketChannel中,然后传给Pipeline处理。
// NioServerSocketChannel
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
又到了“温故时间”,还记得初始化Channel时Netty是怎么做的吗?我们创建了一个叫做ServerBootstrapAcceptor的Handler,它持有的childGroup和childHandler就是SubReactor的NioEventLoopGroup和Handler。
// ServerBootstrap
@Override
void init(Channel channel) throws Exception {
// ...
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
// ...
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
紧接着前面unsafe.read()方法中的fireChannelRead(),会触发ServerBootstrapAcceptor的channelRead()。在这里,msg就是新创建的SocketChannel,将我们的定义的Handler都加入到子Pipeline中。所以说,ServerBootstrapAcceptor就是主从Reactor间的桥梁,它不断将从主Reactor接收到的Channel绑定到从Reactor的一个EventLoop上。
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
try {
childGroup.register(child);
} catch (Throwable t) {
forceClose(child, t);
}
}
}
因为Netty 4中已经完全统一了EventLoopGroup的代码,已经不区分主从Reactor的逻辑了。所以实际上,这里的注册过程我们已经分析过了。子EventLoopGroup会选择出一个EventLoop负责轮询绑定上的Channel的事件,而Channel感兴趣的事件前面也提到了,就是Channel构造方法中传入的。
// NioSocketChannel
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
// AbstractNioByteChannel
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
使用Netty时我们通常会在ChannelInitializer中初始化Handler,但Netty是什么时候调用它的呢?答案就在Channel注册到子EventLoop之后。之前看到的fireChannelRegistered()会触发ChannelInitializer。所以说:每个客户端Channel建立成功后会创建Handler,并且后续请求处理都由这一组Handler完成。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
/**
* This method will be called once the {@link Channel} was registered.
*/
protected abstract void initChannel(C ch) throws Exception;
@Override
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
try {
initChannel((C) ctx.channel());
pipeline.remove(this);
ctx.fireChannelRegistered();
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
}
}
}
通过前面的分析能够看到,EventLoop轮询到的事件最终会交给unsafe.read()去处理。NioSocketChannel与NioServerSocketChannel的一个重要区别就是:NioSocketChannel继承AbstractNioByteChannel,而后者继承AbstractNioMessageChannel,两者的unsafe工具类实现是不同的。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
public void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int totalReadAmount = 0;
boolean readPendingReset = false;
do {
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
}
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
}
}
}
}
在AbstractNioMessageChannel中接收到的是SocketChannel,所以并没有发生真正的读操作。而AbstractNioByteChannel是真正地从SocketChannel中读,所以这也是申请缓冲区的地方。每次发生读事件时,都会分配一块ByteBuf,然后尝试从Channel中读出数据写到ByteBuf中。之后触发fireChannelRead(),由Pipeline中的Handler继续处理,最终Tail处理器负责释放掉ByteBuf。
static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
这次代码分析一个很直接的目的就是想了解Netty的线程和对象模型,至此已经差不多摸清了。首先,ServerSocketChannel会由一个EventLoop负责轮询接收事件,得到的SocketChannel是交给子Reactor中的一个EventLoop负责轮询读事件。也就是说多个客户端可能会对应一个EventLoop线程。每个SocketChannel注册完毕就会创建Handler,所以说每个客户端都对应自己的Handler实例,并且一直使用到连接断开。
版权声明:本文为博主原创文章,未经博主允许不得转载。
原文地址:http://blog.csdn.net/dc_726/article/details/48084367