标签:处理器 tee debug instance 作用 specified RoCE readonly expected
在此声明,Netty 是基于java NIO 的,建议知道java NIO 运行机制(Selector,Channel ,ByteBuffer ,zeroCopy) ,再阅读此篇文章,不然会一头雾水
EventLoopGroup :事件循环组:
EventLoopGroup bossGroup =new NioEventLoopGroup(); EventLoopGroup workGroup =new NioEventLoopGroup();
主要是完成一些变量的赋值
主要发生了什么:Look,源码就是一层一层的调用构造函数,往里面赋值;
1.extends 多线程事件循环组, 被用于基于channel 的NIO selector 实现
/** * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s. */ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
2.我们一步一步点击下来,发现就是在它的构造函数里面调来调去,我们发现他创建了一个空的 Executor 对象 (java 1. 5 并发库重要内容,执行器,进行线程执行),以及 nThreads 线程数量为 0(这里的0并不是说我们给我们创建 0个线程,后面会有判断);
*/ public NioEventLoopGroup() { this(0); } /** * Create a new instance using the specified number of threads, {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); }
3.我们可以看到在这个构造方法里调用了 SelectorProvider.provider(),这个不陌生吧,在java nio 中创建selector 的Selector.open() 方法中其实调用的就是这个
/** * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); }
4.继续点,可以看到它又添加了一些新的内容 DefaultSelectStrategyFactory 工厂 Factory which uses the default select strategy. 默认的选择策略
public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } //在这个构造方法里用添加了一个参数 DefaultSelectStrategyFactory.INSTANCE ,提供一个默认选择策略,工厂模式 /** * Factory which uses the default select strategy. */ public final class DefaultSelectStrategyFactory implements SelectStrategyFactory { public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory(); private DefaultSelectStrategyFactory() { } @Override public SelectStrategy newSelectStrategy() { return DefaultSelectStrategy.INSTANCE; } }
5. 继续走,这里就开始调用父类super(MultithreadEventLoopGroup)方法了,在这里我们就可以知道默认给我们创建多少线程了;
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
//分析线程数量 //线程数量判断,如果是0的话,就是 DEFAULT_EVENT_LOOP_THREADS ,是多少呢?我们点进去看一看,我们会看到一个静态代码块 static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } //调用Runtime.availableProcessors将会获取 可用的处理器 @SuppressForbidden(reason = "to obtain default number of available processors") synchronized int availableProcessors() { if (this.availableProcessors == 0) { final int availableProcessors = SystemPropertyUtil.getInt( "io.netty.availableProcessors", Runtime.getRuntime().availableProcessors()); setAvailableProcessors(availableProcessors); } return this.availableProcessors; } 由此可以看到 默认创建的线程数不是0 而是根据不同电脑的处理器个数*2
6,接下来就是Excutor 的赋值了,因为从第二部可以看到,初始的Excutor 的null;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); }
//**********源码解释***********//
//我们看红色部分,就是对 executor 进行初始化操作,这里我们需要了解的是Excutor 接口 以及ThreadFactory 接口的作用
//在netty 里实现了ThreadFactory关于自己的DefaultThreadFactory
if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
Bootstrap: 服务辅助类
标签:处理器 tee debug instance 作用 specified RoCE readonly expected
原文地址:https://www.cnblogs.com/iscys/p/9693796.html