标签:let 异步 替代 ide long prot sse exp sop
NioEventLoop启动触发条件:
1.服务端绑定本地端口
2.新连接接入通过chooser绑定一个NioEventLoop
服务端绑定本地端口
绑定本地端口,使用下面方法;
ChannelFuture future = bootstrap.bind(host, port).sync();
最终会调用doBind0()方法:
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { public void run() { if(regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
这个时候就会调用channel对应NioEventLoop的execute方法,会判断是否在当前的eventloop对应的thread中,如果在,直接向任务队列中添加绑定端口的任务,如果不在,首先要start当前eventLoop对应的thread,再将任务放到任务队列中。这里的excute(task)方法,并不是让线程直接执行它,而是将它放到线程的任务队列中,等待线程去执行它。
public void execute(Runnable task) { if(task == null) { throw new NullPointerException("task"); } else { boolean inEventLoop = this.inEventLoop(); if(inEventLoop) { this.addTask(task); } else { this.startThread(); this.addTask(task); if(this.isShutdown() && this.removeTask(task)) { reject(); } } if(!this.addTaskWakesUp && this.wakesUpForTask(task)) { this.wakeup(inEventLoop); } } }
NioEventLoop线程执行逻辑
NioEventLoop对应线程的run方法,run()方法里面是一个死循环,主要的逻辑是首先采用select检查是否有IO事件,如果有IO事件,就采用processSelectedKey()对IO事件进行处理,最后调用runAllTasks()处理任务队列中的任务。
protected void run() { while(true) { boolean oldWakenUp = this.wakenUp.getAndSet(false); try { if(this.hasTasks()) { this.selectNow(); } else { this.select(oldWakenUp); if(this.wakenUp.get()) { this.selector.wakeup(); } } this.cancelledKeys = 0; this.needsToSelectAgain = false; int t = this.ioRatio; if(t == 100) { this.processSelectedKeys(); this.runAllTasks(); } else { long e = System.nanoTime(); this.processSelectedKeys(); long ioTime = System.nanoTime() - e; this.runAllTasks(ioTime * (long)(100 - t) / (long)t); } if(this.isShuttingDown()) { this.closeAll(); if(this.confirmShutdown()) { return; } } } catch (Throwable var8) { logger.warn("Unexpected exception in the selector loop.", var8); try { Thread.sleep(1000L); } catch (InterruptedException var7) { ; } } } }
这段代码中的ioRadio是控制执行IO事件和执行任务队列中的任务的一个事件比,默认是50,代表执行IO事件处理和执行任务队列的任务事件比是1:1。
1)使用select检测IO事件
通过Selector的select()方法可以选择已经准备就绪的通道 (这些通道包含你感兴趣的的事件)。比如你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。Java中的Selector几个重载的select()方法:
select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。
一旦调用select()方法,并且返回值不为0时,则可以通过调用Selector的selectedKeys()方法来访问已选择键集合 。如下:
Set selectedKeys=selector.selectedKeys();
Netty中首先判断任务队列是否为空,如果为空的话,就采用select(ltimeout)有超时设置的阻塞方法,如果不为空的话,就调用非阻塞的selectNow()方法,因为即使没有IO事件处理,也可以对任务队列中的任务进行处理。Netty中NioEventLoop的select和selectNow方法其实底层还是依靠selector的select方法。
void selectNow() throws IOException { try { this.selector.selectNow(); } finally { if(this.wakenUp.get()) { this.selector.wakeup(); } } } private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int e = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + this.delayNanos(currentTimeNanos); while(true) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if(timeoutMillis <= 0L) { if(e == 0) { selector.selectNow(); e = 1; } break; } int selectedKeys = selector.select(timeoutMillis); ++e; if(selectedKeys != 0 || oldWakenUp || this.wakenUp.get() || this.hasTasks() || this.hasScheduledTasks()) { break; } if(Thread.interrupted()) { if(logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } e = 1; break; } long time = System.nanoTime(); if(time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { e = 1; } else if(SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && e >= SELECTOR_AUTO_REBUILD_THRESHOLD) { logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding selector.", Integer.valueOf(e)); this.rebuildSelector(); selector = this.selector; selector.selectNow(); e = 1; break; } currentTimeNanos = time; } if(e > 3 && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row.", Integer.valueOf(e - 1)); } } catch (CancelledKeyException var13) { if(logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", var13); } } }
可以看到调用selectNow方法是直接调用java nio的select.selectNow方法,而Netty的select方法中有一个参数oldWakeUp记录当前操作是否是唤醒状态(不太清楚这个唤醒状态的作用),每次进行select操作之前,会将其标志位false,表示要进行select操作,而且是未唤醒状态。
Netty中的select方法首先是根据当前时间时间去计算截止时间,这里使用到了超时队列(超时队列的作用也不太清楚),然后根据截止时间去计算超时时间,如果超时时间小于0,就执行selectNow操作,并退出此次select操作,否则执行带有超时时间的select方法,如果返回的selectKey不等于0,也就是有channel在select上注册了,或者该select操作被唤醒了(?),或者任务队列中有了任务,定时任务队列中有了任务,都会break出来。
接下来的代码逻辑是避免JDK空轮询的,当JDK发生了空轮训,select会直接返回,这时并没有IO事件到达,也没有超过超时时间,这样会导致线程进入死循环,CPU利用率飙升至100%,JDK到现在也并没有解决这个问题。
而Netty是通过记录空轮询的次数,如果这个次数达到了一个上限,上限默认是512,那么就新建一个selector,将注册在老selector上的channel注册到新的selector上,并且关闭老的selector,将新的selector替代老的selector。Netty通过rebuildSelector方法重建selector。
public void rebuildSelector() { if(!this.inEventLoop()) { this.execute(new Runnable() { public void run() { NioEventLoop.this.rebuildSelector(); } }); } else { Selector oldSelector = this.selector; if(oldSelector != null) { Selector newSelector; try { newSelector = this.openSelector(); } catch (Exception var9) { logger.warn("Failed to create a new Selector.", var9); return; } int nChannels = 0; label69: while(true) { try { Iterator t = oldSelector.keys().iterator(); while(true) { if(!t.hasNext()) { break label69; } SelectionKey key = (SelectionKey)t.next(); Object a = key.attachment(); try { if(key.isValid() && key.channel().keyFor(newSelector) == null) { int e = key.interestOps(); key.cancel(); SelectionKey var14 = key.channel().register(newSelector, e, a); if(a instanceof AbstractNioChannel) { ((AbstractNioChannel)a).selectionKey = var14; } ++nChannels; } } catch (Exception var11) { logger.warn("Failed to re-register a Channel to the new Selector.", var11); if(a instanceof AbstractNioChannel) { AbstractNioChannel var13 = (AbstractNioChannel)a; var13.unsafe().close(var13.unsafe().voidPromise()); } else { NioTask task = (NioTask)a; invokeChannelUnregistered(task, key, var11); } } } } catch (ConcurrentModificationException var12) { ; } } this.selector = newSelector; try { oldSelector.close(); } catch (Throwable var10) { if(logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", var10); } } logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } } }
2)processSelectedKey()
netty中selectedKey的优化
通过调用Selector的selectedKeys()方法来访问已选择键集合,此时返回的是HashSet。但是netty是通过反射的方式,将HashSet替换成数组pssSelectedKeysOptimized去处理IO事件。
private Selector openSelector() { AbstractSelector selector; try { selector = this.provider.openSelector(); } catch (IOException var7) { throw new ChannelException("failed to open a new selector", var7); } if(DISABLE_KEYSET_OPTIMIZATION) { return selector; } else { try { SelectedSelectionKeySet t = new SelectedSelectionKeySet(); Class selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); if(!selectorImplClass.isAssignableFrom(selector.getClass())) { return selector; } Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, t); publicSelectedKeysField.set(selector, t); this.selectedKeys = t; logger.trace("Instrumented an optimized java.util.Set into: {}", selector); } catch (Throwable var6) { this.selectedKeys = null; logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, var6); } return selector; } }
首先会调用JDK的openSelector方法返回创建的selector,然后会判断是否要对keySet进行优化,通过判断DISABLE_KEYSET_OPTIMIZATION,是否要对keyset进行优化,默认是要对keyset进行优化的。这里的SelectedSelectionKeySet是优化过后的keyset,底层是通过两个数组加上两个数组的大小进行实现的,这样可以使得add操作达到o(1)的时间复杂度(但是是HashSet的add操作时间复杂度不也是o(1))嘛,
processSelectedKey调用processSelectedKeysOptimized
该方法的流程就是遍历数组中所有的selectedKey,一旦遍历完,就将该引用指向为空。获取每一个selectorKey对应的channel,然后通过调用processSelectedKey去处理该channel上感兴趣的事件。
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { int i = 0;
//遍历SelectedKsys while(true) { SelectionKey k = selectedKeys[i]; if(k == null) { return; } selectedKeys[i] = null;
//获取selectKey对应的channel Object a = k.attachment(); if(a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel)((AbstractNioChannel)a)); } else { NioTask task = (NioTask)a; processSelectedKey(k, (NioTask)task); } if(this.needsToSelectAgain) { while(selectedKeys[i] != null) { selectedKeys[i] = null; ++i; } this.selectAgain(); selectedKeys = this.selectedKeys.flip(); i = -1; } ++i; } }
这里处理selector上面的IO事件,底层其实都是通过channel的unsafe类进行操作的,这里read和accept事件对应的都是channel的read方法。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if(!k.isValid()) { unsafe.close(unsafe.voidPromise()); } else { try { int ignored = k.readyOps();
//如果是read或者accept事件就对channel进行读操作 if((ignored & 17) != 0 || ignored == 0) { unsafe.read(); if(!ch.isOpen()) { return; } }
//write事件 if((ignored & 4) != 0) { ch.unsafe().forceFlush(); }
//connect事件 if((ignored & 8) != 0) { int ops = k.interestOps(); ops &= -9; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException var5) { unsafe.close(unsafe.voidPromise()); } } }
3)使用runAllTasks()执行任务队列中的事件
PriorityQueue(优先级队列),也是非线程安全的队列。
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if(task == null) { throw new NullPointerException("task"); } else { if(this.inEventLoop()) { this.delayedTaskQueue.add(task); } else { this.execute(new Runnable() { public void run() { SingleThreadEventExecutor.this.delayedTaskQueue.add(task); } }); } return task; } }
runAllTask首先从定时任务队列中拉取定时任务,将需要执行的定时任务加入到普通任务队列中,并计算截止时间,然后循环的从普通任务队列中拉取任务,并执行任务,这里判断是否到达超时时间,是每相隔64个任务,就判断是否到达最大任务执行时间。为啥要每隔64个任务判断是否超时呢?因为nanoTime也是比较费时的。
protected boolean runAllTasks(long timeoutNanos) { this.fetchFromDelayedQueue(); Runnable task = this.pollTask(); if(task == null) { return false; } else { long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0L; long lastExecutionTime; while(true) { try { task.run(); } catch (Throwable var11) { logger.warn("A task raised an exception.", var11); } ++runTasks; if((runTasks & 63L) == 0L) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if(lastExecutionTime >= deadline) { break; } } task = this.pollTask(); if(task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.lastExecutionTime = lastExecutionTime; return true; } }
从定时队列中拉取任务,这里拉取的任务是拉取截止时间不超过nanoTime的任务,将任务从定时任务队列中删除,将任务加入到普通任务队列中。这个while循环执行完成之后,所有需要执行的定时任务全部都加入到普通任务队列中。
private void fetchFromDelayedQueue() { long nanoTime = 0L; while (true) { ScheduledFutureTask delayedTask = (ScheduledFutureTask) this.delayedTaskQueue.peek(); if (delayedTask == null) { break; } if (nanoTime == 0L) { nanoTime = ScheduledFutureTask.nanoTime(); } if (delayedTask.deadlineNanos() > nanoTime) { break; } this.delayedTaskQueue.remove(); this.taskQueue.add(delayedTask); } }
定时任务队列是一个优先级队列,队列按照优先级进行排序,这里的优先级是每个任务的截止时间,队列是按照截止时间的早晚对任务进行排序的。
public int compareTo(Delayed o) { if(this == o) { return 0; } else { ScheduledFutureTask that = (ScheduledFutureTask)o; long d = this.deadlineNanos() - that.deadlineNanos(); if(d < 0L) { return -1; } else if(d > 0L) { return 1; } else if(this.id < that.id) { return -1; } else if(this.id == that.id) { throw new Error(); } else { return 1; } } }
总结:
1.默认情况下,NioEventLoopGroup会创建2*cpu个数的线程池,在调用NioEventLoop.execute(task)的时候,如果当前的NioEventLoop没有创建自己的线程,就会创建线程。
2.Netty如何解决JDK空轮训bug?通过计算空轮训操作的个数,这里的空轮训的判断是既没有IO事件的到达,也没有达到超时时间,如果空轮训的个数超过阈值(512),就会新建一个selector,将旧selector的selectorKey注册到新的selector上,将旧的selector关闭,用新的selector替代旧的selector。
3.Netty在所有外部线程调用NioEventLoop的操作时,如果通过InEventLoop判断是否在NioEventLoop所属的线程,如果不在通过startThread启动NioEventLoop的线程,并且将任务添加到NioEventLoop的任务队列中,所有NioEventLoop对应一个线程,其中的操作只会被一个线程所执行,实现了异步串行无锁化。
标签:let 异步 替代 ide long prot sse exp sop
原文地址:https://www.cnblogs.com/xiaobaituyun/p/10801336.html