标签:this map ref base The run reference diff final
hbase mutation操作,比如delete put等,都需要先获取行锁,然后再进行操作,在获取行锁时,是通过HRegion.getRowLockInternal(byte[] row, boolean waitForLock)进行的,因此,我们先大体浏览一下这个方法的流程,如下。可以看到,该方法中主要涉及到行锁相关的内容为RowLock和RowLockContext两个类。这两个都是HRegion的内部类,下面详细看一下这两个类是咋实现的。
protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
HashedBytes rowKey = new HashedBytes(row);
RowLockContext rowLockContext = new RowLockContext(rowKey);
// loop until we acquire the row lock (unless !waitForLock)
while (true) {
RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
if (existingContext == null) {
// Row is not already locked by any thread, use newly created context.
break;
} else if (existingContext.ownedByCurrentThread()) {
// Row is already locked by current thread, reuse existing context instead.
rowLockContext = existingContext;
break;
} else {
if (!waitForLock) {
return null;
}
try {
// Row is already locked by some other thread, give up or wait for it
if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
throw new IOException("Timed out waiting for lock for row: " + rowKey);
}
} catch (InterruptedException ie) {
LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
}
}
}
// allocate new lock for this thread
return rowLockContext.newLock();
}
首先看RowLock类,该类主要逻辑是release方法,是用来释放行锁的。同时有一个布尔类型参数release,默认为false,代表该行锁是否被释放掉了。
public static class RowLock {
@VisibleForTesting final RowLockContext context;
private boolean released = false;
@VisibleForTesting RowLock(RowLockContext context) {
this.context = context;
}
/**
* Release the given lock. If there are no remaining locks held by the current thread
* then unlock the row and allow other threads to acquire the lock.
* @throws IllegalArgumentException if called by a different thread than the lock owning thread
*/
public void release() {
if (!released) {
context.releaseLock();
released = true;
}
}
}
但是在RowLock中,并没有看到实际涉及到锁的信息,这是咋回事呢,别急,细细看下release方法,里面有一个context,是RowLockContext类型。同时其构造方法中也传了一个context对象,因此怀疑是在RowLockContext中new出了一个rowlock,进RowLockContext中看下:
@VisibleForTesting class RowLockContext {
private final HashedBytes row;
//通过计数以及CountDownLatch实现对行锁的condition。这里之所以将countdownlatch设置为一,是因为hbase自己也不知道到底有多少condition来竞争锁,所以加一个计数lockCount,
//当lockCount为零时,再把latch.coutDown。否则会在getRowLockInternal中await。
private final CountDownLatch latch = new CountDownLatch(1);
private final Thread thread;
private int lockCount = 0;
RowLockContext(HashedBytes row) {
this.row = row;
this.thread = Thread.currentThread();
}
boolean ownedByCurrentThread() {
return thread == Thread.currentThread();
}
RowLock newLock() {
lockCount++;
return new RowLock(this);
}
void releaseLock() {
if (!ownedByCurrentThread()) {
throw new IllegalArgumentException("Lock held by thread: " + thread
+ " cannot be released by different thread: " + Thread.currentThread());
}
lockCount--;
if (lockCount == 0) {
// no remaining locks by the thread, unlock and allow other threads to access
RowLockContext existingContext = lockedRows.remove(row);
if (existingContext != this) {
throw new RuntimeException(
"Internal row lock state inconsistent, should not happen, row: " + row);
}
latch.countDown();
}
}
}
通过计数以及CountDownLatch实现对行锁的condition。这里之所以将countdownlatch设置为一,是因为hbase自己也不知道到底有多少condition来竞争锁,所以加一个计数lockCount,
当lockCount为零时,再把latch.coutDown。否则会在getRowLockInternal中await。
在HRegion中还有一个关键的成员变量: lockedrows,用来存储当前已经获取了行锁的所有行信息,key为rowkey,value为RowLockContext。
// map from a locked row to the context for that lock including:
// - CountDownLatch for threads waiting on that row
// - the thread that owns the lock (allow reentrancy)
// - reference count of (reentrant) locks held by the thread
// - the row itself
private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
new ConcurrentHashMap<HashedBytes, RowLockContext>();
好啦,行锁涉及到的内容,我们都大体浏览了,再从getRowLockInternal中开始通一遍逻辑:
上面是获取行锁的流程,释放行锁呢,是通过HRegion的releaseRowLocks方式实现,我们看下代码:
/**
* If the given list of row locks is not null, releases all locks.
*/
public void releaseRowLocks(List<RowLock> rowLocks) {
if (rowLocks != null) {
for (RowLock rowLock : rowLocks) {
rowLock.release();
}
rowLocks.clear();
}
}
可见是调用RowLock.release实现,该方法代码在上面有,具体的逻辑如下:
在lockedrows中将该行锁删除。
判断release是否为false,如果为false,则调用context.releaseLock,context.releaseLock逻辑如下
首先判断释放该行锁的线程是否是该行锁的持有者,若不是则抛出异常
将count--;
如果count==0了,则直接调用latch.countDown,这个方法会触发其他线程去获取行锁。当count==0了也就是说该线程已经不需要改行锁,已经释放
将release设置为true。
注意:
这里在getRowLockInternal中,只要lockedRows.putIfAbsent(rowKey, rowLockContext)成功,其他线程将不会获取成功,由concurrentMap保证。
标签:this map ref base The run reference diff final
原文地址:https://www.cnblogs.com/Evil-Rebe/p/11323010.html