标签:add before ati cat sar head 先来 应该 业务
程序运行过程中,两个或多个线程(thread)并发执行并共享某个资源时,可能对共享资源不同步地修改,造成数据错误(所谓错误,就是修改后的数据不符合预期),为了避免数据错误,普遍采用了线程同步技术,所谓同步,就是避免多个线程毫无规则地征用资源,而是使这些线程看起来像是步调一致、有序地使用共享资源。比如,有线程A,B和C, 有一个可共享的资源R,为了避免多线程并发造成对R资源的冲突,必须同步线程,即当A正在对R进行操作的时候,B和C就在这一块操作代码之外等待A完成操作。
Java中,同步的范围主要有两种,一种是同步方法,对于同一个对象,用synchronized修饰的所有方法,同一时刻至多只能有一个线程进入其中之一。第二种是同步代码块,可用synchronize包围起来,也可以用锁Lock对象,而后者,最常用的就是ReentrantLock,即重入锁。
synchronized关键字的是操作系统底层的互斥锁(mutex lock)来实现的,跟操作系统有关。而ReentrantLock的实现原理是CAS+自旋算法,CAS(Compare and save)就是实现更改数据原子性的算法,底层是JNI调用C代码实现的, 自旋算法就是在一个不设循环次数的循环中不断地循环,直到满足一定条件为止。持有ReentrantLock的线程执行被锁住的代码块,新来的线程尝试去获取ReentrantLock,如果获取到了,就进入代码块执行,如果没有获取到,就会阻塞在代码块之外。
使用Lock的优点在于可以更加精细地控制线程执行锁操作和释放锁,比如可以使用tryLock()的重载方法指定尝试执行锁操作的时间,如果在指定时间内没有锁成功,就可以根据业务需要做其它处理。Lock还可以指定线程是否“公平地”争取锁,是先来后到还是后来的也能挣先。
本篇随笔,只学习和分析ReentrantLock的源码和实现原理。
ReentrantLock是最常用的锁,每个线程就像一个人,一个人(线程)执行任务(程序)走到了一座房子(需要同步执行的代码块)的门前(代码块入口),推门并从内上锁这一过程,就是ReentrantLock.lock()操作,当然,如果推门推不动,说明屋子里有别人呢,得等他完成任务后把锁打开(unlock)。
学习任何源码,最好是写一个简单的demo,作为一个入口而又能对整体执行有个了解才好入手。
现有两个账户,一个from,一个to,初始额度都是100元, from向to中转账,应该两个账户总金额恒定为200元才对。试用多线程,模拟多个人同时使用该账户转账。Account是账户对象,ReentrantLockDemo负责模拟多线程转账操作。首先,不用锁,开启5个线程去跑转账的方法,打印出每个线程转账完成后的结果。
1 public class ReentrantLockDemo { 2 // 转出账户 3 private Account from; 4 // 转入账户 5 private Account to; 6 // 持有重入锁 7 private Lock lock = new ReentrantLock(); 8 9 public ReentrantLockDemo(Account from, Account to) { 10 this.from = from; 11 this.to = to; 12 } 13 14 /** 15 * 转账操作,从from账户转money额度到to账户 16 */ 17 public void transform(int money) { 18 // lock.lock(); 19 try { 20 from.minus(money); 21 Thread.yield(); // 为了更大几率让其它线程进入 22 to.add(money); 23 System.out.println(Thread.currentThread() + " from.money = " + from.money + "; to.money = " + to.money); 24 } finally { 25 // lock.unlock(); 26 } 27 } 28 29 public static void multiThreadTransform(ReentrantLockDemo demo) { 30 ExecutorService exec = Executors.newFixedThreadPool(5); 31 for (int i = 0; i < 5; i++) { 32 exec.execute(() -> { 33 demo.transform(10); 34 }); 35 } 36 exec.shutdown(); 37 } 38 39 public static void main(String[] args) { 40 Account from = new Account(), 41 to = new Account(); 42 ReentrantLockDemo demo = new ReentrantLockDemo(from, to); 43 multiThreadTransform(demo); 44 45 } 46 }
1 class Account { 2 // 账号初始额度为100元 3 int money = 100; 4 5 void add(int m) { 6 money += m; 7 } 8 void minus(int m) { 9 money -= m; 10 } 11 }
结果:
Thread[pool-1-thread-2,5,main] from.money = 80; to.money = 130
Thread[pool-1-thread-4,5,main] from.money = 60; to.money = 140
Thread[pool-1-thread-1,5,main] from.money = 80; to.money = 130
Thread[pool-1-thread-3,5,main] from.money = 70; to.money = 130
Thread[pool-1-thread-5,5,main] from.money = 50; to.money = 150
可以从上面结果看到,线程2转账完成后,from账户为80元,to账户为130元,相加为210元,在这个线程之中的人,读出了错误的数据。而且90,110的组合也不一定出现。
加上锁之后,结果就符合预期了。
1 /** 2 * 转账操作,从from账户转money额度到to账户 3 */ 4 public void transform(int money) { 5 lock.lock(); // 锁操作,实现线程同步 6 try { 7 from.minus(money); 8 Thread.yield(); // 为了更大几率让其它线程进入 9 to.add(money); 10 System.out.println(Thread.currentThread() + " from.money = " + from.money + "; to.money = " + to.money); 11 } finally { 12 lock.unlock(); // 释放锁,以便于接下来的线程进入代码块 13 } 14 }
Thread[pool-1-thread-1,5,main] from.money = 90; to.money = 110 Thread[pool-1-thread-5,5,main] from.money = 80; to.money = 120 Thread[pool-1-thread-2,5,main] from.money = 70; to.money = 130 Thread[pool-1-thread-3,5,main] from.money = 60; to.money = 140 Thread[pool-1-thread-4,5,main] from.money = 50; to.money = 150
进入ReentrantLock.lock()方法后,看到源码是这样的:
1 public void lock() { 2 sync.lock(); 3 }
方法中,调用的是sync.lock(),Sync是ReentrantLock的内嵌类,它继承了AbstractQueuedSynchronizer,是抽象队列同步器,所以ReentrantLock底层的线程同步实际是由队列同步器实现的。ReentrantLock, Sync的依赖关系如下图所示:
1) ReentrantLock依赖于队列同步器Sync对象;
2) Sync对象有两个子类,分别是FairSync和NonfairSync,从名称来看应该是公平锁和非公平锁的实现;
3)ReentrantLock同步由Sync.lock()实现;
4) 使用ReentrantLock默认构造器,创建的是非公平队列同步器NonfairSync。
那么Sync对象是如何实现线程同步的,接着往下看。
1 /** 2 * Performs lock. Try immediate barge, backing up to normal 3 * acquire on failure. 4 */ 5 final void lock() { 6 if (compareAndSetState(0, 1)) 7 setExclusiveOwnerThread(Thread.currentThread()); 8 else 9 acquire(1); 10 }
1 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this 2 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 3 }
进入到NonfairSync.lock()方法,通过注释内容可知,该方法执行锁操作,一个新来的线程,不分先来后到立即先尝试去加锁,如果失败,就执行acquire(1)方法。 compareAndSetState(0, 1)就是CAS操作,即对比-更新,这一步是原子操作,第一个参数是期望值,第二个参数是更新值,意思是:如果现在的值是0,那么就更新为1, 其原子性是底层C代码实现的。这里要更新的值是RenentrantLock对象所持有的抽象队列同步器的状态,代码如下。 当stateOffset的值是0时,更新为1。假设有两个线程同时运行到了下面代码的第3行,由于是原子操作,同一时刻,只可能有一个线程能成功改变stateOffset的值。 假设当前线程成功了,则在AQS中,把当前排他模式的线程持有者设置为当前线程setExclusiveOwnerThread(),如果没有上锁成功,那么就执行acquire()方法.
1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4 selfInterrupt(); 5 }
该方法,以独占模式获取锁,忽略中断。这个方法的原理就是一个线程不断地去尝试锁操作,直到成功,否则就一直阻塞。没有立即成功执行锁操作的线程,会放入一个FIFO队列,队列中的线程会根据相应策略对竞争锁。这个方法做了这么几件事:
1). 先尝试去执行锁才注意tryAquire(arg),如果成功了,就退出方法,线程继续执行下去;
2). 创建一个Node节点,封装当前线程并入AQS队列,即执行addWaiter()方法;
3). 入队后,就会循环地去尝试锁,如果锁成功了就继续执行,没有获取到就阻塞,然后继续尝试锁操作,即执行acquireQueued()方法;
4). 第3)步中,如果返回了true,则表示阻塞的线程被中断了,那么selfInterrupt()的作用是标记当前线程被终端,为什么在这里标记,这是因为线程被中断后,标记会被清除,所以这里重新标记一下。
5). 为什么第3)步中,线程会被中断,这个问题留在之后进入acquireQueued()方法解答
1 final boolean nonfairTryAcquire(int acquires) { 2 final Thread current = Thread.currentThread(); // 获取当前线程 3 int c = getState(); // 获得当前AQS同步器状态 4 if (c == 0) { // 0表示当前锁是释放状态,可以上锁 5 if (compareAndSetState(0, acquires)) { // 使用CAS原子操作更新AQS的state,使AQS对象的状态变成上锁状态 6 setExclusiveOwnerThread(current); // 把当前线程设置为独占线程 7 return true; 8 } 9 } 10 else if (current == getExclusiveOwnerThread()) { // AQS是上锁状态,且当前线程就是AQS中的独占线程 11 int nextc = c + acquires; 12 if (nextc < 0) // overflow 13 throw new Error("Maximum lock count exceeded"); 14 setState(nextc); // 更新AQS状态, 无需用CAS操作,因为只有当前独占线程能进入到这里 15 return true; 16 } 17 return false; 18 }
调用方法tryAcquire(),会调用NonfairSync.nonfairTryAcquire()方法, 返回true的话,表示执行锁操作成功了,返回false,表示执行锁操作失败了。上面的方法每个步骤的作用已经注释了,流程非常清晰,上锁的过程,实际上就是更新AQS的state的过程,另外,注意这里默认是独占锁,所以CAS更新操作成功之后,还会设置当前线程为独占线程,而state,独占线程的引用变量等参数,都是AQS对象持有的,可以看到RenetrantLock的核心就是AQS,而NonfairSync是AQS的子类。
上面执行锁操作,如果执行失败了,那么就会执行addWaiter()方法,这个方法是讲当前线程封装成一个Node,然后将这个Node存入一个FIFO队列:
1 private Node addWaiter(Node mode) { 2 Node node = new Node(Thread.currentThread(), mode); // 封装当前线程为Node对象 3 // Try the fast path of enq; backup to full enq on failure 4 Node pred = tail; // tail是当前队列的队尾节点 5 if (pred != null) { 6 node.prev = pred; // 如果tail节点不是null,就把当前Node放到tail节点后面 7 if (compareAndSetTail(pred, node)) { // 原子操作,更新当前队列的队尾节点为新封装的Node节点 8 pred.next = node; 9 return node; 10 } 11 } 12 enq(node); // 如果tail节点是null,就会调用enq方法,进行必要的队列初始化,然后再把node节点入队操作 13 return node; 14 }
然后看enq(node)方法:
1 /** 2 * Inserts node into queue, initializing if necessary. See picture above. 3 * @param node the node to insert 4 * @return node‘s predecessor 5 */ 6 private Node enq(final Node node) { 7 for (;;) { // 自旋 8 Node t = tail; 9 if (t == null) { // Must initialize 10 if (compareAndSetHead(new Node())) // 如果队列尾节点是null,就初始化队列:1.创建头节点并,此时只有一个节点,头节点也是尾节点,注意,由于这是在一个自旋循环中的操作,所以这一步防止死循环 11 tail = head; 12 } else { // 当自旋操作确保了tail不为null以后,就会在这一步,把封装有当前线程的node节点入队 13 node.prev = t; 14 if (compareAndSetTail(t, node)) { 15 t.next = node; 16 return t; 17 } 18 } 19 } 20 }
以上方法通过自旋+CAS,将封装有当前线程对象的Node节点对象存入AQS对象维护的队列中,必须再次强调,CAS操作是JNI调用底层C代码保证更新操作的原子性,以保证同一时刻最多只能有一个线程成功更新目标值。
上述方法中,compareAndSetHead(new Node())采用原子操作创建的head节点,是一个没有封装任何thread的占位节点,因为这个时候还没有封装有线程的节点占有锁。
此前的种种操作,总结起来其实就是尝试几次上锁,没上锁成功的话,就把当前线程排队。那么线程是怎么实现阻塞等待过程的呢,就是这个方法实现的,具体来说,这里的实现原理是线程每隔一段时间就会去尝试上锁操作,如果上锁成功的话就会继续执行被锁住的代码块,如果上锁失败会继续 阻塞--尝试上锁--阻塞--尝试上锁 这一过程,显然,从这里的描述来看,肯定又用到了自旋+CAS,所以说,ReentrantLock的最终要的算法就是自旋+CAS。
1 /** 2 * Acquires in exclusive uninterruptible mode for thread already in 3 * queue. Used by condition wait methods as well as acquire. 4 * 5 * @param node the node 6 * @param arg the acquire argument 7 * @return {@code true} if interrupted while waiting 8 */ 9 final boolean acquireQueued(final Node node, int arg) { 10 boolean failed = true; 11 try { 12 boolean interrupted = false; 13 for (;;) { 14 final Node p = node.predecessor(); // 当前节点的前置节点 15 if (p == head && tryAcquire(arg)) { // 只有前置节点是head节点的线程,才有资格去竞争锁操作 16 setHead(node); // 当前节点封装的线程成功加锁,就设置为head节点 17 p.next = null; // help GC 18 failed = false; 19 return interrupted; 20 } 21 if (shouldParkAfterFailedAcquire(p, node) && 22 parkAndCheckInterrupt()) // 上一步没有获取锁,就阻塞一会。 23 interrupted = true; 24 } 25 } finally { 26 if (failed) 27 cancelAcquire(node); 28 } 29 }
这是lock()方法的核心,注意,只有封装了当前线程的节点的前置节点是头节点, 才能去争取锁,即tryAcquire(),这个方法在上面已经学习过了,其内部自旋+CAS的实现,保证只能有一个线程成功争取到锁操作。假设争取到了锁,就会把当前线程的节点设置为head节点。
每尝试一次锁操作,如果不成功,该线程便会阻塞一会,这个阻塞功能是在第二个if中的方法实现的shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() , 用自然语言描述就是:
if(要不要阻塞 && 阻塞一会) { 设置中断标志 }
1) 要不要阻塞shouldParkAfterFailedAcquire()
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 int ws = pred.waitStatus; // Node对象的属性,表示当前节点代表的线程的等待状态 3 if (ws == Node.SIGNAL) // 前置节点状态为SIGNAL,表示它获取了同步状态,那么节点就应该阻塞 4 /* 5 * This node has already set status asking a release 6 * to signal it, so it can safely park. 7 */ 8 return true; 9 if (ws > 0) { // 前置节点被cancel了 10 /* 11 * Predecessor was cancelled. Skip over predecessors and 12 * indicate retry. 13 */ 14 do { 15 node.prev = pred = pred.prev; // 连续赋值操作,将pred.prev分别赋值给pred和node.prev,这个操作循环进行,直到找到不是取消状态的前置节点 16 } while (pred.waitStatus > 0); 17 pred.next = node; // 剔除掉取消状态的节点 18 } else { 19 /* 20 * waitStatus must be 0 or PROPAGATE. Indicate that we 21 * need a signal, but don‘t park yet. Caller will need to 22 * retry to make sure it cannot acquire before parking. 23 */ 24 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 设置前任节点为SIGNAL,设置成功,在下一次循环就会阻塞当前节点 25 } 26 return false; 27 }
以上方法主要是判断当前节点是否应该阻塞,如果返回了true,那么就会执行下面的方法实现阻塞
1 /** 2 * Convenience method to park and then check if interrupted 3 * 4 * @return {@code true} if interrupted 5 */ 6 private final boolean parkAndCheckInterrupt() { 7 LockSupport.park(this); // 将线程挂起从而实现当前线程阻塞 8 return Thread.interrupted(); // 返回线程中断状态 9 }
这个方法调用了底层代码实现了线程挂起,具体细节就不再深究了。值得注意的是返回线程状态,在2.4节中我提了一个问题,就是在acquire()方法中,当成功执行了锁操作后,为什么要执行selfInterrupt()操作,这里就可以解答了,LockSupport.part()将线程挂起,支持两种结束方式,一种是前置节点释放锁之后,当前节点正常调用LockSupport.unPark()来取消阻塞,这里不涉及到中断,return Thread.interrupted()就为false;
第二种是LockSupport.park()支持其它线程中断当前线程的阻塞状态,这样返回Thread.interrupted()为true,但是Thread.interrupted()方法,是会清除线程的中断状态标记的,为什么这么做?这样做是为了使这个被中断的线程在acquireQueued方法中,下次阻塞时,忽略线程中断状态。 如果当前线程的前置节点是head节点,那么它成功执行了锁操作之后,返回结果就应该是true ,然后调用selfInterrupt()方法中断当前线程。
1 static void selfInterrupt() { 2 Thread.currentThread().interrupt(); 3 }
ReentrantLock释放锁与上锁是相反的过程,而这一过程的底层,都是调用了AQS的方法。
1 public void unlock() { 2 sync.release(1); 3 }
实际释放锁调用的是AQS的release方法,这个方法如下:
1 public final boolean release(int arg) { 2 if (tryRelease(arg)) { // 尝试释放锁 3 Node h = head; 4 if (h != null && h.waitStatus != 0) 5 unparkSuccessor(h); // 锁释放成功,会将head节点的watistatus设置为0,这样才可以使head节点的后继节点有资格去竞争锁,回忆上面的shouldParkAfterFailedAcquire()方法 6 return true; 7 } 8 return false; 9 }
释放锁过程很简单,就是:
1) 先尝试释放锁,如果成功了,
2) 取到head节点,如果head节点的waitStatus不是0(SIGNAL),
3) 在unparkSuccessor方法中,就是将head节点的waitStatus状态设置为0,以同步AQS状态,使后继节点可以争取锁,并且在这步操作中结束后置节点的阻塞状态
1 protected final boolean tryRelease(int releases) { 2 int c = getState() - releases; // getState()获取到AQS的状态,是一个int值,代表一个线程上锁的次数,减去release代表要释放的锁的个数,这里假设为1 3 if (Thread.currentThread() != getExclusiveOwnerThread()) // 当前线程如果不是独占的,就抛出异常 4 throw new IllegalMonitorStateException(); 5 boolean free = false; 6 if (c == 0) { // c等于0表示AQS同步器没有线程独占,其它线程可以准备竞争上锁了 7 free = true; 8 setExclusiveOwnerThread(null); 9 } 10 setState(c); 11 return free; 12 }
这步操作也比较明确,就是去释放锁,并且更新AQS的状态,如果AQS的state==0,就返回true,以便于通知其它线程准备抢锁。
1 private void unparkSuccessor(Node node) { 2 /* 3 * If status is negative (i.e., possibly needing signal) try 4 * to clear in anticipation of signalling. It is OK if this 5 * fails or if status is changed by waiting thread. 6 */ 7 int ws = node.waitStatus; 8 if (ws < 0) 9 compareAndSetWaitStatus(node, ws, 0); 10 11 /* 12 * Thread to unpark is held in successor, which is normally 13 * just the next node. But if cancelled or apparently null, 14 * traverse backwards from tail to find the actual 15 * non-cancelled successor. 16 */ 17 Node s = node.next; 18 if (s == null || s.waitStatus > 0) { 19 s = null; 20 for (Node t = tail; t != null && t != node; t = t.prev) 21 if (t.waitStatus <= 0) 22 s = t; 23 } 24 if (s != null) 25 LockSupport.unpark(s.thread); 26 }
这个方法中传入的node是head节点,进行以下操作:
1) 将head节点状态设置为0,但是如果这个节点的线程是1(CANCELED),就不去设置
2) 这个head节点所持有的线程既然释放了锁,那么它的后继节点线程就应该解除阻塞状态去获取锁了
3) 该方法中使用for循环找到一个距离当前head节点最近的一个符合条件(不为null,且状态不是CANCELED)的节点
4) 调用LockSupport.unpark()方法,解除该节点线程的阻塞状态
在这里,LockSupport.unpark(s.thread)方法,使用JNI调用了底层代码实现的,不做解释。以上全部内容就结束了,鄙人才疏学浅,是鄙人学习Java过程中记录自己点点滴滴的理解过程,如果有任何错误和疏漏,希望能得到各位的赐教。
标签:add before ati cat sar head 先来 应该 业务
原文地址:https://www.cnblogs.com/yxlaisj/p/12071873.html