标签:upd 缓冲区 tcl make plain corrupted corrupt 源码解析 字节
1、粘包与段包
粘包:指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。
造成的可能原因:
发送端需要等缓冲区满才发送出去,造成粘包
接收方不及时接收缓冲区的包,造成多个包接收
断包:也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全。
2、消息传输的格式
消息长度+消息头+消息体 即前N个字节用于存储消息的长度,用于判断当前消息什么时候结束。
消息头+消息体 即固定长度的消息,前几个字节为消息头,后面的是消息头。
在MINA中用的是
消息长度+消息体 即前4个字节用于存储消息的长度,用于判断当前消息什么时候结束。
3、编码与解码
在网络中,信息的传输都是通过字节的形式传输的,而我们在编写自己的代码时,则都是具体的对象,那么要想我们的对象能够在网络中传输,就需要编码与解码。
编码:即把我们的消息编码成二进制形式,能以字节的形式在网络中传输。
解码:即把我们收到的字节解码成我们代码中的对象。
在MINA中对象的编码与解码用的都是JDK提供的ObjectOutputStream来实现的。
4、MINA中消息的处理实现
消息的接受处理,我们常用的是TCP协议,而TCP协议会分片的,在下面的代码中,具体功能就是循环从通道里面读取数据,直到没有数据可读,或者buffer满了,然后就把接受到的数据发给解码工厂进行处理。
4.1、消息的接收
- private void read(S session) {
- IoSessionConfig config = session.getConfig();
- int bufferSize = config.getReadBufferSize();
- IoBuffer buf = IoBuffer.allocate(bufferSize);
-
- final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
-
- try {
- int readBytes = 0;
- int ret;
-
- try {
-
- if (hasFragmentation) {
-
- while ((ret = read(session, buf)) > 0) {
- readBytes += ret;
-
- if (!buf.hasRemaining()) {
- break;
- }
- }
- } else {
- ret = read(session, buf);
-
- if (ret > 0) {
- readBytes = ret;
- }
- }
- } finally {
- buf.flip();
- }
-
- if (readBytes > 0) {
- IoFilterChain filterChain = session.getFilterChain();
-
- filterChain.fireMessageReceived(buf);
- buf = null;
-
- if (hasFragmentation) {
- if (readBytes << 1 < config.getReadBufferSize()) {
- session.decreaseReadBufferSize();
- } else if (readBytes == config.getReadBufferSize()) {
- session.increaseReadBufferSize();
- }
- }
- }
-
- if (ret < 0) {
- scheduleRemove(session);
- }
- } catch (Throwable e) {
- if (e instanceof IOException) {
- if (!(e instanceof PortUnreachableException)
- || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
- || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
- scheduleRemove(session);
- }
- }
-
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
- }
- }
4.2、解码与编码
4.3、断包与粘包处理
- public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
-
- if (!session.getTransportMetadata().hasFragmentation()) {
- while (in.hasRemaining()) {
- if (!doDecode(session, in, out)) {
- break;
- }
- }
-
- return;
- }
-
-
-
- boolean usingSessionBuffer = true;
-
- IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
-
-
- if (buf != null) {
- boolean appended = false;
-
- if (buf.isAutoExpand()) {
- try {
- buf.put(in);
- appended = true;
- } catch (IllegalStateException e) {
-
-
- } catch (IndexOutOfBoundsException e) {
-
- }
- }
-
- if (appended) {
- buf.flip();
- } else {
-
-
- buf.flip();
- IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
- newBuf.order(buf.order());
- newBuf.put(buf);
- newBuf.put(in);
- newBuf.flip();
- buf = newBuf;
-
-
- session.setAttribute(BUFFER, buf);
- }
- } else {
- buf = in;
- usingSessionBuffer = false;
- }
-
-
-
- for (;;) {
- int oldPos = buf.position();
- boolean decoded = doDecode(session, buf, out);
- if (decoded) {
- if (buf.position() == oldPos) {
- throw new IllegalStateException("doDecode() can‘t return true when buffer is not consumed.");
- }
-
- if (!buf.hasRemaining()) {
- break;
- }
- } else {
- break;
- }
- }
-
-
-
-
- if (buf.hasRemaining()) {
-
- if (usingSessionBuffer && buf.isAutoExpand()) {
- buf.compact();
- } else {
- storeRemainingInSession(buf, session);
- }
- } else {
- if (usingSessionBuffer) {
- removeSessionBuffer(session);
- }
- }
- }
- protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
-
- if (!in.prefixedDataAvailable(4, maxObjectSize)) {
- return false;
- }
-
- out.write(in.getObject(classLoader));
- return true;
- }
NIO框架之MINA源码解析(四):粘包与断包处理及编码与解码
标签:upd 缓冲区 tcl make plain corrupted corrupt 源码解析 字节
原文地址:http://www.cnblogs.com/duanxz/p/6754610.html