标签:
然后来说说IoHandle接口,Mina中的所有I/O事件都是通过这个接口来处理的,这些事件都是上面所说的I/O Processor发出来的,要注意的一点是同一个I/O Processor线程是负责处理多个会话的。包括下面这几个事件的处理:
public interface IoHandler
{
void sessionCreated(IoSession session) throws Exception;//会话创建
void sessionOpened(IoSession session) throws Exception;//打开会话,与sessionCreated最大的区别是它是从另一个线程处调用的
void sessionClosed(IoSession session) throws Exception;//会话结束,当连接关闭时被调用
void sessionIdle(IoSession session, IdleStatus status) throws Exception;//会话空闲
void exceptionCaught(IoSession session, Throwable cause) throws Exception;//异常捕获,Mina会自动关闭此连接
void messageReceived(IoSession session, Object message) throws Exception;//接收到消息
void messageSent(IoSession session, Object message) throws Exception;//发送消息
}
IoHandlerAdapter就不说了,简单地对IoHandler使用适配器模式封装了下,让具体的IoHandler子类从其继承后,从而可以对自身需要哪些事件处理拥有自主权。
来看看IoServiceListener接口,它用于监听IoService相关的事件。
public interface IoServiceListener extends EventListener
{
void serviceActivated(IoService service) throws Exception;//激活了一个新service
void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception; // service闲置
void serviceDeactivated(IoService service) throws Exception;//挂起一个service
void sessionCreated(IoSession session) throws Exception;//创建一个新会话
void sessionDestroyed(IoSession session) throws Exception;//摧毁一个新会话
}
IoServiceListenerSupport类就是负责将上面的IoService和其对应的各个IoServiceListener包装到一起进行管理。
下面是它的成员变量:
private final IoService service;
private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>();
private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>();//被管理的会话集(其实就是服务所管理的会话集)
private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions);//上面的会话集的只读版
private final AtomicBoolean activated = new AtomicBoolean();//被管理的服务是否处于激活状态
激活事件就以会话创建为例来说明:
public void fireSessionCreated(IoSession session)
{
boolean firstSession = false;
if (session.getService() instanceof IoConnector)
{//若服务类型是Connector,则说明是客户端的连接服务
synchronized (managedSessions)
{//锁住当前已经建立的会话集
firstSession = managedSessions.isEmpty();//看服务所管理的会话集是否为空集
}
}
if (managedSessions.putIfAbsent(Long.valueOf(session.getId()), session) != null) { // If already registered, ignore.
return;
}
if (firstSession)
{//第一个连接会话,fire一个虚拟的服务激活事件
fireServiceActivated();
}
//呼叫过滤器的事件处理
session.getFilterChain().fireSessionCreated();// 会话创建
session.getFilterChain().fireSessionOpened();//会话打开
int managedSessionCount = managedSessions.size();
//统计管理的会话数目
if (managedSessionCount > largestManagedSessionCount)
{
largestManagedSessionCount = managedSessionCount;
}
cumulativeManagedSessionCount ++;
//呼叫监听者的事件处理函数
for (IoServiceListener l : listeners)
{
try
{
l.sessionCreated(session);
} catch (Throwable e)
{
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
}
这里值得注意的一个地方是断开连接会话,设置了一个监听锁,直到所有连接会话被关闭后才放开这个锁。
private void disconnectSessions()
{
if (!(service instanceof IoAcceptor))
{//确保服务类型是IoAcceptor
return;
}
if (!((IoAcceptor) service).isCloseOnDeactivation())
{// IoAcceptor是否设置为在服务失效时关闭所有连接会话
return;
}
Object lock = new Object();//监听锁
IoFutureListener<IoFuture> listener = new LockNotifyingListener(lock);
for (IoSession s : managedSessions.values())
{
s.close().addListener(listener);//为每个会话的close动作增加一个监听者
}
try
{
synchronized (lock)
{
while (!managedSessions.isEmpty())
{//所管理的会话还没有全部结束,持锁等待
lock.wait(500);
}
}
} catch (InterruptedException ie)
{
// Ignored
}
}
private static class LockNotifyingListener implements IoFutureListener<IoFuture>
{
private final Object lock;
public LockNotifyingListener(Object lock)
{
this.lock = lock;
}
public void operationComplete(IoFuture future)
{
synchronized (lock)
{
lock.notifyAll();
}
}
}
标签:
原文地址:http://my.oschina.net/anna153/blog/371909