标签:
ReentrantLock能够实现共享资源的互斥访问,但是它在某些条件下效率比较低下。比如,多个线程要查询(或者说读取)某列车的余票数,如果使用ReentrantLock,那么多个线程的查询操作只能互斥,也就是说一个线程查询完成下一个线程才能查询。显然这时如果并发的访问,既可以得到正确的结果也能提高效率,因为读取操作并不会改变数据的值。当某个线程要进行购票或者退票操作时(也就是写操作),这个时候才需要线程间的互斥,购票或者退票操作必须等待所有查询操作完成以后才能执行(同理,如果正在进行退票或者购票,那么其它线程查询操作也必须等待直到退票或者购票完成才能查询到正确的结果),而各个线程的退票和购票操作也必须互斥的进行才能保证余票的正确性。在此种需求下就需要通过ReadWriteReentrantLock来实现。
总结ReadWriteReentrantLock的三种情况
(1)读取操作是并发的
(2)不同线程之间的读取和写入是互斥的
(3)不同线程之间的写入和写入是互斥的
如果一个线程既要读取又要写入,那么应该直接获取写锁即可(或者先获取写锁,然后获取读锁,反之,若先获取读锁,再去获取写锁就会造成死锁现象)。
当一个线程先获取了写锁,然后获取了读锁,并在释放读锁前释放了写锁,那么该线程在释放写锁后就由写锁降级为读锁。
写锁才对应有Condition队列,而读锁没有。这就意味着,获取写锁后可以使用await和signal方方法,而获取读锁后不可以使用await和signal方法。
为了书写方便,在本博客中ReadWriteReentrantLock会简写成RWL。
下面的示例中定义了一个RWL_Demo类,它有三个数据成员,rwl、data、runTimes分别表示读写锁、共享数据、写操作的执行次数。在RWL_Demo的内部又定义两个内部类WriteProcess和ReadProcess,这两个内部类分别实现了Runnalbe接口,并在run方法内进行写入和读取操作。在main方法中创建了一个具有6个Worker的线程池对象,并向其中添加了4个ReadProcess对象和2个WriteProcess对象,通过运行结果我们可以发现读操作的并发性以及读写操作以及写写操作的互斥性。
package javaleanning; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantReadWriteLock; public class RWL_Demo { private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private volatile Integer data; private volatile int runTimes = 10; public class ReadProcess implements Runnable{ private String id; Random rd = new Random(); public ReadProcess(String id){ this.id = id; } @Override public void run() { while(runTimes > 0){ if(data != null){ rwl.readLock().lock(); System.out.println(id + " read data = " + data); try { Thread.sleep(rd.nextInt(500)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(id + " read over"); rwl.readLock().unlock(); } try { Thread.sleep(rd.nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class WriteProcess implements Runnable{ private String id; Random rd = new Random(); public WriteProcess(String id){ this.id = id; } @Override public void run() { while(runTimes > 0){ rwl.writeLock().lock(); data = new Integer(rd.nextInt(20)); System.out.println(id + " write data = " + data); try { Thread.sleep(rd.nextInt(500)); } catch (InterruptedException e) { e.printStackTrace(); } runTimes--; System.out.println(id + " write over"); rwl.writeLock().unlock(); try { Thread.sleep(rd.nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { RWL_Demo rwl_d = new RWL_Demo(); ExecutorService pool = Executors.newFixedThreadPool(6); pool.submit(rwl_d.new ReadProcess("R1")); pool.submit(rwl_d.new ReadProcess("R2")); pool.submit(rwl_d.new ReadProcess("R3")); pool.submit(rwl_d.new ReadProcess("R4")); pool.submit(rwl_d.new WriteProcess("W1")); pool.submit(rwl_d.new WriteProcess("W2")); pool.shutdown(); } }
运行结果
Code
情况1:写锁被占有时,队列中的前n个节点都是因需要加读锁处于等待状态的节点
当写锁被释放时,会唤醒第一个节点(即头节点的下一个节点)表示的线程,第一个节点获取读锁以后(注意是获取以后,而非是释放以后),会将头节点设置成第一个节点(并将前任头节点出列),然后检查它后面的节点是否是需要获取读锁,如果是继续将其唤醒。第二个节点获取读锁后(注意是获取以后,而非是释放以后),会将头节点设置成为第二个节点(并将前任头节点出列),然后检查它后面的节点是否是需要获取读锁……直到头节点的下一个节点是因为获取写锁而位于等待队列中才停止唤醒。
可以看出,读操作的节点是一个一个被唤醒的,第一个节点获取读锁后就会立刻唤醒第二个节点,而不是在释放读锁后,以此类推,,这样就形成了读操作的并发执行。
情况2:写锁被占有时,队列中头节点的下一个节点是写锁
这个情况和ReentrantLock的执行过程是一样的,在此不作赘述。
情况3:读锁被占有时,又有线程来获取读锁(非公平条件下)
这时要分两种情况来讨论
(1)头节点的下一个节点需要获取写锁
创建一个新的节点,并入列。这样做的目的是防止需要获取写锁的线程因为不断有线程获取读锁而被饿死。写线程需要正在运行的全部的读锁都被释放才能被唤醒。
(2) 除(1)以外的其它情况(比如队列为空或者队列中头节点的下一个节点需要获取读锁)
成功获取读锁,此时不会创建新的节点,当然也就不会有入列操作了。
情况4:读锁被占有时,有线程尝试获取写锁
没的商量,直接入列等待。
情况5:一个同时占有读写锁的线程释放了写锁
将头节点的下一个节点唤醒,如果这个节点需要获取读锁,那么获取成功,将自己设置为新的头节点;如果新的头节点的下一个节点也需要获取读锁,则将其唤醒,以此类推,直到碰到一个需要获取写锁的节点为止;如果这个节点需要获取写锁,那么被唤醒后这个线程又会将自身阻塞。
情况6:一个同时占有读写的锁线程释放了读锁
将读锁的计数器减1,然后判断读写锁的计数器是否都为0(即state的值是否为0),如果是,将头节点的下一个节点唤醒,然后从读锁的unlock方法中返回。
AQS重要数据成员
在ReentrantLock的源码分析中,AQS章节中介绍到AQS中有一个state字段(int类型,32位)用来描述是否有线程占有锁。在独占锁的时代这个值通常是0或者1(如果是重入的就是重入的次数)。ReadWriteLock的读、写锁是相关但是又不一致的,所以需要两个数来描述读锁(共享锁)和写锁(独占锁)的数量。显然现在一个state就不够用了。于是在ReentrantReadWrilteLock里面将这个字段一分为二,高位16位表示共享锁的数量,低位16位表示独占锁的数量(或者独占锁的重入数量)。2^16-1=65536,这就是上节中提到的为什么共享锁和独占锁的数量最大只能是65535的原因了。
那么我们为什么不使用,两个变量来描述来分别描述读线程的可重入数和写线程的可重入数呢?我们举个例子来说明,如果当前用writeState和readState两个变量的值来分别表示写锁和读锁的数量。假设当前writestate和readstate的值都为0,一个线程来获取写锁,它成功获取写锁的条件是:没有其它线程来获取读锁,并且自身在获取写锁的竞争中获胜。可用下面的示意代码实现。
if(readState == 0){ int c = writeState; if(c == 0 && compareAndSetState(c, c + 1)){ System.out.println("successfully get writeLock"); } }
但上面的代码有个致命的问题,就是判断readState == 0是成立的,但在成功执行compareAndSetState(c, c + 1)之前,readState已被获取读锁的线程修改为1,这就导致了两个线程,一个成功获取了读锁,一个成功获取了写锁,这显然会导致逻辑上的错误。如果由一个变量表示(分为高16位和低16位),那么在上述条件下,一个线程读锁的获取成功必然导致另一个线程写锁的获取失败,反之亦然。
RWL重要数据成员
private final Sync sync; private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock;
如果构造的是公平锁,sync就引用FairSync的对象,如果构造的是非公平锁sync就是NonfairSync对象的引用。
在RWL中,readerLock和writerLock是对两个内部类读锁(ReadLock)和写锁(WriteLock)的引用,虽然是两个不同的锁,但是对应了同一个AQS队列。
RWL内部类
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; /* * Read vs write count extraction constants and functions. * Lock state is logically divided into two unsigned shorts: * The lower one representing the exclusive (writer) lock hold count, * and the upper the shared (reader) hold count. */ static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } /** * A counter for per-thread read hold counts. * Maintained as a ThreadLocal; cached in cachedHoldCounter */ static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); } /** * ThreadLocal subclass. Easiest to explicitly define for sake * of deserialization mechanics. */ static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } ……//省略 …… }
内部类中的sharedCount和exclusiveCount是用来获取state中的读锁的数量和写锁的数量。读写锁为每个线程都分配了一个HoldCounter对象,它用来记录当前线程获取的读锁的数量。线程可以通过RWL的getReadHoldCount方法获取当前线程获取读锁的计数值。具体实现原理查阅ThreadLocal<T>类。
public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; /** * Constructor for use by subclasses * * @param lock the outer lock object * @throws NullPointerException if the lock is null */ protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } …… }
public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync; /** * Constructor for use by subclasses * * @param lock the outer lock object * @throws NullPointerException if the lock is null */ protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } …… }
通过读锁和写锁的构造函数可以看出,同一个RWL对象的读锁和写锁使用的是同一个AQS对象
构造方法
public ReentrantReadWriteLock() { this(false); } /** * Creates a new {@code ReentrantReadWriteLock} with * the given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
默认情况下构造非公平锁,由于非公平锁相对于公平锁吞吐量更大,在实际中运用较多,本文主要分析非公平读写锁的代码。
public void lock() { sync.acquire(1); }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
state的值不为0可以分为四种情况讨论,写锁数量不为0,读锁数量为0;读锁数量不为0,写锁数量为0;读写锁锁数量都不为0;读写锁数量都为0。写锁的数量通过exclusiveCount(c)得到。仅当读写锁数量都为0或者写锁数量不为0,并且当前已占有该锁时才可以去尝试获取锁。对于非公平锁,上述代码中writerShouldBlock方法总是返回假。
除了tryAcquire方法,整个过程和ReentrantLock方法时一样的,就不在赘述。
通过这段代码我们发现,不同的同步工具实际上仅仅是对状态值所表示的含义有不同的定义,基于这种定义,这个同步工具就需要决定什么时候可以去更新状态,而更新成功或者失败后都的操作实际上都交由AQS完成。
如果我们想要自定义一个独占模式的同步工具,我们只需要覆盖tryAcquire方法和tryRelease方法,并在该方法中决定什么情况设定状态成功,什么情况设定状态失败;什么情况释放状态成功,什么情况释放状态失败。
public void unlock() { sync.release(1); }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
除了tryAcquire方法,真个过程和ReentrantLock方法时一样的,就不在赘述。
在该方法中,首先将state值减1,然后获取它的低16位值。如果该值为0,说明可以成功释放写锁了。该函数返回true使得,unparkSuccessor方法得以执行。如果当前前线程是同时占有读写锁的情况下,先释放的写锁,被唤醒后的节点需要获取的是写锁,那么被唤醒的节点又会将自身阻塞。
public void lock() { sync.acquireShared(1); }
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
对于非公平锁,readerShouldBlock会调用apparentlyFirstQueuedIsExclusive。apparentlyFirstQueuedIsExclusive方法主要检查队列中头节点的下一个节点是否是独占模式(对于读写锁而言可以理解为需要获取写锁的节点)如果是,读线程获取读锁失败,如果不是记录下当前线程获取的读锁的数量。
final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // Make sure we‘re not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
fullTryAcquireShared 的代码相与tryAcquireShared相比有些雷同,tryAcquireShared中没有考虑到增加读锁数量的CAS操作compareAndSetState(c, c + SHARED_UNIT)失败的情况,而在fullTryAcquireShared中,如果操作compareAndSetState(c, c + SHARED_UNIT)失败的话,会不断尝试,直到成功(但是为什么不直接使用 fullTryAcquireShared 暂时还无法理解)。
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
我们仅仅分析一下setHeadAndPropagate方法,其它的代码和ReentrantLock中代码高度一致,此处不在分析。setHeadAndPropagate它的作用是,对于那些已经获取读锁并且已入列的节点,将自身设置新的头节点,然后判断新的头结点的下一个节点是否需要获取读锁,如果是,将其唤醒。代码中通过s.isShared()判断s是否是需要获取读锁,如果是,则唤醒它,唤醒操作封装在了doReleaseShared方法中。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don‘t know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
唤醒的线程又会从doAcquireShared中被阻塞的地方(parkAndCheckInterrupt中的park方法)的下一条语句开始执行,这就使得获取读锁的节点逐个的被唤醒(直到队列为空,或者碰到需要获取写锁的节点),被唤醒的线程获取了读锁后,并发的执行读操作。
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
我们现在来解释一下Node.PROPAGATE状态所表示的含义。当需要唤醒头节点的下一个节点时,如果头节点的waitStatus的值为0,说明头结点的下一个节点还没处于阻塞态(阻塞自己要分为四步,1让自己的prev指向前一个节点。2让前一个节点的next指向自身。3将前一个节点的waitStatus的值修由0改为Node.SIGNAL 4检查自身不是头节点的下一个节点,是的话阻塞自己)。所以此时我们就不必唤醒头节点的下一个节点,因为它可以成功获取锁而不会阻塞自己。我们需要做的就是将当前节点的waitStatus的值由0修改为Node.PROPAGATE,用它来告诉下一个节点,我不用来唤醒你了,你自己处理的来。
public void unlock() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
doReleaseShared方法的代码在锁释放的时候已经分析过,这里就不在展示它的代码
将读锁的计数器减1,然后判断读写锁的计数器是否都为0(即state的值是否为0),如果是,将头节点的下一个节点唤醒;否则不进行唤醒操作。然后从读锁的unlock方法中返回。
[1] http://my.oschina.net/adan1/blog/158107
[2] http://www.tuicool.com/articles/RJ3Eza2
通过ReadWriteReentrantLock源代码分析AbstractQueuedSynchronizer共享模式
标签:
原文地址:http://www.cnblogs.com/nullzx/p/5114009.html