标签:
由于本人水平与表达能力有限,有错误的地方欢迎交流与指正。
可重入读写锁时基于AQS实现的,典型的使用方法如JDK1.7中的示例:
class RWDictionary { private final Map<String, Data> m = new TreeMap<String, Data>(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); public Data get(String key) { r.lock(); try { return m.get(key); } finally { r.unlock(); } } public String[] allKeys() { r.lock(); try { return m.keySet().toArray(); } finally { r.unlock(); } } public Data put(String key, Data value) { w.lock(); try { return m.put(key, value); } finally { w.unlock(); } } public void clear() { w.lock(); try { m.clear(); } finally { w.unlock(); } } }}
读锁使用的是AQS的共享模式,不会阻塞读锁,但是会阻塞写锁;写锁使用的是AQS的独占模式,读写锁都会被阻塞。读写锁是共用了一个Sync(AQS类),也就是说AQS中独占模式和共享模式是并存的。
Sync有两种实现方式,公平(FairSync)和非公平(NonfairSync)。公平的含义是指如果AQS的队列中有等待线程,则当前线程直接就放弃尝试获取锁,自觉的排队了;而非公平方式不一样,当前线程还是要尝试一下(仅仅一下,和AQS队列中第一个结点竞争获取锁),如果成功了,相当于插队成功了,但是如果失败了(就是tryAcquire或tryAcquireShared失败),还是要乖乖的排到最后去。
下面的是整个类的结构图:
Sync是个AQS类,它有两个子类FairSync和NonfairSync。ReadLock和WriteLock里有个成员变量sync(指向同个变量,FaireSync或NonfaireSync类型)。(UML图不是很熟,就这样文字描述了)
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { /** Inner class providing readlock */ private final ReentrantReadWriteLock.ReadLock readerLock; /** Inner class providing writelock */ private final ReentrantReadWriteLock.WriteLock writerLock; /** Performs all synchronization mechanics */ final Sync sync; /** * false,默认锁是非公平的 */ public ReentrantReadWriteLock() { this(false); } /** * Creates a new {@code ReentrantReadWriteLock} with * the given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
主类有三个重要的成员变量:读锁、写锁和同步器。从构造函数可以看出读写锁的同步器默认是非公平的(NonfaireSync)。
实现了Lock接口,并使用sync成员变量实现加锁、解锁功能
public void lock() { sync.acquireShared(1); }
以AQS共享模式获取锁,参数值为1。如果系统中没有线程占有写锁,那么这个函数很快就会返回;否则,当前线程会一直阻塞,直至获取到锁。
public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
以AQS共享模式获取锁,参数值为1。如果系统中没有线程占有写锁,那么这个函数很快就会返回;否则,当前线程会一直阻塞,直至获取到锁或被其他线程中断。
public boolean tryLock() { return sync.tryReadLock(); }
直接调用了sync的tryReadLock方法,这个方法和sync. tryAcquireShared基本一直(少了一个readerShouldBlock判断)。
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
tryLock函数尝试获取锁,直到获取到或超时或被中断。如果当前没有线程占用写锁,会立即返回成功。和3.3的tryLock不一样,这个公平模式下会排队的。如果你不想排队又想支持超时,可以这么写代码:if(lock.tryLock() || lock.tryLock(timeout, unit) ) { ... }。
public Condition newCondition() { throw new UnsupportedOperationException(); }
读锁不支持条件队列的。
实现了Lock接口,并使用sync成员变量实现加锁、解锁功能
public void lock() { sync.acquire(1); }
以AQS独占方式获取写锁,如果当前没有线程占有读锁和写锁,该函数会立即返回;如果当前线程已经获取写锁了,holdCount的值会加1;否则,会阻塞直至获取到锁。注意:如果当前线程已经获取到读锁了,紧接着就获取写锁就会死锁了。
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
以AQS独占方式 获取写锁直到被中断。其他跟lock一致。
public boolean tryLock( ) { return sync.tryWriteLock(); }
直接调用了sync的tryWriteLock方法,这个方法和sync. tryAcquire基本一直(少了一个writerShouldBlock判断)。不管在公平模式还是非公平模式下,都不用排队。
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
tryLock函数尝试获取锁,直到获取到或超时或被中断。如果没有其他线程占有读锁和写锁,立马返回true。和4.3不一样,公平模式下会排队的。如果你又想插队又想支持超时,可以这么写:if(lock.tryLock() || lock.tryLock(timeout, unit) ) { ... }。
public Condition newCondition() { return sync.newCondition(); }
public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); }
当前线程是否占有写锁。
public int getHoldCount() { return sync.getWriteHoldCount(); }
当前线程占有几个写锁。
读锁和写锁公用的同步器,有两个版本:公平的和非公平的。
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; /* * AQS的state字段被拆成两部分了:高16位表示获取读锁的次数,低16位表示 * 获取写锁的次数 */ static final int SHARED_SHIFT = 16; // 每次线程获取读锁成功就会执行state+=SHARED_UNIT操作,不是+1因为 // 高16位表示获取读锁的次数。 static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 允许读或写获取锁的最大次数,都是65535 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** 获取当前读锁的总数 */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** 获取当前写锁的总数 */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } /** * A counter for per-thread read hold counts. * Maintained as a ThreadLocal; cached in cachedHoldCounter */ static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = Thread.currentThread().getId(); } /** * 每个线程都绑定一个HoldCounter对象 */ static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } /** * The number of reentrant read locks held by current thread. * Initialized only in constructor and readObject. * Removed whenever a thread's read hold count drops to 0. */ // 第一个获取读锁线程的HoldCounter没有在里面管理,而是通过firstReader // 和firstReaderHoldCount两个变量维护的。 private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } /* * Acquires and releases use the same code for fair and * nonfair locks, but differ in whether/how they allow barging * when queues are non-empty. */ /** * Returns true if the current thread, when trying to acquire * the read lock, and otherwise eligible to do so, should block * because of policy for overtaking other waiting threads. */ abstract boolean readerShouldBlock(); /** * Returns true if the current thread, when trying to acquire * the write lock, and otherwise eligible to do so, should block * because of policy for overtaking other waiting threads. */ abstract boolean writerShouldBlock();
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // c!=0&&w==0说明有线程(包括当前线程)持有读锁,直接返回false // c!=0&&w!=0&¤t!=当前持有锁的线程,直接返回false // 注意:如果一个线程先获取读锁,紧接着获取写锁时会死锁的 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 写锁数量超过65535,直接抛异常了 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 走到这边说明这个锁是重入了的,不需要CAS了,直接设置state setState(c + acquires); return true; } // c==0或者重入的,如果写需要阻塞或者CAS设置state失败,直接返回false if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 设置独占线程标识 setExclusiveOwnerThread(current); return true; }
tryAcquire 函数是尝试获取写锁:1.如果有读线程或者写线程且不是当前线程,直接失败;2.如果写锁的count超过了65535,直接失败;3.否则,这个线程能够拥有锁(eligible),队列策略允许(writerShouldBlock返回false)或者是重入的锁。
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // state的高16位和低16位肯定都是大于releases的,可以直接相减 int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; // 如果当前写锁释放后没有写锁了,置空独占标识 if (free) setExclusiveOwnerThread(null); // 设置新的state setState(nextc); return free; }
tryRelease函数一般情况下就是释放写锁,但是也有不一般的情况,就是在Condition中调用tryRelease,因此,releases参数可能会包含读和写两个锁的信息。
protected final int tryAcquireShared(int unused) { // 获取当前线程和state值 Thread current = Thread.currentThread(); int c = getState(); // 如果有线程持有写锁且该线程不是当前线程,直接返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); // 为啥等于0的时候要set下?因为release时,count=0会执行readHolds.remove // 方法,但是不会清空cachedHoldCounter。此时,同个线程再成功获取读锁时count // 的值是0且readHolds已经为空了,因此要重新set下。 else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
tryAcquireShared函数的参数(unused)没有用,主要是尝试获取读锁:
1.如果有线程持有写锁,直接返回失败;
2.否则,继续判断,如果队列策略允许(readerShouldBlock返回false)获取锁且CAS设置state成功,则设置读锁count的值。这一步并没有检查读锁重入的情况,被延迟到fullTryAcquireShared里了,因为大多数情况下不是重入的;
3.如果步骤2失败了,或许是队列策略返回false或许是CAS设置失败了等,则执行fullTryAcquireShared。
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; // 是一个循环 for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { // 有线程持有写锁且不是当前线程,直接失败 if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. // 如果队列策略不允许,需要检查是否是读锁重入的情况。队列策略是否允许,分两种情况: // 1.公平模式:如果当前AQS队列前面有等待的结点,返回false;2.非公平模式:如果 // AQS前面有线程在等待写锁,返回false(这样做的原因是为了防止写饥饿)。 } else if (readerShouldBlock()) { // 如果当前线程是第一个获取读锁的线程,则有资格获取读锁 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { // 优先赋值成上一次获取读锁成功的cache,如果发现线程tid和当前线程不相等,在从 // ThreadLocal里获取 rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) { rh = readHolds.get(); if (rh.count == 0) // 帮助GC readHolds.remove(); } } // 说明不是读锁重入的情况,直接返回失败了 if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 设置当前线程为第一个获取读锁的线程 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; // 说明当前线程为第一个获取读锁的线程 } else if (firstReader == current) { firstReaderHoldCount++; } else { // 其他获取读锁成功的情况 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
fullTryAcquireShared函数处理上面tryAcquireShared中没有处理的读锁重入的问题或者CAS设置失败。其实,这函数代码和tryAcquireShared有些重复,但是把处理读锁重入的问题从tryAcquireShared中分离出来了。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 如果当前线程是第一个获取读锁的,需要特殊处理(第一个reader不是用 // HoldCounter类型变量保存的) if (firstReader == current) { // assert firstReaderHoldCount > 0; // 如果firstReaderHoldCount<=0也没报错啊(不会出现这种情况的)。 // 因为firstReaderHoldCount==1时,firstReader就是null了,条件 // firstReader==current就不会成立了 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 如果当前线程是最后一个获取锁的,就是cachedHoldCounter了 // 如果也不是最后一个获取锁的,就要从threadlocal里取了 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); int count = rh.count; // count==1的话直接执行readHolds.remove;count<=0就报错 if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. // mark:当且仅当所有的读和写线程都释放锁了,才会返回true return nextc == 0; } }
tryReleaseShared函数的作用就是释放读锁,需要注意两点:1、如果一个线程没有获取过读锁,执行release方法会报异常;2、当且仅当所有的读和写线程都释放了,这个函数才会返回true。
final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); // 如果有线程持有读锁(自己也不行)直接返回false // 如果不是重入获取写锁的直接返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } // c==0或者写锁重入 if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true; }
tryWriteLock函数会被写锁调用,和tryAcquire基本一致除了少调用一个writerShouldBlock函数,公平和非公平两种模式都允许插队(barging in)相当于是非公平模式了。
final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); // 如果有线程持有写锁且该线程不是当前线程,直接返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } }
tryReadLock函数被读锁调用(tryLock),和tryAcquireShared函数基本一致,除了少调用一个readerShouldBlock方法,公平和非公平两种模式都允许插队(barging in)相当于是非公平模式了。
static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; // 写线程无条件插队 final boolean writerShouldBlock() { return false; // writers can always barge } final boolean readerShouldBlock() { // 为了方式写线程饥饿的情况,如果AQS等待队里的第一个线程是独占的, // 当前读线程就阻塞。 return apparentlyFirstQueuedIsExclusive(); } }
非公平锁的实现类。
static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; // 如果AQS队列里有等待的线程,当前线程就阻塞 final boolean writerShouldBlock() { return hasQueuedPredecessors(); } // 如果AQS队列里有等待的线程,当前线程就阻塞 final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }
公平锁的实现类writerShouldBlock和readerShouldBlock实现方式完全一样,只要队列里有其他线程在等待,当前线程就阻塞,FIFO模式。
Java 1.7 ReentrantReadWriteLock源码解析
标签:
原文地址:http://blog.csdn.net/yuenkin/article/details/51020362