TableName tn = TableName.valueOf("test010"); try (Connection connection = ConnectionFactory.createConnection(conf)) { try (Table table = connection.getTable(tn)) { Put put = new Put("ROW1".getBytes()); put.addColumn("CF1".getBytes(),"column1".getBytes(),"value1".getBytes()); put.addColumn("CF2".getBytes(),"column1".getBytes(),"value1".getBytes()); table.put(put); System.out.println("done!"); } }本文着重解析put是如何被一步步的传送到服务器端以及被服务器端调用的。首先我们有必要回顾一下关于Connection的类型结构,如下图所示:HConnectionImplementation 类是实际负责和服务器连接的,要想对表的数据操作,例如例子中的put我们首选需要获取一个Table的的实例,这个可以从connection中拿到,
public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException { if (managed) { throw new NeedUnmanagedConnectionException(); } return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool); }Table其实就是一个操作接口,真正的实现类是HTable,HTable可以负责对单一的HBase的数据表进行数据的插入删除等数据层次的操作,该类目前只是HBase Internal 的,对外的接口是Table,获取HTable实例之后就是对操作进行执行了,
/** * {@inheritDoc} * @throws IOException */ @Override public void put(final Put put) throws IOException { getBufferedMutator().mutate(put); if (autoFlush) { flushCommits(); } }以上的代码就是HTable操作的原型,这里进行了一系列的调用,我们一一分析,首先是getBufferedMutator()函数,
private void doMutate(Mutation m) throws InterruptedIOException, RetriesExhaustedWithDetailsException { if (closed) { throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); } if (!(m instanceof Put) && !(m instanceof Delete)) { throw new IllegalArgumentException("Pass a Delete or a Put"); } // This behavior is highly non-intuitive... it does not protect us against // 94-incompatible behavior, which is a timing issue because hasError, the below code // and setter of hasError are not synchronized. Perhaps it should be removed. if (ap.hasError()) { writeAsyncBuffer.add(m); backgroundFlushCommits(true); } if (m instanceof Put) { validatePut((Put) m); } currentWriteBufferSize += m.heapSize(); writeAsyncBuffer.add(m); while (currentWriteBufferSize > writeBufferSize) { backgroundFlushCommits(false); } }有效代码也就是这一句:writeAsyncBuffer.add(m);其实也就是向一个异步缓冲区添加该操作,单后当一定条件的时候进行flash,当发生flash操作的时候,才会真正的去执行该操作,这主要是提高系统的吞吐率,接下来我们去看看这个flush的操作内部吧。
private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException { try { if (!synchronous) { ap.submit(tableName, writeAsyncBuffer, true, null, false); if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); } }这个刷新操作可以是制定异步提交还是同步提交,从doMutate中来看默认是以异步的方式进行,这里的ap是AsyncProcess类的一个实例,该类使用多线程的来实现异步的请求,通过Future进行线程中服务器端数据的获取。这里的过程也比较复杂,我将在下一篇文章中继续。
HBase1.0.0源码分析之请求处理流程分析以Put操作为例(一)
原文地址:http://blog.csdn.net/youmengjiuzhuiba/article/details/45024679