标签:
Netty的服务端怎么和java NIO联系起来的,一直很好奇这块内容,这里跟下代码,下篇文章看下Channel相关的知识。
final ChannelFuture initAndRegister() {final Channel channel = channelFactory().newChannel(); //try {init(channel);} catch (Throwable t) {channel.unsafe().closeForcibly(); //立即关闭通道且不会触发事件//因为这个通道还没有注册到EventLoop,所以我们需要强制GlobalEventExecutor的使用。return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}//注册一个EventLoopChannelFuture regFuture = group().register(channel);//注册失败if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it‘s one of the following cases:// 程序运行到这里且promise没有失败,可能有如下几种情况// 1) If we attempted registration from the event loop, the registration has been completed at this point.// 如果试图注册到一个EventLoop,该注册完成,// i.e. It‘s safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop‘s task queue for later execution.// 如果试图注册到其他线程,该注册已经成功,但是没有完成,添加一个事件到任务队列中,等会执行// i.e. It‘s safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture;}
@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}
@Overridepublic ChannelFuture register(final Channel channel, final ChannelPromise promise) {if (channel == null) {throw new NullPointerException("channel");}if (promise == null) {throw new NullPointerException("promise");}channel.unsafe().register(this, promise);return promise;}
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");}if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new OneTimeTask() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}
@Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().selector, 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}
wakeup 方法所得的结果
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}
@Overrideprotected void run() {for (;;) {boolean oldWakenUp = wakenUp.getAndSet(false);try {if (hasTasks()) {selectNow();} else {select(oldWakenUp);// ‘wakenUp.compareAndSet(false, true)‘ is always evaluated// before calling ‘selector.wakeup()‘ to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when ‘wakenUp‘ is set to// true too early.//// ‘wakenUp‘ is set to true too early if:// 1) Selector is waken up between ‘wakenUp.set(false)‘ and// ‘selector.select(...)‘. (BAD)// 2) Selector is waken up between ‘selector.select(...)‘ and// ‘if (wakenUp.get()) { ... }‘. (OK)//// In the first case, ‘wakenUp‘ is set to true and the// following ‘selector.select(...)‘ will wake up immediately.// Until ‘wakenUp‘ is set to false again in the next round,// ‘wakenUp.compareAndSet(false, true)‘ will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following ‘selector.select(...)‘ call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {processSelectedKeys();runAllTasks();} else {final long ioStartTime = System.nanoTime();processSelectedKeys();final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}if (isShuttingDown()) {closeAll();if (confirmShutdown()) {break;}}} catch (Throwable t) {logger.warn("Unexpected exception in the selector loop.", t);// Prevent possible consecutive immediate failures that lead to// excessive CPU consumption.try {Thread.sleep(1000);} catch (InterruptedException e) {// Ignore.}}}}
标签:
原文地址:http://www.cnblogs.com/gaoxing/p/4401791.html