标签:
一. 从 Demo 解析源码
package com.wenniuwuren.concurrent; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * Created by zhuyb on 16/5/1. */ public class SemaphoreTest { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); Semaphore semaphore = new Semaphore(5); for (int i = 0; i < 10; i++) { int count = i; Runnable runnable = new Runnable() { @Override public void run() { try { semaphore.acquire(); // 获取许可 System.out.println("当前循环:" + count); System.out.println("当前还剩多少许可数量:" + semaphore.availablePermits()); Thread.sleep(10000); semaphore.release(); // 释放占用的许可 } catch (Exception e) { e.printStackTrace(); } } }; executorService.execute(runnable); } executorService.shutdown(); } }
public Semaphore(int permits) { sync = new NonfairSync(permits); }
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
AQS 的 Sync 构造函数:设置 AQS 的同步状态 stat
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } // 省略无用代码 }
protected final void setState(int newState) { state = newState; }
(2)semaphore.acquire(); // 获取许可
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
接下来看 tryReleaseShared 返回 true 后进入的 doReleaseShared():与互斥锁不同的是,共享锁在释放锁的时候会将共享锁释放信息向 AQS 的队列中传播这个共享锁已经释放的 SIGNAL,这样等待这个共享锁的线程就能较快地脱离 AQS 的等待队列。从 ws == Node.SIGNAL 分支执行的就是共享锁的向后传递 SIGNAL 方法 unparkSuccessor(h)。最外面的 for(;;)就是为了保证释放锁的正常进行,异常就是循环重试
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
标签:
原文地址:http://blog.csdn.net/wenniuwuren/article/details/51302745