标签:ESS try 服务端 news byte 文章 invoke opera which
前面的第一篇文章中,我以spark中的netty客户端的创建为切入点,分析了netty的客户端引导类Bootstrap的参数设置以及启动过程。显然,我们还有另一个重要的部分--服务端的初始化和启动过程没有探究,所以这一节,我们就来从源码层面详细分析一下netty的服务端引导类ServerBootstrap的启动过程。
我们仍然以spark中对netty的使用为例,以此为源码分析的切入点,首先我们看一下spark的NettyRpc模块中创建netty服务端引导类的代码:
TransportServer的构造方法中会调用init方法,ServerBootstrap类就是在init方法中被创建并初始化以及启动的。
这个方法主要分为三块:
很显然,ServerBootstrap的启动入口就是bind方法。
// 初始化netty服务端
private void init(String hostToBind, int portToBind) {
// io模式,有两种选项NIO, EPOLL
IOMode ioMode = IOMode.valueOf(conf.ioMode());
// 创建bossGroup和workerGroup,即主线程组合子线程组
EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;
// 缓冲分配器,分为堆内存和直接内存
PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
// 创建一个netty服务端引导对象,并设置相关参数
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, allocator)
.childOption(ChannelOption.ALLOCATOR, allocator);
// 内存使用的度量对象
this.metrics = new NettyMemoryMetrics(
allocator, conf.getModuleName() + "-server", conf);
// 排队的连接数
if (conf.backLog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
}
// socket接收缓冲区大小
if (conf.receiveBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
}
// socket发送缓冲区大小
if (conf.sendBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
}
// 子channel处理器
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
context.initializePipeline(ch, rpcHandler);
}
});
InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
// 绑定到ip地址和端口
channelFuture = bootstrap.bind(address);
// 同步等待绑定成功
channelFuture.syncUninterruptibly();
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port: {}", port);
}
这里的校验主要是对group和channelFactory的非空校验
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
这个方法,我们之前在分析Bootstrap的启动过程时提到过,它的主要作用如下:
之前,我们分析了NioSocketChannel的构造过程,以及Bootstarp中对channel的初始化过程,
本节我们要分析NioServerSocketChannel的构造过程,以及ServerBootstrap的init方法的实现。
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建一个channel,并对这个channel做一些初始化工作
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 将这个channel绑定到指定的地址
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {// 对于尚未注册成功的情况,采用异步的方式,即添加一个回调
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
这里通过调用jdk的api创建了一个ServerSocketChannel。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
与NioSocketChannelConfig类似,NioServerSocketChannelConfig也是一种门面模式,是对NioServerSocketChannel中的参数接口的封装。
此外,我们注意到,这里规定了NioServerSocketChannel的初始的感兴趣的事件是ACCEPT事件,即默认会监听请求建立连接的事件。
而在NioSocketChannel中的初始感兴趣的事件是read事件。
所以,这里与NioSocketChannel构造过程最主要的不同就是初始的感兴趣事件不同。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
这里首先调用了父类的构造方法,最终调用了AbstractNioChannel类的构造方法,这个过程我们在之前分析NioSocketChannel初始化的时候已经详细说过,主要就是创建了内部的Unsafe对象和ChannelPipeline对象。
分析完了channel的构造过程,我们再来看一下ServerBootstrap是怎么对channel对象进行初始化的。
所以,很显然,我们接下来就要看一下这个特殊的handler,ServerBootstrapAcceptor的read方法。
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
// 设置参数
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 设置属性
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
// 子channel的group和handler参数
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
// 添加处理器
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 一般情况下,对于ServerBootstrap用户无需设置handler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 这里添加了一个关键的handler,并且顺手启动了对应的EventLoop的线程
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
在分析ServerBootstrapAcceptor之前,我们首先来回顾一下NioEventLoop的循环中,对于accept事件的处理逻辑,这里截取其中的一小段代码:
// 处理read和accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
可见,对于accept事件和read事件一样,调用NioUnsafe的read方法
因为NioServerSocketChannel继承了AbstractNioMessageChannel,并且read方法的实现也是在AbstractNioMessageChannel中,
根据前面对channelPipeline的分析,我们知道,读事件对从头结点开始,向尾节点传播。上面我们也提到了,对于初始的那个NioServerSocketChannel,会在ServerBootstarp的init方法中向这个channel的处理链中加入一个ServerBootstrapAcceptor处理器,所以,很显然,接下来我们应该分析ServerBootstrapAcceptor中对读事件的处理。
public void read() {
// 确认当前代码的执行是在EventLoop的线程中
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 这里读取到的是建立的连接对应的channel,
// jdk的socketChannel被包装成了netty的NioSocketChannel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 把接收到的每一个channel作为消息,在channelPipeline中触发一个读事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
// 最后触发一个读完成的事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
代码逻辑还是比较简单的,因为有了前面的铺垫,即在ServerBootstrap的init方法对创始的那个serverChannel进行初始化时,将用户设置的子channel的参数,属性,子channel的handler和子group等参数作为构造参数全部传给了ServerBootstrapAcceptor,所以在这里直接用就行了。
其实这里的子channel的初始化和注册过程和Bootstrap中对一个新创建的channel的初始化过程基本一样,区别在于Bootstrap中channel是用户代码通过调用connect方法最终在initAndregistry中通过反射构造的一个对象;而在服务端,通过监听ServerSocketChannel的accept事件,当有新的连接建立请求时,会自动创建一个SocketChannel(jdk的代码实现),然后NioServerSocketChannel将其包装成一个NioSocketChannel,并作为消息在传递给处理器,所以在ServerSocketChannel中的子channel的创建是由底层的jdk的库实现的。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 类型转换,这里的强制转换是安全的的,
// 是由各种具体的AbstractNioMessageChannel子类型的实现保证的
// 各种具体的AbstractNioMessageChannel子类型的读方法确保它们读取并最终返回的是一个Channel类型
final Channel child = (Channel) msg;
// 给子channel添加handler
child.pipeline().addLast(childHandler);
// 给子channel设置参数
setChannelOptions(child, childOptions, logger);
// 给子channel设置属性
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 将子channel注册到子group中
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
回到doBind方法中,在完成了channel的构造,初始化和注册逻辑后,接下来就要把这个server类型的channel绑定到一个地址上,这样才能接受客户端建立连接的请求。
从代码中可以看出,调用了channel的bind方法实现绑定的逻辑。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 调用了channel.bind方法完成绑定的逻辑
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
bind操作的传递是从尾节点开始向前传递,所以我们直接看Headcontext对于bind方法的实现
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
调用了unsafe的bind方法。
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
因为后面右有几个事件的触发,每个触发事件都是通过channel的相关方法来触发,然后又是通过channelpipeline的传递事件,这些事件最后基本都是由HeadContext处理了,所以这里我只简单地叙述一下后面的 大概逻辑,代码比较繁琐,而且很多都是相同的调用过程,所以就不贴代码了。
从代码中可以看出来,最终调用了jdk的api,将感兴趣的事件添加到selectionKey中。通过前面的 分析,我们知道对于NioSocketChannel,它的感兴趣的读事件类型是SelectionKey.OP_READ,也就是读事件;
而对于NioServerSocketChannel,根据前面对其构造方法的分析,它的感兴趣的事件是SelectionKey.OP_ACCEPT,也就是建立连接的事件。
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// 将读事件类型加入到selectionKey的感兴趣的事件中
// 这样jdk底层的selector就会监听相应类型的事件
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
到这里,我们就把ServerBootstrap的主要功能代码分析完了,这里面主要包括三个方面:
netty服务端启动--ServerBootstrap源码解析
标签:ESS try 服务端 news byte 文章 invoke opera which
原文地址:https://www.cnblogs.com/zhuge134/p/11108493.html