/**
* Endpoint that provides low - level
network I/O - must be matched to the
* ProtocolHandler implementation (ProtocolHandler using BIO, requires BIO
* Endpoint etc.).
*/
protected AbstractEndpoint<S>
endpoint = null ;
先对AbstractEndpoint(org.apache.tomcat.util.net.AbstractEndpoint)类做了解。
【AbstractEndpoint的线程池】
AbstractEndpoint有一个Executor的属性,是它所用的线程池。这个线程池可以是外界指定的,也可以是由AbstractEndpoint自己创建的。通过属性internalExecutor来标识使用的是外部的线程池,还是有Endpoint自己创建的线程池。
可以由外部调用显式指定endpoint使用的线程池
/**
* External Executor based thread pool.
*/
private Executor executor = null;
public void setExecutor(Executor
executor) {
this.executor =
executor;
this.internalExecutor =
(executor==null);
}
public Executor
getExecutor() { return executor ;
}
在当调用者没有显式指定所用线程池时,会创建一个自己所用的线程池,创建方法如下。
public void createExecutor ()
{
internalExecutor = true ;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName()
+ "-exec-" , daemon ,
getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(),
getMaxThreads(), 60, TimeUnit.SECONDS ,taskqueue,
tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
【AbstractEndpoint的Acceptor】
在AbstractEndpoint中定义了Acceptor类(实现了Runnable接口),同时定义了acceptors属,主要用于接收网络请求。
/**
* Threads used to accept new connections and pass them to worker threads.
*/
protected Acceptor []
acceptors;
启动acceptors时,并没有使用前面提到过的线程池,而是生成了新的守护线程(getDaemon方法,默认返回true),来运行。但,具体在acceptors中线程的执行体,则交由具体的子类负责实现(貌似template-method模式是各种框架的基础配置),通过重写抽象方法createAcceptor来完成。
protected final void startAcceptorThreads()
{
int count
= getAcceptorThreadCount();
acceptors = new Acceptor [count];
for (int i
= 0; i < count; i++) {
acceptors[i]
= createAcceptor();
String threadName = getName() + "-Acceptor-" +
i;
acceptors[i].setThreadName(threadName);
Thread t = new Thread(acceptors [i],
threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
AbstractEndpoint框架主要定义了一些基本的属性,同时规定了生命周期的调用顺序。
Endpoint的初始化和启动,主要执行具体子类的所实现的startInternal方法来完成。
public final void init() throws Exception
{
if (bindOnInit )
{
bind();
bindState =
BindState.BOUND_ON_INIT;
}
}
public final void start() throws Exception
{
if (bindState ==
BindState.UNBOUND) {
bind();
bindState =
BindState.BOUND_ON_START;
}
startInternal();
}
在Http11NioProtocol的构造函数中指定的是使用NioEndpoint实例,因此这里通过分析AbstractEndpoint的子类NioEndpoint来做进一步的了解。
这里主要关注bind和startInternal两个函数
【NioEndpoint】
bind操作的主体部分的代码如下
@Override
public void bind() throws Exception
{
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = (getAddress()!= null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
serverSock.socket().bind(addr,getBacklog());
serverSock.configureBlocking( true); //mimic
APR behavior
serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());
// 这里省却了一些与配置设置和ssl相关的代码
selectorPool.open();
}
从代码上来看,bind操作主要是做一些配置参数的计算,以及开始对socket的监听
startInternal操作的代码如下
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception
{
if (!running)
{
running = true;
paused = false;
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
keyCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getKeyCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// Create worker collection
if (
getExecutor() == null ) {
createExecutor();
}
initializeConnectionLatch();
// Start poller threads
pollers = new Poller[getPollerThreadCount()];
for (int i=0;
i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i],
getName() + "-ClientPoller-" +i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon( true);
pollerThread.start();
}
startAcceptorThreads();
}
}
在startInternal中,初始化线程池,创建和启动网络数据接收线程组,创建和启动poller线程组。
在startInternal中,会检查是否指定要使用外部的线程池,如果没有指定外部线程池,Endpoint就会创建一个内部的线程池。但这里面没有与线程的调度和使用相关的代码
【线程池的调用时机】
与Acceptor和Poller相关?
在NioEndpoint中对Acceptor的功能说明如下,acceptor主要监听网络连接并且进行任务分发的后台线程。
// ---------------------------------------------------
Acceptor Inner Class
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
protected class Acceptor extends AbstractEndpoint.Acceptor
{
Acceptor负责接收网络请求,建立连接。连接建立之后,将这个socket连接交给Poller。由Poller来负责执行数据的读取和业务执行。
从代码上看,Acceptor的负责控制底层同时连接的socket数目,它的任务在把建立建立之后socket交给Poller之后就结束了。
运行的主体部分在重写的run方法中,将建立连接之后的socket交给Poller的工作在setSocketOptions方法中实现。
setSocketOptions方法是专门处理特定socket连接的方法,将一个SocketChannel对象包装成一个NioChannel之后,注册到Poller中。
/**
* Process the specified connection.
*/
protected boolean setSocketOptions(SocketChannel
socket) {
// Process the connection
try {
//disable blocking, APR style, we are
gonna be polling it
socket.configureBlocking( false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = nioChannels.pop();
// 这里省略了设置channel属性的一些语句
getPoller0().register(channel);
} catch (Throwable
t) {
// 省略异常处理的相关代码
return false ;
}
return true ;
}
getPoller0是从在startInternal方法中初始化的pollsers数组中取一个poller。
然后通过Poller对象的register方法把这个channel注册到此Poller对象上。
pollers数组的大小是根据当前的运行环境计算出来的,无法通过配置修改。
【Poller是什么】
Poller是实现了Runnable接口的,在NioEndpoint的时候,会初始化pollers数组,同时启动pollers数组中的线程,让pollers开始工作。
每个Poller都有一个自己的Selector对象,在Poller的构造函数中,通过调用Selector.open方法生成,虽然看上去这很像是一个单例模式,但实际上没法返回的都是一个全新的对象(可能与jdk的底层实现有关,目前从Oracle提供的jdk的试验来看,两次调用返回的是不同的对象)。
在Poller重写的run方法中,会首先根据当前endpoint的状态来选择操作。
如果endpoint被暂停,让Poller线程进行休眠,直到暂停被解除
// Loop if endpoint is paused
while (paused
&& (!close) ) {
try {
Thread.sleep(100);
} catch (InterruptedException
e) {
// Ignore
}
}
endpoint是暂停状态,但没有被关闭,暂停状态是有可能恢复的,所以让poller休眠等待即可
如果endpoint被关闭,那就处理完已有的数据,这个Poller打开的selector。结束poller线程的执行。通过break跳出run方法体的while(true)循环。
// Time to terminate?
if (close)
{
events();
timeout(0, false);
try {
selector.close();
} catch (IOException
ioe) {
log.error(sm.getString(
"endpoint.nio.selectorCloseFail" ),
ioe);
}
break;
} else {
hasEvents = events();
}
如果endpoint是正常工作状态,处理已有的数据。通过events方法来处理当前Poller中已有的事件(数据)。同时使用selector.select或者selectNow来获取这个Poller上
状态已经OK的渠道,并进行数据处理。
if (
!close ) {
if (wakeupCounter.getAndSet(-1)
> 0) {
//if we are here,
means we have other stuff to do
//do a non blocking
select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close)
{
events();
timeout(0, false);
try {
selector.close();
} catch (IOException
ioe) {
log.error(sm.getString(
"endpoint.nio.selectorCloseFail" ),
ioe);
}
break;
}
正常状态下的数据处理,通过processKey来实现。获取对应的渠道的key,然后调用processKey方法
Iterator<SelectionKey> iterator =keyCount
> 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection
of ready keys and dispatch
// any active event.
while (iterator
!= null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
// Attachment may be null
if another thread has called
// cancelledKey()
if (attachment
== null) {
iterator.remove();
} else {
attachment.access();
iterator.remove();
processKey(sk, attachment);
}
}
processKey的主要工作是调用NioEndpoint的processSocket来实现socket的读写。
if ( isWorkerAvailable()
) {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket
= false;
// Read goes before
write
if (sk.isReadable())
{
if (!processSocket(attachment,
SocketStatus.OPEN_READ, true )) {
closeSocket = true;
}
}
if (!closeSocket
&& sk.isWritable()) {
if (!processSocket(attachment,
SocketStatus.OPEN_WRITE, true )) {
closeSocket = true;
}
}
if (closeSocket)
{
cancelledKey(sk,SocketStatus.DISCONNECT);
}
} else {
result = false;
}
根据socket的状态,来进行调用,于是这里又转到了NioEndpoint上。
这里绕回到NioEndpoint上的processSocket,才发现对前面提到过的线程池的使用,
attachment.setCometNotify(false); //will
get reset upon next reg
SocketProcessor sc = processorCache.pop();
if (
sc == null ) sc = new SocketProcessor(attachment,
status);
else sc.reset(attachment,
status);
Executor executor = getExecutor();
if (dispatch
&& executor != null) {
executor.execute(sc);
} else {
sc.run();
}
这里通过getExecutor来获取可用的线程池。任务被封装成SocketProcessor对象,在成功获取线程池后,则通过线程池来进行socket数据数据的读写操作。在此就使用到了启动tomcat时所配置的线程池了。
在此对NioEndpoint的架构做个总结
Acceptor:
负责监听并接收socket连接建立。由Acceptor来控制与服务端建立连接的客户端socket数目。具体的数目为一个服务可配置项,可以在启动服务时指定
Poller:
负责处理已建立连接的socket,将channel封装后,提交至线程池(Executors)来处理。Poller线程的数目与运行时环境有关,通过计算得出,不可配置。
Executors:
处理socket请求的线程池。线程池中线程的数目可在启动服务时配置。
[Poller中Selector的注册]
在Poller中,对selector的使用上,只看到通过selector.select活selector.selectNow来获取对应的渠道。但在java的nio中,一个渠道必须要先在selector上注册后,才能被
selector获取到。那么,各个channel是何时再selector上注册的呢?
答案在PollerEvent上。
前面提到过Acceptor的主要工作是把建立好连接的socket注册到Poller上,通过register上实现。
Poller的register把建立好连接的socket封装成一个PollerEvent对象,然后放入这个Poller所维护的事件队列中。
Poller内部所维护的事件队列,定义如下 private final SynchronizedQueue<PollerEvent>
events = new SynchronizedQueue<>();
在Poller的run方法中,通过events方法,来处理已有的事件。
public boolean events()
{
boolean result
= false;
PollerEvent pe = null;
while (
(pe = events.poll()) != null ) {
result = true;
try {
pe.run();
pe.reset();
if (running
&& !paused) {
eventCache.push(pe);
}
} catch (
Throwable x ) {
log.error( "",x);
}
}
return result;
}
events是一个PollerEvent类型队列。events方法中有一个while循环,取出队列中的每一个PollerEvent对象,然后执行它。
PollerEvent实现了Runnable接口,在其run方法中,完成了channel对selector的注册
if ( interestOps == OP_REGISTER
) {
try {
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
} catch (Exception
x) {
log.error( "",
x);
}
} else {
socket.getPoller()返回这个channel所注册的Poller对象。getSelector()返回这个Poller对象的selector。注册之后,当这个socket的channel有数据到达,便能通过selector.select活selector.selectNow被返回,放入到Executor中进行处理。