5 基础构建模块
Java平台类库包含了丰富的并发基础构建模块,例如线程安全的容器类以及各种用于协调多个相互协作的线程控制流的同步工具类(Synchronizer)。本章将介绍其中一些最有用的并发构建模块。
5.1同步容器类
同步容器类包括Vector和Hashtable,二者是早期JDK的一部分,此外还包括在JDK 1.2
中添加的一些功能相似的类,这些同步的封装器类是由Collections.synchronizedXxx等工厂方
法创建的。这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。
5.1.1同步容器类的问题
所以需要客户端加锁,而同步容器正好是支持的。例如下操作:
2 : 在调用size和相应的get之间,Vector的长度可能会发生变化。
我们可以通过在客户端加锁来解决不可靠迭代的问题,但要牺牲一些伸缩性:
5.1.2 迭代器与ConcurrentModificationException
Java 5.0人的for-each循环语法中,对容器类进行迭代的标准方式都是使用Iterator,也无法避免在迭代期间对容器加锁。因为它们表现出的行为是"及时失败"( fail-fast)的。它们采用的实现方式是,将计数器的变化与容器关联起来。如果在迭代期间计数器被修改,那么hasNext或next将抛出ConcurrentModificationException。
5.1.3 隐藏迭代器
在System.out.println()中迭代set的时候,可能会抛出ConcurrentModificationException。这里得到的教训是,如果状态与保护它的同步代码之间相隔越远,那么开发人员就越容易忘记在访问状态时使用正确的同步。如果Hiddenlterator用synchronizedSet来包装HashSet, 并且对同步代码进行封装,那么就不会发生这种错误。容器的hashCode和equals等方法也会间接地执行迭代操作
正如封装对象的状态有助于雄持不变性条件一样,封装对_象的同步机制同_样有助于确保实施同步策略。
5.2并发容器
同步容器将所有对容器状态的访问都串行化,以实现它们的线程安全性。这种方法的代价是严重降低并发性,Java 5.0提供了多种并发容器类来改进同步容器的性能。
通过LinkedList来实现Queue。Queue上的操作不会阻塞,如果队列为空,那么获取元素的操作将返回空值。BlockingQueu。扩展了Queue,增加了可阻塞的插入和获取等操作。
在Java 5.0中增加了Concurrent HashMap,用来替代同步且基于散列的Map。以及CopyOnWriteArrayList。
Java 6也引人了Concurrent SkipListMap和ConcurrentSkipListSet,分别作为同步的SortedMap和S ortedS et的并发替代品。(以前用synchronizedMap包装TreeMap或TreeSet)
5.2.1 ConcurrentHashMap
采用分段锁。ConcurrentHashMap与其他并发容器一起增强了同步容器类:它们提供的迭代器不会抛出ConcurrentModificationException。
尽管有这些改进,但仍然有一些需要权衡的因素。对于一些需要在整个Map上进行计算的方法,例如size和isEmpty,这些方法的语义被略微减弱了以反映容器的并发特性。由于size返回的结果在计算时可能已经过期了,它实际上只是一个估计值,因此允许size返回一个近似值而不是一个精确值。虽然这看上去有些令人不安,但事实上size和isEmpty这样的方法在并发环境下的用处很小,因为它们的返回值总在不断变化。因此,这些操作的需求被弱化了,以换取对其他更重要操作的性能优化,包括get, put, containsKey和remove等。
只有当应用程序需要加锁Map以进行独占访问.时,才应该放弃使用ConcurrentHashhiap 。
5.2.2 额外的原子Map操作
由于ConcurrentHashMap不能被加锁来执行独占访问,因此我们无法使用客户端加锁来创建新的原子操作,例如4.4.1节中对Vector增加原子操作"若没有则添加"。但是,一些常见的复合操作,例如"若没有则添加"、"若相等则移除(Remove-lf-Equal)"和"若相等则替换
( Replace-If-Equal)"等,都已经实现为原子操作并且在ConcurrentMap的接口中声明,如程序清单5-7所示。如果你需要在现有的同步Map中添加这样的功能,那么很可能就意味着应该考虑使用ConcurrentMap了。
5.2.3写入时复制容器:CopyOnWrite
CopyOnWriteArrayList用于替代同步List,类似地,CopyOnWriteArraySet替代同步Set。
只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。
"写入时复制"容器的迭代器保留一个指向底层基础数组的引用(所以不会被修改因此在对其进行同步时只需确保数组内容的可见性),这个数组当前位于迭代器的起始位置。当多个线程可以同时对这个容器进行迭代,不会彼此干扰(各自遍历各自的引用),迭代器不会抛出ConcurrentModificationException(引用不会被修改)。也不会与修改容器的线程相互干扰。返回的并且返回的元素与迭代器创建时的元素完全一致。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性(因为发布所以可见)。
每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大
时。仅当迭代操作远远多于修改操作时,才应该使用"写入时复制"容器。(事件通知系统)
5.3 阻塞队列和生产者——消费者模式
阻塞队列提供了可阻塞的put和take方法,以及支持定时的。ffer和poll方法。BlockingQueue简化了生产者一消费者设计的实现过程,它支持任意数量的生产者和消费者。最常见的就是线程池与队列的组合。
在构建高可靠的应用程序时,有界队列是一种强有力的资添管理工具,一仓们能抑制并防止产生过多的工作项,使程序更加健壮。
5.3.2 串行线程封闭
在java.util.coricurrent中实现的各种阻塞队列都包含了足够的内部同步机制,从而安全地将对象从生产者线程发布到消费者线程。这种安全的发布确保了对象状态对于新的所有者来说是可见的,并且由于最初的所有者不会再访问它,因此对象将被封闭在新的线程中。新的所有者线程可以有独占的访问权。
对象池利用了串行线程封闭,将对象"借给"一个请求线程。只要对象池包含足够的内部同步来安全地发布池中的对象,并且只要客户代码本身不会发布池中的对象,或者在将对象返回给对象池后就不再使用它(总之不会再让第三个人来操作此对象),那么就可以安全地在线程之间传递所有权。
我们也可以使用其他发布机制来传递可变对象的所有权,但必须确保只有一个线程能接受被转移的对象。阻塞队列简化了这项工作。除此之外,还可以通过ConcurrentMap的原子方法remove或者AtomicReference的原子方法compareAndSet来完成这项工作。
5.3.3 双端队列与工作密取
工作密取非常适用于既是消费者也是生产者的场景。
Java 6增加了两种容器类型,Deque(发音为"deck")和BfockingDeque,它们分别对Queue和BlockingQueue进行了扩展。Deque是一个双端队列,实现了在队列头和队列尾的高效插人和移除。具体实现包括ArrayDeque和LinkedBlockingDeque。
工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾秘密地获取工作。在大多数时候,它们都只是访问自己的双端队列,从而极大地减少了竞争。它会从队列的尾部而不是从头部获取工作,因此进一步降低了队列上的竞争程度。
5.4阻塞方法与中断方法
Thread提供了interrupt方法,用于中断线程或者查询线程是否已经被中断。每个线程都有一个布尔类型的属性,表示线程的中断状态,当中断线程时将设置这个状态。
当在代码中调用了一个将抛出InterruptedException异常的方法时,你自己的方法也就变成了一个阻塞方法,并且必须要处理对中断的响应。对于库代码来说,有两种基本选择:
传递InterruptedException 。避开这个异常通常是最明智的策略。只需把
InterruptedException传递给方法的调用者。传递InterruptedException的方法包括,根本不捕获该异常,或者捕获该异常,然后在执行某种简单的清理工作后再次抛出这个异常。
恢复中断。有时候不能抛出InterruptedException,例如当代码是Runnable的一部分时。在这些情况下,必须捕获InterruptedException,并通过调用当前线程上的interrupt方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断,如程序清单5-10所示。
在出现InterruptedException时不应该做的事情是,捕获它但不做出任何响应。这将使调用栈上更高层的代码无法对中断采取处理措施,因为线程被中断的证据已经丢失。
5.5同步工具类
同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量( Semaphore )、栅栏(Barrier )以及闭锁(Latch )。
所有的同步工具类都包含一些特定的结构化属性:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另一些方法用于高效地等待同步工具类进入到预期状态。
5.5.1闭锁
闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如:
确保某个计算在其需要的所有资源都被初始化之后才继续执行。
确保某个服务在其依赖的所有其他服务都已经启动之后才启动。
等待直到某个操作的所有参与者(在多玩家游戏中的所有玩家)都就绪再继续执行)
CountDownLatch是一种灵活的闭锁实现,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。
在程序清单5-11的TestHarness中给出了闭锁的两种常见用法。TestHarness创建一定数
量的线程,利用它们并发地执行指定的任务。一它使用两个闭锁,分别表示"起始门(Starting
Gate )‘,和"结束门(Ending Gate ) "。起始门计数器的初始值为1,而结束门计数器的初始值为工作线程的数量。每个工作线程首先要做的值就是在启动门上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情是将调用结束门的countDown方法减1,这能使主线程高效地等待直到所有工作线程都执行完成,因此可以统计所消耗的时间。
如果在创建线程后立即启动它们,那么先启动的线程将"领先"后启动的线程,并且活跃线程数量会随着时间的推移而增加或减少,竞争程度也在不断发生变化。启动门将使得主线程能够问时释放所有工作线程,而结束门则使主线程能够等待最后一个线程执行完成,而不是顺序地等待每个线程执行完成。
5.5.2 FutureTask
FutureTask也可以用做闭锁。( FutureTask实现了Future语义,表示一种抽象的可生成结果的计算[CPJ 4.3.3])o FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行(Waiting to run ),正在运行< Running)和运行完成(Completed) o"执行完成"表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进人完成状态后,它会永远停止在这个状态上。
Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进人完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。
FutureTask在Executo:框架中表示异步任务,此外还可以用来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动。程序清单5-12中的Preloader就使用了FutureTask来执行一个高开销的计算,并且计算结果将在稍后使用。通过提前启动计算,可以减少在等待结果时需要的时间。
由于在构造函数或静态初始化方法中启动线程并不是一种好方法,因此提供了一
个start方法来启动线程。当程序随后需要ProductInfo时,可以调用get方法,如果数据已经加载,那么将返回这些数据,否则将等待加载完成后再返回。
Callable表示的任务可以抛出受检查的或未受检查的异常,并且任何代码都可能抛出一个Error。无论任务代码抛出什么异常,都会被封装到一个ExecutionException中,并在
Future.get中被重新抛出。这将使调用get的代码变得复杂,因为它不仅需要处理可能出现的ExecutionException(以及未检查的CancellationException ),而且还由于ExecutionException是作为一个Throwable类返回的,因此处理起来并不容易。在Preloader中,当get方法抛出ExecutionException时,可能是以下三种情况之一:Callable抛出的受检查异常,RuntimeException,以及Error。我们必须对每种情况进行单独处理,但我们将使用程序清单5-13中的launderThrowable辅助方法来封装一些复杂的异常处理逻辑。在调用launderThrowable之前,Preloader会首先检查已知的受检查异常,并重新抛出它们。剩下的是未检查异常,Preloader将调用launderThrowabl。并抛出结果。如果Throwable传递给launderThrowable的是一个Error,那么launderThrowable将直接再次抛出它;如果不是RuntimeException,那么将抛出一个I11ega1StateException表示这是一个逻辑错误。乘下的
RuntimeException, launderThrowable将把它们返回给调用者,而调用者通常会重新抛出
它们。
5.5.3 信号量
计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,数信号量还可以用来实现某种资源池,或者对容器施加边界。Semaphore中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。
计算信号量的一种简化形式是二值信号量,即初始值为1的Semaphore。二值信号量可以用做互斥体(mutex),并具备不可重人的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。
你可以使用Semaphore将任何一种容器变成有界阻塞容器。我们可以构造一个固定长度的资源池,当池为空时,请求资源将会失败,但你真正希望看到的行为是阻塞而不是失败,并且当池非空时解除阻塞。如果将Semaphore的计数值初始化为池的大小,并在从池中获取一个资源之前首先调用acquire方法获取一个许可,在将资源返回给池之后调用release释放许可,那么acquire将一直阻塞直到资源池不为空。
5.5.4栅栏
栅栏(B arrier)类似于闭锁,它能阻塞一组线程直到某个事件发生【CPJ 4,4.3]。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。栅栏用于实现一些协议,例如几个家庭决定在某个地方集合:"所有人6:00在麦当劳碰头,到了以后要等其他人,之后再讨论下一步要做的事情。
CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调。用都将终止并抛出BrokenBarrierException。如果成功地通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来"选举"产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。
在程序清单5-15的CellularAutomata中给出了如何通过栅栏来计算细胞的自动化模拟,例如Conway的生命游戏。在把模拟过程并行化时,为每个元素(在这个示例中相当于一个细胞)分配一个独立的线程是不现实的,因为这将产生过多的线程,而在协调这些线程上导致的开销将降低计算性能。合理的做法是,将问题分解成一定数量的子问题,为每个子问题分配一个线程来进行求解,之后再将所有的结果合并起来。CellularAutomata将问题分解为Ncpu个子问题,其中\P}:等于可用CPU的数量,并将每个子问题分配给一个线程。在每个步骤中,工作线程都为各自子问题中的所有细胞计算新值。当所有工作线程都到达栅栏时,栅栏会把这些新值提交给数据模型。在栅栏的操作执行完以后,工作线程将开始下一步的计算,包括调用isDone方法来判断是否需要进行下一次迭代。
另一种形式的栅栏是Exchanger,它是一种两方(Two-Party )栅栏,各方在栅栏位置上交
换数据[CPJ 3.4.3]。当两方执行不对称的操作时,Exchanger会非常有用,例如当一个线程向缓冲区写人数据,而另一个线程从缓冲区中读取数据。这些线程可以使用Exchangez来汇合,并将满的缓冲区与空的缓冲区交换。当两个线程通过Exchanger交换对象时,这种交换就把这两个对象安全地发布给另一方。
数据交换的时机取决于应用程序的响应需求。最简草的方案是,当缓冲区被填满时,
由填充任务进行交换,当缓冲区为空时,由清空任务进行交换。这样会把需要交换的次数
降至最低,但如果新数据的到达率不可预测,那么一些数据的处理过程就将延迟。另一个
方法是,不仅当缓冲被填满时进行交换,并且当缓冲被填充到一定程度并保持一定时间后,
也进行交换。
5.6 构建高效且可伸缩的结果缓存
我们将开发一个高效且可伸缩的缓存。我们首先从简单的HashMap开始,然后分析它的并发性缺陷,并讨论如何修复它们。
由于ConcurrentHashMap是线程安全的,因此在访问底层Map时就不需要进行同步。但它仍然存在一些不足。当两个线程同时调用compute时存在一个漏洞,如果某个线程启动了一个开销很大的计算,而其他线程并不知道这个计算正在进行,那么很可能会重复这个计算。因此在计算之间最好先判断是否有其他线程正在进行此计算。FutureTask类能实现这个功能。FutureTasko FutureTask表示一个计算的过程,这个过程可能已经计算完成,也可能正在进行。如果有结果可用,那么FutureTask.get将立即返回结果,否则它会一直阻塞,直到结果计算出来再将其返回。
用ConcurrentHashMap<A,Future<V>>,替换原来的ConcurrentHashMap<A, V>。首先检查某个相应的计算是否已经开始,如果还没有启动,那么就创建一个FutureTask,并注册到Map中,然后启动计算;如果已经启动,那么等待现有计算的结果。结果可能很快会得到,也可能还在运算过程中,但这对于Future.get的调用者来说是透明的。
ConcurrentMap中的原子方法putIfAbsent,避免了compute方法中的if代码块仍然是非原子(nonatomic)的"先检查再执行"操作。 (2个线程仍有可能在同一时间内调用compute来计算相同的值,使用put方法则不具备原子性,不安全)。