资源池内部维护一个下标,获取资源时,下标对资源数量(size())求余并自增一,返回对应的资源(资源并没有被从资源池中移除)。使用轮询的方式可能会导致某一资源被外部(线程)同时使用,需要注意资源线程安全问题。
该类继承自ThreadLocal,顾名思义,该资源池与线程有着紧密的联系,资源池中的资源数目取决于使用线程往资源池中添加资源的线程数目,每个线程仅可以使用属于自己线程的那个资源。
private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<ThreadLocalPool<?>, AtomicInteger>();
poolSizes 用来记录各个资源池实例中各自的资源数目(同一程序中可以有多个不同的资源池)。
资源的获取与归还主要利用ThreadLocal中的get、set方法,代码如下:
@Override
public R put(R resource) {
R previousResource = get();
if (previousResource == null) {
AtomicInteger poolSize = poolSizes.get(this);
if (poolSize == null) {
poolSizes.put(this, poolSize = new AtomicInteger(0));
}
poolSize.incrementAndGet();
}
this.set(resource);
return previousResource;
}
get、set中间的代码个人认为仅仅完成资源池审计功能。
PoolMap
HTablePool的资源池设计是以表名称(String、byte[])为单位的,即对HBase中的每一张表维护着各自的连接池,因此在Pool之上有了PoolMap,实际使用中PoolMap的Key即为表名称。
PoolMap内部维护着一个名为pools的变量,
private Map<K, Pool<V>> pools = new ConcurrentHashMap<K, Pool<V>>();
其中,Key为表名称,Value即为对应着的资源池实例。
请求资源代码如下:
@Override
public V get(Object key) {
Pool<V> pool = pools.get(key);
return pool != null ? pool.get() : null;
}
public V put(K key, V value) {
Pool<V> pool = pools.get(key);
if (pool == null) {
pools.put(key, pool = createPool());
}
return pool != null ? pool.put(value) : null;
}
首先根据key(表名称)找到对应的资源池实例pool,然后通过该资源池实例pool添加资源,如果相应的资源池实例不存在,则创建并维护对应关系,创建代码如下:
protected Pool<V> createPool() {
switch (poolType) {
case Reusable:
return new ReusablePool<V>(poolMaxSize);
case RoundRobin:
return new RoundRobinPool<V>(poolMaxSize);
case ThreadLocal:
return new ThreadLocalPool<V>();
}
return null;
}
根据poolType(ReusablePool、RoundRobinPool、ThreadLocalPool)创建相应类型的资源池。
在Pool、PoolMap的基础上,我们可以开始研究HTablePool的实现原理。
HTablePool
该类内部维护着两个重要变量:
private final PoolMap<String, HTableInterface> tables;
......
private final HTableInterfaceFactory tableFactory;
其中,tables维护着某表对应的连接资源(即HTable实例),tableFactory用以创建、释放HTable实例。
HTableInterfaceFactory拥有一个实例类HTableFactory,代码如下:
public class HTableFactory implements HTableInterfaceFactory {
@Override
public HTableInterface createHTableInterface(Configuration config,
byte[] tableName) {
try {
return new HTable(config, tableName);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
@Override
public void releaseHTableInterface(HTableInterface table) throws IOException {
table.close();
}
}
HTableFactory工作过程比较简单,创建、释放(关闭)HTable实例。
请求某表连接资源(HTable实例)代码如下:
public HTableInterface getTable(String tableName) {
// call the old getTable implementation renamed to findOrCreateTable
HTableInterface table = findOrCreateTable(tableName);
// return a proxy table so when user closes the proxy, the actual table
// will be returned to the pool
return new PooledHTable(table);
}
private HTableInterface findOrCreateTable(String tableName) {
HTableInterface table = tables.get(tableName);
if (table == null) {
table = createHTable(tableName);
}
return table;
}
protected HTableInterface createHTable(String tableName) {
return this.tableFactory.createHTableInterface(config,Bytes.toBytes(tableName));
}
(1)根据表名称查询或创建HTableInterface(HTable实现该接口)实例,由方法findOrCreateTable完成;
(2)tables中含有该表名称对应的实例,则直接返回,否则通过方法createHTable创建(即通过tableFactory创建)后返回;
(3)使用代理模式对HTableInterface实例进行包装,将包装后的实例(即PooledHTable实例)返回给调用者。
PooledHTable通过代理模式将请求全部发送至内部HTable实例,但有一个方法请求例外,即close方法,
class PooledHTable implements HTableInterface {
private HTableInterface table; // actual table implementation
public PooledHTable(HTableInterface table) {
this.table = table;
}
/**
* Returns the actual table back to the pool
*
* @throws IOException
*/
public void close() throws IOException {
returnTable(table);
}
@Override
public RowLock lockRow(byte[] row) throws IOException {
return table.lockRow(row);
}
}
该方法不再通过HTableInterface close方法执行关闭操作,而是将实例table返回至相应的资源池中,代码如下:
private void returnTable(HTableInterface table) throws IOException {
// this is the old putTable method renamed and made private
String tableName = Bytes.toString(table.getTableName());
if (tables.size(tableName) >= maxSize) {
// release table instance since we‘re not reusing it
this.tables.remove(tableName, table);
this.tableFactory.releaseHTableInterface(table);
return;
}
tables.put(tableName, table);
}
如果相应的表名称的资源池中的资源数目已经达到配额限制,则将该资源从资源池中移除(某些资源池中实现并没有将资源实际移除资源池,参考前面分析),否则根据对应关系将资源归还即可。