标签:apache mina 线程模型
Apache Mina 中关于线程模型的关键源代码:
在创建 NioSocketAcceptor acceptor = new NioSocketAcceptor(); 时刻,
创建一个SimpleIoProcessorPool 线程池,该线程池最小数量为1个,
默认数量为
/** The default pool size, when no size is provided. */
private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
在Acceptor 绑定端口时 acceptor.bind(new InetSocketAddress(PORT));
Set<SocketAddress> addresses = bindInternal(localAddressesCopy);//绑定本地端口
//在线程池中创建了一个Acceptor线程NioSocketAcceptor-1 用来接收链接事件
//该AbstractPollingIoAcceptor 的一个内部线程类Acceptor
protected final void executeWorker(Runnable worker, String suffix)
{
String actualThreadName = threadName;
if (suffix != null) {
actualThreadName = actualThreadName + ‘-‘ + suffix;
}
executor.execute(new NamePreservingRunnable(worker, actualThreadName));
}
Acceptor 线程只接受isAcceptable 事件
protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception
{
//当Acceptor接收新链接时,创建新NioSocketSession
SelectionKey key = null;
if (handle != null)
{
key = handle.keyFor(selector);
}
if ((key == null) || (!key.isValid()) || (!key.isAcceptable()))
{
return null;
}
// accept the connection from the client
//接受新链接
SocketChannel ch = handle.accept();
if (ch == null)
{
return null;
}
return new NioSocketSession(this, processor, ch);
}
public abstract class NioSession extends AbstractIoSession
{
/** The NioSession processor */
protected final IoProcessor<NioSession> processor;
/** The communication channel */
protected final Channel channel;
/** The SelectionKey used for this session */
protected SelectionKey key;
/** The FilterChain created for this session */
private final IoFilterChain filterChain;
protected NioSession(IoProcessor<NioSession> processor, IoService service, Channel channel)
{
super(service);
this.channel = channel;
this.processor = processor;
filterChain = new DefaultIoFilterChain(this);
}
}
/**
* Find the processor associated to a session. If it hasen‘t be stored into
* the session‘s attributes, pick a new processor and stores it.
*/
private IoProcessor<S> getProcessor(S session)
{
IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
//把IoSession和线程池中的线程关联在一起
if (processor == null) {
if (disposed || disposing) {
throw new IllegalStateException("A disposed processor cannot be accessed.");
}
//通过IoSession和线程关联
processor = pool[Math.abs((int) session.getId()) % pool.length];
if (processor == null) {
throw new IllegalStateException("A disposed processor cannot be accessed.");
}
//在NioSession中管理着自己的线程
session.setAttributeIfAbsent(PROCESSOR, processor);
}
return processor;
}
//此时把Acceptor 线程接收到的NioSocket链接通过并发队列加入到AbstractPollingIoProcessor 中Processor 内部线程中.
//以后都由该线程NioProcessor-X处理I/O的输入输出流.至此,Acceptor工作线程流程到此处结束.
public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S>
{
/** A Session queue containing the newly created sessions */
private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
}
NioProcessor 线程仅仅处理数据的输入和输出
/**
* Deal with session ready for the read or write operations, or both.
*/
private void process(S session)
{
// Process Reads
if (isReadable(session) && !session.isReadSuspended()) {
read(session);
}
// Process writes
if (isWritable(session) && !session.isWriteSuspended()) {
// add the session to the queue, if it‘s not already there
if (session.setScheduledForFlush(true)) {
flushingSessions.add(session);
}
}
}
//NioProcessor 线程读取数据主要流程.读取过程中,所有操作会在该线程中完成
//至此,NioProcessor线程流程完成了.
private void read(S session)
{
if (readBytes > 0) {
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireMessageReceived(buf);
buf = null;
if (hasFragmentation) {
if (readBytes << 1 < config.getReadBufferSize()) {
session.decreaseReadBufferSize();
} else if (readBytes == config.getReadBufferSize()) {
session.increaseReadBufferSize();
}
}
}
}
小结:
至此Apache Mina 的线程模型解析完成.Apache Mina 是一个典型的Reactor Pattern 模式的实现.
在IoProcessor线程中,每一个IoSession和固定的线程关联,这样就避免的对IoSession的读写操作的加锁处理.
并且,IoSession的读写操作是在同一个线程中完成,这样不设计多线程处理.
但是每一个IoSession 都在不同的线程中,也就是说当多个IoSession需要访问共享资源时,需要对共享资源进行加锁处理.
Acceptor 线程和IoProcessor线程之间对于共享资源时通过
private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
这个JavaSE中提供的并发队列完成的.
如果要处理在逻辑层的单线程可以通过所有IoSession来共享同一个并发队列,所有入口都是通过并发队列,这样可以保证逻辑层单线程.
这样可以屏蔽IO操作的发杂行和逻辑线程的单一性,容易调试.
与IOCP模型比较:
我们公司游戏服务器才有IOCP模型,对应输入输出流的处理是通过工作线程(IOCP可以指定最大工作线程数量),这样就避免不了多线程的处理.
用Competition Key参数来关联SocketWrap.SocketWrap 里面含有封装的临界区.在window操作系统中的一个轻量级快速锁.每次对应SocketWrap的读写操作都进行加锁操作.
当时锁在SocketWrap内部,这样SocketWrap 就会与多个工作线程关联,形成并发操作.但是操作系统会对工作线程调度.
当SocketWrap 数据读取完成后,把数据放入到一个并发队列中,所有数据的入口都是通过该并发队列.
只不过,IOCP是C++直接调用Window API和系统调用工作线程,而Apache Mina是通过java语言 JNI接口和JavaSE提供的线程池.效率必然比c++低.
版权声明:本文为博主原创文章,未经博主允许不得转载。
标签:apache mina 线程模型
原文地址:http://blog.csdn.net/sunning9001/article/details/46843901