码迷,mamicode.com
首页 > 其他好文 > 详细

Mina学习

时间:2016-08-23 16:40:50      阅读:379      评论:0      收藏:0      [点我收藏+]

标签:

 
     Apache MINA 2 是一个开发高性能和高可伸缩性网络应用程序的网络应用框架。它提供了一个抽象的事件驱动的异步 API,可以使用 TCP/IP、UDP/IP、串口和虚拟机内部的管道等传输方式。Apache MINA 2 可以作为开发网络应用程序的一个良好基础
优点:
– 异步 
– 无阻塞 
– 事件驱动 
– 支持TCP, UDP, APR, 串口… 
– 通过 过滤器(Filters)实现扩展性 
– 同时提供协议框架
 
技术分享

 

 
 
Server端应用
对socket通信来说,使用比较广泛的是基于Server端的应用,尤其是并发规模达到一定程度后,颇具挑战性。那么我们来看一下,基于MINA框架的Server端应用:
1、IOAcceptor 监听指定的端口,处理新的网络连接;一旦一个新的连接到达后,IOAcceptor 就产生一个session,后续所有从这个IP和端口发送过来的请求就将通过这个Session被处理。
2、Session创建后,后续所有的数据包都被人到过滤器链中,通过过滤器将原始的字节码转变成高层的对象,这个环节PacketEncoder/Decoder就十分有用。
3、最后数据包或对象被传送给Handler做业务逻辑处理;
 
 
IoAcceptor
主要用于创建新的连接。MINA提供了多种实现,所以几乎不需要我们自己再去实现:
NioSocketAcceptor:无阻塞的Socket 传输Acceptor,针对TCP
NioDatagramAcceptor : 无阻塞的Socket 传输Acceptor,针对UDP
AprSocketAcceptor : 阻塞的Socket 传输Acceptor,基于 APR
VmPipeSocketAcceptor : the in-VM Acceptor
 

IoConnector

针对Client端的Socket连接,有多种实现:
NioSocketConnector : 无阻塞的Socket 传输Connector,针对TCP 
NioDatagramConnector : 无阻塞的Socket 传输Connector,针对UDP 
AprSocketConnector : 阻塞的Socket 传输Connector,基于 APR 
ProxyConnector : 一个支持代理服务的 Connector ,通过截取连接的请求,并将终端指向代理设置的地址。
SerialConnector : 针对串口传输的Connector
VmPipeConnector : the in-VM * Connector*
 
 
Session
任何时候只要有新的连接到来,都会生成一个Session对象,并且一致保存在内存中,只到连接断开;
 
Session有一系列状态,如下:
Connected : session被创建,并有效 
Idle : session至少在一个空闲周期(见配置)内没有处理过任何请求 
Idle for read : 在一个空闲周期内没有做实际的读操作
Idle for write : 在一个空闲周期内没有做实际的写操作
Idle for both : 在一个空闲周期内没有做实际的读和写操作 
Closing :session正在被关闭
Closed : session已经被关闭
 
 
实现断线重连 

一、断线重连的方式

    1. 在创建Mina客户端时增加一个监听器,或者增加一个拦截器,当检测到Session关闭时,自动进行重连。

 
    技术分享

 

 

    2. 在第1种方式的基础上,增加客户端的读写通道空闲检查,当发生Session关闭或者读写空闲时,进行重连。

 
    技术分享
 
 
第一种方式比较传统,优点是简单方便,适合网络稳定、数据量不大(1M带宽以下)的环境;不过缺点是不能对系统级的连接断开阻塞进行捕获。
第二种方式更加精细,基本上能捕获到应用、网络、系统级的断连。
 
 
二、重连目的:
        在使用Mina做为客户端时,往往因为网络、服务器、应用程序出现问题而导致连接断开,而自动重连,就是解决连接断开的唯一方式。如果网线断开、服务器宕机、应用程序挂了,都是断线的原因,这个时候,通过增加一个监听器或者拦截器,就能实现重连。但是生产环境中,断线的原因可能更复杂:网络不稳定、延时、服务器负载高、服务器或者应用程序的发送或者接收缓冲区满等等问题都可能导致数据传输过程出现类似于断线的情况,这个时候,光检测Session关闭是远远不够的,这个时候就需要一种重连机制,比如读写空闲超过30秒,就进行重连。对于数据不间断、实时性高、数据量大的应用场景,更是实用。  
 
三、代码实现
技术分享
package com.yitop.feng.service;

import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.service.IoService;
import org.apache.mina.core.service.IoServiceListener;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.session.IoSessionConfig;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

/**
 * @author fengzp
 * @date 16/8/23
 * @email fengzp@gzyitop.com
 * @company 广州易站通计算机科技有限公司
 */
public class MinaClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(MinaClient.class);

    private NioSocketConnector connector;
    private IoSession session;
    private String hostname = "127.0.0.1";
    private int port = 8899;

    public MinaClient(){
        init();
    }

    private void init(){
        connector = new NioSocketConnector();

        connector.getFilterChain().addLast("logger", new LoggingFilter());
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8"), "]", "]")));

        /**
         * 一、使用监听器或拦截器实现断线重连
         * 二、在第1种方式的基础上,增加客户端的读写通道空闲检查,当发生Session关闭或者读写空闲时,进行重连
         */
        //使用拦截器实现断线重连
        connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter(){
            @Override
            public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
                reconnect();
            }
        });
        //使用监听器实现断线重连
        connector.addListener(new MyIoServiceListener());


        connector.getSessionConfig().setReceiveBufferSize(10240);   // 设置接收缓冲区的大小
        connector.getSessionConfig().setSendBufferSize(10240);// 设置输出缓冲区的大小
        connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30000);  //读写都空闲时间:30秒
        connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 40000);//读(接收通道)空闲时间:40秒
        connector.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, 50000);//写(发送通道)空闲时间:50秒

        //在数据处理器IoHandler中sessionIdle方法中加入Session会话关闭的代码,这样session关闭就能传递到拦截器或者监听器中,然后实现重连。
        connector.setHandler(new MyIoHandlerConnector());

        connect();
    }

    private void connect(){
        ConnectFuture future = connector.connect(new InetSocketAddress(hostname, port));

        future.awaitUninterruptibly(10000);

        session = future.getSession();
    }

    private void reconnect(){
        while (true){
            try {
                Thread.sleep(3000);
                connect();
                if (session != null && session.isConnected()) {
                    LOGGER.info("断线重连成功");
                    break;
                }
            }catch (Exception e){
                LOGGER.error("断线重连失败", e);
            }
        }
    }

    class MyIoServiceListener implements IoServiceListener{

        public void serviceActivated(IoService ioService) throws Exception {

        }

        public void serviceIdle(IoService ioService, IdleStatus idleStatus) throws Exception {

        }

        public void serviceDeactivated(IoService ioService) throws Exception {

        }

        public void sessionCreated(IoSession ioSession) throws Exception {

        }

        public void sessionDestroyed(IoSession ioSession) throws Exception {
            reconnect();
        }
    }

    class MyIoHandlerConnector extends IoHandlerAdapter{

        @Override
        public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
            if(session != null){
                //这里关闭session就会触发拦截器或者监听器,从而实现读写空闲时重连
                session.close(true);
            }
        }

        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
        }

        @Override
        public void messageReceived(IoSession session, Object message) throws Exception {
            String req = message==null?"":message.toString();
            System.err.println("Client receive: "+req);
        }

        @Override
        public void sessionCreated(IoSession session) throws Exception {
            IoSessionConfig config = session.getConfig();
            if (config instanceof SocketSessionConfig) {
                SocketSessionConfig sessionConfig = (SocketSessionConfig) config;
                sessionConfig.setKeepAlive(true);// 长连接
            }
            System.out.println("Client session create");
        }

        @Override
        public void sessionClosed(IoSession session) throws Exception {
            super.sessionClosed(session);
            System.out.println("Client session create");
        }
    }

    public static void main(String[] args) {
        new MinaClient();
    }
}
View Code

 

 
 

Mina学习

标签:

原文地址:http://www.cnblogs.com/andyfengzp/p/5799743.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!