标签:style blog http java color 使用
HBase提供基于单行数据操作的原子性保证/** * @param put * @param lockid * @param writeToWAL * @throws IOException * @deprecated row locks (lockId) held outside the extent of the operation are deprecated. */ public void put(Put put, Integer lockid, boolean writeToWAL) throws IOException { checkReadOnly(); // Do a rough check that we have resources to accept a write. The check is // 'rough' in that between the resource check and the call to obtain a // read lock, resources may run out. For now, the thought is that this // will be extremely rare; we'll deal with it when it happens. checkResources(); startRegionOperation(); this.writeRequestsCount.increment(); this.opMetrics.setWriteRequestCountMetrics(this.writeRequestsCount.get()); try { // We obtain a per-row lock, so other clients will block while one client // performs an update. The read lock is released by the client calling // #commit or #abort or if the HRegionServer lease on the lock expires. // See HRegionServer#RegionListener for how the expire on HRegionServer // invokes a HRegion#abort. byte [] row = put.getRow(); // If we did not pass an existing row lock, obtain a new one Integer lid = getLock(lockid, row, true); try { // All edits for the given row (across all column families) must happen atomically. internalPut(put, put.getClusterId(), writeToWAL); } finally { if(lockid == null) releaseRowLock(lid); } } finally { closeRegionOperation(); } }getLock调用了internalObtainRowLock:
private Integer internalObtainRowLock(final HashedBytes rowKey, boolean waitForLock) throws IOException { checkRow(rowKey.getBytes(), "row lock"); startRegionOperation(); try { CountDownLatch rowLatch = new CountDownLatch(1); // loop until we acquire the row lock (unless !waitForLock) while (true) { CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); if (existingLatch == null) { break; } else { // row already locked if (!waitForLock) { return null; } try { if (!existingLatch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { throw new IOException("Timed out on getting lock for row=" + rowKey); } } catch (InterruptedException ie) { // Empty } } } // loop until we generate an unused lock id while (true) { Integer lockId = lockIdGenerator.incrementAndGet(); HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey); if (existingRowKey == null) { return lockId; } else { // lockId already in use, jump generator to a new spot lockIdGenerator.set(rand.nextInt()); } } } finally { closeRegionOperation(); } }HBase行锁的实现细节推荐下:hbase源码解析之行锁
// Lock row Integer lid = getLock(lockId, get.getRow(), true); ...... // get and compare try { result = get(get, false); ...... //If matches put the new put or delete the new delete if (matches) { if (isPut) { internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); } else { Delete d = (Delete)w; prepareDelete(d); internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL); } return true; } return false; } finally { // release lock if(lockId == null) releaseRowLock(lid); }实现逻辑:加锁=>get=>比较=>put/delete
checkAndPut在实际应用中非常有价值,我们线上生成Dpid的项目,多个客户端会并行生成DPID,如果有一个客户端已经生成了一个DPID,则其他客户端不能生成新的DPID,只能获取该DPID
代码片段:
ret = hbaseUse.checkAndPut("bi.dpdim_mac_dpid_mapping", mac, "dim", "dpid", null, dpid); if(false == ret){ String retDpid = hbaseUse.query("bi.dpdim_mac_dpid_mapping", mac, "dim", "dpid"); if(!retDpid.equals(ABNORMAL)){ return retDpid; } }else{ columnList.add("mac"); valueList.add(mac); }
checkAndPut详细试用可以参考: HBaseEveryDay_Atomic_compare_and_set
Reference:
HBase - Apache HBase (TM) ACID Properties
hbase源码解析之行锁
HBase权威指南
HBaseEveryDay_Atomic_compare_and_set
标签:style blog http java color 使用
原文地址:http://blog.csdn.net/yfkiss/article/details/36438855