标签:读取数据 get 主线程 tle 阻塞 countdown 参数 之间 illegal
借用Java并发编程实践中的话”编写正确的程序并不容易,而编写正常的并发程序就更难了”,相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作的顺序是不可预期的,本文算是对多线程情况下同步策略的一个简单介绍。
线程安全问题是指当多个线程同时读写一个状态变量,并且没有任何同步措施时候,导致脏数据或者其他不可预见的结果的问题。Java中首要的同步策略是使用Synchronized关键字,它提供了可重入的独占锁。
要谈可见性首先需要介绍下多线程处理共享变量时候的Java中内存模型。
Java内存模型规定了所有的变量都存放在主内存中,当线程使用变量时候都是把主内存里面的变量拷贝到了自己的工作空间或者叫做工作内存。
当线程操作一个共享变量时候操作流程为:
那么假如线程A和B同时去处理一个共享变量,会出现什么情况那?
首先他们都会去走上面的三个流程,假如线程A拷贝共享变量到了工作内存,并且已经对数据进行了更新但是还没有更新会主内存(结果可能目前存放在当前cpu的寄存器或者高速缓存),这时候线程B拷贝共享变量到了自己的工作内存进行处理,处理后,线程A才把自己的处理结果更更新到主内存或者缓存,可知 线程B处理的并不是线程A处理后的结果,也就是说线程A处理后的变量值对线程B不可见,这就是共享变量的不可见性问题。
构成共享变量内存不可见原因是因为三步流程不是原子性操作,下面知道使用恰当同步就可以解决这个问题。
我们知道ArrayList是线程不安全的,因为他的读写方法没有同步策略,会导致脏数据和不可预期的结果,下面我们就一一讲解如何解决。
这是线程不安全的 public class ArrayList<E> { public E get(int index) { rangeCheck(index); return elementData(index); } public E set(int index, E element) { rangeCheck(index); E oldValue = elementData(index); elementData[index] = element; return oldValue; } }
假设线程A执行操作Ao和线程B执行操作Bo ,那么从A看,当B线程执行Bo操作时候,那么Bo操作全部执行,要么全部不执行,我们称Ao和Bo操作互为原子性操作,在设计计数器时候一般都是先读取当前值,然后+1,然后更新会变量,是读-改-写的过程,这个过程必须是原子性的操作。
public class ThreadNotSafeCount { private Long value; public Long getCount() { return value; } public void inc() { ++value; } }
如上代码是线程不安全的,因为不能保证++value是原子性操作。方法一是使用Synchronized进行同步如下:
public class ThreadSafeCount { private Long value; public synchronized Long getCount() { return value; } public synchronized void inc() { ++value; } }
注意,这里不能简单的使用volatile修饰value进行同步,因为变量值依赖了当前值
使用Synchronized确实可以实现线程安全,即实现可见性和同步,但是Synchronized是独占锁,没有获取内部锁的线程会被阻塞掉,那么有没有刚好的实现那?答案是肯定的。
原子变量类比锁更轻巧,比如AtomicLong代表了一个Long值,并提供了get,set方法,get,set方法语义和volatile相同,因为AtomicLong内部就是使用了volatile修饰的真正的Long变量。另外提供了原子性的自增自减操作,所以计数器可以改下为:
public class ThreadSafeCount { private AtomicLong value = new AtomicLong(0L); public Long getCount() { return value.get(); } public void inc() { value.incrementAndGet(); } }
那么相比使用synchronized的好处在于原子类操作不会导致线程的挂起和重新调度
,因为他内部使用的是cas的非阻塞算法。
常用的原子类变量为:AtomicLong,AtomicInteger,AtomicBoolean,AtomicReference.
CAS 即CompareAndSet,也就是比较并设置,CAS有三个操作数分别为:内存位置,旧的预期值,新的值,操作含义是当内存位置的变量值为旧的预期值时候使用新的值替换旧的值。通俗的说就是看内存位置的变量值是不是我给的旧的预期值,如果是则使用我给的新的值替换他,如果不是返回给我旧值。这个是处理器提供的一个原子性指令。上面介绍的AtomicLong的自增就是使用这种方式实现:
public final long incrementAndGet() { for (;;) { long current = get();(1) long next = current + 1;(2) if (compareAndSet(current, next))(3) return next; } } public final boolean compareAndSet(long expect, long update) { return unsafe.compareAndSwapLong(this, valueOffset, expect, update); }
假如当前值为1,那么线程A和检查B同时执行到了(3)时候各自的next都是2,current=1,假如线程A先执行了3,那么这个是原子性操作,会把档期值更新为2并且返回1,if判断true所以incrementAndGet返回2.这时候线程B执行3,因为current=1而当前变量实际值为2,所以if判断为false,继续循环,如果没有其他线程去自增变量的话,这次线程B就会更新变量为3然后退出。
这里使用了无限循环使用CAS进行轮询检查,虽然一定程度浪费了cpu资源,但是相比锁来说避免的线程上下文切换和调度。
当一个线程要获取一个被其他线程占用的锁时候,该线程会被阻塞,那么当一个线程再次获取它自己已经获取的锁时候是否会被阻塞那?如果不需要阻塞那么我们说该锁是可重入锁,也就是说只要该线程获取了该锁,那么可以无限制次数进入被该锁锁住的代码。
先看一个例子如果锁不是可重入的,看看会出现什么问题。
public class Hello{ public Synchronized void helloA(){ System.out.println("hello"); } public Synchronized void helloB(){ System.out.println("hello B"); helloA(); } }
如上面代码当调用helloB函数前会先获取内置锁,然后打印输出,然后调用helloA方法,调用前会先去获取内置锁,如果内置锁不是可重入的那么该调用就会导致死锁了,因为线程持有并等待了锁。
实际上内部锁是可重入锁,例如synchronized关键字管理的方法,可重入锁的原理是在锁内部维护了一个线程标示,标示该锁目前被那个线程占用,然后关联一个计数器,一开始计数器值为0,说明该锁没有被任何线程占用,当一个线程获取了该锁,计数器会变成1,其他线程在获取该锁时候发现锁的所有者不是自己所以被阻塞,但是当获取该锁的线程再次获取锁时候发现锁拥有者是自己会把计数器值+1, 当释放锁后计数器会-1,当计数器为0时候,锁里面的线程标示重置为null,这时候阻塞的线程会获取被唤醒来获取该锁。
synchronized块是Java提供的一种强制性内置锁,每个Java对象都可以隐式的充当一个用于同步的锁的功能,这些内置的锁被称为内部锁或者叫监视器锁,执行代码在进入synchronized代码块前会自动获取内部锁,这时候其他线程访问该同步代码块时候会阻塞掉。拿到内部锁的线程会在正常退出同步代码块或者异常抛出后释放内部锁,这时候阻塞掉的线程才能获取内部锁进入同步代码块。
内部锁是一种互斥锁,具体说是同时只有一个线程可以拿到该锁,当一个线程拿到该锁并且没有释放的情况下,其他线程只能等待。
对于上面说的ArrayList可以使用synchronized进行同步来处理可见性问题。
使用synchronized对方法进行同步 public class ArrayList<E> { public synchronized E get(int index) { rangeCheck(index); return elementData(index); } public synchronized E set(int index, E element) { rangeCheck(index); E oldValue = elementData(index); elementData[index] = element; return oldValue; } }
如图当线程A获取内部锁进入同步代码块后,线程B也准备要进入同步块,但是由于A还没释放锁,所以B现在进入等待,使用同步可以保证线程A获取锁到释放锁期间的变量值对B获取锁后都可见。也就是说当B开始执行A执行的代码同步块时候可以看到A操作的所有变量值,这里具体说是当线程B获取b的值时候能够保证获取的值是2。这时因为线程A进入同步块修改变量值后,会在退出同步块前把值刷新到主内存,而线程B在进入同步块前会首先清空本地内存内容,从主内存重新获取变量值,所以实现了可见性。但是要注意一点所有线程使用的是同一个锁。
注意 Synchronized关键字会引起线程上下文切换和线程调度
。
使用synchronized可以实现同步,但是缺点是同时只有一个线程可以访问共享变量,但是正常情况下,对于多个读操作操作共享变量时候是不需要同步的,synchronized时候无法实现多个读线程同时执行,而大部分情况下读操作次数多于写操作,所以这大大降低了并发性,所以出现了ReentrantReadWriteLock,它可以实现读写分离,多个线程同时进行读取,但是最多一个写线程存在。
对于上面的方法现在可以修改为:
public class ArrayList<E> { private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public E get(int index) { Lock readLock = readWriteLock.readLock(); readLock.lock(); try { return list.get(index); } finally { readLock.unlock(); } } public E set(int index, E element) { Lock wirteLock = readWriteLock.writeLock(); wirteLock.lock(); try { return list.set(index, element); } finally { wirteLock.unlock(); } } }
如代码在get方法时候通过 readWriteLock.readLock()获取了读锁,多个线程可以同时获取这读锁,set方法通过readWriteLock.writeLock()获取了写锁,同时只有一个线程可以获取写锁,其他线程在获取写锁时候会阻塞直到写锁被释放。假如一个线程已经获取了读锁,这时候如果一个线程要获取写锁时候要等待直到释放了读锁,如果一个线程获取了写锁,那么所有获取读锁的线程需要等待直到写锁被释放。所以相比synchronized来说运行多个读者同时存在,所以提高了并发量。
注意 需要使用者显示调用Lock与unlock操作
对于避免不可见性问题,Java还提供了一种弱形式的同步,即使用了volatile关键字。该关键字确保了对一个变量的更新对其他线程可见。当一个变量被声明为volatile时候,线程写入时候不会把值缓存在寄存器或者或者在其他地方,当线程读取的时候会从主内存重新获取最新值,而不是使用当前线程的拷贝内存变量值。
volatile虽然提供了可见性保证,但是不能使用他来构建复合的原子性操作,也就是说当一个变量依赖其他变量或者更新变量值时候新值依赖当前老值时候不在适用。与synchronized相似之处在于如图
如图线程A修改了volatile变量b的值,然后线程B读取了改变量值,那么所有A线程在写入变量b值前可见的变量值,在B读取volatile变量b后对线程B都是可见的,图中线程B对A操作的变量a,b的值都可见的。volatile的内存语义和synchronized有类似之处,具体说是说当线程写入了volatile变量值就等价于线程退出synchronized同步块(会把写入到本地内存的变量值同步到主内存),读取volatile变量值就相当于进入同步块(会先清空本地内存变量值,从主内存获取最新值)。
下面的Integer也是线程不安全的,因为没有进行同步措施
public class ThreadNotSafeInteger { private int value; public int get() { return value; } public void set(int value) { this.value = value; } }
使用synchronized关键字进行同步如下:
public class ThreadSafeInteger { private int value; public synchronized int get() { return value; } public synchronized void set(int value) { this.value = value; } }
等价于使用volatile进行同步如下:
public class ThreadSafeInteger { private volatile int value; public int get() { return value; } public void set(int value) { this.value = value; } }
这里使用synchronized和使用volatile是等价的,但是并不是所有情况下都是等价,一般只有满足下面所有条件才能使用volatile
另外 加锁可以同时保证可见性和原子性,而volatile只保证变量值的可见性。
注意 volatile关键字不会引起线程上下文切换和线程调度
。另外volatile还用来解决重排序问题,后面会讲到。
悲观锁,指数据被外界修改持保守态度(悲观),在整个数据处理过程中,将数据处于锁定状态。 悲观锁的实现,往往依靠数据库提供的锁机制 。数据库中实现是对数据记录进行操作前,先给记录加排它锁,如果获取锁失败,则说明数据正在被其他线程修改,则等待或者抛出异常。如果加锁成功,则获取记录,对其修改,然后事务提交后释放排它锁。
一个例子:select * from 表 where .. for update;
悲观锁是先加锁再访问策略,处理加锁会让数据库产生额外的开销,还有增加产生死锁的机会,另外在多个线程只读情况下不会产生数据不一致行问题,没必要使用锁,只会增加系统负载,降低并发性,因为当一个事务锁定了该条记录,其他读该记录的事务只能等待。
乐观锁是相对悲观锁来说的,它认为数据一般情况下不会造成冲突,所以在访问记录前不会加排他锁,而是在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,具体说根据update返回的行数让用户决定如何去做。乐观锁并不会使用数据库提供的锁机制,一般在表添加version字段或者使用业务状态来做。
具体可以参考:https://www.atatech.org/articles/79240
乐观锁直到提交的时候才去锁定,所以不会产生任何锁和死锁。
根据锁能够被单个线程还是多个线程共同持有,锁又分为独占锁和共享锁。独占锁保证任何时候都只有一个线程能读写权限,ReentrantLock就是以独占方式实现的互斥锁。共享锁则可以同时有多个读线程,但最多只能有一个写线程,读和写是互斥的,例如ReadWriteLock读写锁,它允许一个资源可以被多线程同时进行读操作,或者被一个线程 写操作,但两者不能同时进行。
独占锁是一种悲观锁,每次访问资源都先加上互斥锁,这限制了并发性,因为读操作并不会影响数据一致性,而独占锁只允许同时一个线程读取数据,其他线程必须等待当前线程释放锁才能进行读取。
共享锁则是一种乐观锁,它放宽了加锁的条件,允许多个线程同时进行读操作。
根据线程获取锁的抢占机制锁可以分为公平锁和非公平锁,公平锁表示线程获取锁的顺序是按照线程加锁的时间多少来决定的,也就是最早加锁的线程将最早获取锁,也就是先来先得的FIFO顺序。而非公平锁则运行闯入,也就是先来不一定先得。
ReentrantLock提供了公平和非公平锁的实现:
公平锁ReentrantLock pairLock = new ReentrantLock(true);
非公平锁 ReentrantLock pairLock = new ReentrantLock(false);
如果构造函数不传递参数,则默认是非公平锁。
在没有公平性需求的前提下尽量使用非公平锁,因为公平锁会带来性能开销。
假设线程A已经持有了锁,这时候线程B请求该锁将会被挂起,当线程A释放锁后,假如当前有线程C也需要获取该锁,如果采用非公平锁方式,则根据线程调度策略线程B和C两者之一可能获取锁,这时候不需要任何其他干涉,如果使用公平锁则需要把C挂起,让B获取当前锁。
AbstractQueuedSynchronizer提供了一个队列,大多数开发者可能从来不会直接用到AQS,AQS有个变量用来存放状态信息 state,可以通过protected的getState,setState,compareAndSetState函数进行调用。对于ReentrantLock来说,state可以用来表示该线程获可重入锁的次数,semaphore来说state用来表示当前可用信号的个数,FutuerTask用来表示任务状态(例如还没开始,运行,完成,取消)。
public class Test { private static final int ThreadNum = 10; public static void main(String[] args) { //创建一个CountDownLatch实例,管理计数为ThreadNum CountDownLatch countDownLatch = new CountDownLatch(ThreadNum); //创建一个固定大小的线程池 ExecutorService executor = Executors.newFixedThreadPool(ThreadNum); //添加线程到线程池 for(int i =0;i<ThreadNum;++i){ executor.execute(new Person(countDownLatch, i+1)); } System.out.println("开始等待全员签到..."); try { //等待所有线程执行完毕 countDownLatch.await(); System.out.println("签到完毕,开始吃饭"); } catch (InterruptedException e) { e.printStackTrace(); }finally { executor.shutdown(); } } static class Person implements Runnable{ private CountDownLatch countDownLatch; private int index; public Person(CountDownLatch cdl,int index){ this.countDownLatch = cdl; this.index = index; } @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("person " + index +"签到"); //线程执行完毕,计数器减一 countDownLatch.countDown(); } } }
如上代码,创建一个线程池和CountDownLatch实例,每个线程通过构造函数传入CountDownLatch的实例,主线程通过await等待线程池里面线程任务全部执行完毕,子线程则执行完毕后调用countDown计数器减一,等所有子线程执行完毕后,主线程的await才会返回。
先看下类图:
可知CountDownLatch内部还是使用AQS实现的。
首先通过构造函数初始化AQS的状态值
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); }
然后看下await方法:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //如果线程被中断则抛异常 if (Thread.interrupted()) throw new InterruptedException(); //尝试看当前是否计数值为0,为0则直接返回,否者进入队列等待 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
如果tryAcquireShared返回-1则 进入doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //加入队列状态为共享节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //如果多个线程调用了await被放入队列则一个个返回。 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //shouldParkAfterFailedAcquire会把当前节点状态变为SIGNAL类型,然后调用park方法把当先线程挂起, if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
调用await后,当前线程会被阻塞,直到所有子线程调用了countdown方法,并在计数为0时候调用该线程unpark方法激活线程,然后该线程重新tryAcquireShared会返回1。
然后看下 countDown方法:
//委托给sync public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
首先看下tryReleaseShared
protected boolean tryReleaseShared(int releases) { //循环进行cas,直到当前线程成功完成cas使计数值(状态值state)减一更新到state for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
该函数一直返回false直到当前计数器为0时候才返回true。
返回true后会调用doReleaseShared,该函数主要作用是调用uppark方法激活调用await的线程,代码如下:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //节点类型为SIGNAL,把类型在通过cas设置回去,然后调用unpark激活调用await的线程 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // loop on failed CAS continue; } // loop if head changed if (h == head) break; } }
激活主线程后,主线程会在调用tryAcquireShared获取锁。
先上类图:
可知ReentrantLock最终还是使用AQS来实现,并且根据参数决定内部是公平还是非公平锁,默认是非公平锁
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
加锁代码:
public void lock() { sync.lock(); }
先看Lock方法:
lock方法最终调用FairSync重写的tryAcquire方法
protected final boolean tryAcquire(int acquires) { //获取当前线程和状态值 final Thread current = Thread.currentThread(); int c = getState(); //状态为0说明该锁未被任何线程持有 if (c == 0) { //为了实现公平,首先看队列里面是否有节点,有的话再看节点所属线程是不是当前线程,是的话 //hasQueuedPredecessors返回false,然后使用原子操作compareAndSetState保证一个线程更新状态为1, //设置排他锁归属为当前线程。其他线程通过cass则返回false. if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //状态不为0说明该锁已经被线程持有,则看是否是当前线程持有,是则重入锁次数+1. else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平性保证代码:
public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
再看看unLock方法,最终调用了Sync的tryRelease方法:
protected final boolean tryRelease(int releases) { //如果不是锁持有者调用UNlock则抛出异常。 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //如果当前可重入次数为0,则清空锁持有线程 if (c == 0) { free = true; setExclusiveOwnerThread(null); } //设置可重入次数为原始值-1 setState(c); return free; }
final void lock() { //如果当前锁空闲0,则设置状态为1,并且设置当前线程为锁持有者 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //调用重写的tryAcquire方法->nonfairTryAcquire方法 acquire(1); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//状态为0说明没有线程持有该锁 if (compareAndSetState(0, acquires)) {//cas原子性操作,保证只有一个线程可以设置状态 setExclusiveOwnerThread(current);//设置锁所有者 return true; } }//如果当前线程是锁持有者则可重入锁计数+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
可知公平与非公平都是先执行tryAcquire尝试获取锁,如果成功则直接获取锁,如果不成功则把当前线程放入队列。对于放入队列里面的第一个线程A在unpark后会进行自旋调用tryAcquire尝试获取锁,假如这时候有一个线程B执行了lock操作,那么也会调用tryAcquire方法尝试获取锁,但是线程B并不在队列里面,但是线程B有可能比线程A优先获取到锁,也就是说虽然线程A先请求的锁,但是却有可能没有B先获取锁,这是非公平锁实现。而公平锁要保证线程A要比线程B先获取锁。所以公平锁相比非公平锁在tryAcquire里面添加了hasQueuedPredecessors方法用来保证公平性。
如图读写锁内部维护了一个ReadLock和WriteLock,并且也提供了公平和非公平的实现,下面只介绍下非公平的读写锁实现。我们知道AQS里面只维护了一个state状态,而ReentrantReadWriteLock则需要维护读状态和写状态,一个state是无法表示写和读状态的。所以ReentrantReadWriteLock使用state的高16位表示读状态也就是读线程的个数,低16位表示写锁可重入量。
static final int SHARED_SHIFT = 16; 共享锁(读锁)状态单位值65536 static final int SHARED_UNIT = (1 << SHARED_SHIFT); 共享锁线程最大个数65535 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; 排它锁(写锁)掩码 二进制 15个1 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** 返回读锁线程数 */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** 返回写锁可重入个数 */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
lock 获取锁 对应写锁只需要分析下Sync的tryAcquire和tryRelease
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //c!=0说明读锁或者写锁已经被某线程获取 if (c != 0) { //w=0说明已经有线程获取了读锁或者w!=0并且当前线程不是写锁拥有者,则返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //说明某线程获取了写锁,判断可重入个数 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 设置可重入数量(1) setState(c + acquires); return true; } //写线程获取写锁 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
unlock 释放锁\
protected final boolean tryRelease(int releases) { // 看是否是写锁拥有者调用的unlock if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //获取可重入值,这里没有考虑高16位,因为写锁时候读锁状态值肯定为0 int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; //如果写锁可重入值为0则释放锁,否者只是简单更新状态值。 if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
对应读锁只需要分析下Sync的tryAcquireShared和tryReleaseShared
lock 获取锁
protected final int tryAcquireShared(int unused) { //获取当前状态值 Thread current = Thread.currentThread(); int c = getState(); //如果写锁计数不为0说明已经有线程获取了写锁,然后看是不是当前线程获取的写锁。 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //获取读锁计数 int r = sharedCount(c); //尝试获取锁,多个读线程只有一个会成功,不成功的进入下面fullTryAcquireShared进行重试 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
unlock 释放锁
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //循环直到自己的读计数-1 cas更新成功 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序可以保证最终执行的结果是与程序顺序执行的结果一致,并且只会对不存在数据依赖性的指令进行重排序,这个重排序在单线程下对最终执行结果是没有影响的,
但是在多线程下就会存在问题。
一个例子
int a = 1;(1) int b = 2;(2) int c= a + b;(3)
如上c的值依赖a和b的值,所以重排序后能够保证(3)的操作在(2)(1)之后,但是(1)(2)谁先执行就不一定了,这在单线程下不会存在问题,因为并不影响最终结果。
一个多线程例子
public static class ReadThread extends Thread { public void run() { while(!Thread.currentThread().isInterrupted()){ if(ready){(1) System.out.println(num+num);(2) } System.out.println("read thread...."); } } } public static class Writethread extends Thread { public void run() { num = 2;(3) ready = true;(4) System.out.println("writeThread set over..."); } } private static int num =0; private static boolean ready = false; public static void main(String[] args) throws InterruptedException { ReadThread rt = new ReadThread(); rt.start(); Writethread wt = new Writethread(); wt.start(); Thread.sleep(10); rt.interrupt(); System.out.println("main exit"); }
如代码由于(1)(2)(3)(4) 之间不存在依赖,所以写线程(3)(4)可能被重排序为先执行(4)在执行(3),那么执行(4)后,读线程可能已经执行了(1)操作,并且在(3)执行前开始执行(2)操作,这时候打印结果为0而不是4.
解决:使用volatile 修饰ready可以避免重排序。
Java中断机制是一种线程间协作模式,通过中断并不能直接终止另一个线程,而是需要被中断的线程根据中断状态自行处理。
例如当线程A运行时,线程B可以调用A的 interrupt()方法来设置中断标志为true,并立即返回。设置标志仅仅是设置标志,线程A并没有实际被中断,会继续往下执行的,然后线程A可以调用isInterrupted方法来看自己是不是被中断了,返回true说明自己被别的线程中断了,然后根据状态来决定是否终止自己活或者干些其他事情。
Interrupted经典使用代码
public void run(){ try{ //线程退出条件 while(!Thread.currentThread().isInterrupted()&& more work to do){ // do more work; } } catch(InterruptedException e){ // thread was interrupted during sleep or wait } finally{ // cleanup, if required } }
使用场景:
public static class MyThread extends Thread { public void run() { while (!Thread.currentThread().isInterrupted()) { System.out.println("do Someing...."); } } } public static void main(String[] args) throws InterruptedException { MyThread t = new MyThread(); t.start(); Thread.sleep(1000); t.interrupt(); }
public static class SleepInterrupt extends Object implements Runnable{ public void run(){ try{ System.out.println("thread-sleep for 2000 seconds"); Thread.sleep(2000000); System.out.println("thread -waked up"); }catch(InterruptedException e){ System.out.println("thread-interrupted while sleeping"); return; } System.out.println("thread-leaving normally"); } } public static void main(String[] args) throws InterruptedException { SleepInterrupt si = new SleepInterrupt(); Thread t = new Thread(si); t.start(); //主线程休眠2秒,从而确保刚才启动的线程有机会执行一段时间 try { Thread.sleep(2000); }catch(InterruptedException e){ e.printStackTrace(); } System.out.println("main() - interrupting other thread"); //中断线程t t.interrupt(); System.out.println("main() - leaving"); }
InterruptedException的处理
如果抛出 InterruptedException那么就意味着抛出异常的方法是阻塞方法,比如Thread.sleep,wait,join。
那么接受到异常后如何处理的,醉简单的是直接catch掉,不做任何处理,但是中断发生一般是为了取消任务或者退出线程来使用的,所以如果直接catch掉那么就会失去做这些处理的时机,出发你能确定不需要根据中断条件做其他事情。
static class Task implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("子线程在进行计算"); Thread.sleep(1000); int sum = 0; for (int i = 0; i < 100; i++) sum += i; return sum; } } public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); executor.submit(futureTask); System.out.println("主线程在执行任务"); try { System.out.println("task运行结果" + futureTask.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("所有任务执行完毕"); executor.shutdown(); }
如上代码主线程会在futureTask.get()出阻塞直到task任务执行完毕,并且会返回结果。
先看下类图结构
FutureTask 内部有一个state用来展示任务的状态,并且是volatile修饰的:
/** Possible state transitions: * NEW -> COMPLETING -> NORMAL 正常的状态转移 * NEW -> COMPLETING -> EXCEPTIONAL 异常 * NEW -> CANCELLED 取消 * NEW -> INTERRUPTING -> INTERRUPTED 中断 */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
其中构造FutureTask实例时候状态为new
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; }
把FutureTask提交到线程池或者线程执行start时候会调用run方法:
public void run() { //如果当前不是new状态,或者当前cas设置当前线程失败则返回,只有一个线程可以成功。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { //当前状态为new 则调用任务的call方法执行任务 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex);完成NEW -> COMPLETING -> EXCEPTIONAL 状态转移 } //执行任务成功则保存结果更新状态,unpark所有等待线程。 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { //状态从new->COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //状态从COMPLETING-》NORMAL UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //unpark所有等待线程。 finishCompletion(); } }
任务提交后,会调用 get方法获取结果,这个get方法是阻塞的。
public V get() throws InterruptedException, ExecutionException { int s = state; //如果当前状态是new或者COMPLETING则等待,因为位normal或者exceptional时候才说明数据计算完成了。 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //如果被中断,则抛异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } //组建单列表 int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); //超时则返回 if (nanos <= 0L) { removeWaiter(q); return state; } //否者设置park超时时间 LockSupport.parkNanos(this, nanos); } else //直接挂起当前线程 LockSupport.park(this); } }
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
在submit任务后还可以调用futuretask的cancel来取消任务:
public boolean cancel(boolean mayInterruptIfRunning) { //只有任务是new的才能取消 if (state != NEW) return false; //运行时允许中断 if (mayInterruptIfRunning) { //完成new->INTERRUPTING if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); //完成INTERRUPTING->INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } //不允许中断则直接new->CANCELLED else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }
翻看ConcurrentHashMap的源码知道ConcurrentHashMap使用分离锁,整个map分段segment,每个segments是继承了ReentrantLock,使用ReentrantLock的独占锁用来控制同一个段只能有一个线程进行写,但是不同段可以多个线程同时写。另外无论是段内还是段外多个线程都可以同时读取,因为他使用了volatile语义的读,并没加锁。并且当前段有写线程时候,该段也允许多个读线程存在。
put的大概逻辑,首先计算key的hash值,然后根据一定算法(位移和与操作)计算出该元素应该放到那个segment,然后调用segment.put方法,该方法里面使用ReentrantLock进行写控制,第一个线程tryLock获取锁进行写入,其他写线程则自旋调用tryLock 循环尝试。
get的大概逻辑,使用UNSAFE.getObjectVolatile 在不加锁情况下获取volatile语义的值。
标签:读取数据 get 主线程 tle 阻塞 countdown 参数 之间 illegal
原文地址:http://www.cnblogs.com/leemz-coding/p/7457258.html