码迷,mamicode.com
首页 > 其他好文 > 详细

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(一)

时间:2015-04-13 19:02:42      阅读:120      评论:0      收藏:0      [点我收藏+]

标签:hbase   源码   

如下面的代码所示,是HBase Put操作的简单代码实例,关于代码中的Connection connection = ConnectionFactory.createConnection(conf),已近在前一篇博 HBase1.0.0源码分析之Client启动连接流程,中介绍了链接的相关流程以及所启动的服务信息。
            TableName tn = TableName.valueOf("test010");
            try (Connection connection = ConnectionFactory.createConnection(conf)) {
                try (Table table = connection.getTable(tn)) {
                    Put put = new Put("ROW1".getBytes());
                    put.addColumn("CF1".getBytes(),"column1".getBytes(),"value1".getBytes());
                    put.addColumn("CF2".getBytes(),"column1".getBytes(),"value1".getBytes());
                    table.put(put);
                    System.out.println("done!");

                }
            }
本文着重解析put是如何被一步步的传送到服务器端以及被服务器端调用的。首先我们有必要回顾一下关于Connection的类型结构,如下图所示:HConnectionImplementation 类是实际负责和服务器连接的,要想对表的数据操作,例如例子中的put我们首选需要获取一个Table的的实例,这个可以从connection中拿到,
技术分享
        public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
            if (managed) {
                throw new NeedUnmanagedConnectionException();
            }
            return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool);
        }
Table其实就是一个操作接口,真正的实现类是HTable,HTable可以负责对单一的HBase的数据表进行数据的插入删除等数据层次的操作,该类目前只是HBase Internal 的,对外的接口是Table,获取HTable实例之后就是对操作进行执行了,
  /**
   * {@inheritDoc}
   * @throws IOException
   */
  @Override
  public void put(final Put put) throws IOException {
    getBufferedMutator().mutate(put);
    if (autoFlush) {
      flushCommits();
    }
  }
以上的代码就是HTable操作的原型,这里进行了一系列的调用,我们一一分析,首先是getBufferedMutator()函数,
该函数返回一个实现的实例BufferedMutatorImpl,该类和HTable类似,负责和单个HBase的table通信,但是他对put的操作是batch的,并且具有异步执行的能力
mutate在内部会调用doMutate的方法:
  private void doMutate(Mutation m) throws InterruptedIOException,
      RetriesExhaustedWithDetailsException {
    if (closed) {
      throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
    }
    if (!(m instanceof Put) && !(m instanceof Delete)) {
      throw new IllegalArgumentException("Pass a Delete or a Put");
    }

    // This behavior is highly non-intuitive... it does not protect us against
    // 94-incompatible behavior, which is a timing issue because hasError, the below code
    // and setter of hasError are not synchronized. Perhaps it should be removed.
    if (ap.hasError()) {
      writeAsyncBuffer.add(m);
      backgroundFlushCommits(true);
    }

    if (m instanceof Put) {
      validatePut((Put) m);
    }

    currentWriteBufferSize += m.heapSize();
    writeAsyncBuffer.add(m);

    while (currentWriteBufferSize > writeBufferSize) {
      backgroundFlushCommits(false);
    }
  }

有效代码也就是这一句:writeAsyncBuffer.add(m);其实也就是向一个异步缓冲区添加该操作,单后当一定条件的时候进行flash,当发生flash操作的时候,才会真正的去执行该操作,这主要是提高系统的吞吐率,接下来我们去看看这个flush的操作内部吧。
  private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException,
      RetriesExhaustedWithDetailsException {
    try {
      if (!synchronous) {
        ap.submit(tableName, writeAsyncBuffer, true, null, false);
        if (ap.hasError()) {
          LOG.debug(tableName + ": One or more of the operations have failed -"
              + " waiting for all operation in progress to finish (successfully or not)");
        }
      }
这个刷新操作可以是制定异步提交还是同步提交,从doMutate中来看默认是以异步的方式进行,这里的ap是AsyncProcess类的一个实例,该类使用多线程的来实现异步的请求,通过Future进行线程中服务器端数据的获取。这里的过程也比较复杂,我将在下一篇文章中继续。

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(一)

标签:hbase   源码   

原文地址:http://blog.csdn.net/youmengjiuzhuiba/article/details/45024679

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!