标签:style blog http java color 使用
HBase提供基于单行数据操作的原子性保证 /**
* @param put
* @param lockid
* @param writeToWAL
* @throws IOException
* @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
*/
public void put(Put put, Integer lockid, boolean writeToWAL)
throws IOException {
checkReadOnly();
// Do a rough check that we have resources to accept a write. The check is
// 'rough' in that between the resource check and the call to obtain a
// read lock, resources may run out. For now, the thought is that this
// will be extremely rare; we'll deal with it when it happens.
checkResources();
startRegionOperation();
this.writeRequestsCount.increment();
this.opMetrics.setWriteRequestCountMetrics(this.writeRequestsCount.get());
try {
// We obtain a per-row lock, so other clients will block while one client
// performs an update. The read lock is released by the client calling
// #commit or #abort or if the HRegionServer lease on the lock expires.
// See HRegionServer#RegionListener for how the expire on HRegionServer
// invokes a HRegion#abort.
byte [] row = put.getRow();
// If we did not pass an existing row lock, obtain a new one
Integer lid = getLock(lockid, row, true);
try {
// All edits for the given row (across all column families) must happen atomically.
internalPut(put, put.getClusterId(), writeToWAL);
} finally {
if(lockid == null) releaseRowLock(lid);
}
} finally {
closeRegionOperation();
}
}getLock调用了internalObtainRowLock: private Integer internalObtainRowLock(final HashedBytes rowKey, boolean waitForLock)
throws IOException {
checkRow(rowKey.getBytes(), "row lock");
startRegionOperation();
try {
CountDownLatch rowLatch = new CountDownLatch(1);
// loop until we acquire the row lock (unless !waitForLock)
while (true) {
CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
if (existingLatch == null) {
break;
} else {
// row already locked
if (!waitForLock) {
return null;
}
try {
if (!existingLatch.await(this.rowLockWaitDuration,
TimeUnit.MILLISECONDS)) {
throw new IOException("Timed out on getting lock for row=" + rowKey);
}
} catch (InterruptedException ie) {
// Empty
}
}
}
// loop until we generate an unused lock id
while (true) {
Integer lockId = lockIdGenerator.incrementAndGet();
HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
if (existingRowKey == null) {
return lockId;
} else {
// lockId already in use, jump generator to a new spot
lockIdGenerator.set(rand.nextInt());
}
}
} finally {
closeRegionOperation();
}
}HBase行锁的实现细节推荐下:hbase源码解析之行锁 // Lock row
Integer lid = getLock(lockId, get.getRow(), true);
......
// get and compare
try {
result = get(get, false);
......
//If matches put the new put or delete the new delete
if (matches) {
if (isPut) {
internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
} else {
Delete d = (Delete)w;
prepareDelete(d);
internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
}
return true;
}
return false;
} finally {
// release lock
if(lockId == null) releaseRowLock(lid);
}实现逻辑:加锁=>get=>比较=>put/deletecheckAndPut在实际应用中非常有价值,我们线上生成Dpid的项目,多个客户端会并行生成DPID,如果有一个客户端已经生成了一个DPID,则其他客户端不能生成新的DPID,只能获取该DPID
代码片段:
ret = hbaseUse.checkAndPut("bi.dpdim_mac_dpid_mapping", mac, "dim",
"dpid", null, dpid);
if(false == ret){
String retDpid = hbaseUse.query("bi.dpdim_mac_dpid_mapping", mac, "dim", "dpid");
if(!retDpid.equals(ABNORMAL)){
return retDpid;
}
}else{
columnList.add("mac");
valueList.add(mac);
}
checkAndPut详细试用可以参考: HBaseEveryDay_Atomic_compare_and_set
Reference:
HBase - Apache HBase (TM) ACID Properties
hbase源码解析之行锁
HBase权威指南
HBaseEveryDay_Atomic_compare_and_set
标签:style blog http java color 使用
原文地址:http://blog.csdn.net/yfkiss/article/details/36438855