标签:exception 实现 style 双链表 wait ESS load perm html
Semaphore构造方法
public Semaphore(int permits) {------permits 表示能同时有多少个线程访问我们的资源
sync = new NonfairSync(permits); -------------默认创建的是非公平锁。
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);-------传入的permits做i为了state的值,作为资源总数
}
semaphore.acquire();获取资源,源码实现
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);---------每次申请一次资源
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();-----------------线程无效直接抛异常
if (tryAcquireShared(arg) < 0) --------------------拿不到资源,需要进行入队操作
doAcquireSharedInterruptibly(arg); ---------入队操作
}
final int nonfairTryAcquireShared(int acquires) { --------获取资源的操作
for (;;) {
int available = getState(); --------------拿到现有的资源
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) -----------原子操作,多线程情况下会可能失败,所以无线循环自旋下去,直到成功;
return remaining;---------------------如果大于等于0那么就是拿到了资源,如果小于0,那么线程就要进入等待队列
}
}
为什么要用死循环----compareAndSetState这个是cas原子操作,失败之后要循环重复继续操作,直到成功。死循环也就结束了。
private void doAcquireSharedInterruptibly(int arg)-------------线程入队操作
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);---------------注意这里是以共享的方式入队
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) { --------新入队的节点会判断他上一个节点是不是头节点,如果是头节点会再次尝试获取资源,
int r = tryAcquireShared(arg);
if (r >= 0) { -------------如果获取到资源,那么这个阻塞队列就要清空了,里面没有在等待的线程了。
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && ----------如果获取不到资源,那么就要线程阻塞了
parkAndCheckInterrupt()) -----------parkAndCheckInterrupt这个方法会将线程阻塞(挂起),线程都阻塞了,这个死循环就不会执行了,这也就是为什么juc源码写了很多
死循环都没问题地原因,我们可以借鉴。当线程被唤醒之后又开始这个死循环,尝试拿资源(非公平锁有可能拿不到),
拿不到再次被阻塞挂起。
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { ---------------判断线程能否被正常阻塞
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) -----------------如果上一个节点是有效的在等待的线程,那么该线程就可以插入到队列后面
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { -----------如果上一个节点是无效的,那就查找上上个节点是不是有效的,直到找到那个有效的节点,然后将该节点插入到那个有效节点后面,中间的无效节点从链表中删除,后面的节点要找前面
的节点这也就说明了为什么我们地等待队列要设计成双链表,不光有next。next这种找后驱节点地操作还有pre .pre这样前驱节点。所以需要双链表。
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don‘t park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases; -----------获取当前的资源然后给资源加回去
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) -----------------CAS算法还资源,死循环,直到成功还回去,死循环结束。
return true;
}
}
资源还回去之后执行doReleaseShared方法唤醒其他线程抢资源
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) { --------发现阻塞队列有阻塞线程
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); ---------跳过头节点,唤醒下一个节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t; ------循环找到waitStatus<0能唤醒的节点调用unpark方法唤醒线程。
}
if (s != null)
LockSupport.unpark(s.thread);
}
标签:exception 实现 style 双链表 wait ESS load perm html
原文地址:https://www.cnblogs.com/jihuifeng/p/13336869.html