EventLoopGroup group = new NioEventLoopGroup();这里我们就以NioEventLoopGroup来说明。先看一下它的继承关系
NioEventLoopGroup extends MultithreadEventLoopGroup extends MultithreadEventExecutorGroup看看NioEventLoopGroup的构造函数
public NioEventLoopGroup() { this(0); } //他会连续调用内部的构造函数,直到用下面的构造去执行父类的构造 //nThreads此时为0,马上就会提到这个参数的用处 public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { super(nThreads, executor, selectorProvider); }基类MultithreadEventLoopGroup的构造
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); } //nThreads:内部线程数,如果为0,就取默认值,通常我们会设置为处理器个数*2 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));继续调用再上一级的MultithreadEventExecutorGroup的构造
//这里会根据nThreads创建执行者数组 private final EventExecutor[] children; protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //这里创建EventExecutor数组对象 children = new EventExecutor[nThreads]; if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } //此处循环children数组,来创建内部的NioEventLoop对象 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //newChild是abstract方法,运行期会执行具体的实例对象的重载 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { //如果没有成功,做关闭处理 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } ...... }因为我们最初创建的是NioEventLoopGroup对象,所以newChild会执行NioEventLoopGroup的newChild方法,创建NioEventLoop对象。
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0]); }看看NioEventLoop的继承关系
NioEventLoop extends SingleThreadEventLoop extends SingleThreadEventExecutorNioEventLoop通过自己的构造行数,一直调用到SingleThreadEventExecutor的构造
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { super(parent); if (executor == null) { throw new NullPointerException("executor"); } this.addTaskWakesUp = addTaskWakesUp; this.executor = executor; taskQueue = newTaskQueue(); }至此,Group和内部的Loop对象以及Executor就创建完毕,那么他们是什么时候被调用的呢?就是在服务端bind和客户端connect时。
@SuppressWarnings("unchecked") public B validate() { if (group == null) { throw new IllegalStateException("group not set"); } if (channelFactory == null) { throw new IllegalStateException("channel or channelFactory not set"); } return (B) this; }所以,假如初始时,没有设置Bootstrap的group的话,就会报错。
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }我们这里,channel.eventLoop()得到的是NioEventLoop对象,所以执行NioEventLoop的run方法,开始线程运行。