粘包:指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。
造成的可能原因:
发送端需要等缓冲区满才发送出去,造成粘包
接收方不及时接收缓冲区的包,造成多个包接收
断包:也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全。
消息长度+消息头+消息体 即前N个字节用于存储消息的长度,用于判断当前消息什么时候结束。
消息头+消息体 即固定长度的消息,前几个字节为消息头,后面的是消息头。
在MINA中用的是
消息长度+消息体 即前4个字节用于存储消息的长度,用于判断当前消息什么时候结束。
在网络中,信息的传输都是通过字节的形式传输的,而我们在编写自己的代码时,则都是具体的对象,那么要想我们的对象能够在网络中传输,就需要编码与解码。
编码:即把我们的消息编码成二进制形式,能以字节的形式在网络中传输。
解码:即把我们收到的字节解码成我们代码中的对象。
在MINA中对象的编码与解码用的都是JDK提供的ObjectOutputStream来实现的。
消息的接受处理,我们常用的是TCP协议,而TCP协议会分片的,在下面的代码中,具体功能就是循环从通道里面读取数据,直到没有数据可读,或者buffer满了,然后就把接受到的数据发给解码工厂进行处理。
//class AbstractPollingIoProcessor private void read(S session) { IoSessionConfig config = session.getConfig(); int bufferSize = config.getReadBufferSize(); IoBuffer buf = IoBuffer.allocate(bufferSize); final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); try { int readBytes = 0; int ret; try { //是否有分片 tcp传输会有分片,即把大消息分片成多个小消息再传输 if (hasFragmentation) { //read方法非阻塞,没有读到数据的时候返回0 while ((ret = read(session, buf)) > 0) { readBytes += ret; //buffer 满了 if (!buf.hasRemaining()) { break; } } } else { ret = read(session, buf); if (ret > 0) { readBytes = ret; } } } finally { buf.flip(); } if (readBytes > 0) { IoFilterChain filterChain = session.getFilterChain(); //处理消息 filterChain.fireMessageReceived(buf); buf = null; if (hasFragmentation) { if (readBytes << 1 < config.getReadBufferSize()) { session.decreaseReadBufferSize(); } else if (readBytes == config.getReadBufferSize()) { session.increaseReadBufferSize(); } } } if (ret < 0) { scheduleRemove(session); } } catch (Throwable e) { if (e instanceof IOException) { if (!(e instanceof PortUnreachableException) || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass()) || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) { scheduleRemove(session); } } IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } }
//class AbstractIoBuffer public Object getObject(final ClassLoader classLoader) throws ClassNotFoundException { //首先判断当前buffer中消息长度是否完整,不完整的话直接返回 if (!prefixedDataAvailable(4)) { throw new BufferUnderflowException(); } //消息长度 int length = getInt(); if (length <= 4) { throw new BufferDataException("Object length should be greater than 4: " + length); } int oldLimit = limit(); //limit到消息结尾处 limit(position() + length); try { ObjectInputStream in = new ObjectInputStream(asInputStream()) { @Override protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException { int type = read(); if (type < 0) { throw new EOFException(); } switch (type) { case 0: // NON-Serializable class or Primitive types return super.readClassDescriptor(); case 1: // Serializable class String className = readUTF(); Class<?> clazz = Class.forName(className, true, classLoader); return ObjectStreamClass.lookup(clazz); default: throw new StreamCorruptedException("Unexpected class descriptor type: " + type); } } @Override protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { String name = desc.getName(); try { return Class.forName(name, false, classLoader); } catch (ClassNotFoundException ex) { return super.resolveClass(desc); } } }; return in.readObject(); } catch (IOException e) { throw new BufferDataException(e); } finally { limit(oldLimit); } } //判断当前消息是否完整 public boolean prefixedDataAvailable(int prefixLength, int maxDataLength) { if (remaining() < prefixLength) { return false; } int dataLength; switch (prefixLength) { case 1: dataLength = getUnsigned(position()); break; case 2: dataLength = getUnsignedShort(position()); break; case 4: dataLength = getInt(position()); break; default: throw new IllegalArgumentException("prefixLength: " + prefixLength); } if (dataLength < 0 || dataLength > maxDataLength) { throw new BufferDataException("dataLength: " + dataLength); } //判断当前消息是否完整 return remaining() - prefixLength >= dataLength; } //编码 public IoBuffer putObject(Object o) { int oldPos = position(); skip(4); // Make a room for the length field.预留4个字节用于存储消息长度 try { ObjectOutputStream out = new ObjectOutputStream(asOutputStream()) { @Override protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException { try { Class<?> clz = Class.forName(desc.getName()); if (!Serializable.class.isAssignableFrom(clz)) { // NON-Serializable class write(0); super.writeClassDescriptor(desc); } else { // Serializable class write(1); writeUTF(desc.getName()); } } catch (ClassNotFoundException ex) { // Primitive types write(0); super.writeClassDescriptor(desc); } } }; out.writeObject(o); out.flush(); } catch (IOException e) { throw new BufferDataException(e); } // Fill the length field int newPos = position(); position(oldPos); //存储消息长度 putInt(newPos - oldPos - 4); position(newPos); return this; }
// class CumulativeProtocolDecoder public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { //是否有分片,tcp 有分片 if (!session.getTransportMetadata().hasFragmentation()) { while (in.hasRemaining()) { if (!doDecode(session, in, out)) { break; } } return; } // 1、断包处理 // 2、处理粘包 boolean usingSessionBuffer = true; //session中是否有断包情况(上次处理后),断包保存在session中 IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER); // If we have a session buffer, append data to that; otherwise // use the buffer read from the network directly. if (buf != null) {//有断包,则把当前包拼接到断包里面 boolean appended = false; // Make sure that the buffer is auto-expanded. if (buf.isAutoExpand()) { try { buf.put(in); appended = true; } catch (IllegalStateException e) { // A user called derivation method (e.g. slice()), // which disables auto-expansion of the parent buffer. } catch (IndexOutOfBoundsException e) { // A user disabled auto-expansion. } } if (appended) { buf.flip(); } else { // Reallocate the buffer if append operation failed due to // derivation or disabled auto-expansion. buf.flip(); IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true); newBuf.order(buf.order()); newBuf.put(buf); newBuf.put(in); newBuf.flip(); buf = newBuf; // Update the session attribute. session.setAttribute(BUFFER, buf); } } else { buf = in; usingSessionBuffer = false; } //2 粘包处理,可能buffer中有多个消息,需要多次处理(解码)每个消息,直到消息处理完,或者剩下的消息不是一个完整的消息或者buffer没有数据了 for (;;) { int oldPos = buf.position(); boolean decoded = doDecode(session, buf, out); if (decoded) {//解码 成功 if (buf.position() == oldPos) { throw new IllegalStateException("doDecode() can‘t return true when buffer is not consumed."); } //buffer空了 if (!buf.hasRemaining()) {//buffer没有数据了 break; } } else {//剩下的消息不是一个完整的消息,断包出现了 break; } } // if there is any data left that cannot be decoded, we store // it in a buffer in the session and next time this decoder is // invoked the session buffer gets appended to if (buf.hasRemaining()) {//剩下的消息不是一个完整的消息,断包出现了 //如果断包已经保存在session中,则更新buffer,没有的话,就把剩下的断包保存在session中 if (usingSessionBuffer && buf.isAutoExpand()) { buf.compact(); } else { storeRemainingInSession(buf, session); } } else { if (usingSessionBuffer) { removeSessionBuffer(session); } } }
//class ObjectSerializationDecoder protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { //首先判断当前buffer中消息长度是否完整,不完整的话直接返回 if (!in.prefixedDataAvailable(4, maxObjectSize)) { return false; } out.write(in.getObject(classLoader)); return true; }
NIO框架之MINA源码解析(四):粘包与断包处理及编码与解码
原文地址:http://blog.csdn.net/chaofanwei/article/details/38928409