1. Semaphore
Semaphore和ReentrantReadWriteLock.ReadLock(读锁)都采用AbstractOwnableSynchronizer中共享排队的方式实现
关于AbstractQueuedSynchronizer中的独占锁和共享锁,请参考ReentrantLock(http://www.cnblogs.com/bjorney/p/8040085.html)和ReentrantReadWriteLock(http://www.cnblogs.com/bjorney/p/8064268.html)
问题:Semaphore在acquire时不检查传入的参数是否超过state最大值??? 例如,new Semaphore(5, ture || false) -> acquire(10),则调用acquire的线程将被阻塞
1)在公平模式下,之后所有调用acquire的线程都将在SyncQueue中排队而永远阻塞???
2)在非公平模式下,之后所有acquire失败而参与排队的线程都将永远阻塞???
public class Semaphore implements java.io.Serializable { private final Sync sync; public Semaphore(int permits) { sync = new NonfairSync(permits); // 公平竞争 } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); // 公平竞争 || 非公平竞争 } public void acquire() throws InterruptedException; public void acquire(int permits) throws InterruptedException { // permits必须>=0 if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public void acquireUninterruptibly(); public void acquireUninterruptibly(int permits) { // permits必须>=0 if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); } public boolean tryAcquire(); public boolean tryAcquire(int permits) { // permits必须>=0 if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } public boolean tryAcquire(long timeout, TimeUnit unit); public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { // permits必须>=0 if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); } public void release(); public void release(int permits) { // permits必须>=0 if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } ... ... }
2. Semaphore.Sync
abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { setState(permits); } final int nonfairTryAcquireShared(int acquires) { // 尝试非公平取锁 for (;;) { // CAS(state)失败将回到此处 int available = getState(); /*记录state*/ int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) //remaining >= 0时/*CAS设置state -= acquires*/ return remaining; // remaining < 0:SyncQueue中排队 } } protected final boolean tryReleaseShared(int releases) { for (;;) { // CAS(state)失败将回到此处 int current = getState(); /*记录state*/ int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) /*CAS设置state += releases*/ return true; } } ... ... } static final class NonfairSync extends Sync { NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { // 尝试非公平取锁 return nonfairTryAcquireShared(acquires); } } static final class FairSync extends Sync { FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { // 尝试公平取锁 for (;;) { // CAS(state)失败将回到此处 if (hasQueuedPredecessors()) // SyncQueue为空 || SyncQueue中下个待唤醒节点对应当前线程 return -1; // int available = getState(); /*记录state*/ int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) //remaining >= 0时/*CAS设置state -= acquires*/ return remaining; } } }