码迷,mamicode.com
首页 > 其他好文 > 详细

Thrift源码分析(五)-- FrameBuffer类分析

时间:2014-09-30 16:21:19      阅读:366      评论:0      收藏:0      [点我收藏+]

标签:thrift   rpc   

FrameBuffer是Thrift NIO服务器端的一个核心组件,它一方面承担了NIO编程中的缓冲区的功能,另一方面还承担了RPC方法调用的职责。


bubuko.com,布布扣


FrameBufferState定义了FrameBuffer作为缓冲区的读写状态

private enum FrameBufferState {
    // in the midst of reading the frame size off the wire
    // 读Frame消息头,实际是4字节表示Frame长度
    READING_FRAME_SIZE,
    // reading the actual frame data now, but not all the way done yet
    // 读Frame消息体
    READING_FRAME,
    // completely read the frame, so an invocation can now happen
    // 读满包 
    READ_FRAME_COMPLETE,
    // waiting to get switched to listening for write events
    // 等待注册写
    AWAITING_REGISTER_WRITE,
    // started writing response data, not fully complete yet
    // 写半包 
    WRITING,
    // another thread wants this framebuffer to go back to reading
    // 等待注册读
    AWAITING_REGISTER_READ,
    // we want our transport and selection key invalidated in the selector
    // thread
    // 等待关闭 
    AWAITING_CLOSE
  }

值得注意的是,FrameBuffer读数据时,

1. 先读4字节的Frame消息头,

2. 然后改变FrameBufferState,从READING_FRMAE_SIZE到READING_FRAME,并根据读到的Frame长度修改Buffer的长度

3. 再次读Frame消息体,如果读完就修改状态到READ_FRAME_COMPLETE,否则还是把FrameBuffer绑定到SelectionKey,下次继续读

public boolean read() {
      if (state_ == FrameBufferState.READING_FRAME_SIZE) {
        // try to read the frame size completely
        if (!internalRead()) {
          return false;
        }

        // if the frame size has been read completely, then prepare to read the
        // actual frame.
        if (buffer_.remaining() == 0) {
          // pull out the frame size as an integer.
          int frameSize = buffer_.getInt(0);
          if (frameSize <= 0) {
            LOGGER.error("Read an invalid frame size of " + frameSize
                + ". Are you using TFramedTransport on the client side?");
            return false;
          }

          // if this frame will always be too large for this server, log the
          // error and close the connection.
          if (frameSize > MAX_READ_BUFFER_BYTES) {
            LOGGER.error("Read a frame size of " + frameSize
                + ", which is bigger than the maximum allowable buffer size for ALL connections.");
            return false;
          }

          // if this frame will push us over the memory limit, then return.
          // with luck, more memory will free up the next time around.
          if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
            return true;
          }

          // increment the amount of memory allocated to read buffers
          readBufferBytesAllocated.addAndGet(frameSize);

          // reallocate the readbuffer as a frame-sized buffer
          buffer_ = ByteBuffer.allocate(frameSize);

          state_ = FrameBufferState.READING_FRAME;
        } else {
          // this skips the check of READING_FRAME state below, since we can't
          // possibly go on to that state if there's data left to be read at
          // this one.
          return true;
        }
      }

      // it is possible to fall through from the READING_FRAME_SIZE section
      // to READING_FRAME if there's already some frame data available once
      // READING_FRAME_SIZE is complete.

      if (state_ == FrameBufferState.READING_FRAME) {
        if (!internalRead()) {
          return false;
        }

        // since we're already in the select loop here for sure, we can just
        // modify our selection key directly.
        if (buffer_.remaining() == 0) {
          // get rid of the read select interests
          selectionKey_.interestOps(0);
          state_ = FrameBufferState.READ_FRAME_COMPLETE;
        }

        return true;
      }

      // if we fall through to this point, then the state must be invalid.
      LOGGER.error("Read was called but state is invalid (" + state_ + ")");
      return false;
    }

internalRead方法实际调用了SocketChannel来读数据。注意SocketChannel返回值小于0的情况:

n 有数据的时候返回读取到的字节数。
0 没有数据并且没有达到流的末端时返回0。
-1 当达到流末端的时候返回-1。

当Channel有数据时并且是最后的数据 时,实际会读两次,第一次返回字节数,第二次返回-1。这个是底层Selector实现的。

 private boolean internalRead() {
      try {
        if (trans_.read(buffer_) < 0) {
          return false;
        }
        return true;
      } catch (IOException e) {
        LOGGER.warn("Got an IOException in internalRead!", e);
        return false;
      }
    }

在看写缓冲时的情况

1. 写之前必须把FrameBuffer的状态改成WRITING,后面会有具体例子

2. 如果没写任何数据,就返回false

3. 如果写完了,就需要把SelectionKey注册的写事件取消。Thrift是直接把SelectionKey注册事件改成读了,而常用的做法一般是把写事件取消就行了。关于更多NIO写事件的注册问题,看这篇: http://blog.csdn.net/iter_zc/article/details/39291129

  public boolean write() {
      if (state_ == FrameBufferState.WRITING) {
        try {
          if (trans_.write(buffer_) < 0) {
            return false;
          }
        } catch (IOException e) {
          LOGGER.warn("Got an IOException during write!", e);
          return false;
        }

        // we're done writing. now we need to switch back to reading.
        if (buffer_.remaining() == 0) {
          prepareRead();
        }
        return true;
      }

      LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
      return false;
    }

FrameBuffer可以根据SelectionKey的状态来切换自身状态,也可以根据自身状态来选择注册的Channel事件

public void changeSelectInterests() {
      if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) {
        // set the OP_WRITE interest
        selectionKey_.interestOps(SelectionKey.OP_WRITE);
        state_ = FrameBufferState.WRITING;
      } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) {
        prepareRead();
      } else if (state_ == FrameBufferState.AWAITING_CLOSE) {
        close();
        selectionKey_.cancel();
      } else {
        LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")");
      }
    }

说完了FrameBuffer作为NIO缓冲区的功能,再看看它作为RPC方法调用模型的重要组件的功能。


FrameBuffer提供了invoker方法,当读满包时,从消息头拿到要调用的方法,然后通过它管理的Processor来完成实际方法调用。然后切换到写模式来写消息体

具体的调用模型看这篇: http://blog.csdn.net/iter_zc/article/details/39692951

public void invoke() {
      TTransport inTrans = getInputTransport();
      TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
      TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());

      try {
        processorFactory_.getProcessor(inTrans).process(inProt, outProt);
        responseReady();
        return;
      } catch (TException te) {
        LOGGER.warn("Exception while invoking!", te);
      } catch (Throwable t) {
        LOGGER.error("Unexpected throwable while invoking!", t);
      }
      // This will only be reached when there is a throwable.
      state_ = FrameBufferState.AWAITING_CLOSE;
      requestSelectInterestChange();
    }

 public void responseReady() {
      // the read buffer is definitely no longer in use, so we will decrement
      // our read buffer count. we do this here as well as in close because
      // we'd like to free this read memory up as quickly as possible for other
      // clients.
      readBufferBytesAllocated.addAndGet(-buffer_.array().length);

      if (response_.len() == 0) {
        // go straight to reading again. this was probably an oneway method
        state_ = FrameBufferState.AWAITING_REGISTER_READ;
        buffer_ = null;
      } else {
        buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());

        // set state that we're waiting to be switched to write. we do this
        // asynchronously through requestSelectInterestChange() because there is
        // a possibility that we're not in the main thread, and thus currently
        // blocked in select(). (this functionality is in place for the sake of
        // the HsHa server.)
        state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
      }
      requestSelectInterestChange();
    }

写消息体responseReday()方法时,我们看到Thrift是如何处理写的

1. 创建ByteBuffer

2. 修改状态到AWAITING_REGISTER_WRITE

3. 调用requestSelecInteresetChange()方法来注册Channel的写事件

4. 当Selector根据isWriteable状态来调用要写的Channel时,会调用FrameBuffer的write方法,上面说了write方法写满包后,会取消注册的写事件。

Thrift源码分析(五)-- FrameBuffer类分析

标签:thrift   rpc   

原文地址:http://blog.csdn.net/iter_zc/article/details/39694129

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!