标签:
显示锁
Lock接口是Java 5.0新增的接口,该接口的定义如下:
1
2
3
4
5
6
7
8
|
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time , TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); } |
与内置加锁机制不同的是,Lock提供了一种无条件的、可轮询的、定时的以及可中断的锁获取操作,所有加锁和解锁的方法都是显示的。ReentrantLock实现了Lock接口,与内置锁相比,ReentrantLock有以下优势:可以中断获取锁操作,获取锁时候可以设置超时时间。以下代码给出了Lock接口的标准使用形式:
1
2
3
4
5
6
7
|
Lock lock = new ReentrantLock(); ... lock.lock(); try{ ... } finally { lock.unlock(); |
1.1、轮询锁与定时锁
可定时的与可轮询的锁获取方式是由tryLock方法实现的,与无条件的锁获取方式相比,它具有跟完善的错误回复机制。tryLock方法的说明如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
boolean tryLock():仅在调用时锁为空闲状态才获取该锁。如果锁可用,则获取锁,并立即返回值 true 。如果锁不可用,则此方法将立即返回值 false 。 boolean tryLock(long time , TimeUnit unit) throws InterruptedException: 如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。 如果锁可用,则此方法将立即返回值 true 。如果锁不可用,出于线程调度目的,将禁用当前线程,并且在发生以下三种情况之一前,该线程将一直处于休眠状态: 锁由当前线程获得;或者 其他某个线程中断当前线程,并且支持对锁获取的中断;或者 已超过指定的等待时间 如果获得了锁,则返回值 true 。 如果当前线程: 在进入此方法时已经设置了该线程的中断状态;或者 在获取锁时被中断,并且支持对锁获取的中断, 则将抛出 InterruptedException,并会清除当前线程的已中断状态。 如果超过了指定的等待时间,则将返回值 false 。如果 time 小于等于 0,该方法将完全不等待。 |
在内置锁中,死锁是一个严重的问题,恢复程序的唯一方法是重新启动程序,而防止死锁的唯一方法就是在构造程序时避免出现不一致的锁顺序,可定时的与可轮询的锁提供了另一种选择:先用tryLock()尝试获取所有的锁,如果不能获取所有需要的锁,那么释放已经获取的锁,然后重新尝试获取所有的锁,以下例子演示了使用tryLock避免死锁的方法:先用tryLock来获取两个锁,如果不能同时获取,那么就回退并重新尝试。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public boolean transferMoney(Account fromAcct, Account toAcct, DollarAmount amount, long timeout, TimeUnit unit) throws InsufficientFundsException, InterruptedException { long fixedDelay = 1; long randMod = 2; long stopTime = System.nanoTime() + unit.toNanos(timeout); while ( true ) { if (fromAcct.lock.tryLock()) { try { if (toAcct.lock.tryLock()) { try { if (fromAcct.getBalance().compareTo(amount) < 0) throw new InsufficientFundsException(); else { fromAcct.debit(amount); toAcct.credit(amount); return true ; } } finally { toAcct.lock.unlock(); } } } finally { fromAcct.lock.unlock(); } } if (System.nanoTime() < stopTime) return false ; NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod); } } |
1.2、可中断的锁获取操作
lockInterruptibly方法能够在获得锁的同时保持对中断的响应,该方法说明如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
void lockInterruptibly() throws InterruptedException: 如果当前线程未被中断,则获取锁。 如果锁可用,则获取锁,并立即返回。 如果锁不可用,出于线程调度目的,将禁用当前线程,并且在发生以下两种情况之一以前,该线程将一直处于休眠状态: 锁由当前线程获得;或者 其他某个线程中断当前线程,并且支持对锁获取的中断。 如果当前线程: 在进入此方法时已经设置了该线程的中断状态;或者 在获取锁时被中断,并且支持对锁获取的中断, 则将抛出 InterruptedException,并清除当前线程的已中断状态。 |
1.3、读-写锁
Java 5除了增加了Lock接口,还增加了ReadWriteLock接口,即读写锁,该接口定义如下:
1
2
3
4
|
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); } |
读写锁允许多个读线程并发执行,但是不允许写线程与读线程并发执行,也不允许写线程与写线程并发执行。下面的例子使用了ReentrantReadWriteLock包装Map,从而使他能够在多个线程之间安全的共享:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
public class ReadWriteMap <K,V> { private final Map<K, V> map; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock r = lock.readLock(); private final Lock w = lock.writeLock(); public ReadWriteMap(Map<K, V> map) { this .map = map; } public V put(K key, V value) { w.lock(); try { return map.put(key, value); } finally { w.unlock(); } } public V remove(Object key) { w.lock(); try { return map.remove(key); } finally { w.unlock(); } } public void putAll(Map<? extends K, ? extends V> m) { w.lock(); try { map.putAll(m); } finally { w.unlock(); } } public void clear() { w.lock(); try { map.clear(); } finally { w.unlock(); } } public V get(Object key) { r.lock(); try { return map.get(key); } finally { r.unlock(); } } public int size() { r.lock(); try { return map.size(); } finally { r.unlock(); } } public boolean isEmpty() { r.lock(); try { return map.isEmpty(); } finally { r.unlock(); } } public boolean containsKey(Object key) { r.lock(); try { return map.containsKey(key); } finally { r.unlock(); } } public boolean containsValue(Object value) { r.lock(); try { return map.containsValue(value); } finally { r.unlock(); } } } |
同步工具类
2.1、闭锁
闭锁是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
用给定的计数初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。
下例给出了闭锁的常见用法,TestHarness创建一定数量的线程,利用它们并发的执行指定的任务,它使用两个闭锁,分别表示"起始门"和"结束门"。每个线程首先要做的就是在启动门上等待,从而确保所有线程都就绪后才开始执行,而每个线程要做的最后一件事是将调用结束门的countDown方法减1,这能使主线程高效地等待直到所有工作线程都执行完毕,因此可以统计所消耗的时间:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public class TestHarness { public long timeTasks( int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch( 1 ); final CountDownLatch endGate = new CountDownLatch(nThreads); for ( int i = 0 ; i < nThreads; i++) { Thread t = new Thread() { public void run() { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException ignored) { } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end - start; } } |
2.2、FutureTask
FutureTask表示可取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future 的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。FutureTask的方法摘要如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
boolean cancel( boolean mayInterruptIfRunning) 试图取消对此任务的执行。 protected void done() 当此任务转换到状态 isDone(不管是正常地还是通过取消)时,调用受保护的方法。 V get() throws InterruptedException, ExecutionException 如有必要,等待计算完成,然后获取其结果。 V get( long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。 boolean isCancelled() 如果在任务正常完成前将其取消,则返回 true 。 boolean isDone() 如果任务已完成,则返回 true 。 void run() 除非已将此 Future 取消,否则将其设置为其计算的结果。 protected boolean runAndReset() 执行计算而不设置其结果,然后将此 Future 重置为初始状态,如果计算遇到异常或已取消,则该操作失败。 protected void set(V v) 除非已经设置了此 Future 或已将其取消,否则将其结果设置为给定的值。 protected void setException(Throwable t) 除非已经设置了此 Future 或已将其取消,否则它将报告一个 ExecutionException,并将给定的 throwable 作为其原因。 |
FutureTask可以用来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动,以下代码就是模拟一个高开销的计算,我们可以先调用start()方法开始计算,然后在需要结果时,再调用get得到结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
public class Preloader { ProductInfo loadProductInfo() throws DataLoadException { return null ; } private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>( new Callable<ProductInfo>() { public ProductInfo call() throws DataLoadException { return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } public ProductInfo get() throws DataLoadException, InterruptedException { try { return future.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof DataLoadException) throw (DataLoadException) cause; else throw new RuntimeException(e); } } interface ProductInfo { } } class DataLoadException extends Exception { } |
2.3、信号量
从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后等待获取许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
class Pool { private static final int MAX_AVAILABLE = 100 ; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true ); public Object getItem() throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem(Object x) { if (markAsUnused(x)) available.release(); } // Not a particularly efficient data structure; just for demo protected Object[] items = ... whatever kinds of items being managed protected boolean [] used = new boolean [MAX_AVAILABLE]; protected synchronized Object getNextAvailableItem() { for ( int i = 0 ; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true ; return items[i]; } } return null ; // not reached } protected synchronized boolean markAsUnused(Object item) { for ( int i = 0 ; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false ; return true ; } else return false ; } } return false ; } } |
获得一项前,每个线程必须从信号量获取许可,从而保证可以使用该项。该线程结束后,将项返回到池中并将许可返回到该信号量,从而允许其他线程获取该项。注意,调用 acquire() 时无法保持同步锁,因为这会阻止将项返回到池中。信号量封装所需的同步,以限制对池的访问,这同维持该池本身一致性所需的同步是分开的。
将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可,或零个可用的许可。按此方式使用时,二进制信号量具有某种属性(与很多 Lock 实现不同),即可以由线程释放“锁”,而不是由所有者(因为信号量没有所有权的概念)。在某些专门的上下文(如死锁恢复)中这会很有用。
Semaphore的构造方法可选地接受一个公平 参数。当设置为 false 时,此类不对线程获取许可的顺序做任何保证。特别地,闯入 是允许的,也就是说可以在已经等待的线程前为调用 acquire() 的线程分配一个许可,从逻辑上说,就是新线程将自己置于等待线程队列的头部。当公平设置为 true时,信号量保证对于任何调用获取方法的线程而言,都按照处理它们调用这些方法的顺序(即先进先出;FIFO)来选择线程、获得许可。注意,FIFO 排序必然应用到这些方法内的指定内部执行点。所以,可能某个线程先于另一个线程调用了acquire,但是却在该线程之后到达排序点,并且从方法返回时也类似。还要注意,非同步的tryAcquire 方法不使用公平设置,而是使用任意可用的许可。
通常,应该将用于控制资源访问的信号量初始化为公平的,以确保所有线程都可访问资源。为其他的种类的同步控制使用信号量时,非公平排序的吞吐量优势通常要比公平考虑更为重要。
Semaphore还提供便捷的方法来同时 acquire 和释放多个许可。小心,在未将公平设置为 true 时使用这些方法会增加不确定延期的风险。
内存一致性效果:线程中调用“释放”方法(比如 release())之前的操作 happen-before 另一线程中紧跟在成功的“获取”方法(比如 acquire())之后的操作。
2.4、栅栏
CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier很有用。因为该 barrier在释放等待线程后可以重用,所以称它为循环的barrier。
CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。
示例用法:下面是一个在并行分解设计中使用barrier的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
class Solver { final int N; final float [][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker( int row) { myRow = row; } public void run() { while (!done()) { processRow(myRow); try { barrier.await(); } catch (InterruptedException ex) { return ; } catch (BrokenBarrierException ex) { return ; } } } } public Solver( float [][] matrix) { data = matrix; N = matrix.length; barrier = new CyclicBarrier(N, new Runnable() { public void run() { //mergeRows(...); } }); for ( int i = 0 ; i < N; ++i) new Thread( new Worker(i)).start(); waitUntilDone(); } } |
在这个例子中,每个 worker 线程处理矩阵的一行,在处理完所有的行之前,该线程将一直在屏障处等待。处理完所有的行之后,将执行所提供的 Runnable 屏障操作,并合并这些行。如果合并者确定已经找到了一个解决方案,那么 done() 将返回 true,所有的 worker 线程都将终止。
如果屏障操作在执行时不依赖于正挂起的线程,则线程组中的任何线程在获得释放时都能执行该操作。为方便此操作,每次调用 await() 都将返回能到达屏障处的线程的索引。然后,您可以选择哪个线程应该执行屏障操作.
对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException以反常的方式离开。
内存一致性效果:线程中调用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 紧跟在从另一个线程中对应 await() 成功返回的操作。
标签:
原文地址:http://www.cnblogs.com/prctice/p/4850244.html