IoService分析
AbstractIoAcceptor定义了所有的public接口,并定义了子类需要实现的bindInternal函数,AbstractPollingIoAcceptor<S extends AbstractIoSession, H>作为它的一个派生类,主要就是实现bindInternal函数,
AbstractPollingIoAcceptor<S extends AbstractIoSession, H>类定义了bindInternal的实现框架,NioSocketAcceptor使用selector实现了它需要的接口,比如select,wakeup,open等函数。
总体来说,bindInternal实现的功能就是开启一个新的线程,在这个线程中绑定监听的地址,并接受客户端请求。看如下两个函数:
protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
// Create a bind request as a Future operation. When the selector
// have handled the registration, it will signal this future.
AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);//这个future是一个用于线程间异步通信结果的类,它可以不被中断的等待异步操作的结果
// adds the Registration request to the queue for the Workers
// to handle
registerQueue.add(request);
// creates the Acceptor instance and has the local
// executor kick it off.
startupAcceptor();//这里启动acceptor线程
// As we just started the acceptor, we have to unblock the select()
// in order to process the bind request we just have added to the
// registerQueue.
try {
lock.acquire();//lock是一个semaphone,用于同步acceptor线程,保证该线程在成功创建并开始运行后,再执行后续的代码
// Wait a bit to give a chance to the Acceptor thread to do the select()
Thread.sleep(10);
wakeup();//这是一个派生类需要实现的接口
} finally {
lock.release();
}
// Now, we wait until this request is completed.
request.awaitUninterruptibly();//这里异步等待acceptor线程设置结果,并导致当前线程被唤醒
if (request.getException() != null) {
throw request.getException();
}
// Update the local addresses.
// setLocalAddresses() shouldn‘t be called from the worker thread
// because of deadlock.
Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
for (H handle : boundHandles.values()) {
newLocalAddresses.add(localAddress(handle));
}
return newLocalAddresses;
}
/**
* This method is called by the doBind() and doUnbind()
* methods. If the acceptor is null, the acceptor object will
* be created and kicked off by the executor. If the acceptor
* object is null, probably already created and this class
* is now working, then nothing will happen and the method
* will just return.
*/
private void startupAcceptor() throws InterruptedException {
// If the acceptor is not ready, clear the queues
// TODO : they should already be clean : do we have to do that ?
if (!selectable) {
registerQueue.clear();
cancelQueue.clear();
}
// start the acceptor if not already started
Acceptor acceptor = acceptorRef.get();
if (acceptor == null) {
lock.acquire();
acceptor = new Acceptor();//创建Acceptor实例
if (acceptorRef.compareAndSet(null, acceptor)) {
executeWorker(acceptor);//启动acceptor线程
} else {
lock.release();
}
}
}
现在来看看Acceptor线程
private class Acceptor implements Runnable {
public void run() {
assert (acceptorRef.get() == this);
int nHandles = 0;
// Release the lock
lock.release();//进程开始运行了,释放lock
while (selectable) {
try {
// Detect if we have some keys ready to be processed
// The select() will be woke up if some new connection
// have occurred, or if the selector has been explicitly
// woke up
int selected = select();//调用select接口,派生类需要实现之
// this actually sets the selector to OP_ACCEPT,
// and binds to the port on which this class will
// listen on
nHandles += registerHandles();//遍历registerQueue中的绑定请求,并调用open接口,实际上就是由派生类实现bind地址的功能。
// Now, if the number of registred handles is 0, we can
// quit the loop: we don‘t have any socket listening
// for incoming connection.
if (nHandles == 0) {
acceptorRef.set(null);
if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
assert (acceptorRef.get() != this);
break;
}
if (!acceptorRef.compareAndSet(null, this)) {
assert (acceptorRef.get() != this);
break;
}
assert (acceptorRef.get() == this);
}
if (selected > 0) {
// We have some connection request, let‘s process
// them here.
processHandles(selectedHandles());//调用accept函数接收用户请求,创建Session,并把session加入到processor中
}
// check to see if any cancellation request has been made.
nHandles -= unregisterHandles();
} catch (ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
ExceptionMonitor.getInstance().exceptionCaught(cse);
break;
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
// Cleanup all the processors, and shutdown the acceptor.
if (selectable && isDisposing()) {
selectable = false;
try {
if (createdProcessor) {
processor.dispose();
}
} finally {
try {
synchronized (disposalLock) {
if (isDisposing()) {
destroy();
}
}
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
disposalFuture.setDone();
}
}
}
}
参考:
IoSession
先来看看和IoSession相关的类结构
AbstractIoSession实现了Session完成的主要功能,所谓Session其实是一个物理连接的逻辑抽象,所以在NioSession这一层,它与Nio的channel是相关的,Session需要通过这个底层连接实现它的逻辑功能。
Session的主要功能,包括,关闭连接,在连接上进行read,write、设置和读取Session级别的属性,进行流量控制等。为了深入理解Session,我们需要了解如下几个问题:
1.Session是在何时,由谁创建的?
在前文分析中,我们知道在Acceptor线程中,在接收用户请求,并创建Session,具体来说,这个Session是在NioSocketAcceptor.accept方法中创建的
@Override
protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
SelectionKey key = null;
if (handle != null) {
key = handle.keyFor(selector);
}
if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
return null;
}
// accept the connection from the client
SocketChannel ch = handle.accept();
if (ch == null) {
return null;
}
return new NioSocketSession(this, processor, ch);
}
而该函数又是在AbstractPollingIoAcceptor<S extends AbstractIoSession, H>.processHandles中调用的
@SuppressWarnings("unchecked")
private void processHandles(Iterator<H> handles) throws Exception {
while (handles.hasNext()) {
H handle = handles.next();
handles.remove();
// Associates a new created connection to a processor,
// and get back a session
S session = accept(processor, handle);
if (session == null) {
continue;
}
initSession(session, null, null);
// add the session to the SocketIoProcessor
session.getProcessor().add(session);
}
}
我们看到在创建session之后,会调用initSession对session进行初始化,然后把它加入到Processor中。
2.Session是如何进行读写的?
Session创建之后是如何收到对端数据,如何提供发送数据的接口的呢?先来看写操作,我们看看AbstractIoSession.write方法
public WriteFuture write(Object message, SocketAddress remoteAddress) {
。。。
// Now, we can write the message. First, create a future
WriteFuture writeFuture = new DefaultWriteFuture(this);
WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
// Then, get the chain and inject the WriteRequest into it
IoFilterChain filterChain = getFilterChain();
filterChain.fireFilterWrite(writeRequest);
。。。
// Return the WriteFuture.
return writeFuture;
}
该方法中,要被write的message,被包装在WriteRequest中,并且返回的是一个writeFuture,这就是说,我们可以使用这个future不被中断的等待写操作完成。同时,该方法中调用了filterChain的fireFilterWrite函数,它的作用是遍历filterChain中的所有filter,触发他们的fireFilterWrite方法。AbstractIoSession.getFilterChain只是一个接口,需要在派生类中实现。在其派生类NioSession中,我们可以看到这个filterChain是DefaultIoFilterChain实例。它的fireFilterWrite方法实际上是,从tail到head遍历链表,既然是反向遍历,那么Head是最后一个被遍历到的filter,这个head是一个HeadFilter实例
private class HeadFilter extends IoFilterAdapter {
@SuppressWarnings("unchecked")
@Override
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
AbstractIoSession s = (AbstractIoSession) session;
// Maintain counters.
if (writeRequest.getMessage() instanceof IoBuffer) {
IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
// I/O processor implementation will call buffer.reset()
// it after the write operation is finished, because
// the buffer will be specified with messageSent event.
buffer.mark();
int remaining = buffer.remaining();
if (remaining > 0) {
s.increaseScheduledWriteBytes(remaining);
}
} else {
s.increaseScheduledWriteMessages();
}
WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue();//获取session中的写请求队列
if (!s.isWriteSuspended()) {
if (writeRequestQueue.isEmpty(session)) {
// We can write directly the message
s.getProcessor().write(s, writeRequest);//使用processor写writeRequest,实际上还是把writeRequest写入到写请求队列中了
} else {
s.getWriteRequestQueue().offer(s, writeRequest);
s.getProcessor().flush(s);
}
} else {
s.getWriteRequestQueue().offer(s, writeRequest);//把writeRequest写入到写请求队列中
}
}
@SuppressWarnings("unchecked")
@Override
public void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
((AbstractIoSession) session).getProcessor().remove(session);
}
}
从这个类,我们可以清楚的看到我们请求写的writeRequest被写入到了session的写队列中了。那么问题来了,这个写队列是从哪冒出来的,它是如何创建,又是谁从这个队列中把写请求取出来,发送出去的呢?
从AbstractIoSession.getWriteRequestQueue方法,我们知道其中的WriteRequestQueue实例,是被set进去的,到底是在哪里被set进去的呢,我们看前面提到的方法AbstractPollingIoAcceptor<S extends AbstractIoSession, H>.processHandles,在这个方法中调用了initSession方法,它是用来初始化session,按理说应该在这里,该方法是在AbstractIoService类中定义的.
@SuppressWarnings("unchecked")
protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
。。。
// Every property but attributeMap should be set now.
// Now initialize the attributeMap. The reason why we initialize
// the attributeMap at last is to make sure all session properties
// such as remoteAddress are provided to IoSessionDataStructureFactory.
try {
((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
.getAttributeMap(session));
} catch (IoSessionInitializationException e) {
throw e;
} catch (Exception e) {
throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
}
try {
((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
.getWriteRequestQueue(session));
} catch (IoSessionInitializationException e) {
throw e;
} catch (Exception e) {
throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
}
if ((future != null) && (future instanceof ConnectFuture)) {
// DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
}
if (sessionInitializer != null) {
sessionInitializer.initializeSession(session, future);
}
finishSessionInitialization0(session, future);
}
从该方法中可以看到,session的WriteRequestQueue实例,实际上就是session.getService().getSessionDataStructureFactory().getWriteRequestQueue(session)。好吧,我们还得接着寻找,session.getService是谁呢?这里的session显然是NioSession,service是NioSocketAcceptor,getSessionDataStructureFactory方法是在基类AbstractIoService中定义的,在默认情况下,get出来的实例,是DefaultIoSessionDataStructureFactory的类实例,我们再来看这个类
public class DefaultIoSessionDataStructureFactory implements IoSessionDataStructureFactory {
public IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception {
return new DefaultIoSessionAttributeMap();
}
public WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception {
return new DefaultWriteRequestQueue();
}
private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap {
private final ConcurrentHashMap<Object, Object> attributes = new ConcurrentHashMap<Object, Object>(4);
。。。
}
private static class DefaultWriteRequestQueue implements WriteRequestQueue {
/** A queue to store incoming write requests */
private final Queue<WriteRequest> q = new ConcurrentLinkedQueue<WriteRequest>();
。。。
}
}
最终我们看到session.getService().getSessionDataStructureFactory().getWriteRequestQueue(session),实际上得到的是 new DefaultWriteRequestQueue(),这个类实际上是对ConcurrentLinkedQueue实例的封装,也就是说我们所添加的WriteRequest都是添加到ConcurrentLinkedQueue这个实例中的。
综上所述,在初始化session的时候,把IoService都会new一个WriteRequestQueue实例赋值给session,同时,为了防止多个线程在读写这个Queue的时候发生竞争,这里使用了ConcurrentLinkedQueue。
我们返回来再看HeadFilter.filterWrite方法,其中:
s.getProcessor().write(s, writeRequest);
其中的write方法,对应AbstractPollingIoProcessor.write
public void write(S session, WriteRequest writeRequest) {
WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
writeRequestQueue.offer(session, writeRequest);
if (!session.isWriteSuspended()) {
this.flush(session);
}
}
这里可以看到WriteRequest被添加到session的WriteRequestQueue中,然后调用了AbstractPollingIoProcessor.flush方法,这里的flush,只是把session加入到flushingSessions队列中。在IoProcessor的分析中,我们会知道有一个processor的线程,专门会从session中读取WriteRequest,然后通过session的Channel把数据发送出去。
至此,我们来回顾整个发送数据的过程,首先是在IoService中创建IoSession的时候,会给它创建一个写队列,其次IoSession的写操作,都是放入到这个写队列中的,最后,IoProccessor的线程会去读这个写队列最终通过底层Channel把数据发送出去。
下面我们还需要分析读操作是如何处理的,既然是读数据,必然是从网络中获取数据,这就着落在processor线程中了,在AbstractPollingIoProcessor.processor类中调用了process方法,这个方法在判断session可读的情况下回调用read方法,read方法会从session的channel中读取数据,然后触发session的MessageReceived事件,如果session结束了,还会去触发InputClosed事件,当然,如果session出现了异常,会触发ExceptionCaught事件,这里的事件也是通过filterChain触发,前面分析过,这个filterChain实例是DefaultIoFilterChain,它的fireMessageReceived方法是从head到tail遍历链表,在它的tailFilter.messageReceived方法中触发了handler.messageReceived方法,也就是说,这个事件传递首先是传递给各个filter,最终再传递给handler的,这是符合我们要求filter先进行各种处理,最终交给handler来处理的需求。
同理,Inputclosed和ExceptionCaught这两个事件,也是从head到tail遍历的,最终交给handler处理。
3.Session是如何读取和设置属性的?
最后,我们再来看看Session是如何存取属性的,经过前面的分析,我们看到在初始化Session的时候initSession,除了给这个Session初始化了WriteRequestQueue,同时也初始化了AttributeMap
((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
.getAttributeMap(session));
同样的,在DefaultIoSessionDataStructureFactory中,也为每个session都生成了一个DefaultIoSessionAttributeMap的实例,这个实例封装了一个ConcurrentHashMap实例,这同样是为了在多线程读取该实例的时候,能够正常访问数据。
IoProcessor
IoProcessor及其附属类是一个很重要的类,它们是真正进行读写数据的类,在AbstractPollingIoProcessor类,要想深入了解IoProcess,需要回答以下两个问题:
1.谁创建了IoProcessor
在AbstractPollingIoAcceptor的构造函数中,需要指明IoProcessor的类,在其派生类NioSocketAcceptor类中指明使用NioProcessor.class。在AbstractPollingIoAcceptor的构造函数中,是这样使用这个类的,
new SimpleIoProcessorPool<S>(processorClass)
SimpleIoProcessorPool类在构造函数中使用class.newInstance,创建了若干个IoProcessor,个数可以是通过参数指定的,也可以使用默认的,即CPU核数+1。SimpleIoProcessorPool本身也是一个IoProcessor,它实际上对外提供了IoProcessor的接口,实现上是根据Session,在它的pool中选择一个Processor,然后设置给Session,后续的操作,如add,remove都是在这个特定的processor上执行的。
2.IoProcessor的运行机制
在前面分析IoService的时候,我们知道在acceptor线程accept一个新的session的时候,会把这个session加入到它的processor中,也就是会调用AbstractPollingIoProcessor.add方法,它实际上只是把session加入到newSessions队列中,并启动了一个新的线程processor,当然,此时是运行在acceptor线程上的。具体的读写数据是在processor线程上执行的。当然为了线程间的竞争,newSessions也是用了ConcurrentLinkedQueue类。
我们来看AbstractPollingIoProcessor.Processor
private class Processor implements Runnable {
public void run() {
assert (processorRef.get() == this);
int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();
for (;;) {
try {
// This select has a timeout so that we can manage
// idle session when we get out of the select every
// second. (note : this is a hack to avoid creating
// a dedicated thread).
long t0 = System.currentTimeMillis();
int selected = select(SELECT_TIMEOUT);//select事件,看是否有读写,关闭事件发生
long t1 = System.currentTimeMillis();
long delta = (t1 - t0);
if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
// Last chance : the select() may have been
// interrupted because we have had an closed channel.
if (isBrokenConnection()) {
LOG.warn("Broken connection");
// we can reselect immediately
// set back the flag to false
wakeupCalled.getAndSet(false);
continue;
} else {
LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
// Ok, we are hit by the nasty epoll
// spinning.
// Basically, there is a race condition
// which causes a closing file descriptor not to be
// considered as available as a selected channel, but
// it stopped the select. The next time we will
// call select(), it will exit immediately for the same
// reason, and do so forever, consuming 100%
// CPU.
// We have to destroy the selector, and
// register all the socket on a new one.
registerNewSelector();
}
// Set back the flag to false
wakeupCalled.getAndSet(false);
// and continue the loop
continue;
}
// Manage newly created session first
nSessions += handleNewSessions();//这里会处理newSessions,就是在acceptor线程中add进来的,基本上来说,就是创建filterChain,并触发sessionCreated事件和sessionOpen事件,
updateTrafficMask();
// Now, if we have had some incoming or outgoing events,
// deal with them
if (selected > 0) {
//LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
process();//这里会对发生了事件的session进行处理,如果session是读事件,会调用session底层的channel去读数据,并触发session的messageReceived时间,如果session是写事件,会把session加入到flushingSessions队列里
}
// Write the pending requests
long currentTime = System.currentTimeMillis();
flush(currentTime);//这里会处理flushingSessions队列,调用session底层的channel去发送数据
// And manage removed sessions
nSessions -= removeSessions();
// Last, not least, send Idle events to the idle sessions
notifyIdleSessions(currentTime);
// Get a chance to exit the infinite loop if there are no
// more sessions on this Processor
if (nSessions == 0) {
processorRef.set(null);
if (newSessions.isEmpty() && isSelectorEmpty()) {
// newSessions.add() precedes startupProcessor
assert (processorRef.get() != this);
break;
}
assert (processorRef.get() != this);
if (!processorRef.compareAndSet(null, this)) {
// startupProcessor won race, so must exit processor
assert (processorRef.get() != this);
break;
}
assert (processorRef.get() == this);
}
// Disconnect all sessions immediately if disposal has been
// requested so that we exit this loop eventually.
if (isDisposing()) {
for (Iterator<S> i = allSessions(); i.hasNext();) {
scheduleRemove(i.next());
}
wakeup();
}
} catch (ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
// But first, dump a stack trace
ExceptionMonitor.getInstance().exceptionCaught(cse);
break;
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
try {
synchronized (disposalLock) {
if (disposing) {
doDispose();
}
}
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
disposalFuture.setValue(true);
}
}
}
根据以上对IoService,IoSession,IoProcessor的分析,我们知道对于服务器端的程序,在用户程序的主线程中调用acceptor的bind方法,实际上启动了一个acceptor线程用来accept新的session,如果有新的session到来,会有在session加入到processor的过程中,会启动一个processor线程,如果当前CPU是多核的话,下一个sesion的到来,会启动另外一个processor线程。这些processor线程是用来检查是否有读写事件的。用户添加到filterChain的filter都是在这个线程中执行的,最后会把事件传递给handler进行最终的处理。也就是说,当有多个session的时候,会有多个processor线程,session的个数是大于等于processor的个数的。同时,一个processor会对应多个session,单一个session只对应一个processor线程。
Mina的高性能,来源于nio的多路复用机制
参看http://ifeve.com/netty-mina-in-depth-1/
Mina的线程模型被称为所谓的reactors in threads,即一个线程负责接收用户请求,即acceptor线程,另外几个线程负责处理session的读写,注意线程之间是通过共享Concurrent的队列来实现请求的移交的,除此之外,他们并没有消息的交互,它们完全靠系统的线程切换来运行,这就降低了编程的复杂性。
对于参考文章中提到的reactors in threads和thread pools,其实实现上也类似,就是在发现读写事件的时候,把它加入到一个Concurrent的队列中,然后新启动一个线程专门用来读取这个队列来进行计算。