标签:场景 线程等待 通过 rlock 实现 个数 override tee cat
java.util.concurrent
包是JDK5开始提供的一套并发编程包,其中包含了大量和多线程开发相关的工具类,大大简化了Java的多线程开发,在高并发分布式场景下应用广泛。
要求:
本身是一种队列数据结构,比普通队列多了阻塞机制,所以称之为阻塞式队列。当队列满时,调用put向队列写入数据,操作会被阻塞,直到队列有空间;当队列空时,调用take从队列读取数据,操作会被阻塞,直到队列有数据。通过上面的阻塞机制可以在多个线程之间进行存取队列的操作,而不会有线程并发安全问题,可以简理解为阻塞式队列是专门设计用来在多个线程间共享数据。BlockingQueue是个接口,其实现类有基于数组的实现类ArrayBlockingQueue
ArrayBlockingQueue:基于数组实现的有界的阻塞队列,实例化时必须指定容量。
LinkedBlockingQueue:基于链表,容量可以不指定,默认容量为Integer.MAX_VALUE
插入操作:
boolean add(E e)
:当队列满时,会抛出运行时异常IllegalStateExceptionvoid put(E e) throws InterruptedException
:当队列满时,会阻塞直到队列有空间,需要处理非运行时异常InterruptedExceptionboolean offer(E e)
:插入成功返回true,队列满插入失败直接返回false,不会抛异常boolean offer(E e, long timeout, TimeUnit unit)
:指定超时时间,在超时时间内插入成功返回true,否则返回flase移除操作:
E remove()
:当队列空时,会抛出运行时异常NoSuchElementExceptionE take() throws InterruptedException
:当队列空时,会阻塞,需要处理非运行时异常InterruptedExceptionE poll()
:当队列空时,返回null,不会阻塞也不会抛出异常E poll(long timeout, TimeUnit unit) throws InterruptedException
:当队列空时,返回null,不会阻塞但需要处理非运行时异常InterruptedException检查操作:
E element()
:当队列空时,会抛出运行时异常NoSuchElementExceptionE peek()
:当队列空时,直接返回null生产者消费者示例代码:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
new Thread(new Customer(queue)).start();
new Thread(new Customer(queue)).start();
new Thread(new Producer(queue)).start();
}
}
class Customer implements Runnable {
private BlockingQueue<Integer> queue;
public Customer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer msg = queue.take();
System.out.println("消费:" + msg);
Thread.sleep(2000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
int count = 0;
while (true) {
Thread.sleep(1000);
System.out.println("生产:" + count);
queue.put(count++);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ConcurrentMap是一个线程安全的Map,可以防止多线程并发安全问题,Hashtable也是线程安全的,但ConcurrentMap性能更好,更推荐ConcurrentMap。ConcurrentMap是一个接口,Java提供了两个实现类ConcurrentHashMap和ConcurrentSkipListMap。
ConcurrentMap性能好的原因:
CountDownLatch是Councurrent包提供的一种新的并发构造,可以协调线程的执行过程,实现协调某个线程阻塞直到其它若干线程执行达到一定条件才放开阻塞继续执行的效果(一个线程等待其它多个线程达到指定条件,才会继续执行)。
API:
CountDownLatch(int count)
:构造方法,需要传入一个数字作为闭锁的计数器初始值来构造闭锁;void await()
:调用此方法可以阻塞当前线程,直到闭锁中的计数器count值为0,才会自动放开阻塞;void countDown()
:调用此方法可以将闭锁中的计数器自减1;做饭的示例代码:
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo1 {
public static void main(String[] args) {
// 创建闭锁,计数器初始值设置为3
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(new ZuoFan(countDownLatch)).start();
new Thread(new MaiGuo(countDownLatch)).start();
new Thread(new MaiCai(countDownLatch)).start();
new Thread(new MaiMi(countDownLatch)).start();
}
}
class ZuoFan implements Runnable {
private CountDownLatch cdl;
public ZuoFan(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
try {
System.out.println("准备做饭...");
// 等待计数器变为0
cdl.await();
System.out.println("开始做饭...");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("饭已经做好!");
}
}
class MaiGuo implements Runnable {
private CountDownLatch cdl;
public MaiGuo(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
System.out.println("去买锅...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("锅已经买回!");
// 计数器自减1
cdl.countDown();
}
}
class MaiMi implements Runnable {
private CountDownLatch cdl;
public MaiMi(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
System.out.println("去买米...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("米已经买回!");
// 计数器自减1
cdl.countDown();
}
}
class MaiCai implements Runnable {
private CountDownLatch cdl;
public MaiCai(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
System.out.println("去买菜...");
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("菜已经买回!");
// 计数器自减1
cdl.countDown();
}
}
输出:
准备做饭...
去买锅...
去买菜...
去买米...
锅已经买回!
米已经买回!
菜已经买回!
开始做饭...
饭已经做好!
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。即:N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。CyclicBarrier是当await的数量到达了设置的数量的时候,才会继续往下面执行,CyclicBarrier计数达到指定值时,计数置为0重新开始。
API:
-public CyclicBarrier(int parties)
:构造方法,接收一个初始值,指定栅栏要等待的线程数量;
-public int await()
:当线程到达指定位置时,可以调用此方法进入阻塞等待状态,直到等待的线程数达到了栅栏设定的线程数时,所以线程的阻塞同时被放开,一起继续执行;
赛马示例代码:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo1 {
public static void main(String[] args) {
// 指定计数器值为5,当5个线程全部进行就绪后,所有线程才会继续向下执行
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
new Thread(new Horse(cyclicBarrier)).start();
new Thread(new Horse(cyclicBarrier)).start();
new Thread(new Horse(cyclicBarrier)).start();
new Thread(new Horse(cyclicBarrier)).start();
new Thread(new Horse(cyclicBarrier)).start();
}
}
class Horse implements Runnable {
private CyclicBarrier cb;
public Horse(CyclicBarrier cb) {
this.cb = cb;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " - 已经到栅栏位置...");
// 等待所有线程就绪
cb.await();
System.out.println(Thread.currentThread().getName() + " - 开始赛马!");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
输出:
Thread-1 - 已经到栅栏位置...
Thread-3 - 已经到栅栏位置...
Thread-0 - 已经到栅栏位置...
Thread-4 - 已经到栅栏位置...
Thread-2 - 已经到栅栏位置...
Thread-2 - 开始赛马!
Thread-0 - 开始赛马!
Thread-4 - 开始赛马!
Thread-3 - 开始赛马!
Thread-1 - 开始赛马!
CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
实现两个线程交换对象的效果,先到达的线程会产生阻塞,等待后续到来的线程,直到两个线程都到达交换机互换对象后,两个线程才会放开阻塞继续执行。
API:
public Exchanger()
:构造方法;public void exchange(V x)
:交换机交换对象的方法,先到的线程调用此方法会进入阻塞状态,直到另一个线程到达也调用此方法,交换对象,两个线程阻塞才会放开。示例代码:
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<String>();
new Thread(new A(exchanger)).start();
new Thread(new B(exchanger)).start();
}
}
class A implements Runnable {
private Exchanger<String> exchanger;
public A(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("A启动...");
Thread.sleep(2000);
System.out.println("A交换对象");
String msg = exchanger.exchange("北冥有鱼,其名为鲲。鲲之大,一锅钝不下!");
System.out.println("接到B的消息:" + msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class B implements Runnable {
private Exchanger<String> exchanger;
public B(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("B启动...");
Thread.sleep(1000);
System.out.println("B交换对象");
String msg = exchanger.exchange("化而为鸟,其名为鹏。鹏之大,需要两个烧烤架!");
System.out.println("接到A的消息:" + msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Concurrent包中提供的一个并发构造,可以在创建信号量时指定信号量的初始数量,后续可以调用acquire()来获取信号量,通过release()释放信号量,如果某一个时刻,信号量被取完,再调用acquire()方法时,该方法将会产生阻塞,直到有其它线程调用release()将信号量释放回来。
使用场景:
API:
Semaphore(int permits)
:构造方法,传入信号量初始值Semaphore(int permits, boolean fair)
:构造方法,可选传入一个布尔值,可设定当前信号量是否启用公平策略,默认不启用void acquire()
:获取信号量,如果信号量为0,则些方法将会阻塞,直到其它线程释放信号量,信号量计数器自减1void release()
:释放信号量,信号量计数器自增1限制同时只能5个线程执行代码:
import java.util.concurrent.Semaphore;
public class SemaphoreDemo1 {
public static void main(String[] args) {
// 创建信号量,同时允许5个线程执行
Semaphore semaphore = new Semaphore(5);
new Thread(new Cust(semaphore)).start();
new Thread(new Cust(semaphore)).start();
new Thread(new Cust(semaphore)).start();
new Thread(new Cust(semaphore)).start();
new Thread(new Cust(semaphore)).start();
new Thread(new Cust(semaphore)).start();
new Thread(new Cust(semaphore)).start();
}
}
class Cust implements Runnable {
private Semaphore semaphore;
public Cust(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "等待执行...");
// 获取信号量,如果获取不到信号量,发生阻塞,直到获得信号量
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "获得信号量,开始执行...");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + "执行完成...");
// 执行完成,释放信号量
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出:
Thread-0等待执行...
Thread-0获得信号量,开始执行...
Thread-1等待执行...
Thread-1获得信号量,开始执行...
Thread-2等待执行...
Thread-2获得信号量,开始执行...
Thread-4等待执行...
Thread-4获得信号量,开始执行...
Thread-3等待执行...
Thread-3获得信号量,开始执行...
Thread-5等待执行...
Thread-6等待执行...
Thread-0执行完成...
Thread-5获得信号量,开始执行...
Thread-4执行完成...
Thread-6获得信号量,开始执行...
Thread-3执行完成...
Thread-2执行完成...
Thread-1执行完成...
Thread-5执行完成...
Thread-6执行完成...
两个线程之间发送信号:
import java.util.concurrent.Semaphore;
public class SemaphoreDemo2 {
public static void main(String[] args) {
// 创建信号量,同时允许1个线程执行
Semaphore semaphore = new Semaphore(1);
new Thread(new Mast(semaphore)).start();
new Thread(new Slave(semaphore)).start();
}
}
class Mast implements Runnable {
private Semaphore semaphore;
public Mast(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
try {
System.out.println(this.getClass().getSimpleName() + "等待执行...");
// 获取信号量,如果获取不到信号量,发生阻塞,直到获得信号量
semaphore.acquire();
System.out.println(this.getClass().getSimpleName() + "获得信号量,开始执行...");
Thread.sleep(2000);
System.out.println(this.getClass().getSimpleName() + "执行完成...");
// 执行完成,释放信号量
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Slave implements Runnable {
private Semaphore semaphore;
public Slave(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
try {
System.out.println(this.getClass().getSimpleName() + "等待执行...");
// 获取信号量,如果获取不到信号量,发生阻塞,直到获得信号量
semaphore.acquire();
System.out.println(this.getClass().getSimpleName() + "获得信号量,开始执行...");
Thread.sleep(2000);
System.out.println(this.getClass().getSimpleName() + "执行完成...");
// 执行完成,释放信号量
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出:
Mast等待执行...
Slave等待执行...
Mast获得信号量,开始执行...
Mast执行完成...
Slave获得信号量,开始执行...
Slave执行完成...
concurrent包下提供的一个接口ExcecutorService
,主要用来实现线程池。所谓池就是用来重用对象的一个集合,可以减少对象的创建和销毁,提高效率。而线程本身就是一个重量级的对象,线程的创建和销毁都是非常耗费资源和时间,所以如果需要频繁使用大量线程,不建议每次都创建线程销毁线程,而应该利用线程池的机制,实现线程对象的共享,提升程序的效率。Java提供了ExcecutorService
接口的实现类ThreadPoolExecutor
,和创建线程池的工具类Executors
。
API:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
:构造方法
int corePoolSize
:线程池核心线程数量int maximumPoolSize
:线程池最大线程数量long keepAliveTime
:大于核心线程数量的线程缓存时间,超过缓时间会自动销毁TimeUnit unit
:缓存时间单位BlockingQueue<Runnable> workQueue
:一个任务队列,协调多线程任务RejectedExecutionHandler handler
:拒绝任务处理器,当线程池达到最大线程,且任务队列满时,调用拒绝处理拒绝处理新任务void execute(Runnable command)
:向线程池提交普通任务,传入一个Runnable接口的实现类对象,这种方法提交的任务无法监控线程的执行,也无法在线程内向调用者返回处理结果;Future<?> submit(Runnable task)
:通过传入一个Runnable接口的实现类对象的方式向线程池提交任务,此方法会返回一个Future对象,通过Future对象的get的方法来检测线程是否执行结束,如果线程未执行结束get方法将会阻塞;<T> Future<T> submit(Callable<T> task)
:通过传入一个Callable接口的实现类对象的方式向线程池提交任务,此方法会返回一个Future对象,通过Future对象的get的方法来获取任务的处理的返回值,如果线程未执行结束get方法将会阻塞,直到线程执行成完,返回结果;<T> T invokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
:提交一个任务集合,当其中任意一个任务执行完成,将其结果返回,同时将其余任务中止<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
:提交一个任务集合,返回保存任务状态和结果的Future集合对象void shutdown()
:将正在执行的任务和正在等待执行的任务全部执行完成后,关闭线程池List<Runnable> shutdownNow()
:尝试停止所有正在执行的任务,停止等待执行的任务,并返回等待执行的任务列表。通过Executors工具类的静态方法创建线程池:
Executors.newCachedThreadPool();
Executors.newSingleThreadExecutor();
Executors.newFixedThreadPool(10);
线程池的工作方式:
测试代码:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
int corePoolSize = 5;
int maximumPoolSize = 10;
long keepAliveTime = 1000;
TimeUnit unit = TimeUnit.MILLISECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(10);
// 手动创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
unit, workQueue, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("线程不够用了,无法处理此任务!");
}
});
// 使用工具类Executors创建线程池
/*
* int corePoolSize = 0; int maximumPoolSize = Integer.MAX_VALUE; long
* keepAliveTime = 60; TimeUnit unit = TimeUnit.SECONDS;
*/
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
/*
* 创建一个线程的线程池 int corePoolSize = 1; int maximumPoolSize = 1; long
* keepAliveTime = 0; TimeUnit unit = TimeUnit.MILLISECONDS;
* BlockingQueue<Runnable> workQueue = new
* LinkedBlockingQueue<Runnable>()
*/
ExecutorService newSingleThreadPool = Executors.newSingleThreadExecutor();
/*
* 创建固定大小线程池 int corePoolSize = n; int maximumPoolSize = n; long
* keepAliveTime = 0; TimeUnit unit = TimeUnit.MILLISECONDS;
* BlockingQueue<Runnable> workQueue = new
* LinkedBlockingQueue<Runnable>()
*/
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
System.out.println("---------- 使用execute方法执行任务,适用于执行无返回值的任务 ----------");
// 使用execute方法执行任务,适用于执行无返回值的任务
newFixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("无返回值任务开始执行...");
Thread.sleep(2000);
System.out.println("无返回值任务执行完毕!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("---------- 使用submit方法执行任务,适用于执行自定义返回值的任务 ----------");
// 使用submit方法执行任务,适用于执行自定义返回值的任务
Future<?> futureForCustomResult = newFixedThreadPool.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("自定义返回值任务开始执行...");
Thread.sleep(2000);
System.out.println("自定义返回值任务执行完毕!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, /* 自定义的返回值 */ "自定义的返回值");
Object result;
try {
result = futureForCustomResult.get();
System.out.println("自定义返回值任务返回任务处理结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("---------- 使用submit方法执行任务,适用于执行有返回值的任务 ----------");
// 使用submit方法执行任务,用于任务有返回值的场景
Future<String> futureForSubmitRunnable = newFixedThreadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
try {
System.out.println("有返回值任务开始执行...");
Thread.sleep(2000);
System.out.println("有返回值任务执行完毕!");
return "线程返回的数据...";
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
});
try {
String rs = futureForSubmitRunnable.get();
System.out.println("有返回值任务返回结果:" + rs);
} catch (Exception e) {
e.printStackTrace();
}
// 构建批量任务
ArrayList<Callable<String>> taskList = new ArrayList<>();
taskList.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("aaa start...");
Thread.sleep(2000);
System.out.println("aaa end...");
return "aaa";
}
});
taskList.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("bbb start...");
Thread.sleep(1000);
System.out.println("bbb end...");
return "bbb";
}
});
taskList.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("ccc start...");
Thread.sleep(3000);
System.out.println("ccc end...");
return "ccc";
}
});
System.out.println("---------- 执行批量任务,当其一个任务执行完成后,将其结果返回值,同时调用interrupt方法中断其它任务 ----------");
// invokeAny 当其一个任务执行完成后,将其结果返回值,同时调用interrupt方法中断其它任务。
try {
String anyResult = newFixedThreadPool.invokeAny(taskList);
System.out.println("第一个任务返回的结果:" + anyResult);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("---------- 执行批量任务 ----------");
// invokeAll等线程任务执行完毕后,取得全部任务的结果值
try {
List<Future<String>> futureList = newFixedThreadPool.invokeAll(taskList);
for (Future<String> future : futureList) {
String rs = future.get();
System.out.println("批量执行任务结果:" + rs);
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("--------- 关闭线程池 ----------");
// 线程池不再接收新的任务,线程池中的任务全部执行完成后关闭线程池
newFixedThreadPool.shutdown();
// 立即关闭线程池,正在执行的线程有可以被意外中断
newCachedThreadPool.shutdownNow();
newSingleThreadPool.shutdownNow();
}
}
java.util.concurrent.locks.Lock是一个类似于synchronized块的线程同步机制,但是Lock比synchronized块更灵活、更精细、更简单。Java提供了实现类ReentrantLock、ReentrantReadWriteLock。
API:
ReentrantLock()
:无参构造ReentrantLock(boolean fair)
:传入一个true,可以创建一个公平策略的ReentrantLockvoid lock()
:获取锁,阻塞直到成功拿到锁boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException
:获取锁,不阻塞,拿到锁返回true,没有拿到返回falsevoid unlock()
:试图释放当前锁Lock和syncronized的区别:
读写锁(ReentrantReadWriteLock):对于多线程并发安全问题,其实只在涉及到并发写的时候才会发生,多个并发的读并不会有线程安全问题,所以在concurrent包中提供了读写锁的机制,可以实现区分读锁和写锁来进行并发控制,多个读锁可以共存,而写锁和任意锁都不可共存,从而实现多个并发读并行执行提升效率,而任意时刻写操作都进行隔离,保证安全,这是一种非常高效而精细的锁机制。
示例代码:
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class LockDemo {
public static String name = "Sherlock";
public static String gender = "male";
public static void main(String[] args) {
// 实例化读写锁
ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
new Thread(new ChangeThread(reentrantLock)).start();
new Thread(new PrintThread(reentrantLock)).start();
}
}
class ChangeThread implements Runnable {
private ReentrantReadWriteLock lock;
public ChangeThread(ReentrantReadWriteLock lock) {
this.lock = lock;
}
@Override
public void run() {
while (true) {
// 获取写锁,如果获取不到锁,将会阻塞,直到得到锁,写锁不能与其它锁共存
lock.writeLock().lock();
if ("Sherlock".equals(LockDemo.name)) {
LockDemo.name = "Watson";
LockDemo.gender = "female";
} else {
LockDemo.name = "Sherlock";
LockDemo.gender = "male";
}
// 释放写锁
lock.writeLock().unlock();
}
}
}
class PrintThread implements Runnable {
private ReentrantReadWriteLock lock;
public PrintThread(ReentrantReadWriteLock lock) {
this.lock = lock;
}
@Override
public void run() {
while (true) {
// 获取读锁,如果获取不到锁,将会阻塞,读锁可以与读锁共存
lock.readLock().lock();
System.out.println(LockDemo.name);
System.out.println(LockDemo.gender);
// 释放读锁
lock.readLock().unlock();
}
}
}
"原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何上下文切换。Java中一般事务管理里面用到原子操作。
原子操作可以是一个步骤,也可以是多个操作步骤,但其顺序不可被打乱,也不可以被切割,而只执行其中的一部分,将整个操作视作一个整体是原子性的核心特征
原子操作的好处:
问题代码:将total值用两个线程同时自增10000次
import java.util.concurrent.CountDownLatch;
public class AtomicDemo {
public static int total = 0;
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(new AddThread(countDownLatch)).start();
new Thread(new AddThread(countDownLatch)).start();
countDownLatch.await();
System.out.println("total = " + total);
}
}
class AddThread implements Runnable {
private CountDownLatch countDownLatch;
public AddThread(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
++AtomicDemo.total;
}
countDownLatch.countDown();
}
}
背景:上面代码运行结果可能不是我们预想的20000,其原因在于Java中最基本的运算都是非原子性的操作,在底层存在多个步骤,所有在多线程并发操作的过程中可能有多线程并发安全问题。
解决方案1:使用同步代码块进行同步,可以解决问题,但是如果在多线程并发场景下,最基本的运算都要同步的话,代码会被大量的同步代码块包裹,导致代码混乱,效率低下。
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
// 使用synchronized块包裹
synchronized (AtomicDemo.class) {
++AtomicDemo.total;
}
}
countDownLatch.countDown();
}
解决方案2:使用concurrent包中提供的Atomic原子性操作。
java.util.concurrent.atomic包下提供了与包装类对应的原子性操作类,如AtomicInteger、AtomicBoolean等,另外提供一个引用类型的原子性操作类AtomicReference
API:
public AtomicInteger()
:无参数构造方法,默认初始值为0public AtomicInteger(int initialValue)
:有参构造方法,传入初始值public final int getAndSet(int newValue)
:设置给定的值并返回旧值public final int addAndGet(int delta)
:将给定的值与当前值相加,并返回更新后的值public final int getAndIncrement()
:自增并返回旧值public final int getAndDecrement()
:自减并返回旧值public final int incrementAndGet()
:自增并返回新值public final int decrementAndGet()
:自减并返回新值public final void set(int newValue)
:设置新值public final int get()
:获取当前值改进代码:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicDemo {
public static AtomicInteger total = new AtomicInteger();
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(new AddThread(countDownLatch)).start();
new Thread(new AddThread(countDownLatch)).start();
countDownLatch.await();
System.out.println("total = " + total);
}
}
class AddThread implements Runnable {
private CountDownLatch countDownLatch;
public AddThread(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
AtomicDemo.total.incrementAndGet();
}
countDownLatch.countDown();
}
}
标签:场景 线程等待 通过 rlock 实现 个数 override tee cat
原文地址:https://www.cnblogs.com/sea9001/p/9940995.html