标签:ali 同步队列 需要 tle 有一个 状态 细节 operation lock
1、简介
AbstractQueuedSynchronizer队列同步器,用来实现锁或者其他同步组件的基础框架
AbstractQueuedSynchronizer使用int类型的volatile变量维护同步状态
一般使用AQS的主要方式是继承,子类通过实现它提供的抽象方法来管理同步状态,主要管理的方式是通过tryAcquire和tryRelease类似的方法来操作状态,同时,AQS提供以下线程安全的方法来对状态进行操作
protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS本身是没有实现任何同步接口的,它仅仅只是定义了同步状态的获取和释放的方法来供自定义的同步组件的使用
注:AQS主要是怎么使用的呢?
在java的同步组件中,AQS的子类一般是同步组件的静态内部类。
AQS是实现同步组件的关键,它俩的关系可以这样描述:同步组件是面向使用者的,它定义了使用者与组件交互的接口,隐藏了具体的实现细节;而AQS面向的是同步组件的实现者,它简化了具体的实现方式,屏蔽了线程切换相关底层操作,它们俩一起很好的对使用者和实现者所关注的领域做了一个隔离。
AQS的实现分析
从实现的角度具体分析AQS是如何实现线程同步的
同步队列分析
AQS的实现依赖内部的同步队列(FIFO双向队列)来完成同步状态的管理,假如当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构造成一个Node,并将其加入同步队列,同时阻塞当前线程。当同步状态释放时,唤醒队列的首节点。
Node
static final class Node { volatile int waitStatus; volatile Node prev;//前驱节点 volatile Node next; //后继节点 volatile Thread thread; //进入队列的当前线程 Node nextWaiter;//存储condition队列中的后继节点 }
waitStatus的几种状态:
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1; //当前线程被取消 /** waitStatus value to indicate successor‘s thread needs unparking */ static final int SIGNAL = -1;//当前节点的后继节点需要运行 /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; //当前节点在等待condition /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; //当前场景下后续的acquireShared可以执行
/** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; //当前场景下后续的acquireShared可以执行
Node是sync队列和condition队列构建的基础,AQS拥有三个成员变量:
private transient volatile Node head; private transient volatile Node tail; private volatile int state;
对于锁的获取,请求形成节点将其挂在队列尾部,至于资源的转移,是从头到尾进行,队列的基本结构就出来了:
同步队列插入/删除节点:
1、节点插入
AQS提供基于CAS的设置尾节点的方法:
/** * CAS tail field. Used only by enq */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
需要传递当前线程认为的尾节点和当前节点,设置成功后,当前节点与尾节点建立关联
2、节点删除
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态之后将会唤醒后继节点,后继节点将会在获取同步状态成功的时候将自己设置为首节点。
注:设置首节点是由获取同步状态成功的线程来完成,因为每次只会有一个线程能够成功的获取到同步状态,所以,设置首节点并不需要CAS来保证。
//独占式获取同步状态,该方法的实现需要先查询当前的同步状态是否可以获取,如果可以获取再进行获取; protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//释放状态;
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
//共享式获取同步状态; protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
//共享式释放状态; protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } //独占模式下,判断同步状态是否已经被占用; protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
AQS提供两种方式来操作同步状态,独占式与共享式,下面就针对性做一下源码分析
独占式同步状态获取 - acquire实现
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
具体执行流程如下:
tryAcquire
方法尝试获取同步状态;addWaiter的实现
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
- 分配引用T指向尾节点;
- 将待插入节点的prev指针指向尾节点;
- 如果尾节点还为T,将当前尾节点设置为带待插入节点;
- T的next指针指向待插入节点。
enq(Node node)
方法private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize Node h = new Node(); // Dummy header h.next = node; node.prev = h; if (compareAndSetHead(h)) { tail = node; return h; } } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
enq的逻辑可以确保Node可以有顺序的添加到同步队列中,具体的加入队列的逻辑如下:
可以看出,整个enq方法通过“死循环”来保证节点的正确插入。
进入同步队列之后接下来就是同步状态的获取了,或者说是访问控制acquireQueued
。对于同步队列中的线程,在同一时刻只能由队列首节点获取同步状态,其他的线程进入等待,直到符合条件才能继续进行。
final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } }
在整个方法中,当前线程一直都在“死循环”中尝试获取同步状态:
从代码的逻辑也可以看出,其实在节点与节点之间在循环检查的过程中是不会相互通信的,仅仅只是判断自己当前的前驱是不是头结点,这样设计使得节点的释放符合FIFO,同时也避免了过早通知
注:过早通知是指前驱节点不是头结点的线程由于中断被唤醒。
acquire实现总结
release(int arg)
方法可以释放同步状态public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease
保证将状态重置回去,同样采用CAS来保证操作的原子性;unparkSuccessor
唤醒当前节点的后继节点线程。unparkSuccessor实现
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
取出当前节点的next节点,将该节点线程唤醒,被唤醒的线程获取同步状态。这里主要通过LockSupport
的unpark
方法唤醒线程。
共享式同步状态获取
共享式获取与独占式获取最主要的区别就是在同一时刻能否有多个线程可以同时获取到同步状态。这两种不同的方式在获取资源区别如下图所示:
AQS提供acquireShared
方法来支持共享式获取同步状态
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared(int arg)
方法尝试获取同步状态:tryAcquireShared
方法返回值 > 0时,表示能够获取到同步状态;doAcquireShared(int arg)
方法进入同步队列doAcquireShared实现
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
与独占式获取同步状态一样,共享式获取也是需要释放同步状态的,AQS提供releaseShared(int arg)
方法可以释放同步状态
共享式同步状态释放 - releaseShared实现
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared
方法释放状态;doReleaseShared
方法唤醒后继节点;独占式超时获取 - doAcquireNanos
该方法提供了超时获取同步状态调用,假如在指定的时间段内可以获取到同步状态返回true,否则返回false。它是acquireInterruptibly(int arg)
的增强版
1、acquireInterruptibly实现
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
InterruptedException
异常并将中断标志位置为false;doAcquireInterruptibly(int arg)
排队等待doAcquireInterruptibly实现
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); throw new InterruptedException(); }
InterruptedException
,退出循环。doAcquireNanos实现
该方法在支持中断响应的基础上,增加了超时获取的特性。针对超时获取,主要在于计算出需要睡眠的时间间隔nanosTimeout,如果nanosTimeout > 0表示当前线程还需要睡眠,反之返回false。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return true; } if (nanosTimeout <= 0) { cancelAcquire(node); return false; } if (nanosTimeout > spinForTimeoutThreshold && shouldParkAfterFailedAcquire(p, node)) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; if (Thread.interrupted()) break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); throw new InterruptedException(); }
System.nanoTime()
:当前时间):nanosTimeout <= 0
,返回超时未获取到同步状态;nanosTimeout > 0 && nanosTimeout <= 1000L
,线程快速自旋nanosTimeout > 1000L
,线程通过LockSupport.parkNanos
进入超时等待。
AbstractQueuedSynchronizer源码解析
标签:ali 同步队列 需要 tle 有一个 状态 细节 operation lock
原文地址:https://www.cnblogs.com/cherish010/p/8797022.html