标签:
java.util.concurrent包的类都来自于JSR-166:Concurrent Utilities,官方的描述叫做“The JSR proposes a set of medium-level utilities that provide functionality commonly needed in concurrent programs. ”。作者是大名鼎鼎的Doug Lea,这个包的前身可以在这里找到,它最好的文档就是系统的API手册。
当然,这里参考的concurrent包来自JDK7,比最初JDK1.5的版本有了不少改进。我曾经在《Java多线程发展简史》提到过,对于Java并发本身,在基础的并发模型建立以后,JSR-133和JSR-166是贡献最大的两个,如觉必要,在阅读这篇文章之前,你可以先移步阅读这篇文章,能帮助在脑子里建立起最基础的Java多线程知识模型;此外,还有一篇是《从DCL的对象安全发布谈起》,这篇文章相当于是对JSR-133规范的阅读理解。
这篇文章中,我只是简要地记录类的功能和使用,希望可以帮助大家全面掌握或回顾Java的并发包。当然,任何不清楚的接口和功能,JDK的API手册是最好的参考材料,如果想更进一步,参透至少大部分类的实现代码,这会非常非常辛苦。
并发容器
这些容器的关键方法大部分都实现了线程安全的功能,却不使用同步关键字(synchronized)。值得注意的是Queue接口本身定义的几个常用方法的区别,
阻塞队列:
非阻塞队列:
转移队列:
其它容器:
同步设备
这些类大部分都是帮助做线程之间同步的,简单描述,就像是提供了一个篱笆,线程执行到这个篱笆的时候都得等一等,等到条件满足以后再往后走。
给出一个Phaser使用的最简单的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public class T { public static void main(String args[]) { final int count = 3 ; final Phaser phaser = new Phaser(count); // 总共有3个registered parties for ( int i = 0 ; i < count; i++) { final Thread thread = new Thread( new Task(phaser)); thread.start(); } } public static class Task implements Runnable { private final Phaser phaser; public Task(Phaser phaser) { this .phaser = phaser; } @Override public void run() { phaser.arriveAndAwaitAdvance(); // 每执行到这里,都会有一个party arrive,如果arrived parties等于registered parties,就往下继续执行,否则等待 } } } |
原子对象
这些对象都的行为在不使用同步的情况下保证了原子性。值得一提的有两点:
锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[ 100 ]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0 ; ++count; notEmpty.signal(); // 既然已经放进了元素,肯定不空了,唤醒“notEmpty” } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0 ) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0 ; --count; notFull.signal(); // 既然已经拿走了元素,肯定不满了,唤醒“notFull” return x; } finally { lock.unlock(); } } } |
Fork-join框架
这是一个JDK7引入的并行框架,它把流程划分成fork(分解)+join(合并)两个步骤(怎么那么像MapReduce?),传统线程池来实现一个并行任务的时候,经常需要花费大量的时间去等待其他线程执行任务的完成,但是fork-join框架使用work stealing技术缓解了这个问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
class SortTask extends RecursiveAction { final long [] array; final int lo; final int hi; private int THRESHOLD = 30 ; public SortTask( long [] array) { this .array = array; this .lo = 0 ; this .hi = array.length - 1 ; } public SortTask( long [] array, int lo, int hi) { this .array = array; this .lo = lo; this .hi = hi; } @Override protected void compute() { if (hi - lo < THRESHOLD) sequentiallySort(array, lo, hi); else { int pivot = partition(array, lo, hi); coInvoke( new SortTask(array, lo, pivot - 1 ), new SortTask(array, pivot + 1 , hi)); } } private int partition( long [] array, int lo, int hi) { long x = array[hi]; int i = lo - 1 ; for ( int j = lo; j < hi; j++) { if (array[j] <= x) { i++; swap(array, i, j); } } swap(array, i + 1 , hi); return i + 1 ; } private void swap( long [] array, int i, int j) { if (i != j) { long temp = array[i]; array[i] = array[j]; array[j] = temp; } } private void sequentiallySort( long [] array, int lo, int hi) { Arrays.sort(array, lo, hi + 1 ); } } |
测试的调用代码:
1
2
3
4
5
6
7
8
9
10
11
|
@Test public void testSort() throws Exception { ForkJoinTask sort = new SortTask(array); ForkJoinPool fjpool = new ForkJoinPool(); fjpool.submit(sort); fjpool.shutdown(); fjpool.awaitTermination( 30 , TimeUnit.SECONDS); assertTrue(checkSorted(array)); } |
RecursiveTask和RecursiveAction的区别在于它的compute是可以有返回值的,子任务的计算使用fork()方法,结果的获取使用join()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
class Fibonacci extends RecursiveTask { final int n; Fibonacci( int n) { this .n = n; } private int compute( int small) { final int [] results = { 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 , 55 , 89 }; return results[small]; } public Integer compute() { if (n <= 10 ) { return compute(n); } Fibonacci f1 = new Fibonacci(n - 1 ); Fibonacci f2 = new Fibonacci(n - 2 ); f1.fork(); f2.fork(); return f1.join() + f2.join(); } } |
执行器和线程池
这个是我曾经举过的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public class FutureUsage { public static void main(String[] args) { ExecutorService executor = Executors.newSingleThreadExecutor(); Callable<Object> task = new Callable<Object>() { public Object call() throws Exception { Thread.sleep( 4000 ); Object result = "finished" ; return result; } }; Future<Object> future = executor.submit(task); System.out.println( "task submitted" ); try { System.out.println(future.get()); } catch (InterruptedException e) { } catch (ExecutionException e) { } // Thread won‘t be destroyed. } } |
线程池具备这样的优先级处理策略:
对于大于coreSize而小于maxSize的那些线程,空闲了keepAliveTime后,会被销毁。观察上面说的优先级顺序可以看到,假如说给ExecutorService一个无限长的队列,比如LinkedBlockingQueue,那么maxSize>coreSize就是没有意义的。
ExecutorService:
CompletionService:
其它:
标签:
原文地址:http://www.cnblogs.com/zxf330301/p/5634987.html