一、基础知识
并发工具定义了一些核心特征,用于以其他方式实现同步和线程间通信。
- 同步器:提供了同步多线程间交互的高级方法。
- 执行器:管理线程的执行。
- 并发集合:提供了由集合框架定义的相关类的并发替代版本。
- Fork/Join框架:支持并行编程。
二、同步对象使用
Semaphore实现了经典的信号量,信号量通过计数器控制对共享资源的访问。如果计数器大于0则允许访问,如果计数器为0则拒绝访问。希望获得共享资源的线程尝试获得许可证,若允许访问则线程可得到许可证,若不允许访问则线程阻塞直至得到许可证为止。
import java.util.concurrent.Semaphore; class Solution { public static void main(String[] args) { Semaphore semaphore = new Semaphore(1); Integer i = 0; new IncThread(semaphore); new DecThread(semaphore); } } class Shared { static int integer = 0; } class IncThread implements Runnable { private Semaphore semaphore; IncThread(Semaphore semaphore) { this.semaphore = semaphore; new Thread(this).start(); } @Override public void run() { try { semaphore.acquire();//阻塞直至得到许可 for (int i = 0; i < 5; i++) { Shared.integer++; System.out.println(Shared.integer); Thread.sleep(500); } } catch (InterruptedException exc) { System.out.println(exc.getMessage()); } semaphore.release();//释放许可 } } class DecThread implements Runnable { private Semaphore semaphore; DecThread(Semaphore semaphore) { this.semaphore = semaphore; new Thread(this).start(); } @Override public void run() { try { semaphore.acquire(); for (int i = 0; i < 5; i++) { Shared.integer--; System.out.println(Shared.integer); Thread.sleep(500); } } catch (InterruptedException exc) { System.out.println(exc.getMessage()); } semaphore.release(); } }
如果希望线程进行等待,直到发生一个或多个事件为止。CountDownLatch可用于处理这类情况,计数器必须从指定事件数量归零,锁存器才会被释放。
import java.util.concurrent.CountDownLatch; class Solution { public static void main(String[] args) { CountDownLatch cdl = new CountDownLatch(5); new Thread(() -> { for (int i = 0; i < 5; i++) { System.out.println(i); cdl.countDown(); } }).start(); try { cdl.await();//暂停主线程直至计数器递减5次 } catch (InterruptedException e) { System.out.println(e.getMessage()); } System.out.println("Hello"); } }
如果由多个线程组成的线程组必须在某处进行等待,直到线程组中所有线程都到达执行点。CyclicBarrier可用于处理这类情况,同步对象会被挂起直至指定数量的线程都到达预定位置为止。
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; class Solution { public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(10, () -> System.out.println("Hello")); for (int i = 0; i < 10; i++) { new Thread(() -> { try { int t = (int) (Math.random() * 5000); Thread.sleep(t); System.out.println(t); barrier.await();//指定数量的线程调用await后释放被挂起的线程 } catch (InterruptedException | BrokenBarrierException e) { System.out.println(e.getMessage()); } }).start(); } } }
Exchanger用于简化两个线程之间的数据交换。简单的进行等待,直到两个独立的线程均调用exchange方法为止,之后交换线程所提供的数据。
import java.util.Arrays; import java.util.concurrent.Exchanger; class Solution { public static void main(String[] args) { Exchanger<int[]> arrayExchanger = new Exchanger<>(); Runnable consumer = () -> { try { for (int i = 0; i < 3; i++) { int[] arr = arrayExchanger.exchange(new int[5]); System.out.println(Arrays.toString(arr)); } } catch (InterruptedException e) { System.out.println(e.getMessage()); } }; Runnable producer = () -> { int[] arr = new int[5]; try { for (int i = 0; i < 15; i++) { arr[i % 5] = i; if ((i + 1) % 5 == 0) arr = arrayExchanger.exchange(arr); } } catch (InterruptedException e) { System.out.println(e.getMessage()); } }; new Thread(consumer).start(); new Thread(producer).start(); } }
Phaser允许多个线程进行同步。Phaser对象会等待所有party(等价于线程)完成当前阶段后才会进入下阶段,通过调用arrive方法或其变体通知当前阶段完成。
import java.util.concurrent.Phaser; class MyPhaser extends Phaser { private int total; MyPhaser(int parties, int total) { super(parties); this.total = total - 1; } @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("Phase " + phase + " completed"); return phase == total || registeredParties == 0;//返回true则结束Phaser } } class Solution { public static void main(String[] args) { Phaser phaser = new MyPhaser(0, 3); Runnable runnable = () -> { phaser.register(); while (!phaser.isTerminated()) { System.out.println("Phase " + phaser.getPhase() + " started"); phaser.arriveAndAwaitAdvance(); } }; for (int i = 0; i < 3; i++) new Thread(runnable).start(); } }
三、执行器
执行器用于启动并控制线程的执行,因此执行器为通过Thread管理线程提供了一种代替方案。执行器的核心是Executor接口,ExecutorService接口扩展了Executor接口。
线程池提供了用于执行各种任务的一组线程,每个任务不是使用独立的线程而是线程池中的线程。线程池减轻了创建许多独立线程所带来的负担,通过少量线程执行大量任务。
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class Solution { public static void main(String[] args) { CountDownLatch[] cdls = new CountDownLatch[5]; for (int i = 0; i < cdls.length; i++) cdls[i] = new CountDownLatch(5); ExecutorService pool = Executors.newFixedThreadPool(3);//创建线程池 for (CountDownLatch cdl : cdls) pool.execute(() -> { for (int j = 0; j < 5; j++) { System.out.println(j); cdl.countDown(); } }); try { for (CountDownLatch cdl : cdls) cdl.await(); } catch (InterruptedException e) { System.out.println(e.getMessage()); } pool.shutdown();//关闭线程池 } }
泛型接口Callable表示返回值的线程。应用程序可以使用Callable对象计算结果后,将结果返回给调用线程。call方法定义希望执行的任务,在任务完成后返回结果。Callable任务通过调用ExecutorService对象的submit方法执行。
泛型接口Future表示将由Callable对象返回的值。
import java.util.concurrent.*; class Add implements Callable<Integer> { private int[] arr; Add(int... arr) { this.arr = arr; } @Override public Integer call() { int sum = 0; for (int i : arr) sum += i; return sum; } } class Solution { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(1); Future<Integer> future = pool.submit(new Add(1, 2, 3)); try { System.out.println(future.get()); } catch (InterruptedException | ExecutionException e) { System.out.println(e.getMessage()); } pool.shutdown(); } }
四、时间
TimeUnit枚举用于指定时间单位。
import java.util.concurrent.TimeUnit; class Solution { public static void main(String[] args) { try { TimeUnit.SECONDS.sleep(3);//暂停3秒 System.out.println(TimeUnit.SECONDS.toMillis(1));//单位转换 } catch (InterruptedException e) { System.out.println(e.getMessage()); } } }
五、锁
锁为使用synchronized控制共享资源的访问提供了替代技术。在访问共享资源自谦,申请用于保护资源的锁,当资源访问完成后释放锁。当某线程正在使用锁时,其他尝试申请锁的线程会被挂起。
所有锁都实现了Lock接口,申请锁时调用lock方法,如果不可得就会进行等待。如果不希望进行等待就使用tryLock方法。使用newCondition方法获取与锁关联的Condition对象之后可通过await或signal等方法控制锁,类似于wait和notify方法。
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; class Shared { static int data = 0; } class Solution { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); Runnable runnable = () -> { lock.lock();//申请锁 Shared.data++; try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException exc) { System.out.println(exc.getMessage()); } finally { System.out.println(Shared.data); lock.unlock();//释放锁 } }; for (int i = 0; i < 10; i++) new Thread(runnable).start(); } }
六、原子操作
当读写某些类型的变量时,原子操作提供了一种不可中断的方案。这意味着不再需要锁以及其他同步机制。
import java.util.concurrent.atomic.AtomicInteger; class Shared { static AtomicInteger integer = new AtomicInteger(0); } class Solution { public static void main(String[] args) { Runnable runnable = () -> { for (int i = 0; i < 3; i++) System.out.println(Shared.integer.getAndAdd(1)); }; for (int i = 0; i < 5; i++) new Thread(runnable).start(); } }
七、并行编程
Fork/Join框架通过简化多线程的创建使用和自动使用多处理器,增强了多线程编程。
ForkJoinTask用于定义能够被ForkJoinPool管理的任务,泛型参数指定了任务结果的类型。fork方法为调用任务的异步执行提交任务,join方法等待调用该方法的任务中止,invoke和invokeAll方法则将fork和join合并到单个调用中。
RecursiveAction和RecursiveTask均为ForkJoinTask的子类。前者用于封装不返回结果的任务,后者用于封装返回结果的任务。
ForkJoinPool用于管理ForkJoinTask。若调用需等待的任务则使用线程池的invoke方法(同步),若不需要等待任务完成则调用execute方法(异步)。
import java.util.Arrays; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; class Sqrt extends RecursiveAction {//分治策略 double[] arr; int front, rear; Sqrt(double[] arr, int front, int rear) { this.arr = arr; this.front = front; this.rear = rear; } @Override protected void compute() { if (rear - front < 100) { for (int i = front; i <= rear; i++) arr[i] = Math.sqrt(arr[i]); } else { int middle = (front + rear) / 2; invokeAll(new Sqrt(arr, front, middle), new Sqrt(arr, middle + 1, rear)); } } } class Solution { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); double[] arr = new double[10000]; for (int i = 0; i < arr.length; i++) arr[i] = (double) i; System.out.println(Arrays.toString(arr)); Sqrt sqrt = new Sqrt(arr, 0, arr.length - 1); pool.invoke(sqrt); System.out.println(Arrays.toString(arr)); } }
若没有显示声明ForkJoinPool,则会自动使用公共池,使用commonPool可获取公共池的引用。
import java.util.Arrays; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; class Sum extends RecursiveTask<Integer> { int[] arr; int front, rear; Sum(int[] arr, int front, int rear) { this.arr = arr; this.front = front; this.rear = rear; } @Override protected Integer compute() { int sum = 0; if (rear - front < 100) { for (int i = front; i <= rear; i++) sum += arr[i]; } else { int middle = (front + rear) / 2; Sum taskA = new Sum(arr, front, middle); Sum taskB = new Sum(arr, middle + 1, rear); taskA.fork(); taskB.fork(); sum = taskA.join() + taskB.join(); } return sum; } } class Solution { public static void main(String[] args) { ForkJoinPool pool = ForkJoinPool.commonPool(); int[] arr = new int[10000]; for (int i = 0; i < arr.length; i++) arr[i] = i; System.out.println(pool.invoke(new Sum(arr, 0, arr.length - 1))); } }