NIO框架之MINA源码解析(四):粘包与断包处理及编码与解码
之所以说NIO超级陷阱,就是因为我在本系列开头的那句话,因为使用缺陷导致客户业务系统瘫痪。当然,我对这个问题进行了很深的追踪,包括对MINA源码的深入了解,但其实之所以会出现这个问题,它的根不是MINA的原因,而是JDK底层的问题。
JDK底层在实现nio时,为了能够唤醒等待在io上的线程,在windows平台使用了两个端口建立连接发消息实现。看如下代码:
public class NIOTest { @Test public void test1(){ final int MAXSIZE=1000; Selector [] sels = new Selector[ MAXSIZE]; try{ for( int i = 0 ;i< MAXSIZE ;++i ) { sels[i] = Selector.open(); Thread.sleep(1000*10); } }catch( Exception ex ){ ex.printStackTrace(); } } }
也就是说每调用一次Selector.open(),就会占用两个随机可用端口,相互通信--这就是问题的根源。
当然对于我们的项目来说,这个问题的解决需要分两步,首先限制端口数到用户可接受范围内,这个比较容易,我们可用只调用一次Selector.open()方法即可,使用单利模式;第二步,因为Selector.open()方法每次都是使用的系统当前可用的随机端口,所以就有可能导致占用客户业务端口的情况,因此我们必须把Selector.open()所开的端口限制在一定范围内,最好可以通过代码指定使用哪几个端口,但是直到现在我们都没有找到,如果亲们有方法的话,很感谢能够告诉我....,。
Selector总结如下:
Windows下,Selector.open()会自己和自己建立两条TCP链接。不但消耗了两个TCP连接和端口,同时也消耗了文件描述符。
Linux下,Selector.open()会自己和自己建两条管道。同样消耗了两个系统的文件描述符。
来源于:http://blog.csdn.net/haoel/article/details/2224055
所以,现在我们现在就干脆用传统IO与MINA server通信吧。
其实非常简单,唯一的难点就在于使用同步IO与MINA通信时,怎么编码和解码,还是看下面代码吧。
server 端(用的MINA)
/** * 初始化设置,启动服务 */ public void startServer() { logger.debug("mina server start"); int port = SysEvnVar.managerPort; logger.debug("manager端口号:" + port); IoAcceptor acceptor = new NioSocketAcceptor(); /** 日志设置 */ acceptor.getFilterChain().addLast("logger", new LoggingFilter()); //编码与解码工厂,使用的技术就是JDK提供的ObjectOutStream ObjectSerializationCodecFactory objsCodec=new ObjectSerializationCodecFactory(); objsCodec.setDecoderMaxObjectSize(DEFAULTDECODER); objsCodec.setEncoderMaxObjectSize(DEFAULTDECODER); /** 数据转换,编码设置 */ acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(objsCodec)); /** 设置业务处理类 */ acceptor.setHandler(serverHandler); /** 设置buffer容量 */ acceptor.getSessionConfig().setReadBufferSize(DEFAULTBUFFERSIZE); /** 设置空闲时间 */ acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, DEFAULTIDLETIME); try { /** 绑定端口 */ acceptor.bind(new InetSocketAddress(port)); logger.info("mina server bind port ("+port+") sucess"); } catch (IOException e) { logger.error("mina server bind port ("+port+") fail:" + e.getMessage(),e); } }
public void sendMessageToManager(HyRequest request) { Socket socket = null; BufferedInputStream inBuff = null; OutputStream outBuff = null; try { Integer port = Integer.parseInt(hyGlobalConfigureCacheImpl.get( "managerPort").toString()); String host = hyGlobalConfigureCacheImpl.get("managerIp") .toString(); socket = new Socket(host, port); inBuff = new BufferedInputStream(new DataInputStream( socket.getInputStream())); outBuff = socket.getOutputStream(); String json = HyJsonUtil.reqeustToJsonStr(request); //编码开始,使MINA能够解析 IoBuffer buf = IoBuffer.allocate(64); buf.setAutoExpand(true); buf.putObject(json); buf.flip(); byte[] bytes =new byte[buf.limit()]; buf.get(bytes); //编码结束,直接输出到客户端 outBuff.write(bytes); outBuff.flush(); if(request.getOperation() != null && !"quit".equals(request.getOperation())){ String allreadstr = new String(); byte[] b = new byte[512 * 1024]; int len; byte[] tmp = new byte[0]; if ((len = inBuff.read(b)) != -1) { tmp = new byte[len]; System.arraycopy(b, 0, tmp, 0, len); } loger.debug("len:"+len); //解码开始,byte[]为MINA传过来的数据 IoBuffer in = IoBuffer.wrap(tmp); Object obj = null; try { loger.debug("in:"+in); obj = in.getObject(); //解码结束 loger.debug("parse success"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); loger.debug("go to exception"); loger.warn("nio parse exception",e); obj = "exception"; } allreadstr = (String) obj; loger.info("receive message from " + socket.getRemoteSocketAddress().toString() + ",message:" + allreadstr); // 请求消息转换为HyResponse对象 HyResponse response = HyJsonUtil.getHyResponse(allreadstr); HyRequest hyrequest = new HyRequest(); hyResponseDispatcher = hyClientHandler.getHyResponseDispatcher(); hyResponseDispatcher.responseProcess(hyrequest, response); if (hyrequest.getOperation() != null) { sendMessageToManager2(hyrequest); } } } catch (Exception e) { e.printStackTrace(); } finally { try { if(null != outBuff) outBuff.close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } try { if(null != inBuff) inBuff.close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } if (socket != null) { try { socket.close(); } catch (IOException e) { } } } }
看代码里面的注释,怎么编码和解码,其实我是直接看MINA源码,把MINA的编码和解码方法工具类直接拷到了我们的client端,这样就轻松的实现使用同步IO和MINA进行通信。
NIO框架之MINA源码解析(五):NIO超级陷阱和使用同步IO与MINA通信
原文地址:http://blog.csdn.net/chaofanwei/article/details/38983113