标签:
好久没写这个mina了,为了对之前的一篇博文Mina传输大数组,多路解码,粘包问题的处理 进行更进一步的补充,特此再来补说明。特别解决三个问题:
1,大数组粘包 在上篇的博文中提到用累积性解码器解决传输大数组的问题,还有可能出现粘包,解决方法是对decode方法进行了改进:
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { Context ctx =getContext(session);//获取session 的context long matchCount=ctx.getMatchLength();//目前已获取的数据 long length=ctx.getLength();//数据总长度 IoBuffer buffer=ctx.getBuffer();//数据存入buffer //第一次取数据 if(length==0){ length=in.getLong(); //保存第一次获取的长度 ctx.setLength(length); matchCount=in.remaining(); } else{ matchCount+=in.remaining(); } ctx.setMatchLength(matchCount); if (in.hasRemaining()) {// 如果buff中还有数
///////////////////改进的部分//////////////////////////////
if(matchCount< length) { buffer.put(in);// 添加到保存数据的buffer中 } if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码 final byte[] b = new byte[(int) length]; byte[] temp = new byte[(int) length]; in.get(temp,0, (int) (length-buffer.position()));//最后一次in的数据可能有多的 buffer.put(temp); ///////////////////////////////////////////////////////// // 一定要添加以下这一段,否则不会有任何数据,因为,在执行in.put(buffer)时buffer的起始位置已经移动到最后,所有需要将buffer的起始位置移动到最开始 buffer.flip(); buffer.get(b); <span style="font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px;"> </span><span class="comment" style="margin: 0px; padding: 0px; border: none; color: rgb(0, 130, 0); font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px;">////自己解码的部分///////</span><span style="margin: 0px; padding: 0px; border: none; font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px;"> </span>
ctx.reset();//清空 return MessageDecoderResult.OK; } else { ctx.setBuffer(buffer); return MessageDecoderResult.NEED_DATA; } } return MessageDecoderResult.NEED_DATA; }2,如果上传的数据有两种类型,也就是说有两个大数组,两个数组不停地轮流向通道中发数据,处理过程中也出现了不少问题。
首先,毋庸置疑肯定要用累积性解码器,在通信网速良好,信息无误的情况下,一切是美好的;
但是,在实际中,项目的要求是要针对两种数据的看重程度不一样的,将两种数组分别说成是数组A,数组B,看重程度如下:
数组A必须准确无误的收到。若成功收到回复一个成功应答标志,否则发一个失败标志;同时,服务器若收到成功标志发下一个数据组B,若收到失败标志则将当前数组再发一次;
而数组B,不管是否有没有成功收到,都回复一种标志,服务器收到该标志后直接发数组A;
在这种模式下,难点就在于数组B 的接收有丢失,这时还停留在B的解码器中,而这是服务器直接发来了数组A;在保证数据不丢失的情况下,如何实现数据的跳转?
具体实现是借助全局变量和通道空闲来实现,直接贴代码说明:
数组A的传输解码器:
public class ADecoder implements MessageDecoder { private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context"); private int k; private ReceiveRight mReceiveRight = new ReceiveRight(); private ReceiveWrong mReceiveWrong = new ReceiveWrong(); @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if(Constants.flag ) {//如果在B解码器中,收到了一部分A数据,该部分数据有区分解码器的标志 Constants.buffer.flip(); Constants.buffer.limit(Constants.positionValue); byte headtail = Constants.buffer.get(); byte functionCode = Constants.buffer.get(); if (functionCode == 0x51 || functionCode == 0x53) { Constants.Isstop=false; return MessageDecoderResult.OK; } else { Constants.Isstop=true; return MessageDecoderResult.NOT_OK; } }else { if (in.remaining() < 2) { return MessageDecoderResult.NEED_DATA; } else { byte functionCode = in.get(); if (functionCode == 0x51 || functionCode == 0x53) { Constants.Isstop=false; return MessageDecoderResult.OK; } else { Constants.Isstop=true; return MessageDecoderResult.NOT_OK; } } } } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, final ProtocolDecoderOutput out) throws Exception { Constants.ctx = getContext(session);//获取session 的context long matchCount = Constants.ctx.getMatchLength();//目前已获取的数据 long length = Constants.ctx.getLength();//数据总长度 IoBuffer buffer = Constants.ctx.getBuffer();//数据存入buffer if (Constants.flag) {//<span style="font-family: Arial, Helvetica, sans-serif;">如果在B解码器中,收到了一部分A数据,要先将这部分数据存入累积性解码器的buffer中,以拼凑完整</span> Constants.buffer.flip(); Constants.buffer.limit(Constants.positionValue); buffer.put(Constants.buffer); matchCount=Constants.positionValue; Constants.buffer.clear(); Constants.flag=false; Constants.positionValue=0; } /////////////////////////////////////////////////// matchCount += in.remaining(); Log.d("abcd", "共收到字节:" + String.valueOf(matchCount)); Constants.ctx.setMatchLength(matchCount); if (in.hasRemaining()) {// 如果in中还有数据 if(matchCount< length) { buffer.put(in);// 添加到保存数据的buffer中 } if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码 final byte[] b = new byte[1614]; byte[] temp = new byte[1614]; in.get(temp,0, (int) (length-buffer.position()));//最后一次in的数据可能有多的 buffer.put(temp); // 一定要添加以下这一段,否则不会有任何数据,因为,在执行in.put(buffer)时buffer的起始位置已经移动到最后,所有需要将buffer的起始位置移动到最开始 buffer.flip(); buffer.get(b); //解码部分
Constants.NotFill = false;//收成功,NotFill表示没满的变量 k++; Log.d("sucess", "成功次数:" + String.valueOf(k)); } Constants.ctx.reset(); return MessageDecoderResult.OK; } else { Constants.ctx.setBuffer(buffer); Constants.NotFill = true; return MessageDecoderResult.NEED_DATA; } } return MessageDecoderResult.NEED_DATA; } @Override public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception { }
数组B的解码器
public class BDecoder implements MessageDecoder { private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context"); private ComputePara computePara=new ComputePara(); private ReceiveRight mReceiveRight = new ReceiveRight(); private ReceiveWrong mReceiveWrong = new ReceiveWrong(); @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if(Constants.flag ){//区分解码器的标志出现在上一次剩余数据中 Constants.buffer.limit(Constants.positionValue); Constants.buffer.flip(); byte functionCode = Constants.buffer.get(); if (functionCode == 0x52) { Constants.Isstop=false; return MessageDecoderResult.OK; } else { Constants.Isstop=true; return MessageDecoderResult.NOT_OK; } }else { if (in.remaining() < 2) { return MessageDecoderResult.NEED_DATA; } else { byte functionCode = in.get(); if (functionCode == 0x52) { Constants.Isstop=false; return MessageDecoderResult.OK; } else { Constants.Isstop=true; return MessageDecoderResult.NOT_OK; } } } } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, final ProtocolDecoderOutput out) throws Exception { if(Constants.IsJump&&(!Constants.Backfail)){ Constants.IsJump=false; Constants.flag = true; Constants.positionValue=in.limit(); Constants.buffer.clear(); Constants.buffer.put(in); Log.d("back", "jump"); return MessageDecoderResult.OK; } Constants.ctxBack = getContext(session);//获取session 的context long matchCount = Constants.ctxBack .getMatchLength();//目前已获取的数据 long length = Constants.ctxBack .getLength();//数据总长度 IoBuffer buffer = Constants.ctxBack .getBuffer();//数据存入buffer /////////////////////////////////////////////////// matchCount += in.remaining(); Log.d("back", "共收到字节:" + String.valueOf(matchCount)); Constants.ctxBack .setMatchLength(matchCount); if (in.hasRemaining()) {// 如果in中还有数据 if(matchCount< length) { buffer.put(in);// 添加到保存数据的buffer中 } if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码 final byte[] b = new byte[1561]; byte[] temp = new byte[1561]; in.get(temp,0, (int) (length-buffer.position()));//最后一次in的数据可能有多的 buffer.put(temp); // 一定要添加以下这一段,否则不会有任何数据,因为,在执行in.put(buffer)时buffer的起始位置已经移动到最后,所有需要将buffer的起始位置移动到最开始 buffer.flip(); buffer.get(b); //解码过程 Constants.ctxBack .reset(); return MessageDecoderResult.OK; } else { Constants.ctxBack .setBuffer(buffer); Constants.Backfail=true; return MessageDecoderResult.NEED_DATA; } } return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception { }
/**
<pre name="code" class="java">public static boolean NotFill=false;//A数据没有收满 public static boolean Backfail=false;//B数据接收失败 public static boolean IsJump=false;/B谱数据接收失败h后让解码器跳转到A频谱 public static int positionValue=0; public static boolean flag=false; public static boolean Isstop=false;//判断是否有解码器
*/ <pre name="code" class="java">@Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { super.sessionIdle(session, status); final ReceiveWrong mReceiveWrong = new ReceiveWrong(); final ReceiveRight mReceiveRight = new ReceiveRight(); //A数据超时重传 if (Constants.NotFill) { Constants.FPGAsession.write(mReceiveWrong); Constants.NotFill = false; Constants.ctx.reset(); Constants.failCount++; Log.d("trans", "重传次数:" + Constants.failCount); }
//B数据传输失败 if (Constants.Backfail) { Constants.FPGAsession.write(mReceiveWrong); Constants.Backfail = false; Constants.IsJump = true; Constants.ctxBack.reset(); } if (Constants.Isstop) { Constants.FPGAsession.write(mReceiveWrong); Constants.Isstop = false; } }
</pre><pre code_snippet_id="1691756" snippet_file_name="blog_20160521_13_6359721" name="code" class="java">在处理上述问题的时候,又相继出现了一个问题,那就是找不到解码器,导致IObuffer中出现了大量垃圾数据,解码器的不断循环,于是专门写了一个用来清除垃圾书据的解码器,
<pre name="code" class="java">public class ClearDecoder implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession ioSession, IoBuffer ioBuffer) { if(Constants.Isstop) {//<span style="font-family: Arial, Helvetica, sans-serif;">Isstop=false;//判断是否有解码器,该标志时在找不到解码器时置为true</span> ioBuffer.sweep(); int n=ioBuffer.remaining(); Constants.buffer.sweep();//buffer中的是错误帧,此时也找不到解码器 Constants.flag=false; Log.d("abcd", "clear解码器清空数据"); } return MessageDecoderResult.OK; } @Override public MessageDecoderResult decode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception { return null; } @Override public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception { } }
</pre><pre code_snippet_id="1691756" snippet_file_name="blog_20160521_16_3786210" name="code" class="java">
标签:
原文地址:http://blog.csdn.net/sangsa/article/details/51471859