标签:假设 文章 hold @param 注意 ast max containe depends
@
前置知识,可以看AQS(一)独占锁(基于JDK 8)
ReentrantLock 是一个可重入的独占锁。
在独占模式中,isHeldExclusively 为 true 表示是对当前线程加锁,false 表示未加锁或者对其他线程加锁。
加锁状态由 state 标识,如果为0,则说明未加锁,如果大于0,则state标识已经重入的次数。
ReentrantLock 包含一个 Sync,Sync 是 AQS(AbstractQueuedSynchronizer) 的子类。Sync 又分为两种,公平锁 FairSync 和非公平锁 NonfairSync。
假设有很多线程在排队。如果一个线程会直接排到队伍末尾,就是公平锁;如果线程直接去抢锁,就是非公平锁。非公平锁可以减少线程切换,提高效率。
FairSync 和 NonfairSync 需要实现 AQS 的 tryAcquire,tryRelease,isHeldExclusively 。后两种方法没有区别,所以在 Sync 中实现,而 tryAcquire 在公平锁和非公平锁中不同,会被单独实现。
//ReentrantLock 实现了 Lock 接口
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
//Sync 是 AQS 的子类,是一个抽象类,具体由两个子类 FairSync/NonfairSync 实现
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
// 这个方法用于非公平锁,写在这里是方便 ReentrantLock.tryLock() 使用
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 是否已经释放,状态为0
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 判断独占的线程是否是当前线程
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don‘t need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
//获取一个 ConditionObject
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
//获取独占的线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
//如果独占的线程是当前线程,返回持有的state,否则为0
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
//是否已锁
final boolean isLocked() {
return getState() != 0;
}
}
}
对于公平锁和非公平锁,lock 的处理方式不同,公平锁会执行 acquire,也就是正常排队;非公平锁直接去抢锁,如果失败,才执行 acquire 排队。
static final class FairSync extends Sync {
// 调用 aquire
final void lock() {
acquire(1);
}
}
static final class NonfairSync extends Sync {
final void lock() {
// 如果之前为0,说明没有被其他线程持有锁,尝试抢锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
// 否则,调用 aquire。
else
acquire(1);
}
}
acquire 是 AQS 的方法。在前文讲过,流程为:首先尝试获取(tryAcquire),失败后将当前线程包装到一个 Node 中,插入同步队列尾部(addWaiter),并在队列中不断自旋尝试(acquireQueued),满足堵塞条件(shouldParkAfterFailedAcquire)则会堵塞(parkAndCheckInterrupt)以减轻自旋的 CPU 消耗,如果堵塞后被唤醒会继续自旋尝试,直到成功后返回,并根据中断情况设置中断(selfInterrupt);或者因为异常抛出没有成功,则取消该线程所在的节点(cancelAcquire)。
对于公平锁和非公平锁,表现在不同的 tryAcquire。acquire 在最开始会使用一次 tryAcquire;acquireQueued 每次尝试时,也会 tryAcquire
// AbstractQueuedSynchronizer.java
// 初始使用一次 tryAcquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 每次尝试也会使用 tryAcquire
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 这里用 tryAcquire
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
对于公平锁和非公平锁, tryAcquire 不同。公平锁执行 tryAcquire。如果 c 为0,说明当前没有线程持有锁,hasQueuedPredecessors() 看一下前面是否有其他线程,如果有,根据公平锁的原理,那就得排在后面,也就是失败,如果没有,那就可以尝试改变 state,成功的话就返回 true;否则会看一下当前线程是否是独占的线程,执行可重入的操作。
非公平锁执行 tryAcquire,通过 Sync 的 nonfairTryAcquire 实现。
非公平锁和公平锁的区别是没有 hasQueuedPredecessors(),也就是不排队,直接去抢锁。
// 公平锁部分
static final class FairSync extends Sync {
/**
* Fair version of tryAcquire. Don‘t grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果没有线程持有锁
if (c == 0) {
// 先看前面有没有排队的,没有的话,尝试获取锁
// 注意 hasQueuedPredecessors()
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}//当前线程是独占的线程,可重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 未获取锁则失败
return false;
}
}
// 非公平锁部分
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 和公平锁的区别在于,不检查队列里面有没有等待的线程,
// 而是直接抢锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
下面的方法是检查队列前面是否有其他线程,等价于检查队列中是否有线程 以及 当前线程是否不是队列中第一个线程。
如果当前线程之前有一个队列中线程,则为true;如果当前线程位于队列头部或队列为空,则为false。
public abstract class AbstractQueuedSynchronizer
{
// 根据注释,该方法等价于
//getFirstQueuedThread() != Thread.currentThread() &&hasQueuedThreads()
// 也就是说队列中第一个线程不是当前线程,且队列中一定有线程
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
// head == tail 说明队列为空
public final boolean hasQueuedThreads() {
return head != tail;
}
public final Thread getFirstQueuedThread() {
// handle only fast path, else relay
return (head == tail) ? null : fullGetFirstQueuedThread();
}
/**
* Version of getFirstQueuedThread called when fastpath fails
*/
private Thread fullGetFirstQueuedThread() {
/*
* The first node is normally head.next. Try to get its
* thread field, ensuring consistent reads: If thread
* field is nulled out or s.prev is no longer head, then
* some other thread(s) concurrently performed setHead in
* between some of our reads. We try this twice before
* resorting to traversal.
*/
// 第一个节点是 head.next。
// 如果在读的时候并发修改了头部,需要重新尝试;这里尝试两次
Node h, s;
Thread st;
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null) ||
((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null))
return st;
/*
* Head‘s next field might not have been set yet, or may have
* been unset after setHead. So we must check to see if tail
* is actually first node. If not, we continue on, safely
* traversing from tail back to head to find first,
* guaranteeing termination.
*/
// 由于尾分叉,从尾开始
// 是从尾到头遍历,找到最前面非空的线程
Node t = tail;
Thread firstThread = null;
while (t != null && t != head) {
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
}
return firstThread;
}
}
根据这篇文章https://blog.csdn.net/anlian523/article/details/106173860
解释下 hasQueuedPredecessors
首先看下 acquire,调用了 tryAcquire,addWaiter 会调用 enq 入队,acquireQueued 也用了 tryAcquire。顺序是先 tryAcquire,再 addWaiter,然后 acquireQueued。
如果 head != tail,可能是两个都是非 null,也可能是 head 为 null,tail 非 null,在 enq 处做了注释。
如果 head 为 null,tail 非 null,说明有个线程正在执行 enq,肯定有其他线程在入队,所以其他线程会排在当前线程前面,返回 true,即(s = h.next) == null
。
如果都非 null,说明至少有两个节点存在,此时 s!=null
,检查 head 的下一个节点的线程,也就是正在排队的第一个线程,和当前线程比较,即s.thread != Thread.currentThread()
;
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
// 如果CAS成功,在这里停下,head != null
// tail = null
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
unlock 对于公平锁和非公平锁都一样,调用 release。release 首先 tryRelease,如果彻底释放 tryRelease 返回 true。
这是因为如果重入了多次,释放的不够,该线程仍然是持有锁的,不能唤醒后面的节点。
// ReentrantLock 的方法
public void unlock() {
sync.release(1);
}
// AQS 的方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// ReentrantLock 中 Sync 的方法,尝试进行释放,彻底释放才是 true
// 由于是 protected,只是被 unlock 使用,看起来不会出现 c<0 的情况
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 先检查持有锁的是否是当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// c==0 表示彻底释放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
ReentrantLock 的方法都是调用 Sync 和 AQS相应方法实现。
两种锁,公平锁和非公平锁,默认非公平锁。
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
为了方便在 ReentrantLock 中使用把非公平获取放在 Sync 中,
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
常规使用
lock.lock();
try{
//通常会使用 Condition 执行 await 或者 signal
}finally{
lock.unlock();
}
生产者消费者
最后是一个例子,使用 ReentrantLock 和 Condition 实现生产者消费者模型。下面有一个 Lock,对应两个 Condition,最多为100个。如果在生产时,队列已满,则放入 notFull 队列等待;如果在消费时,队列已空,则放入 notEmpty 队列等待。
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
// 加锁
lock.lock();
try {
// 队列已满,进入 notFull
// 如果被中断,while 会使得再次进入 notFull
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
// 唤醒 notEmpty 中一个
notEmpty.signal();
} finally {
// 解锁
lock.unlock();
}
}
public Object take() throws InterruptedException {
// 加锁
lock.lock();
try {
// 队列已空,进入 notEmpty
// 如果被中断,while 会使得再次进入 notEmpty
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
// 唤醒 notFull 中一个
notFull.signal();
return x;
} finally {
// 解锁
lock.unlock();
}
}
}
标签:假设 文章 hold @param 注意 ast max containe depends
原文地址:https://www.cnblogs.com/BigDataStudy/p/14608317.html