标签:
使用Java API与HBase集群交互时,需要构建HTable对象,使用该对象提供的方法来进行插入/删除/查询等操作。要创建HTable对象,首先要创建一个带有HBase集群信息的配置对象Configuration conf,其一般创建方法如下:
Configuration conf = HBaseConfiguration.create(); //设置HBase集群的IP和端口 conf.set("hbase.zookeeper.quorum", "10.172.1.61"); conf.set("hbase.zookeeper.property.clientPort", "2181");
在拥有了conf之后,可以通过HTable提供的如下两种构造方法来创建HTable对象:
(1)直接利用conf来创建HTable对象,对应的构造函数如下:
public HTable(Configuration conf, final TableName tableName) throws IOException { this.tableName = tableName; this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true; if (conf == null) { this.connection = null; return; } this.connection = HConnectionManager.getConnection(conf); this.configuration = conf; this.pool = getDefaultExecutor(conf); this.finishSetup(); }
注意红色部分的代码。这种构造方法实际上调用了HConnectionManager的getConnection函数,来获取了一个HConnection对象。一般使用Java API进行数据库操作的时候,都会创建一个类似的connection对象来维护一些数据库连接相关的信息(熟悉odbc,jdbc的话这一块就没有理解问题)。getConnection函数的具体实现如下:
public static HConnection getConnection(final Configuration conf) throws IOException { HConnectionKey connectionKey = new HConnectionKey(conf); synchronized (CONNECTION_INSTANCES) { HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey); if (connection == null) { connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } else if (connection.isClosed()) { HConnectionManager.deleteConnection(connectionKey, true); connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } connection.incCount(); return connection; } }
其中,CONNECTION_INSTANCES的类型是LinkedHashMap<HConnectionKey,HConnectionImplementation>。同样注意红色部分的三行代码。第一行,根据conf信息创建了一个HConnectionKey的对象;第二行,去CONNECTION_INSTANCES中查找是否存在刚才创建的HConnectionKey;第三行,如果不存在,那么调用createConnection来创建一个HConnection的对象,否则直接返回刚才从Map中查找得到的HConnection对象
不嫌麻烦,再看一下HConnectionKey的构造函数和重写的hashCode函数,代码分别如下:
HConnectionKey(Configuration conf) { Map<String, String> m = new HashMap<String, String>(); if (conf != null) { for (String property : CONNECTION_PROPERTIES) { String value = conf.get(property); if (value != null) { m.put(property, value); } } } this.properties = Collections.unmodifiableMap(m); try { UserProvider provider = UserProvider.instantiate(conf); User currentUser = provider.getCurrent(); if (currentUser != null) { username = currentUser.getName(); } } catch (IOException ioe) { HConnectionManager.LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe); }
}
public int hashCode() { final int prime = 31; int result = 1; if (username != null) { result = username.hashCode(); } for (String property : CONNECTION_PROPERTIES) { String value = properties.get(property); if (value != null) { result = prime * result + value.hashCode(); } } return result; }
可以看到,hashCode函数被重写以后,其返回值实际上是username的hashCode函数的返回值,而username来自于currentuser,currentuser又来自于provider,provider是由conf创建的。可以看出,只要有相同的conf,就能创建出相同的username,也就能保证HConnectionKey的hashCode函数被重写以后,能够在username相同时返回相同的值。而CONNECTION_INSTANCES是一个LinkedHashMap,其get函数会调用HConnectionKey的hashCode函数来判断该对象是否已经存在。因此,getConnection函数的本质就是根据conf信息返回connection对象,对每一个内容相同的conf,只会返回一个connection
(2)调用createConnection方法来显式地创建connection,再使用connection来创建HTable对象。createConnection方法和Htable对应的构造函数分别如下:
public static HConnection createConnection(Configuration conf) throws IOException { UserProvider provider = UserProvider.instantiate(conf); return createConnection(conf, false, null, provider.getCurrent()); } static HConnection createConnection(final Configuration conf, final boolean managed,final ExecutorService pool, final User user)
throws IOException { String className = conf.get("hbase.client.connection.impl",HConnectionManager.HConnectionImplementation.class.getName()); Class<?> clazz = null; try { clazz = Class.forName(className); } catch (ClassNotFoundException e) { throw new IOException(e); } try { // Default HCM#HCI is not accessible; make it so before invoking. Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, boolean.class, ExecutorService.class, User.class); constructor.setAccessible(true); return (HConnection) constructor.newInstance(conf, managed, pool, user); } catch (Exception e) { throw new IOException(e); } }
public HTable(TableName tableName, HConnection connection) throws IOException { this.tableName = tableName; this.cleanupPoolOnClose = true; this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); this.pool = getDefaultExecutor(this.configuration); this.finishSetup(); }
可以看出,这样的话每次创建HTable对象,都需要创建一个新的HConnection对象,而不像方法(1)中那样共享一个HConnection对象。
那么,上述两种方法,在执行插入/删除/查找的时候,性能如何呢?先从代码角度分析一下。为了简便,先分析HTable在执行put(插入)操作时具体做的事情。
HTable的put函数如下:
public void put(final Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { doPut(put); if (autoFlush) { flushCommits(); } } private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { if (ap.hasError()){ writeAsyncBuffer.add(put); backgroundFlushCommits(true); } validatePut(put); currentWriteBufferSize += put.heapSize(); writeAsyncBuffer.add(put); while (currentWriteBufferSize > writeBufferSize) { backgroundFlushCommits(false); } } private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException { try { do { ap.submit(writeAsyncBuffer, true); } while (synchronous && !writeAsyncBuffer.isEmpty()); if (synchronous) { ap.waitUntilDone(); } if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); while (!writeAsyncBuffer.isEmpty()) { ap.submit(writeAsyncBuffer, true); } ap.waitUntilDone(); if (!clearBufferOnFail) { // if clearBufferOnFailed is not set, we‘re supposed to keep the failed operation in the // write buffer. This is a questionable feature kept here for backward compatibility writeAsyncBuffer.addAll(ap.getFailedOperations()); } RetriesExhaustedWithDetailsException e = ap.getErrors(); ap.clearErrors(); throw e; } } finally { currentWriteBufferSize = 0; for (Row mut : writeAsyncBuffer) { if (mut instanceof Mutation) { currentWriteBufferSize += ((Mutation) mut).heapSize(); } } } }
如红色部分所表示,调用顺序是put->doPut->backgroundFlushCommits->ap.submit,其中ap是类AsyncProcess的对象。因此追踪到AsynvProcess类,其代码如下:
public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException { submitLowPriority(rows, atLeastOne, false); } public void submitLowPriority(List<? extends Row> rows, boolean atLeastOne, boolean isLowPripority) throws InterruptedIOException { if (rows.isEmpty()) { return; } // This looks like we are keying by region but HRegionLocation has a comparator that compares // on the server portion only (hostname + port) so this Map collects regions by server. Map<HRegionLocation, MultiAction<Row>> actionsByServer = new HashMap<HRegionLocation, MultiAction<Row>>(); List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size()); long currentTaskCnt = tasksDone.get(); boolean alreadyLooped = false; NonceGenerator ng = this.hConnection.getNonceGenerator(); do { if (alreadyLooped){ // if, for whatever reason, we looped, we want to be sure that something has changed. waitForNextTaskDone(currentTaskCnt); currentTaskCnt = tasksDone.get(); } else { alreadyLooped = true; } // Wait until there is at least one slot for a new task. waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); // Remember the previous decisions about regions or region servers we put in the // final multi. Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>(); Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>(); int posInList = -1; Iterator<? extends Row> it = rows.iterator(); while (it.hasNext()) { Row r = it.next(); HRegionLocation loc = findDestLocation(r, posInList); if (loc == null) { // loc is null if there is an error such as meta not available. it.remove(); } else if (canTakeOperation(loc, regionIncluded, serverIncluded)) { Action<Row> action = new Action<Row>(r, ++posInList); setNonce(ng, r, action); retainedActions.add(action); addAction(loc, action, actionsByServer, ng); it.remove(); } } } while (retainedActions.isEmpty() && atLeastOne && !hasError()); HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker(); sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer, isLowPripority); } private HRegionLocation findDestLocation(Row row, int posInList) { if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null"); HRegionLocation loc = null; IOException locationException = null; try { loc = hConnection.locateRegion(this.tableName, row.getRow()); if (loc == null) { locationException = new IOException("#" + id + ", no location found, aborting submit for" + " tableName=" + tableName + " rowkey=" + Arrays.toString(row.getRow())); } } catch (IOException e) { locationException = e; } if (locationException != null) { // There are multiple retries in locateRegion already. No need to add new. // We can‘t continue with this row, hence it‘s the last retry. manageError(posInList, row, false, locationException, null); return null; } return loc; }
这样就真相大白了。HConnection在HTable的put操作中,只是起到一个定位RegionServer的作用,在这之后,操作都由RegionServer与cilent端交互。因此,只要client端不是非常频繁地切换region,调用HConnection的次数就应当远小于执行put操作的次数。这个结论在插入/查询/删除中是一致的。
代码分析完毕,简单做一个实验来验证上述论断:
环境:四台linux 64G服务器组成的HBase集群,连接速度平均5ms
实验代码如下:
public class TestHbaseConection { public static void main(String[] args) throws Exception{ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "10.172.1.16"); conf.set("hbase.zookeeper.property.clientPort", "2181"); //创建Hbase表的参数 String tableNamePrefix = "testTable"; String[] colNames = new String[2]; colNames[0] = "grad"; colNames[1] = "course"; for(int i=0;i<100;i++){ createTable(tableNamePrefix+i,colNames,conf); } for(int i=0;i<100;i++){ //通过共享connection来执行插入操作 new Thread(new WriteThread(conf,"CREATEWITHCONF",60000L,tableNamePrefix+i,colNames)).start(); //通过单独创建connection来执行插入操作 //new Thread(new WriteThread(conf,"CREATEWITHCONN",60000L,tableNamePrefix+i,colNames)).start(); } } public static void createTable(String tableName,String[] colNames,Configuration conf) { System.out.println("start create table "+tableName); try { HBaseAdmin hBaseAdmin = new HBaseAdmin(conf); if (hBaseAdmin.tableExists(tableName)) {// 如果存在要创建的表,那么先删除,再创建 hBaseAdmin.disableTable(tableName); hBaseAdmin.deleteTable(tableName); System.out.println(tableName + " is exist"); return; } HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); for(int i=0;i<colNames.length;i++) { tableDescriptor.addFamily(new HColumnDescriptor(colNames[i])); } hBaseAdmin.createTable(tableDescriptor); } catch (Exception ex) { ex.printStackTrace(); } System.out.println("end create table "+tableName); } } class WriteThread implements Runnable{ private Configuration conf; private String type; private long lifeTime; private String tableName; private String[] colNames; private String threadName; public WriteThread(Configuration conf,String type,long lifeTime,String tableName,String[] colNames){ this.conf = conf; this.type = type; this.lifeTime = lifeTime; this.tableName = tableName; this.colNames = colNames; } @Override public void run(){ threadName = Thread.currentThread().getName(); int count = 0; System.out.println(threadName+": started"); try { //create connection for each thread if (type.equals("CREATEWITHCONN")) { //create htable with connection directly HConnection conn = HConnectionManager.createConnection(conf); HTable table = new HTable(TableName.valueOf(tableName),conn); HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies(); long start = System.currentTimeMillis(); long end = System.currentTimeMillis(); while(end-start<=lifeTime){ Put put = generatePut(threadName,columnFamilies,count); table.put(put); count++; end = System.currentTimeMillis(); } conn.close(); } else if (type.equals("CREATEWITHCONF")) { //create htable with conf HTable table = new HTable(conf,tableName); HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies(); long start = System.currentTimeMillis(); long end = System.currentTimeMillis(); while(end-start<=lifeTime){ Put put = generatePut(threadName,columnFamilies,count); table.put(put); count++; end = System.currentTimeMillis(); } } else { return; } }catch(Exception ex) { ex.printStackTrace(); } System.out.println(threadName+": ended with operation num:"+count); } private Put generatePut(String threadName,HColumnDescriptor[] columnFamilies,int count){ Put put = new Put(Bytes.toBytes(threadName+"_"+count)); for (int i = 0; i < columnFamilies.length; i++) { String familyName = columnFamilies[i].getNameAsString(); //System.out.println("familyName:"+familyName); for(int j=0;j<colNames.length;j++){ if(familyName.equals(colNames[j])) { // grad列族put数据 String columnName = familyName+(int)(Math.floor(Math.random()*5+10*j)); String val = ""+columnName.hashCode()%100; put.add(Bytes.toBytes(familyName),Bytes.toBytes(columnName),Bytes.toBytes(val)); } } } return put; } }
简单来说就是先创建100张有两列的HBase表,然后分别采用getConnection策略和createConnection策略来写1分钟的数据,当然写几张表,写多久,写什么都可以调整。
测试了几次,使用getConnection策略时,每个线程每分钟写入量大概在2400~2800条左右;使用createConnection策略时,每个线程每分钟写入量大概在1200~1800条左右。注意此处实验时,为了防止线程之间抢夺资源,已经令它们在不同的region上(实际上是不同的表上)进行操作了。如果在同一个region上进行操作(稍微修改实验代码就能做到),则性能差别更为明显:getConnection每个线程每分钟写入量3500~5000,createConnection每个线程每分钟写入量1000~1200。总的来说,region越少,线程越多,getConnection策略越有利。猜想造成这种情况的原因是createConnection线程过多可能会导致服务端负载过大,即便是多个redionServer在负责具体的写操作,也仍旧会导致性能下降。还有一点值得注意的是,createConnection策略需要显式地关闭某个连接,否则它将持续地占有资源,甚至导致内存泄露。因此,建议大家在使用Java API与HBase交互时,尽量使用getConnection的办法去创建HTable对象,避免浪费资源。
标签:
原文地址:http://www.cnblogs.com/xczyd/p/5577124.html