标签:打开 size logging ast user VID assert auto hat
public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
final ChannelFuture initAndRegister() { Channel channel = this.channelFactory().newChannel(); try { this.init(channel); } catch (Throwable var3) { channel.unsafe().closeForcibly(); return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3); } ChannelFuture regFuture = this.group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
public T newChannel() { try { return (Channel)this.clazz.newInstance(); } catch (Throwable var2) { throw new ChannelException("Unable to create Channel from class " + this.clazz, var2); } }
public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
NioServerSocketChannel通过newSocket创建java nio中的channel,newSocket(DEFAULT_SELECTOR_PROVIDER)
实例的关系,他们都是一个socket channel,但是是两个不同的东西。newSocket(DEFAULT_SELECTOR_PROVIDER)
protected AbstractChannel(Channel parent) { this.parent = parent; this.unsafe = this.newUnsafe(); this.pipeline = new DefaultChannelPipeline(this); }
public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) { super((Channel)null, channel, 16); this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this, this.javaChannel().socket()); }
void init(Channel channel) throws Exception { Map<ChannelOption<?>, Object> options = this.options(); synchronized(options) { channel.config().setOptions(options); } Map<AttributeKey<?>, Object> attrs = this.attrs(); synchronized(attrs) { Iterator i$ = attrs.entrySet().iterator(); while(true) { if (!i$.hasNext()) { break; } Entry<AttributeKey<?>, Object> e = (Entry)i$.next(); AttributeKey<Object> key = (AttributeKey)e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); if (this.handler() != null) { p.addLast(new ChannelHandler[]{this.handler()}); } final EventLoopGroup currentChildGroup = this.childGroup; final ChannelHandler currentChildHandler = this.childHandler; Map var9 = this.childOptions; final Entry[] currentChildOptions; synchronized(this.childOptions) { currentChildOptions = (Entry[])this.childOptions.entrySet().toArray(newOptionArray(this.childOptions.size())); } var9 = this.childAttrs; final Entry[] currentChildAttrs; synchronized(this.childAttrs) { currentChildAttrs = (Entry[])this.childAttrs.entrySet().toArray(newAttrArray(this.childAttrs.size())); } p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() { public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)}); } }}); }
ChannelFuture regFuture = this.group().register(channel);
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } else if (AbstractChannel.this.isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); } else if (!AbstractChannel.this.isCompatible(eventLoop)) { promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); } else { AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { this.register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { public void run() { AbstractUnsafe.this.register0(promise); } }); } catch (Throwable var4) { AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4); this.closeForcibly(); AbstractChannel.this.closeFuture.setClosed(); this.safeSetFailure(promise, var4); } } } }
public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } else { boolean inEventLoop = this.inEventLoop(); if (inEventLoop) { this.addTask(task); } else { this.startThread(); this.addTask(task); if (this.isShutdown() && this.removeTask(task)) { reject(); } } if (!this.addTaskWakesUp && this.wakesUpForTask(task)) { this.wakeup(inEventLoop); } } }
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !this.ensureOpen(promise)) { return; } boolean firstRegistration = this.neverRegistered; AbstractChannel.this.doRegister(); this.neverRegistered = false; AbstractChannel.this.registered = true; this.safeSetSuccess(promise); AbstractChannel.this.pipeline.fireChannelRegistered(); if (firstRegistration && AbstractChannel.this.isActive()) { AbstractChannel.this.pipeline.fireChannelActive(); } } catch (Throwable var3) { this.closeForcibly(); AbstractChannel.this.closeFuture.setClosed(); this.safeSetFailure(promise, var3); } }
protected void doRegister() throws Exception { boolean selected = false; while(true) { try { this.selectionKey = this.javaChannel().register(this.eventLoop().selector, 0, this); return; } catch (CancelledKeyException var3) { if (selected) { throw var3; } this.eventLoop().selectNow(); selected = true; } } }
doBind0(regFuture, channel, localAddress, promise);
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
// AbstractChannel public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); } // DefaultChannelPipeline public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { // 因为bind是一个outbound事件,从pipeline链尾tailContext开始执行 return tail.bind(localAddress, promise); } // tail context的父类AbstractChannelHandlerContext public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } if (isNotValidPromise(promise, false)) { // cancelled return promise; } // 应用程序没有添加outbound的情况下,找到的next context是head context final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; } // head context的父类AbstractChannelHandlerContext private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); } } // 还在head context的父类AbstractChannelHandlerContext public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { // 通过unsafe调用bind了,意味着会调用JVM的功能,操作底层的一些函数了 unsafe.bind(localAddress, promise); } // AbstractUnsafe public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can‘t receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can‘t receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } boolean wasActive = isActive(); try { // 做实际的bind工作 doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); } // NioServerSocketChannel protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { // 最终通过JVM调用server socket的bind、listen等函数,启动服务端 javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
标签:打开 size logging ast user VID assert auto hat