mina 是NIO 运行 封装 基于NIO 的网络通信框架. 每个链接都会创建一个IOSession。
创建的 每个链接. 服务端都会有对应的 处理IOFilterChain 是 一个过滤器 链. 每个链接创建的IOsession 可以有自己的IOFilterChain。 默认的 是DefaultIoFilterChain。
IoFuture 有多种实现.
比如:DefaultWriteFuture DefaultReadFuture DefaultConnectFuture
这些IoFuture 在IOSession里调度.
面对客户端的 是 SocketConnector
用户可以他 来创建链接. 默认实现是 NioSocketConnector
在此说下 服务端创建过程:
但它 也是皮包一层. 功能都依托 它 的父类 AbstractPollingIoConnector<NioSession,
SocketChannel>.它 是连接处理器.里面保存链接队列. 处理带链接的Socket 请求.当每个链接请求完成后. 并产生session。 过程在 connect0() 完成.然后把产生的session 交由IOProcessor 处理器.具体实现代码:以下为 connect0()核心过程:
try {
handle = newHandle(localAddress);
if (connect(handle, remoteAddress)) {
ConnectFuture future = new DefaultConnectFuture();
//产生ConnectFuture 管理器 通过他可以感知整个session 创建过程是否完成.
T session = newSession( processor,
handle); //在此生成 session
initSession(session, future, sessionInitializer); //初始化session 信息
// Forward the remaining process to the IoProcessor.
session.getProcessor().add(session); //把新创建session 交由IOProcessor 处理 tag1
success = true;
return future;
它提供用户自定义的 构造函数. 比如 指定 IOProcessor 的线程管理器个数 还会processor
具体参考 NioSocketConnector 实现.
AbstractPollingIoConnector 主要维护两个队列:
Queue<ConnectionRequest> connectQueue
Queue<ConnectionRequest> cancelQueue//处理失败
或取消的 链接 请求
服务端AbstractPollingIoAcceptor 与 AbstractPollingIoConnector 功能类似:
private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
最主要的 是IoProcessor<T> processor;
IO处理器.所有 的session运行控制 调度 都在IO处理器完成.里面维护了三个session队列.
private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>(); //保存所有 新创建的session
/** A queue used to store the sessions to be removed */
private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();//
需要移除 的 session 回话
/** A queue used to store the sessions to be flushed */
private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();//
处理完 需要flush 的回话
默认的IOProcessor 是AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S>
IOProcessor 在NioSocketConnector 构造函数指定:
从上可知. 初始化是父类 AbstractPollingIoProcessor的 参数
protected AbstractPollingIoConnector(IoSessionConfig
sessionConfig, Class<? extends IoProcessor<T>> processorClass)
this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
过程 和客户端一样的 数据的读取还是交由 NioProcessor 执行.
注意其中的SimpleIoProcessorPool 它接受一个IOprocessor.Class
类型 参数. 并通过反射生成真正的IOprocessor 对象
SimpleIoProcessorPool 里面几个重要属性
The contained which is passed to the IoProcessor when they are created */
private final Executor executor;用于执行session
/** The pool table */
private final IoProcessor<S>[] pool;
用来处理session 的 IoProcessor 的个数 默认的个数的
private static final int DEFAULT_SIZE =
Runtime.getRuntime().availableProcessors() + 1; // 当前处理器数 +1
AbstractPollingIoProcessor具体初始化 参见其构造函数
public SimpleIoProcessorPool(Class<? extends IoProcessor<S>>
processorType, Executor executor, int size)
触发IOprocessor 对session 读取操作 在NIOConnector 中 connect0() session.getProcessor().add(session);
public final void add(S
session) {
if ( disposed || disposing)
throw new IllegalStateException( "Already
// Adds the session to the newSession queue and starts the worker
startupProcessor ();
private void startupProcessor ()
Processor processor = processorRef.get();
if (processor
== null) {
processor = new Processor(); //
包装 session。 负责执行 session 把它 放入对应的flushingSessions newSessions removingSessions
if ( processorRef.compareAndSet( null,
processor)) {
executor.execute(new NamePreservingRunnable(processor, threadName )); //在此执行
// Just stop the select() and start it again, so that the processor
// can be activated immediately.
来看下 Processor 结构:
private class Processor implements Runnable
public void run()
assert ( processorRef.get()
== this); // processorRef
表示 当前 执行 session 对象引用.
int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();
for (;;) { // 循环执行. 知道发生异常 退出
try {
// This select has a timeout so that we can manage
// idle session when we get out of the select every
// second. (note : this is a hack to avoid creating
// a dedicated thread).
long t0 = System. currentTimeMillis();
int selected = select(SELECT_TIMEOUT);
long t1 = System. currentTimeMillis();
long delta = (t1 - t0);
if ((selected == 0) && !wakeupCalled.get()
&& (delta < 100)) { // 如果当前多路复用器 上没有注册对象 或是 获取超时 则退出
// Last chance : the select() may have been
// interrupted because we have had an closed channel.
if (isBrokenConnection()) {
LOG.warn( "Broken
// we can reselect immediately
// set back the flag to false
wakeupCalled.getAndSet(false );
} else {
LOG.warn( "Create
a new selector. Selected is 0, delta = " + (t1 - t0));
// Ok, we are hit by the nasty epoll
// spinning.
// Basically, there is a race condition
// which causes a closing file descriptor not to be
// considered as available as a selected channel, but
// it stopped the select. The next time we will
// call select(), it will exit immediately for the same
// reason, and do so forever, consuming 100%
// CPU.
// We have to destroy the selector, and
// register all the socket on a new one.
// Set back the flag to false
wakeupCalled.getAndSet( false);
// and continue the loop
// Manage newly created session first
nSessions += handleNewSessions();
// Now, if we have had some incoming or outgoing events,
// deal with them
if (selected > 0) {
//LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
process(); // 核心方法. 负责执行 调度 session
private void process () throws Exception
for (Iterator<S> i = selectedSessions(); i.hasNext();) {
S session = i.next();
private void process(S
session) {
// Process Reads
if (isReadable(session)
&& !session.isReadSuspended()) {
read(session); //读取 session 数据
// Process writes
if (isWritable(session)
&& !session.isWriteSuspended()) {
// add the session to the queue, if it‘s not already there
if (session.setScheduledForFlush( true))
private void read(S
session) {
IoSessionConfig config = session.getConfig();
int bufferSize
= config.getReadBufferSize();
IoBuffer buf = IoBuffer. allocate(bufferSize);
final boolean hasFragmentation
= session.getTransportMetadata().hasFragmentation();
try {
int readBytes
= 0;
int ret;
try {
if (hasFragmentation)
while ((ret
= read(session, buf)) > 0) {
readBytes += ret;
if (!buf.hasRemaining())
} else {
ret = read(session, buf);
if (ret
> 0) {
readBytes = ret;
} finally {
if (readBytes
> 0) {
IoFilterChain filterChain = session.getFilterChain();
buf = null;
if (readBytes
<< 1 < config.getReadBufferSize()) {
} else if (readBytes
== config.getReadBufferSize()) {
if (ret
< 0) {
} catch (Throwable
e) {
if (e instanceof IOException)
if (!(e instanceof PortUnreachableException)
|| !AbstractDatagramSessionConfig.class .isAssignableFrom(config.getClass())
|| ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
IoFilterChain filterChain = session.getFilterChain();