标签:after 后继节点 可见性 dex 返回 max cached 信息 loading
特点:
功能:
class CachedData{
Object data; // 缓存的数据
volatile boolean cacheValid; // 缓存有效性标识
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData(){
rwl.readLock().lock(); // 先获取读锁
if(!cacheValid){ // 若缓存过期,释放读锁,获取写锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try{
if(!cacheValid){ // 对缓存进行更新
// 数据更新操作
cacheValid = true;
}
rwl.readLock().lock(); // 获得读锁
} finally{
rwl.writeLock().unlock(); // 释放写锁
}
}
try{
// 使用缓存数据
} finally{
rwl.readLock().unlock(); // 获取读锁
}
}
}
在使用集合的情况下,当集合很大,并且读线程远多于写线程时.
class RWDictoary{
private final Map<String,Data> m = new TreeMap<String,Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key){
r.lock();
try{ return m.get(key);}
finally{r.unlock();}
}
public String[] allKeys(){
r.lock();
try{ return m.keySet().toArray();}
finally { r.unlock();}
}
public Data put(String key, Data value){
w.lock();
try{return m.put(key,value);}
finally{w.unlock();}
}
public void clear(){
w.lock();
try{m.clear();}
finally{w.unlock();}
}
}
state
)维护多个读线程和一个写线程的状态.state
的高16位表示读,低16位表示写.包含两把锁:
readerLock
:读锁writerLock
:写锁// ReentrantReadWriteLock中的内部类中的lock()
public void lock() {
sync.acquire(1);
}
// AbstractQueuedSynchronizer中的acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// ReentrantReadWriteLock中内部类Sync的tryAcquire()
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
流程:
获取写锁:
注:
对于writeShouldBlock()
表示是否需要阻塞,NofairSync
和FairSync
的实现不同:
NofairSync
:直接返回false,不需要阻塞,可以插队.FairSync
:若有前驱节点,则返回true,需要阻塞,否则返回false.// ReentrantReadWriteLock中WriteLock的unlock()
public void unlock() {
sync.release(1);
}
// AbstractQueuedSynchronizer中的release()
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// ReentrantReadWriteLock中的内部类Sync中的tryRelease()
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
// ReentrantReadWriteLock中内部类Sync中的unparkSuccessor()
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
流程:
尝试释放锁:
尝试释放锁:
state
:唤醒后继线程:
释放写锁:
// ReentrantReadWriteLock中内部类ReadLock中的lock()
public void lock() {
sync.acquireShared(1);
}
// AQS的acquireShared()
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// ReentrantReadWriteLock的内部类Sync的tryAcquireShared()
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we‘re not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
// AQS的doAcquireShared()
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
流程:
首先试图获取读锁:
firstReader
.若读锁获取失败,则:
// ReentrantReadWriteLock的内部类ReadLock中的unlock()
public void unlock() {
sync.releaseShared(1);
}
// AQS中的releaseShared()
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// ReentrantReadWriteLock的内部类Sync中的tryReleaseShared()
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 释放读锁对于读线程没有影响,但是若读写锁均被释放,则写线程可以得到处理
return nextc == 0;
}
}
// AQS的doReleaseShared()
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
流程:
state
,更新成功则返回。锁降级:写锁降级为读锁。
流程:线程持有写锁的同时,获取到读锁,获取成功后释放写锁。
必要性:为了保证数据的可见性。若当前线程不获取读锁而是直接释放写锁,则其他线程获取写锁并修改数据时,当前线程无法感知到数据的修改。而使用锁降级,则其他线程会被阻塞,直到当前线程已经获取读锁后才能有可能获得写锁。
注:ReentrantReadWriteLock不支持锁升级(读锁==>写锁),为了保证数据的可见性。
参考:
标签:after 后继节点 可见性 dex 返回 max cached 信息 loading
原文地址:https://www.cnblogs.com/truestoriesavici01/p/13235964.html