标签:mina android protobuf 粘包 断包
一、前述:
近期做项目用到了MINA,其中遇到了一个断包与粘包的问题,困扰了我一天一夜,经过一天一夜的思索与查看其他大牛分享的资料,现将我在解决这一问题过程中的一些心得与解决问题的方法记录下来,供广大IT兄弟姐妹们参考,如有不对或欠妥之处,请指证。请不要吝惜分享您的技术,作为中国IT软件工程师,一定要想到多一个人掌握IT技术,不会给你增加一个竞争对手,如果认为会给你增加竞争对手,这种想法是非常狭隘的,自私自利的。只有分享,大家共同的技术提高了,才能激发出更多的思维解决更加棘手的技术难点,希望大家永远Open,为咱们中国软件技术的振兴尽一分心力。
二、概念
1、什么是粘包,粘包是如何产生的?
指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。
造成的可能原因:
(1)发送端需要等缓冲区满才发送出去,造成粘包。
(2)接收方不及时接收缓冲区的包,造成多个包接收。
2、什么是断包,断包是如何产生的?
也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全。
三、在protobuf中客户端如何接收服务器端响应的数据?
客户端在接收数据时需要考虑以下几种情况:
1. 一个ip包中只包含一个完整消息/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.mina.filter.codec; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.service.TransportMetadata; import org.apache.mina.core.session.AttributeKey; import org.apache.mina.core.session.IoSession; /** * A {@link ProtocolDecoder} that cumulates the content of received * buffers to a <em>cumulative buffer</em> to help users implement decoders. * <p> * If the received {@link IoBuffer} is only a part of a message. * decoders should cumulate received buffers to make a message complete or * to postpone decoding until more buffers arrive. * <p> * Here is an example decoder that decodes CRLF terminated lines into * <code>Command</code> objects: * <pre> * public class CrLfTerminatedCommandLineDecoder * extends CumulativeProtocolDecoder { * * private Command parseCommand(IoBuffer in) { * // Convert the bytes in the specified buffer to a * // Command object. * ... * } * * protected boolean doDecode( * IoSession session, IoBuffer in, ProtocolDecoderOutput out) * throws Exception { * * // Remember the initial position. * int start = in.position(); * * // Now find the first CRLF in the buffer. * byte previous = 0; * while (in.hasRemaining()) { * byte current = in.get(); * * if (previous == '\r' && current == '\n') { * // Remember the current position and limit. * int position = in.position(); * int limit = in.limit(); * try { * in.position(start); * in.limit(position); * // The bytes between in.position() and in.limit() * // now contain a full CRLF terminated line. * out.write(parseCommand(in.slice())); * } finally { * // Set the position to point right after the * // detected line and set the limit to the old * // one. * in.position(position); * in.limit(limit); * } * // Decoded one line; CumulativeProtocolDecoder will * // call me again until I return false. So just * // return true until there are no more lines in the * // buffer. * return true; * } * * previous = current; * } * * // Could not find CRLF in the buffer. Reset the initial * // position to the one we recorded above. * in.position(start); * * return false; * } * } * </pre> * <p> * Please note that this decoder simply forward the call to * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)} if the * underlying transport doesn't have a packet fragmentation. Whether the * transport has fragmentation or not is determined by querying * {@link TransportMetadata}. * * @author <a href="http://mina.apache.org">Apache MINA Project</a> */ public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter { private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer"); /** * Creates a new instance. */ protected CumulativeProtocolDecoder() { // Do nothing } /** * Cumulates content of <tt>in</tt> into internal buffer and forwards * decoding request to {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)}. * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> * and the cumulative buffer is compacted after decoding ends. * * @throws IllegalStateException if your <tt>doDecode()</tt> returned * <tt>true</tt> not consuming the cumulative buffer. */ public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (!session.getTransportMetadata().hasFragmentation()) { while (in.hasRemaining()) { // 判断是否符合解码要求,不符合则中断并返回 if (!doDecode(session, in, out)) { break; } } return; } boolean usingSessionBuffer = true; // 取得上次断包数据 IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER); // If we have a session buffer, append data to that; otherwise // use the buffer read from the network directly. if (buf != null) {// 如果有断包数据 boolean appended = false; // Make sure that the buffer is auto-expanded. if (buf.isAutoExpand()) { try { // 将断包数据和当前传入的数据进行拼接 buf.put(in); appended = true; } catch (IllegalStateException e) { // A user called derivation method (e.g. slice()), // which disables auto-expansion of the parent buffer. } catch (IndexOutOfBoundsException e) { // A user disabled auto-expansion. } } if (appended) { buf.flip();// 如果是拼接的数据,将buf置为读模式 } else { // Reallocate the buffer if append operation failed due to // derivation or disabled auto-expansion. //如果buf不是可自动扩展的buffer,则通过数据拷贝的方式将断包数据和当前数据进行拼接 buf.flip(); IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true); newBuf.order(buf.order()); newBuf.put(buf); newBuf.put(in); newBuf.flip(); buf = newBuf; // Update the session attribute. session.setAttribute(BUFFER, buf); } } else { buf = in; usingSessionBuffer = false; } for (;;) { int oldPos = buf.position(); boolean decoded = doDecode(session, buf, out);// 进行数据的解码操作 if (decoded) { // 如果符合解码要求并进行了解码操作,则当前position和解码前的position不可能一样 if (buf.position() == oldPos) { throw new IllegalStateException("doDecode() can't return true when buffer is not consumed."); } // 如果已经没有数据,则退出循环 if (!buf.hasRemaining()) { break; } } else { // 如果不符合解码要求,则退出循环 break; } } // if there is any data left that cannot be decoded, we store // it in a buffer in the session and next time this decoder is // invoked the session buffer gets appended to if (buf.hasRemaining()) { if (usingSessionBuffer && buf.isAutoExpand()) { buf.compact(); } else { //如果还有没处理完的数据(一般为断包),刚将此数据存入session中,以便和下次数据进行拼接。 storeRemainingInSession(buf, session); } } else { if (usingSessionBuffer) { removeSessionBuffer(session); } } } /** * Implement this method to consume the specified cumulative buffer and * decode its content into message(s). * * @param in the cumulative buffer * @return <tt>true</tt> if and only if there's more to decode in the buffer * and you want to have <tt>doDecode</tt> method invoked again. * Return <tt>false</tt> if remaining data is not enough to decode, * then this method will be invoked again when more data is cumulated. * @throws Exception if cannot decode <tt>in</tt>. */ protected abstract boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception; /** * Releases the cumulative buffer used by the specified <tt>session</tt>. * Please don't forget to call <tt>super.dispose( session )</tt> when * you override this method. */ @Override public void dispose(IoSession session) throws Exception { removeSessionBuffer(session); } private void removeSessionBuffer(IoSession session) { session.removeAttribute(BUFFER); } private void storeRemainingInSession(IoBuffer buf, IoSession session) { final IoBuffer remainingBuf = IoBuffer.allocate(buf.capacity()).setAutoExpand(true); remainingBuf.order(buf.order()); remainingBuf.put(buf); session.setAttribute(BUFFER, remainingBuf); } }
package com.goodwin.finance.net.socket; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.ArrayList; import java.util.List; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import android.util.Log; import com.goodwin.finance.protobuf.HqProtos.tagAnsHead; import com.goodwin.finance.protobuf.HqProtos.tagCommon; import com.goodwin.finance.protobuf.HqProtos.tagCommon.MF; import com.goodwin.finance.protobuf.HqProtos.tagCommon.SF; import com.goodwin.finance.protobuf.HqProtos.tagResponseDataArchitecture; import com.goodwin.finance.protobuf.HqProtos.tagResponseHeartbeat; import com.goodwin.finance.protobuf.HqProtos.tagResponseICSortFastData; import com.goodwin.finance.protobuf.HqProtos.tagResponseL1StockMinuteData; import com.goodwin.finance.util.DataUtil; import com.google.protobuf.InvalidProtocolBufferException; /** * 将行情服务器返回的数据进行解包处理,并重新编码 * @author Administrator */ public class SocketDecoder extends CumulativeProtocolDecoder { // 日志输出标记 private static final String TAG = "SocketDecoder"; // 字符流转码对象 private final CharsetDecoder charsetDecoder; // IP包包头的字节数 public static final int MSG_HEADER_LENGTH = 4; // IP包中CRC的长度 public static final int CRC_LENGTH = 2; public SocketDecoder(String charset) { this.charsetDecoder = Charset.forName(charset).newDecoder(); } @Override protected synchronized boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { //解析数据包头部信息,获取到该响应数据的总长度 if (HqClient.time == -1) { deCodeHeader(session, in); HqClient.time = 1; } //1、开始接收第一个数据包 int dataLength = in.remaining(); if (dataLength >= MSG_HEADER_LENGTH + CRC_LENGTH) { //如果当前读取的缓冲区中的数据,小于响应数据的总长度,则让缓冲区继续累积数据,一直累积到数据接收完成,即缓冲区中数据的长度等于响应数据的总长度时,则开始解析. if (dataLength < HqClient.contentSize) { Log.i(TAG, "=======================缓冲数据区缓冲大小:" + dataLength + "====================="); return false;//接收下一数据包,累积至缓冲区,直至接收完成. } else {//如果当前缓冲区的数据等于 发送内容的长度,则开始解析数据,否则继续缓冲 byte[] dataB = new byte[dataLength]; in.get(dataB); HqClient.dataBuf.put(dataB); HqClient.dataBuf.flip(); byte[] headSizeB = new byte[4]; HqClient.dataBuf.get(headSizeB); byte[] crcB = new byte[2]; HqClient.dataBuf.get(crcB); int headsize = DataUtil.byteArray2int(headSizeB); byte[] headBytes = new byte[headsize];// 请求头tagReqHead HqClient.dataBuf.get(headBytes); tagAnsHead reqHead = tagAnsHead.parseFrom(headBytes); tagCommon comm = reqHead.getComm(); int contentsize = comm.getContentsize(); byte[] contentBytes = new byte[contentsize];// 请求内容 HqClient.dataBuf.get(contentBytes); List<Object> hqDatas = new ArrayList<Object>(); if (comm.getMainFunction().equals(MF.Stock)) { if (comm.getSubFunction().equals(SF.dat_architecture)) {// 数据框架 tagResponseDataArchitecture dataArchitecture = tagResponseDataArchitecture.parseFrom(contentBytes); hqDatas.add(dataArchitecture); } else if (comm.getSubFunction().equals(SF.baojialiebiao)) {//报价列表 tagResponseICSortFastData icSortFastData = tagResponseICSortFastData.parseFrom(contentBytes); hqDatas.add(icSortFastData); } else if (comm.getSubFunction().equals(SF.l1RealTime)) {//多个股票分时线 tagResponseL1StockMinuteData l1StockMinuteData = tagResponseL1StockMinuteData.parseFrom(contentBytes); hqDatas.add(l1StockMinuteData); } else if (comm.getSubFunction().equals(SF.heartbeat)) { tagResponseHeartbeat heartbeat = tagResponseHeartbeat.parseFrom(contentBytes); hqDatas.add(heartbeat); } } else if (comm.getMainFunction().equals(MF.Notify)) { hqDatas.add(reqHead); } HqClient.totalLength = -1; HqClient.contentSize = -1; HqClient.time = -1;// HqClient.dataBuf.flip();//极限设为位置,位置设为0。 HqClient.dataBuf.clear();//不改变极限,位置设为0。 out.write(hqDatas); Log.i(TAG, "=======================缓冲区数据解析完毕:" + dataLength + "====================="); return true; } } return false; } /** * 解析包头 * @throws InvalidProtocolBufferException */ private void deCodeHeader(IoSession session, IoBuffer in) throws InvalidProtocolBufferException { if (in.remaining() >= MSG_HEADER_LENGTH + CRC_LENGTH) { int totalLength = HqClient.totalLength; byte[] headSizeB = new byte[4]; in.get(headSizeB); HqClient.dataBuf.put(headSizeB); byte[] crcB = new byte[2]; in.get(crcB); HqClient.dataBuf.put(crcB); int headsize = DataUtil.byteArray2int(headSizeB); byte[] headBytes = new byte[headsize]; in.get(headBytes); HqClient.dataBuf.put(headBytes); tagAnsHead reqHead = tagAnsHead.parseFrom(headBytes); tagCommon comm = reqHead.getComm(); int contentsize = comm.getContentsize(); totalLength = 4 + 2 + headsize + contentsize; HqClient.contentSize = contentsize; HqClient.totalLength = totalLength; Log.i(TAG, "===================================信息头长度为:" + headsize + "==================================="); Log.i(TAG, "===================================信息内容长度为:" + contentsize + "==================================="); Log.i(TAG, "===================================信息总长度为:" + totalLength + "==================================="); } } }
标签:mina android protobuf 粘包 断包
原文地址:http://blog.csdn.net/qinyunying/article/details/41351899