标签:等待队列 adl until 组件 getname method notifyall api 提高
AQS,队列同步器AbstractQueuedSynchronizer的简写,JDK1.5引入的,是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。AQS的作者Doug Lea大师期望它能够成为实现大部分同步需求的基础。
AbstractQueuedSynchronizer是一个抽象类,先看一下其类图。
AQS中里有一个volatile修饰int型的state来代表同步状态,使用同步器提供的3个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来改变状态,因为它们能够保证状态的改变是安全的。
AQS使用的是模板方法模式,主要使用方式是继承,且通常将子类推荐定义为静态内部类,子类通过继承AQS并实现它的抽象方法来管理同步状态。AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。AQS是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器。可以这样理解二者之间的关系:
实现自定义同步组件时,将会调用AQS提供的模板方法,AQS的模板方法如下:
AQS提供的模板方法基本上分为3类:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。AQS中可重写的方法如下:
AQS中有一个内部类Node,用于构造一个队列来保存排队等待获取锁的线程。看一下Node的源码及其简单说明:
static final class Node { /**标记线程是因为获取共享资源失败被阻塞添加到队列中的*/ static final Node SHARED = new Node(); /**表示线程因为获取独占资源失败被阻塞添加到队列中的*/ static final Node EXCLUSIVE = null; /**表示线程因为中断或者等待超时,需要从等待队列中取消等待*/ static final int CANCELLED = 1; /**表示当前线程占有锁,队列中没有存放线程引用头结点的后继结点A处于等待状态, * 如果已占有锁的线程释放锁或被CANCEL之后就会通知结点A去获取锁。*/ static final int SIGNAL = -1; /**当持有锁的线程调用了Condition(下面会讲到具体运用)的signal()方法之后,处于同一condition下的等待线程会去竞争锁*/ static final int CONDITION = -2; /**表示把waitStatus的值,指示下一个acquireShared应该无条件传播*/ static final int PROPAGATE = -3; /**表示当前线程的等待状态*/ volatile int waitStatus; volatile Node prev; volatile Node next; /**表示进入AQS队列中的线程引用*/ volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
AQS基础内容先了解这么多,后面会用AQS实现一个自己的可重入独占式锁。
与使用关键字synchronized相比,显式锁Lock提供了更广泛的加锁操作。 Lock获取锁的方法更加灵活,并且支持多个关联的Condition对象,先看一下Lock的常用API:
与关键字synchronized相比,Lock有以下几个优势:
(1)可以尝试非阻塞地获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁。
(2)获取锁过程中可以被中断。
(3)超时获取锁,可以指定一个时间,在指定的时间范围内获取锁,如果截止时间到了仍然无法获取锁,则返回,可以避免线程长时间阻塞。
Lock也有缺点,比如说必须手动的释放锁,所以在使用Lock时有一个范式,以ReentrantLock为例:
class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } } }
还有一个要注意的地方是,不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常,异常抛出的同时,也会导致锁无故释放。
ReentrantLock是可重入的互斥锁,与使用synchronized修饰的方法和代码块具有相同的基本行为和语义,但具有扩展的功能。最明显的一个扩展功能是ReentrantLock可以定义为公平锁或非公平锁,synchronized内部实现使用的是非公平锁机制。从时间上来说,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之是不公平的。 ReentrantLock提供了一个构造函数,能够控制锁是否是公平的。事实上,公平的锁机制往往没有非公平的效率高,原因如下:在激烈竞争的情况下,恢复一个被挂起的线程与该线程真正开始运行之间存在着严重的延迟。假设线程A持有一个锁,并且线程B请求这个锁。由于这个锁已被线程A持有,因此B将被挂起。当A释放锁时,B将被唤醒,因此会再次尝试获取锁。与此同时,如果C也请求这个锁,那么C很可能会在B被完全唤醒之前获得、使用以及释放这个锁。这样的情况是一种“双赢”的局面,B获得锁的时刻并没有推迟,C更早地获得了锁,并且吞吐量也获得了提高,用一张图来说明。
我们可以去看下公平锁和非公平锁加锁的源码,区别其实非常小,先看非公平锁:
1 final boolean nonfairTryAcquire(int acquires) { 2 final Thread current = Thread.currentThread(); 3 int c = getState(); 4 if (c == 0) { 5 if (compareAndSetState(0, acquires)) { 6 setExclusiveOwnerThread(current); 7 return true; 8 } 9 } 10 else if (current == getExclusiveOwnerThread()) { 11 int nextc = c + acquires; 12 if (nextc < 0) // overflow 13 throw new Error("Maximum lock count exceeded"); 14 setState(nextc); 15 return true; 16 } 17 return false; 18 }
再看公平锁:
1 protected final boolean tryAcquire(int acquires) { 2 final Thread current = Thread.currentThread(); 3 int c = getState(); 4 if (c == 0) { 5 if (!hasQueuedPredecessors() && 6 compareAndSetState(0, acquires)) { 7 setExclusiveOwnerThread(current); 8 return true; 9 } 10 } 11 else if (current == getExclusiveOwnerThread()) { 12 int nextc = c + acquires; 13 if (nextc < 0) 14 throw new Error("Maximum lock count exceeded"); 15 setState(nextc); 16 return true; 17 } 18 return false; 19 } 20 }
通过对比源码可以发现,公平锁在第5行的判断条件里多了一个!hasQueuedPredecessors(),这个的意思是查询是否有线程在排队等待获取锁,如果有线程在排队,则不去抢锁。而非公平锁才不管你有没有线程在排队等待,直接去抢一次再说,不管抢不抢的到。
隔壁老王在某宝买了一个FBB版的娃娃,假设娃娃从广东发出,目的是上海,距离大约1500公里。娃娃发出后,在离目的地小于100公里的时候给老王发短信说,你的娃娃快到了。在上海的快递员接到娃娃后,会给老王打电话让他来取娃娃。这是一个典型的等待/通知机制,在之前的篇幅中我们使用Object类中的wait()和notifyAll()等待通知机制实现了一个自己的数据库连接池,现在使用ReentrantLock来模拟刚刚老王买娃娃的场景。
业务实现代码:
1 import java.util.concurrent.locks.Condition; 2 import java.util.concurrent.locks.Lock; 3 import java.util.concurrent.locks.ReentrantLock; 4 5 public class BuyFBBWawa { 6 public final static String DESTINATION = "Shanghai"; 7 /**娃娃剩余运输里程数*/ 8 private int km; 9 /**娃娃当前位置*/ 10 private String site; 11 private Lock lock = new ReentrantLock(); 12 /**距离Condition*/ 13 private Condition kmCondition = lock.newCondition(); 14 /**位置Condition*/ 15 private Condition siteCondition = lock.newCondition(); 16 17 public BuyFBBWawa() { 18 } 19 20 public BuyFBBWawa(int km, String site) { 21 this.km = km; 22 this.site = site; 23 } 24 25 /** 26 * 距离目的地小于100公里,通知处于wait状态并需要给老王发送短信的线程工作 27 */ 28 public void changeKm() { 29 lock.lock(); 30 try { 31 this.km = 99; 32 kmCondition.signal();//通知其他在kmCondition上等待的线程 33 } finally { 34 lock.unlock(); 35 } 36 } 37 38 /** 39 * 到达菜鸟驿站,通知处于wait状态并需要给老王打电话的线程工作 40 */ 41 public void changeSite() { 42 lock.lock(); 43 try { 44 this.site = "Shanghai"; 45 siteCondition.signal();//通知其他在siteCondition上等待的线程 46 } finally { 47 lock.unlock(); 48 } 49 } 50 51 /** 52 * 当娃娃的剩余里程数小于100时给老王发短信 53 */ 54 public void waitKm() { 55 lock.lock(); 56 try { 57 while (this.km >= 100) { 58 try { 59 kmCondition.await();//当前线程在kmCondition上进行等待 60 System.out.println("check km thread[" + Thread.currentThread().getName() 61 + "] is be notify"); 62 } catch (InterruptedException e) { 63 e.printStackTrace(); 64 } 65 } 66 } finally { 67 lock.unlock(); 68 } 69 70 System.out.println("娃娃离老王已经不足100公里,我给他发个短信"); 71 } 72 73 /**当娃娃到达目的地时给老王打电话*/ 74 public void waitSite() { 75 lock.lock(); 76 try { 77 while (!this.site.equals(DESTINATION)) { 78 try { 79 siteCondition.await();//当前线程在siteCondition上进行等待 80 System.out.println("check Site thread[" + Thread.currentThread().getName() 81 + "] is be notify"); 82 } catch (InterruptedException e) { 83 e.printStackTrace(); 84 } 85 } 86 } finally { 87 lock.unlock(); 88 } 89 System.out.println("娃娃已经到达目的地,我给他打个电话让他来取"); 90 } 91 }
测试代码:
public class TestBuyWawa { private static BuyFBBWawa fbbWawa = new BuyFBBWawa(1500, "Guangdong"); /**检查里程数变化的线程,不满足条件,线程一直等待*/ private static class CheckKm extends Thread { @Override public void run() { fbbWawa.waitKm(); } } /**检查地点变化的线程,不满足条件,线程一直等待*/ private static class CheckSite extends Thread { @Override public void run() { fbbWawa.waitSite(); } } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { new CheckSite().start(); } for (int i = 0; i < 3; i++) { new CheckKm().start(); } Thread.sleep(1000); fbbWawa.changeKm();//娃娃距离目的地小于100公里 Thread.sleep(2000); fbbWawa.changeSite();//娃娃到达目的地 } }
这段代码使用ReentrantLock和Condition模拟了老王买的娃娃的运输过程,从程序输出可以看到,通过不同的Condition实现了点对点的通知,这是与使用synchronized+wait()/notifyAll()最大的区别,如果对wait()/notifyAll()使用方法不熟悉的同学,欢迎阅读之前的《java线程间的协作》。使用synchronized+wait()/notifyAll()的时候,不能指定唤醒某类线程,只能唤醒等待在对象上的所有线程,故尽量使用notifyAll()而不是notify(),在使用Lock+Condition的时候,由于可以指定唤醒某类线程,所以尽量使用signal()而不是signalAll()。
之前提到的synchroniezd和ReentrantLock都是排它锁,这些锁在同一时刻只允许一个线程访问,而读写锁ReentrantReadWriteLock在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。除了保证写操作对读操作的可见性以及并发性的提升之外,读写锁能够简化读写交互场景的编程方式。假设在程序中定义一个共享的用作缓存数据结构,它大部分时间提供读服务(例如查询和搜索),而写操作占有的时间很少,但是写操作完成之后的更新需要对后续的读服务可见。如果不使用读写锁,完成上述工作就要使用Java的等待通知机制,就是当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知之后,所有等待的读操作才能继续执行(写操作之间依靠synchronized关键进行同步),这样做的目的是使读操作能读取到正确的数据,不会出现脏读。改用读写锁实现上述功能,只需要在读操作时获取读锁,写操作时获取写锁即可。当写锁被获取到时,后续(非当前写操作线程)的读写操作都会被阻塞,写锁释放之后,所有操作继续执行,编程方式相对于使用等待通知机制的实现方式而言,变得简单明了。一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。
我们来模拟一个读多写少的场景,分别使用synchroniezd和ReentrantReadWriteLock,看看效率的差异。假设某种商品,读写比列为1:10,我们写一段代码来模拟。
商品类:
public class GoodsInfo { /**总销售额*/ private double totalMoney; /**库存数*/ private int storeNumber; public GoodsInfo( int totalMoney, int storeNumber) { this.totalMoney = totalMoney; this.storeNumber = storeNumber; } public double getTotalMoney() { return totalMoney; } public int getStoreNumber() { return storeNumber; } public void changeNumber(int sellNumber) { this.totalMoney += sellNumber * 9.9; this.storeNumber -= sellNumber; } }
商品接口:
public interface GoodsService { GoodsInfo getNumber(); void setNumber(int number); }
使用读写锁来实现商品接口:
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class UseRwLock implements GoodsService { private GoodsInfo goodsInfo; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock getLock = lock.readLock();//读锁 private final Lock setLock = lock.writeLock();//写锁 public UseRwLock(GoodsInfo goodsInfo) { this.goodsInfo = goodsInfo; } @Override public GoodsInfo getNumber() { getLock.lock(); try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } finally { getLock.unlock(); } return this.goodsInfo; } @Override public void setNumber(int number) { setLock.lock(); try { Thread.sleep(5); goodsInfo.changeNumber(number); } catch (InterruptedException e) { e.printStackTrace(); } finally { setLock.unlock(); } } }
使用synchronized实现商品接口:
public class UseSynchronized implements GoodsService { private GoodsInfo goodsInfo; public UseSynchronized(GoodsInfo goodsInfo) { this.goodsInfo = goodsInfo; } @Override public synchronized GoodsInfo getNumber() { try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } return this.goodsInfo; } @Override public synchronized void setNumber(int number) { try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } goodsInfo.changeNumber(number); } }
测试类,先使用synchronized实现:
1 import java.util.Random; 2 3 public class GoodsInfoTest { 4 static final int readWriteRatio = 10;//读写线程的比例 5 static final int writeThreadCount = 1;//写线程数量 6 7 /** 8 * 读线程 9 */ 10 private static class GetTask implements Runnable { 11 private GoodsService goodsService; 12 13 public GetTask(GoodsService goodsService) { 14 this.goodsService = goodsService; 15 } 16 17 @Override 18 public void run() { 19 long start = System.currentTimeMillis(); 20 for (int i = 0; i < 100; i++) {//每个读线程操作100次 21 goodsService.getNumber(); 22 } 23 System.out.println(Thread.currentThread().getName() + "读取商品数据耗时:" 24 + (System.currentTimeMillis() - start) + "ms"); 25 } 26 } 27 28 /** 29 * 写线程 30 */ 31 private static class SetTask implements Runnable { 32 private GoodsService goodsService; 33 34 public SetTask(GoodsService goodsService) { 35 this.goodsService = goodsService; 36 } 37 38 @Override 39 public void run() { 40 long start = System.currentTimeMillis(); 41 Random r = new Random(); 42 for (int i = 0; i < 10; i++) {//每个写线程操作10次 43 goodsService.setNumber(r.nextInt(10)); 44 } 45 System.out.println(Thread.currentThread().getName() 46 + "写商品数据耗时:" + (System.currentTimeMillis() - start) + "ms"); 47 } 48 } 49 50 public static void main(String[] args) throws InterruptedException { 51 GoodsInfo goodsInfo = new GoodsInfo(100000, 10000); 52 GoodsService goodsService = new UseSynchronized(goodsInfo); 53 54 for (int i = 0; i < writeThreadCount; i++) { //启动1个写线程 55 new Thread(new SetTask(goodsService)).start(); 56 for (int j = 0; j < readWriteRatio; j++) { //启动10个读线程 57 new Thread(new GetTask(goodsService)).start(); 58 } 59 Thread.sleep(10); 60 } 61 } 62 }
程序输出:
再把刚刚测试类修改一下,只需要把第52行修改成读写锁实现,即GoodsService goodsService = new UseRwLock(goodsInfo);程序输出:
对比可以看出,对于读多写少的场景,使用读写锁比使用独占锁效率高很多。
锁的重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获得该锁,该特性的实现需要解决以下两个问题:
(1)线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
(2)锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。
从上面ReentrantLock的公平锁和非公平锁加锁的源码也可以看出,getState()返回的是一个累计获取锁的次数。我们基于以上2点,利用AQS手写一个简易版本的可重入独占锁。
实现类:
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class MyReentrantLock implements Lock { /*** * 内部类继承AQS */ static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0, 1)) {//锁被第一次获取 setExclusiveOwnerThread(Thread.currentThread());//设置当前线程为锁独占线程 return true; } else if (Thread.currentThread() == getExclusiveOwnerThread()) {//锁被多次获取 setState(getState() + 1);//对获取锁的次数累加 return true; } return false; } @Override protected boolean tryRelease(int arg) { if (Thread.currentThread() != getExclusiveOwnerThread()) { throw new IllegalMonitorStateException(); } if (getState() == 0) { throw new IllegalMonitorStateException(); } setState(getState() - 1); if (getState() == 0) { setExclusiveOwnerThread(null); } return true; } @Override protected boolean isHeldExclusively() { return getState() > 0; } /** * 返回一个Condition,每个condition都包含了一个condition队列 * 这是能够唤醒指定线程的关键 */ Condition newCondition() { return new ConditionObject(); } } /**仅需要将操作代理到Sync上,调用AQS模板方法*/ private final Sync sync = new Sync(); /*** * 调用AQS的模板方法acquire(int arg) */ public void lock() { System.out.println(Thread.currentThread().getName() + " 准备获取锁"); sync.acquire(1); System.out.println(Thread.currentThread().getName() + " 已经获取到锁"); } public boolean tryLock() { return sync.tryAcquire(1); } /*** * 调用AQS的模板方法release(int arg) */ public void unlock() { System.out.println(Thread.currentThread().getName() + " 准备释放锁"); sync.release(1); System.out.println(Thread.currentThread().getName() + " 已经释放锁"); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
测试类:
import java.util.concurrent.locks.Lock; public class Test { static final Lock lock = new MyReentrantLock(); /** * 递归获取锁 * * @param deep 递归深度 */ public static void reenter(int deep) { lock.lock(); try { System.out.println(Thread.currentThread().getName() + ":递归深度:" + deep); int currentDeep = deep - 1; if (currentDeep == 0) { return; } else { reenter(currentDeep); } } finally { lock.unlock(); } } static class WorkerThread extends Thread { public void run() { reenter(3); } } public static void main(String[] args) { // 启动2个子线程去争抢锁 for (int i = 0; i < 2; i++) { Thread thread = new WorkerThread(); thread.start(); } } }
从程序输出可以看到,利用AQS,我们自定义的MyReentrantLock实现了可重入独占锁的功能。
本次就分享这么多内容,希望大家看了有收获。下一篇内容中会介绍Java线程池相关知识点,阅读过程中如发现描述有误,请指出,谢谢。
标签:等待队列 adl until 组件 getname method notifyall api 提高
原文地址:https://www.cnblogs.com/hongshaodian/p/12452136.html