Tomcat的Http11NioProtocol协议使用Java NIO技术实现高性能Web服务器。本文通过分析Http11NioProtocol源码来学习Java NIO的使用。从中可以了解到阻塞IO和非阻塞IO的配合,NIO的读写操作以及Selector.wakeup的使用。
Java NIO服务器端实现的第一步是开启一个新的ServerSocketChannel对象。Http11NioProtocol的实现也不例外, 在NioEndPoint类的init方法可以看到这段代码。
代码1:NioEndPoint.init()方法
public void init() throws Exception { if (initialized ) return; //开启一个新的ServerSocketChannel serverSock = ServerSocketChannel.open(); //设置socket的性能偏好 serverSock.socket().setPerformancePreferences(socketProperties .getPerformanceConnectionTime(), socketProperties.getPerformanceLatency(), socketProperties.getPerformanceBandwidth()); InetSocketAddress addr = ( address!=null ?new InetSocketAddress(address ,port ):new InetSocketAddress(port)); //绑定端口号,并设置backlog serverSock.socket().bind(addr,backlog ); //将serverSock设置成阻塞IO serverSock.configureBlocking(true); //mimic APR behavior //初始化acceptor线程数 if (acceptorThreadCount == 0) { // FIXME: Doesn‘t seem to work that well with multiple accept threads acceptorThreadCount = 1; } //初始化poller线程数 if (pollerThreadCount <= 0) { //minimum one poller thread pollerThreadCount = 1; } // 根据需要,初始化SSL // 因为主要关注Java NIO, 所以这一块代码就省略掉了 if (isSSLEnabled()) { ...... } //OutOfMemoryError策略 if (oomParachute >0) reclaimParachute(true); //开启NioSelectorPool selectorPool.open(); initialized = true ; }
在NioEndPoint.init方法中,可以看到ServerSocketChannel被设置成阻塞IO,并且没有注册任何就绪事件。这样可以和阻塞ServerSocket一样方便地使用阻塞accept方法来接收客户端新来的连接。但不同的是当NioEndPoint.Accept线程通过accept方法获得一个新的SocketChannel后会构建一个OP_REGISTER类型的PollerEvent事件并放到Poller.events队列中。而我们使用ServerSocket实现服务器的时候,在接收到新连接后,一般是从线程池中取出一个线程来处理这个连接。
在NioEndPoint.Accept的setSocketOptions方法中可以看到获得SocketChannel后的处理过程。步骤如下:
1)将SocketChannel设置成非阻塞;
2)构建OP_REGISTER类型的PollerEvent对象,并放入到Poller.events队列中。
代码2:NioEndPoint.Accept类的setSocketOptions方法
protected boolean setSocketOptions(SocketChannel socket) { try { //将客户端Socket设置为非阻塞, APR风格 socket.configureBlocking( false); Socket sock = socket.socket(); socketProperties.setProperties(sock); //从缓存中取NioChannel对象,如果取不到直接构建一个 NioChannel channel = nioChannels.poll(); if ( channel == null ) { // 如果sslContext不等于null, 需要启动ssl if (sslContext != null) { .... } //正常tcp启动 else { //构建NioBufferHandler对象 NioBufferHandler bufhandler = new NioBufferHandler(socketProperties .getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); //构建NioChannel对象 channel = new NioChannel(socket, bufhandler); } } else { //从缓存中取的NioChannel对象,将客户端socket设置进去 channel.setIOChannel(socket); if ( channel instanceof SecureNioChannel ) { SSLEngine engine = createSSLEngine(); ((SecureNioChannel)channel).reset(engine); } else { channel.reset(); } } //注册NioChannel对象 getPoller0().register(channel); } catch (Throwable t) { try { log.error("" ,t); } catch ( Throwable tt){} // Tell to close the socket return false ; } return true ; }
Poller线程会从Poller.events队列中取出PollerEvent对象,并运行PollerEvent.run()方法。在PollerEvent.run()方法中发现是OP_REGISTER事件,则会在Poller.selector上注册SocketChannel对象的OP_READ就绪事件。
代码3:PollerEvent.run()方法代码片段
public void run() { if ( interestOps == OP_REGISTER ) { try { //在Poller.selector上注册OP_READ就绪事件 socket.getIOChannel().register(socket .getPoller().getSelector(), SelectionKey.OP_READ , key ); } catch (Exception x) { log.error("" , x); } } ...... }
至此,一个客户端连接准备工作就已经完成了。我们获得了一个客户端的SocketChannel, 并注册OP_READ就绪事件到Poller.selector上(如图1)。下面就可以进行数据读写了。
图1:ServerSocketChannel和SocketChannel的初始化状态
Poller线程会做如下工作:
1) 通过selection操作获取已经选中的SelectionKey数量;
2) 执行Poller.events队列中的PollerEvent;
3) 处理已经选中的SelectionKey。
当有新PollerEvent对象加入Poller.events队列中,需要尽快执行第二步,而不应该阻塞的selection操作中。所以就需要配合Selector.wakeup()方法来实现这个需求。Tomcat使用信号量wakeupCounter来控制Selector.wakeup()方法,阻塞Selector.select()方法和非阻塞Selector.selectNow()方法的使用。
当有新PollerEvent对象加入Poller.events队列中,会按照如下条件执行Selector.wakeup()方法。
代码4:Poller.addEvent()方法,实现将PollerEvent对象加入Poller.events队列中。
public void addEvent(Runnable event) { events.offer(event); //如果wakeupCount加1后等于0,则调用wakeup方法 if ( wakeupCounter .incrementAndGet() == 0 ) selector.wakeup(); }
if (wakeupCounter .get()>0) { keyCount = selector.selectNow(); else { wakeupCounter.set(-1); keyCount = selector.select(selectorTimeout ); } wakeupCounter.set(0);
所以,Tomcat通过wakeupCounter信号量的变化来控制只有阻塞在selection操作的时候才调用Selector.wakeup()方法。当有新PollerEvent对象加入Poller.events队列中,并且没有处于阻塞在selection操作中,则直接调用非阻塞方法Selector.selectNow()。
Poller线程会调用Poller.processKey()方法处理已经选中的SelectionKey。
该方法会完成下面工作:
1)取消在Poller.selector上注册的OP_READ就绪事件;
2)启动工作线程来处理网络请求;
2-1)读取和解析http请求数据
2-2)如果是动态内容,则会调用用户自定义的Servlet类处理并返回结果给浏览器;如果是静态内容,则会直接返回静态资源数据给浏览器。
我们在这就不详细讨论http协议的实现以及Servlet的使用,直接跳到网络IO读写实现类NioSelectorPool。
NioSelectorPool类也提供了产生Selector对象的功能,通过NioSelectorPool.get()方法就可以获得一个Selector对象。
根据命令行参数-Dorg.apache.tomcat.util.net.NioSelectorShared的设置决定是否在SocketChannel中共享Selector。
从NioSelectorPool类中获得的Selector对象会传入到NioSelectorPool的read和write方法,并在网络IO读写时候使用。
NioSelectorPool类的读写方法提供了两种模式。通过方法的最后一个入参block控制。
1)读方法read():
2)写方法write():
另外如果是共享Selector(NioSelectorShared=true)并且阻塞模式(block=true),则会使用NioBlockingSelector类实现读写数据。该类与NioSelectorPool使用Java NIO的策略是类似的,但实现略有不同,本文就不详细分析了。
图2:事件注册在读写时候发生的变化
下面是NioSelectorPool的read方法,实现从网络IO中读取数据。该方法有5个参数:
代码6:NioSelectorPool的read()方法
public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException { //如果是共享Selector和阻塞模式,则使用NioBlockingSelector实现数据读取 if ( SHARED && block ) { return blockingSelector .read(buf,socket,readTimeout); } SelectionKey key = null; int read = 0; boolean timedout = false; //一开始我们认为是可以读的 int keycount = 1; //assume we can read //开始时间 long time = System.currentTimeMillis(); //start the timeout timer try { //当没有超时,则继续读数据 while ( (!timedout) ) { int cnt = 0; if ( keycount > 0 ) { //only read if we were registered for a read cnt = socket.read(buf); if (cnt == -1) throw new EOFException(); read += cnt; //如果读取到数据,则继续读 if (cnt > 0) continue; //read some more //如果没有读取到数据,并且不是block模式,则直接break if (cnt==0 && (read>0 || (!block) ) ) break; //we are done reading } if ( selector != null ) {//perform a blocking read //在NioSelectionPool提供的selector上注册OP_READ事件 if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_READ); else key.interestOps(SelectionKey.OP_READ); //调用Selector.select方法 keycount = selector.select(readTimeout); } //计算是否超时 if (readTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=readTimeout; } //while //如果超时,抛出SocketTimeoutException异常 if ( timedout ) throw new SocketTimeoutException(); } finally { //在返回前,取消SelectionKey, 并将所有的key从selector中删掉 if (key != null) { key.cancel(); if (selector != null) selector.selectNow();//removes the key from this selector } } return read; }
代码7:NioSelectorPool.write()方法
public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout, boolean block,MutableInteger lastWrite) throws IOException { //如果是共享Selector和阻塞模式,则使用NioBlockingSelector实现写数据 if ( SHARED && block ) { return blockingSelector.write(buf,socket,writeTimeout,lastWrite); } SelectionKey key = null; int written = 0; boolean timedout = false; //一开始我们认为是可以读的 int keycount = 1; //assume we can write //记录开始时间 long time = System.currentTimeMillis(); //start the timeout timer try { while ( (!timedout) && buf.hasRemaining() ) { int cnt = 0; if ( keycount > 0 ) { //only write if we were registered for a write cnt = socket.write(buf); //write the data if (lastWrite!=null) lastWrite.set(cnt); if (cnt == -1) throw new EOFException(); written += cnt; //如果写数据成功,重新记录超时开始时间,并继续读 if (cnt > 0) { time = System. currentTimeMillis(); //reset our timeout timer continue; //we successfully wrote, try again without a selector } //如果写入数据为0,并且是非阻塞模式,则直接退出 if (cnt==0 && (!block)) break; //don‘t block } if ( selector != null ) { //在NioSelectorPool的selector注册OP_WRITE事件 if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE); else key.interestOps(SelectionKey.OP_WRITE); keycount = selector.select(writeTimeout); } //是否超时 if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout; } //while //如果超时,则直接抛出SocketTimeoutException异常 if ( timedout ) throw new SocketTimeoutException(); } finally { //在返回前,取消SelectionKey, 并将所有的key从selector中删掉 if (key != null) { key.cancel(); if (selector != null) selector.selectNow();//removes the key from this selector } } return written; }
Tomcat在使用Java NIO的时候,将ServerSocketChannel配置成阻塞模式,这样可以方便地对ServerSocketChannel编写程序。当accept方法获得一个SocketChannel,并没有立即从线程池中取出一个线程来处理这个SocketChannel,而是构建一个OP_REGISTER类型的PollerEvent,并放到Poller.events队列中。Poller线程会处理这个PollerEvent,发现是OP_REGISTER类型,会在Poller.selector上注册一个这个SocketChannel的OP_READ就绪事件。如图1所示。
因为Java NIO的wakeup特性,使用wakeupCount信号量控制Selector.wakeup()方法,非阻塞方法Selector.selectNow()和阻塞方法Selector.select()的调用。我们在编写Java NIO程序时候也可以参考这种方式。
在SocketChannel上读的时候,分成非阻塞模式和阻塞模式。
在SocketChannel上写的时候也分成非阻塞模式和阻塞模式。
在写数据的时候,开始没有监听OP_WRITE就绪事件,直接调用write()方法。这是一个乐观设计,估计网络大部分情况都是正常的,不会拥塞。如果第一次写没有成功,则说明网络可能拥塞,那么再等待OP_WRITE就绪事件。
阻塞模式的读写方法没有在原有的Poller.selector上注册就绪事件,而是使用NioSelectorPool类提供的Selector对象注册就绪事件。这样的设计可以将各个Channel的就绪事件分散注册到不同的Selector对象中,避免大量Channel集中注册就绪事件到一个Selector对象,影响性能。
1)Tomcat6.0.18源码
2)Ron Hitchens的Java NIO
本博客系博主原创,转载请附上原博客地址:http://blog.csdn.net/jeff_fangji/article/details/43909677
通过Tomcat的Http11NioProtocol源码学习Java NIO设计
原文地址:http://blog.csdn.net/jeff_fangji/article/details/43909677