标签:on() this lock rri called execution outbound exec opera
就以netty-example中的EchoServer这个经典例子作为楔子吧
// 创建bossGroup和workerGroup,reactor模式的实现
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
// 在启动引导类ServerBootstrap中配置各种serverSocketChannel和socketChannel属性
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.attr(AttributeKey.newInstance("attr"), "attr")
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new channelInitializer{
@Override
public void initChannel(SocketChannel ch)throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
})
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttr");// bind方法开始启动服务端
ChannelFuture f = b.bind(PORT).sync();// 在sync方法中main线程阻塞直到服务器关闭
f.channel().closeFuture().sync();
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}public NioEventLoopGroup(int nThreads, Executor executor){ this(nThreads, executor, SelectorProvider.provider()); } public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
在越过一层层的重载方法时,我们注意到bossGroup设置了线程数、生产线程的工厂类型、selectorProvider和selector策略工厂,
线程池拒绝策略等。尤其要注意线程数,当调用默认构造函数时,会传入DEFAULT_EVENT_LOOP_THREADS,它的默认值是2倍CPU可运行核心数。
之后来到真正的NioeventLoopGroup的构造函数
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}// 1:创建一个线程池, 线程池工厂主要设置了线程池内线程的前缀名 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 2:将创建eventLoop,并和线程池内线程一一对应 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 3:设置NioEventLoop的轮训方式 chooser = chooserFactory.newChooser(children); // 4:为每个nviEventLoop添加一个操作完成的监听器 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
我们进入第二个标注即创建nioEventLoop的地方,并着重看下openSelector()方法
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 未包装的selector,即jdkNio的selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}// 不开启selectionKeySet优化,则直接返回包装过的selector if (DISABLE_KEY_SET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); } Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; // 优化过后的SelectionKeySet,底层是用array代替了set,并通过反射的方式对Selector来了一招偷梁换柱 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); ReflectionUtil.trySetAccessible(selectedKeysField, true); ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException | IllegalAccessException e) { return e; } } }); return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }
group至此就创建完毕, bossGroup和workGroup除了线程数不一样外,其他地方均一致。
之后进入ServerBootstrap的配置环节
// 在group方法中设置bossGroup为parentGroup,workGroup为childGroupo
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
ObjectUtil.checkNotNull(childGroup, "childGroup");
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
//channel方法中设置了一个工厂,该工厂生产的对象是设置进来的泛型,结合工厂的名字(反射)和属性(构造函数),
//可以猜测该工厂是利用反射+泛型+工厂模式来生产设置进来的serverSocketChannel
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}public ReflectiveChannelFactory(Class<? extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz"); try { this.constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", e); } }
当serverBootstrap配置完后,开始服务端真正的启动工作。进入b.bind(),一路跳转到AbstractBootstrap.doBind(SocketAddress localAddress)方法。
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化并注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();//若已经注册完成,则开始绑定,否则添加一个监听器,待注册完成后绑定 if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener((ChannelFutureListener) future -> doBind0(regFuture, channel, localAddress, promise)); return promise; } }
先看看initAndRegister()方法,首先是创建NioServerSocketChannel,通过工厂模式实例化一个channel。
final ChannelFuture initAndRegister() {
Channel channel = channelFactory.newChannel();
init(channel);ChannelFuture regFuture = config().group().register(channel); // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. // 上面的注释大意是,走到这里promise没有失败,要么就是由当前eventLoop注册完成,要么就是丢到任务队列里由其他eventLoop //注册完成了,所以现在执行绑定和链接是安全的 return regFuture; } // 与我们猜测一致,通过反射获得的构造函数实例化serverSocketChannel。 @Override public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } } public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } // 注意此处传入了OP_ACCEPT感兴趣事件 public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } // 调用父类构造函数,在构造函数里设置好channel,selectProvidor创建的serverSocketChannel及OP_ACCEPT事件 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { logger.warn( "Failed to close a partially initialized socket.", e2); } throw new ChannelException("Failed to enter non-blocking mode.", e); } } // 在顶级抽象父类里,创建好id、unsafe、pipeline。netty的unsafe是关联socket或其他可进行IO操作组件的一个类,与jdk的unsafe对象类似,一般不 // 需要用户关注。 protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
之后开始执行初始化,将bootstrapServer中的option和attr赋予serverSocketChannel,通过pipeline添加一个channelInitializer,
进而通过channelInitializer将childOption、childAttr、workerGroup、childHandler保存在一个叫ServerBootstrapAcceptor的handler中。
从名字可以猜测,此handler应该是在客户端连接时来处理相关事件的。而channelInitializer会在完成由子类实现的initChannel方法后将自己从pipeline中移除。
void init(Channel channel) {
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry, Object>[] currentChildOptions =
childOptions.entrySet().toArray(newOptionArray(0));
final Entry, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
p.addLast(new ChannelInitializer() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(() -> pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)));
}
});
}
初始化完毕后便是注册,稍显麻烦
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; // 如果当前线程是之前nioEventLoop绑定的线程则直接注册,否则添加到eventLoop的线程池中 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(() -> register0(promise)); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } // 通过nioEventLoop线程池的线程执行任务 @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } } // 判断当前线程是否是NioEventLoop线程 @Override public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); } @Override public boolean inEventLoop(Thread thread) { return thread == this.thread; } @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // 调用jdk的serverSocketChannel注册到selector上,注意这里注册的感兴趣事件为0,表示对所有事件都不感兴趣 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
首先判断是否注册以及传进来的eventLoop参数是否属于NioEventLoop类,然后判断当前线程是否是NioEventLoop线程,
若是,则直接注册,否则添加到eventLoop的线程池中。在eventLoop的execute方法中,将任务添加到任务队列,再次
判断当前线程是否是NioEventLoop线程,若不是,启动一个新线程来执行。显然新启动的线程属于nioEventLoop线程。
在register0——doRegister方法链中,我们看到netty最终调用了jdk底层的channel绑定了selector,由于此时还未
绑定端口,所以ops即感兴趣事件是0。同时,把this,即NioServerSocketChannel作为attachment添加到selectionKey上,
这是为了之后在select出事件时,可以获取channel进行操作。
注册完成,回到AbstractBootStrap, 开始执行doBind0方法。这也是需要由nioEventLoop执行的,所以也丢到了线程池里。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); } @Override public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); } @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } if (isNotValidPromise(promise, false)) { // cancelled return promise; } // 调用下一个OutBoundHandlerContext执行,默认传递到headContext final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; } // 传到headContext,调用unsafe来执行。 @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { unsafe.bind(localAddress, promise); } // AbstractUnsafe的bind public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } // unsafe最终调用jdk的channel完成绑定操作 @Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
nioEventLoop通过pipeline的方式绑定,pipeline调用tailContext的bind方法,tailContext又会不断寻找下一个OutBoundHandler来执行bind方法,
默认会传到headContext,headContext再调用底层的unsafe来执行bind。unsafe完成bind后,会通知pipeline调用fireChannelActive方法。
fireChannelActive首先会传递到headContext,
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();readIfIsAutoRead(); }
headContext先将active事件传播,然后调用readIfIsAutoRead方法。此方法不断传播,直到headContext,headContext又调用unsafe的beginRead
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
在beginRead方法中,将channel注册感兴趣事件为创建NioServerSocketChannel时传入的OP_ACCEPT事件。
至此,创建、初始化、注册、绑定都完成了,随后主线程调用sync——>await,阻塞在此处,任由netty自由发挥。
总结:
⑴服务端启动有4个流程:创建、初始化、注册、绑定
⑵netty通过反射,将selectKey由set转为了array
⑶默认情况下,bossGroup有一个线程,workerGroup有2倍CPU核心数线程
标签:on() this lock rri called execution outbound exec opera
原文地址:https://www.cnblogs.com/spiritsx/p/11839066.html