Java并发API提供了大量接口和类来实现并发应用程序。这些接口和类既包含了底层机制,如Thread类、Runnable接口或Callable接口、synchronized关键字,也包含了高层机制,如在Java 7中增加的Executor框架和Fork/Join框架。尽管如此,在开发应用程序时,仍会发现己有的Java类无法满足需求。
这时,我们就需要基于Java提供类和接口来实现自己的定制并发工具。一般来说,我们可以:
? 实现一个接口以拥有接口定义的功能,例如,ThreadFactory接口;
? 覆盖类的一些方法,改变这些方法的行为,来满足需求,例如,覆盖Thread类的 run()方法,它默认什么都不做,可以被用来覆盖以提供预期的功能。
在本章中,我们将学习如何改变一些Java并发API的行为,而不需要从头设计一个并发框架。并且在今后的应用程序开发中,可以使用本章中的代码作为定制并发类的起始点。
Executor框架是一种将线程的创建和执行分离的机制。它基于Executor和 ExecutorService 接口,及这两个接口的实现类 ThreadPoolExecutor 展开。Executor 有一个内部线程池,并提供了将任务传递到池中线程以获得执行的方法。可传递的任务有如下两种:
?通过Runnable接口实现的任务,它不返回结果;
?通过Callable接口实现的任务,它返回结果。
在这两种情况下,只需要传递任务到执行器,执行器即可使用线程池中的线程或新创建的线程来执行任务。执行器也决定了任务执行的时间。
本节将学习如何覆盖ThreadPoolExecirtor类,通过范例来计算任务在执行器中执行的时间,执行结束后在控制台输出有关执行器的统计信息。
package com.concurrency.executor;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 扩展的线程池执行器类,实现定制执行器功能。
*/
public class MyExecutor extends ThreadPoolExecutor {
/**
* 用户存存储任务的名称(线程的hashCode字符形式)和它开始执行的时间
*/
private ConcurrentHashMap<String, Date> startTimes;
/**
* 构造函数
*
* @param corePoolSize 线程池最小的线程数目
* @param maximumPoolSize 线程池最大的线程数目
* @param keepAliveTime 线程最大的空闲时间
* @param unit 空闲时间的时间单位
* @param workQueue 提交的任务所使用的队列
*/
public MyExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
startTimes = new ConcurrentHashMap<>();
}
/**
* 这个方法被用来结束执行器的执行,输出线程池的相关信息
*/
@Override
public void shutdown() {
System.out.printf("MyExecutor: Going to shutdown.\n");
System.out.printf("MyExecutor: Executed tasks: %d\n", getCompletedTaskCount());
System.out.printf("MyExecutor: Running tasks: %d\n", getActiveCount());
System.out.printf("MyExecutor: Pending tasks: %d\n", getQueue().size());
super.shutdown();
}
/**
* 这个方法被用来立即结束线程池的执行, 输出线程池的相关信息
*/
@Override
public List<Runnable> shutdownNow() {
System.out.printf("MyExecutor: Going to immediately shutdown.\n");
System.out.printf("MyExecutor: Executed tasks: %d\n", getCompletedTaskCount());
System.out.printf("MyExecutor: Running tasks: %d\n", getActiveCount());
System.out.printf("MyExecutor: Pending tasks: %d\n", getQueue().size());
return super.shutdownNow();
}
/**
* 线程执行之前所调用的方法,在这时是打印线程信息,和存储线程信息
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.printf("MyExecutor: A task is beginning: %s : %s\n", t.getName(), r.hashCode());
startTimes.put(String.valueOf(r.hashCode()), new Date());
}
/**
* 在线程执行完成后执行的方法,在这里是输出线程的信息和它执行的时间
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
Future<?> result = (Future<?>) r;
try {
System.out.printf("*********************************\n");
System.out.printf("MyExecutor: A task is finishing.\n");
System.out.printf("MyExecutor: Result: %s\n", result.get());
Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
Date finishDate = new Date();
long diff = finishDate.getTime() - startDate.getTime();
System.out.printf("MyExecutor: Duration: %d\n", diff);
System.out.printf("*********************************\n");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
package com.concurrency.task;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
* 休眠两秒的任务类
*/
public class SleepTwoSecondsTask implements Callable<String> {
/**
* 主方法,返回休眠两秒后的时间字符串
*/
public String call() throws Exception {
TimeUnit.SECONDS.sleep(2);
return new Date().toString();
}
}
package com.concurrency.core;
import com.concurrency.executor.MyExecutor;
import com.concurrency.task.SleepTwoSecondsTask;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
// 创建一个定制的线程执行器
MyExecutor myExecutor = new MyExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());
// 创建一个队列来存存储任务的执行结果
List<Future<String>> results = new ArrayList<>();
// 创建并且提交10个任务
for (int i = 0; i < 10; i++) {
SleepTwoSecondsTask task = new SleepTwoSecondsTask();
Future<String> result = myExecutor.submit(task);
results.add(result);
}
// 获取前5个的执行结果
for (int i = 0; i < 5; i++) {
try {
String result = results.get(i).get();
System.out.printf("Main: Result for Task %d : %s\n", i, result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// 关闭线执行器
myExecutor.shutdown();
// 获取后5个的执行结果
for (int i = 5; i < 10; i++) {
try {
String result = results.get(i).get();
System.out.printf("Main: Result for Task %d : %s\n", i, result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// 等待执行器执行完
try {
myExecutor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 输出信息,表示整个程序运行结束
System.out.printf("Main: End of the program.\n");
}
}
图7.2-1 运行结果
本节实现了一个定制的执行器,它继承了 ThreadPoolExecutor类并覆盖了它的4个 方法。beforeExecute()和afterExecute()方法被用来计算任务的运行时间。beforeExecute()方法在任务开始前执行。本例中,使用HashMap存放任务的开始时间。afterExecute()方法在任务结束后执行。通过这两个方法分别获得了任务的开始时间和结束时间,它们的时间间隔就是当前任务的执行时间。我们也覆盖了shutdown()和shutdownNow()方法,将执行器执行的任务的统计信息输出到控制台:
?通过调用getCompletedTaskCount()方法获得己执行过的任务数;
?通过调用getArtiveCount()方法获得正在执行的任务数,
对于等待执行的任务,执行器将它们存放在阻塞队列中,调用阻塞队列的size()方法 就可以获得等待执行的任务数。SleepTwoSecondsTask类实现了 Callable接口,它让执行线程休眠2s。Main主类传递了10个任务到执行器中,并使用它和其他类来演示它们的 特性。
运行程序,我们将看到每个任务的执行时间,同时也会看到通过调用shutdown()方法而输出统计的信息。
在Java并发API的第一个版本中,我们必须创建并运行应用程序中的所有线程。在 Java5中,伴随Executor框架的出现,引入了一种新的并发任务机制。
使用Executor框架,只需要实现任务并将它们传递到执行器中,然后执行器将负责创建执行任务的线程,并执行这些线程。
执行器内部使用一个阻塞式队列存放等待执行的任务,并按任务到达执行器时的顺序进行存放。另一个可行的替代方案是使用优先级队列存放新的任务,这样,如果有髙优先级的新任务到达执行器,它将在其他正在等待的低优先级的线程之前被执行。
本节将学习如何实现一个执行器,范例使用优先级队列存放传递给它的任务。
package com.concurrency.task;
import java.util.concurrent.TimeUnit;
/**
* 具有指定优先级的任务
*/
public class MyPriorityTask implements Runnable, Comparable<MyPriorityTask> {
/**
* 任务优先级
*/
private int priority;
/**
* 任务名称
*/
private String name;
/**
* 构造函数
*
* @param name 任务名称
* @param priority 任务优先级
*/
public MyPriorityTask(String name, int priority) {
this.name = name;
this.priority = priority;
}
/**
* 获取任务优先级
*
* @return 任务优先级
*/
public int getPriority() {
return priority;
}
/**
* 比较方法,与另一个任务对象进行比较
*/
@Override
public int compareTo(MyPriorityTask o) {
if (this.getPriority() < o.getPriority()) {
return 1;
}
if (this.getPriority() > o.getPriority()) {
return -1;
}
return 0;
}
/**
* 主方法,运行两秒钟(休息两秒钟)
*/
@Override
public void run() {
System.out.printf("MyPriorityTask: %s Priority : %d\n", name, priority);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.concurrency.core;
import com.concurrency.task.MyPriorityTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
// 线程执行器
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 1,
TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
// 向执行器中添加4个任务
for (int i = 0; i < 4; i++) {
MyPriorityTask task = new MyPriorityTask("Task " + i, i);
executor.execute(task);
}
// 主线程序休眠1S
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 再向执行器中添加4个任务
for (int i = 4; i < 8; i++) {
MyPriorityTask task = new MyPriorityTask("Task " + i, i);
executor.execute(task);
}
// 关闭执行器
executor.shutdown();
// 等所有的任务运行结束
try {
executor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 输出信息表示任务运行结束
System.out.printf("Main: End of the program.\n");
}
}
图7.3-1 运行结果
把一个普通的执行器转换为基于优先级的执行器是非常简单的,只需要把 PriorityBlockingQueue对象作为其中一个传入参数,并且要求它的泛型参数是Runnable接口即可。使用这种执行器的时候,存放在优先队列中的所有对象必须实现Comparable接口。
任务类MyPriorityTask实现了 Runnable接口以成为执行任务,也实现了 Comparable接口以被存放在优先队列中。这个类有一个priority属性用来存放任务的优先级。一个任务的优先级属性值越高,它越早被执行。compareTo()方法决定了优先队列中的任务顺序。 Main主类传递了8个不同优先级的任务到执行器。发送到的第一个任务是第一个被执行的任务。当执行器空闲并等待任务时,第一批任务到达,它们将立即被执行。这里我们只创建了两个线程执行器,所以前两个任务将被首批执行。接下来,剩余任务基于它们优先级被依次执行。
可以配置Executor使用BlockingQueue接口的任意实现,比较有趣的一个是 DelayQueue。这个类用来存放带有延迟激活的元素,提供了只返回活动对象的方法。可以 使用ScheduledThreadPoolExecutor类定制自己的类。
工厂模式(Factory Patten)在面向对象编程中是一个应用广泛的设计模式。它是一种创建模式(Creational Pattern),目标是创建一个类并通过这个类创建一个或多个类的对象。当创建一个类的对象时,使用工厂类而不是new操作符。
通过工厂模式,我们能够将对象创建集中化,这样做的好处是:改变对象的创建方式将会变得很容易,并且针对限定资源还可以限制创建对象的数量。例如,通过工厂模式生 成了一个类型的N个对象,就很容易获得创建这些对象的统计数据。
Java提供了 ThreadFactory接口来实现Thread对象工厂。Java并发API的一些高级辅助类,像Executor框架或Fork/Join框架,都使用了线程工厂来创建线程。
线程工厂在Java并发API中的另一个应用是Executors类。它提供了大量方法来创建不同类型的Executor对象。
本节的范例将通过继承Thread类并增加一些新的功能来实现一个新的线程类,同时实现一个线程工厂来生成这个新的线程类对象。
package com.concurrency.task;
import java.util.Date;
/**
* 自定义线程类
*/
public class MyThread extends Thread {
// 线程创建的时间
private Date creationDate;
// 线开始执行的时间
private Date startDate;
// 线程完成执行的时间
private Date finishDate;
/**
* 构造函数
*
* @param target 执行的任务
* @param name 线程的名称
*/
public MyThread(Runnable target, String name) {
super(target, name);
setCreationDate();
}
/**
* 主方法,记录线程运行的开始和结束时间
*/
@Override
public void run() {
setStartDate();
super.run();
setFinishDate();
}
/**
* 设置线程创建的时间
*/
public void setCreationDate() {
creationDate = new Date();
}
/**
* 设置线程开始执行的时间
*/
public void setStartDate() {
startDate = new Date();
}
/**
* 设置线程结束执行的时间
*/
public void setFinishDate() {
finishDate = new Date();
}
/**
* 计算线程执行的时间
*
* @return 线程执行的时间
*/
public long getExecutionTime() {
return finishDate.getTime() - startDate.getTime();
}
/**
* 输出线程相关信息
*/
@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append(getName());
buffer.append(": ");
buffer.append(" Creation Date: ");
buffer.append(creationDate);
buffer.append(" : Running time: ");
buffer.append(getExecutionTime());
buffer.append(" Milliseconds.");
return buffer.toString();
}
}
package com.concurrency.task;
import java.util.concurrent.ThreadFactory;
/**
* 自定义线程工厂类
*/
public class MyThreadFactory implements ThreadFactory {
// 记录工厂中的线程数目
private int counter;
// 工厂中线程的名称前缀
private String prefix;
/**
* 构造函数
*
* @param prefix 工厂中线程名称前缀
*/
public MyThreadFactory(String prefix) {
this.prefix = prefix;
counter = 1;
}
/**
* 创建线程的工厂
*/
@Override
public Thread newThread(Runnable r) {
MyThread myThread = new MyThread(r, prefix + "-" + counter);
counter++;
return myThread;
}
}
package com.concurrency.task;
import java.util.concurrent.TimeUnit;
/**
* 自定义的任务类
*/
public class MyTask implements Runnable {
/**
* 主方法,休眠两秒钟
*/
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.concurrency.core;
import com.concurrency.task.MyTask;
import com.concurrency.task.MyThreadFactory;
public class Main {
public static void main(String[] args) throws Exception {
// 创建一个线程工厂
MyThreadFactory myFactory = new MyThreadFactory("MyThreadFactory");
// 创建一个任务
MyTask task = new MyTask();
// 使用自定义的线程工厂创建一个新的线程
Thread thread = myFactory.newThread(task);
// 开始执行线程
thread.start();
// 等待线程执行结束
thread.join();
// 输出线程信息
System.out.printf("Main: Thread information.\n");
System.out.printf("%s\n", thread);
System.out.printf("Main: End of the example.\n");
}
}
图7.4-1 运行结果
本节实现了一个定制的MyThread类,它继承了 Thread类。MyThread类有三个属性分别存放它的创建时间,执行起始时间和执行结束时间。getExecutioTime()方法使用了执行起始时间和结束时间两个属性,返回线程执行的时间。最后,覆盖了toString()方法 来生成有关线程的信息。
在实现了自己的线程类之后,我们通过实现ThreadFactory接口来完成工厂类,用来创建刚实现的线程类对象。如果使用工厂作为独立对象那么就可以不必实现 ThreadFartory接口,但是如果这个工厂要与:Java并发API的其他类一起使用,就必须实现ThreadFactory接口。实现ThreadFactory接口只有一个方法,即newThead()方法,它接收一个Runnable对象作为参数并返回一个执行Runnable对象的Thread对象。在这个 例子中,返回一个MyThread对象。
为了检查这两个类,我们需要实现MyTask类,用其实现Runnable接口。这是在 MyThread线程中被执行的任务。MyTask对象能让执行线程休眠2s。
在main()方法中,通过MyThreadFactory工厂创建了 MyThread对象来执行任务。运行这个程序,在控制台能看到执行线程的创建时间和执行时间的消息。
Java并发API提供了 Executor类来生成执行线程,生成的执行线程通常是ThreadPoolExecutor类的对象。也可以使用这个类的defaultThreadFactory()方法获取ThreadFactory接口的最基本实现,这个工厂能够生成基本的线程对象,并且生成的线程都属于同一个线 程组对象。
当然,你可以在程序中自由使用ThreadFactory接口,而不必拘泥于Executor框架。
Executor框架是一种分离线程的创建和执行的机制。它是基于Executor和 ExecutorService接口,以及这两个接口的实现类ThreadPoolExecutor展开的。它有一个内部线程池,并提供了将任务传递到池中线程以获得执行的方法。可传递的任务有如下两种:
? 通过Runnable接口实现的任务,它不返回结果:
? 通过Callable接口实现的任务,它返回结果。
Executor框架内部使用了 ThreadFactory接口来生成新的线程。本节,我们将学习如何实现自己的线程类,以及用来生成这个线程类对象的线程工厂类,同时也将学习如何在执行器中使用工厂类,使执行器将执行创建的线程。
package com.concurrency.task;
import java.util.Date;
/**
* 自定义线程类
*/
public class MyThread extends Thread {
// 线程创建的时间
private Date creationDate;
// 线开始执行的时间
private Date startDate;
// 线程完成执行的时间
private Date finishDate;
/**
* 构造函数
*
* @param target 执行的任务
* @param name 线程的名称
*/
public MyThread(Runnable target, String name) {
super(target, name);
setCreationDate();
}
/**
* 主方法,记录线程运行的开始和结束时间
*/
@Override
public void run() {
setStartDate();
super.run();
setFinishDate();
}
/**
* 设置线程创建的时间
*/
public void setCreationDate() {
creationDate = new Date();
}
/**
* 设置线程开始执行的时间
*/
public void setStartDate() {
startDate = new Date();
}
/**
* 设置线程结束执行的时间
*/
public void setFinishDate() {
finishDate = new Date();
}
/**
* 计算线程执行的时间
*
* @return 线程执行的时间
*/
public long getExecutionTime() {
return finishDate.getTime() - startDate.getTime();
}
/**
* 输出线程相关信息
*/
@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append(getName());
buffer.append(": ");
buffer.append(" Creation Date: ");
buffer.append(creationDate);
buffer.append(" : Running time: ");
buffer.append(getExecutionTime());
buffer.append(" Milliseconds.");
return buffer.toString();
}
}
package com.concurrency.task;
import java.util.concurrent.ThreadFactory;
/**
* 自定义线程工厂类
*/
public class MyThreadFactory implements ThreadFactory {
// 记录工厂中的线程数目
private int counter;
// 工厂中线程的名称前缀
private String prefix;
/**
* 构造函数
*
* @param prefix 工厂中线程名称前缀
*/
public MyThreadFactory(String prefix) {
this.prefix = prefix;
counter = 1;
}
/**
* 创建线程的工厂
*/
@Override
public Thread newThread(Runnable r) {
MyThread myThread = new MyThread(r, prefix + "-" + counter);
counter++;
return myThread;
}
}
package com.concurrency.task;
import java.util.concurrent.TimeUnit;
/**
* 自定义的任务类
*/
public class MyTask implements Runnable {
/**
* 主方法,休眠两秒钟
*/
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.concurrency.core;
import com.concurrency.task.MyTask;
import com.concurrency.task.MyThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws Exception {
// 创建一个自定义的线程工厂类
MyThreadFactory threadFactory=new MyThreadFactory("MyThreadFactory");
// 创建一个线程缓冲池执行器对象,它的参数是线程工厂类
ExecutorService executor=Executors.newCachedThreadPool(threadFactory);
// 创建一个自定义任务
MyTask task=new MyTask();
// 任务提交到执行器
executor.submit(task);
// 关闭执行器对象
executor.shutdown();
// 等待执行器中的任务运行结束
executor.awaitTermination(1, TimeUnit.DAYS);
// 输出信息表明程序已经结束
System.out.printf("Main: End of the program.\n");
}
}
图7.5-1 运行结果
本例的main()方法使用newCachedThreadPool()方法创建了一个Executor对象,并以创建的工厂对象作为传入参数,所以创建的Executor对象将使用这个工厂创建它需要的线程,并且将执行MyThread类的线程对象。
运行这个程序,会看到关于线程开始时间和执行时间的信息。
定时线程池(Scheduled Thread Pool)是Executor框架基本线程池的扩展,允许在一段时间后定时执行任务.ScheduledThreadPoolExecutor类不仅实现了这个功能,还允许执行下列两类任务。
? 延迟任务(Delayed Task):这类任务在一段时间后仅执行一次。
? 周期性任务(PeriodicTask):这类任务在一段延迟时间后周期性地执行。
延迟任务能够执行实现Callable和Runnable接口的两类对象,周期性任务仅能执行实现Runnable接口的对象。所有由定时线程池执行的任务都必须实现RunnabelScheduledFuture接口。本节将学习如何实现RunnabelScheduledFuture接口来执行延迟任务和周期性任务。
package com.concurrency.task;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 自定义任务高度类,它有一个参数化类型V
*
* @param <V>
*/
public class MyScheduledTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
// 用于存储可调度的任务对象
private RunnableScheduledFuture<V> task;
// 可调度的线程池执行器
private ScheduledThreadPoolExecutor executor;
// 任务两次执行的时间间隔
private long period;
// 任务开始执行的时间
private long startDate;
/**
* 构造函数
*
* @param runnable 任务提交的可执行的任务对象
* @param result 任务返回的结果
* @param task 执行runnable对象的任务
* @param executor 执行task对象的执行器
*/
public MyScheduledTask(Runnable runnable, V result, RunnableScheduledFuture<V> task, ScheduledThreadPoolExecutor executor) {
super(runnable, result);
this.task = task;
this.executor = executor;
}
/**
* 返回下一次要执行的剩余时间,如是延迟任务就返回最初任务的延迟时间,
* 如果是周期任务,返回开始时间和当前时间的差值
*
* @param unit 延迟的时间单位
*/
@Override
public long getDelay(TimeUnit unit) {
if (!isPeriodic()) {
return task.getDelay(unit);
} else {
if (startDate == 0) {
return task.getDelay(unit);
} else {
Date now = new Date();
long delay = startDate - now.getTime();
return unit.convert(delay, TimeUnit.MILLISECONDS);
}
}
}
/**
* 比较方法
*/
@Override
public int compareTo(Delayed o) {
return task.compareTo(o);
}
/**
* 判断是否是周期任务
*/
@Override
public boolean isPeriodic() {
return task.isPeriodic();
}
/**
* 主方法
*/
@Override
public void run() {
// 如果是周期任务,并且执行器没有关闭
if (isPeriodic() && (!executor.isShutdown())) {
// 更新开始时间,同时将本任务再次入队
Date now = new Date();
startDate = now.getTime() + period;
executor.getQueue().add(this);
}
// 输出任务相关信息
System.out.printf("Pre-MyScheduledTask: %s\n", new Date());
System.out.printf("MyScheduledTask: Is Periodic: %s\n", isPeriodic());
super.runAndReset();
System.out.printf("Post-MyScheduledTask: %s\n", new Date());
}
/**
* 设置周期任务的时间间隔
*
* @param period 时间间隔
*/
public void setPeriod(long period) {
this.period = period;
}
}
package com.concurrency.task;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 自定义任务线程池调度类
*/
public class MyScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
/**
* 构造函数
*
* @param corePoolSize 线程池是至少保留的任务数
*/
public MyScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize);
}
/**
* 装饰方法,将一个RunnableScheduledFuture任务转换成MyScheduledTask任务
*/
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable,
RunnableScheduledFuture<V> task) {
MyScheduledTask<V> myTask = new MyScheduledTask<V>(runnable, null, task, this);
return myTask;
}
/**
* 执行器周期期调度的方法
*/
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay, long period, TimeUnit unit) {
// 使用超类的方法去完成任务
ScheduledFuture<?> task = super.scheduleAtFixedRate(command, initialDelay, period, unit);
MyScheduledTask<?> myTask = (MyScheduledTask<?>) task;
myTask.setPeriod(TimeUnit.MILLISECONDS.convert(period, unit));
return task;
}
}
package com.concurrency.task;
import java.util.concurrent.TimeUnit;
/**
* 自定义任务类
*/
public class Task implements Runnable {
/**
* 主方法,运行两秒钟
*/
@Override
public void run() {
System.out.printf("Task: Begin.\n");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("Task: End.\n");
}
}
package com.concurrency.core;
import com.concurrency.task.MyScheduledThreadPoolExecutor;
import com.concurrency.task.Task;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws Exception {
// 创建一个自定义的调度线程池执行器框架
MyScheduledThreadPoolExecutor executor = new MyScheduledThreadPoolExecutor(2);
// 创建一个任务
Task task = new Task();
// 输出开始执行的时间
System.out.printf("Main: %s\n", new Date());
// 向执行器中发送一个任务,它在一秒后执行
executor.schedule(task, 1, TimeUnit.SECONDS);
// 主线程休眠3秒
TimeUnit.SECONDS.sleep(3);
// 创建一个任务
task = new Task();
// 输出开始执行的时间
System.out.printf("Main: %s\n", new Date());
/*
* Send to the executor a delayed task. It will begin its execution after 1 second of dealy
* and then it will be executed every three seconds
*/
// 送一个任务到执行器,它在一秒后执行,并且分每隔三秒执行一次
executor.scheduleAtFixedRate(task, 1, 3, TimeUnit.SECONDS);
// 主线程休眠10秒
TimeUnit.SECONDS.sleep(10);
// 关闭执行器
executor.shutdown();
// 待待执行器运行结束
executor.awaitTermination(1, TimeUnit.DAYS);
// 输出信息,通知程序进行结束
System.out.printf("Main: End of the program.\n");
}
}
图7.6-1 运行结果
在本节中,我们实现了定制的任务类MyScheduledTask,它能在MyScheduledTaskPoolExeculor 对象里执行。该类继承了 FutureTask 类并实现了 RunnableScheduledFuture 接口。之所以实现RnnnableSchedutedFuture接口是因为所有在定时执行器中执行的任务都必须实现这个接口;之所以继承FutureTask类是因为该类提供了 RunnableScheduledFuture接口中声明的方法的有效实现。所有上面提到的接口和类都使用了泛型参数化,泛型参数是任务返回的数据类型。
为了在定时执行器中使用MyScheduledTask任务,我们必须在MyScheduledThreadPoolExecutor 类中覆盖 decorateTask()方法。MyScheduledThreadPoolExecutor 类继承了 ScheduledThreadPoolExecutor, decorateTask()方法默认返回的是 ScheduledThreadPoolExecutor实现的缺省定时任务,覆盖这个方法会将缺省定时任务替换为MyScheduledTask 任务.所以,当实现自定义的定时任务时,必须实现自定义的定时执行器。
decorateTask()方法只是使用传入参数来创建MyScheduledTask对象;传入的Runnable对象将在任务中执行,执行结果也将被任务返回。在本例中,任务不会返回一个结果,所以结果参数使用了 null作为返回值。这个方法默认返回的任务将执行Runnable 对象.覆盖后新返回的对象将在池中替换默认的任务对象,并传入参数执行器用来执行这个任务。在本例中,使用关键字this来引用创建任务的执行器。
MyScheduledTask类既可以执行延迟任务也可以执行周期性任务。我们己经实现了 getDelay()和run()方法,它们具有执行这两种任务的所有必需逻辑。
对于getDelay()方法,定时执行器调用它来确定是否必须执行一个任务。这个方法在延迟任务和周期性任务中表现不一样。像之前提到的,MyScheduledTask类构造器接收的是ScheduledRunnableFuture默认实现的对象,用来执行传入的Runnable对象,并把它保存为MyScheduledTask类的属性,以供MyScheduledTask类的其他方法和数据进行访问。如果执行的是一个延迟任务,getDelay()方法返回传入任务的延迟值,如果执行的是周期性任务,getDelay()方法返回startDate属性与当前时间的时间差。
run()方法,即执行任务的方法。周期性任务的一个特殊性是,它必须添加到执行器的队列中作为新任务,才能被再次执行。所以,如果正在执行一个周期性任务,需用当前时间与任务执行周期的和重置startDate属性值,并将这个任务再次加入到执行器队列中,startDate属性存放任务下一次执行的时间。接下来,使用FutureTask类提供的 runAndResrt()方法来执行方法。在延迟任务中,不需要再次把任务放入执行器队列,因为它仅执行一次。
备注:必须考虑执行器己关闭时的情况。在这种情况下,周期性任务不需要再加入到执行器队列中。
最后,在 MyScheduledThreadPoolExecutor 类中,我们覆盖了scheduleAtFixedRate()方法。如前所述,对于周期性任务,startDate属性必须被重置,而这需要用到任务的周期, 此时我们并没有初始化它。所以我们必须覆盖scheduleAtfixedRate()方法,因为它接收任务的周期值,并传入到MyScheduledTask类里。
Task类使得范例更完整,它实现了Runnable接口,也是在定时执行器中执行的任务。Main主类创建了一个MyScheduledThreadPoolExecutor执行器对象并发送给它下面的两个任务;
? 一个延迟任务,在当前时间1秒后执行;
? 一个周期性任务,第一次在当前时间1秒后执行,接下来每3秒执行一次。
ScheduledThreadPoolExecutor 类提供了另外一种 decorateTask()方法,它接收一个Callable对象(而不是Ruunable对象)作为参数。
Java7中最有趣特的性之一是Fork/Join框架。它是Executor和ExecutorService接口的实现,这两个接口能够允许我们执行Callable和Runnable任务,而不需要去关注执行这些任务的具体线程。
这个执行器用于执行可以分拆成更小任务体的任务,它的主要组件如下。
? 一种特殊类型任务,由ForkJoinTask类来实现。
? 两种操作,其中通过fork操作将一个任务分拆为多个子任务,而通过join操作等待这些子任务结束。
?工作窃取算法(Work-Stealing Algorithm)用来对线程池的使用进行优化。当一个任务等待它的子任务时,执行这个任务的线程可以被用来执行其他任务。
Fork/Join框架的主类是ForkJoinPool类。从内部实现来说,它有下面两个元素:
? 一个任务队列,存放的是等待被执行的任务;
?一个执行这些任务的线程池。
在本节,我们将学习如何实现一个定制的工作线程(Worker Thread),它被 ForkJoinPool类使用,此外我们还将学习如何通过工厂模式来使用它。
package com.concurrency.task;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
/**
* 自定义工作线程
*/
public class MyWorkerThread extends ForkJoinWorkerThread {
// 每个线程所执行的任务数
private static ThreadLocal<Integer> taskCounter = new ThreadLocal<>();
/**
* 构造函数
*
* @param pool 分合池对象
*/
protected MyWorkerThread(ForkJoinPool pool) {
super(pool);
}
/**
* 当一个Fork/Join框架的线程开始执行任务的时候进行调用,它初始化任务计数器
*/
@Override
protected void onStart() {
super.onStart();
System.out.printf("MyWorkerThread %d: Initializing task counter.\n", getId());
taskCounter.set(0);
}
/**
* 当一个Fork/Join框架的线程结束执行任务的时候进行调用,输出线程执行的任务数
*/
@Override
protected void onTermination(Throwable exception) {
System.out.printf("MyWorkerThread %d: %d\n", getId(), taskCounter.get());
super.onTermination(exception);
}
/**
* 增加任务计数
*/
public void addTask() {
int counter = taskCounter.get().intValue();
counter++;
taskCounter.set(counter);
}
}
package com.concurrency.task;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
/**
* 自定义工作线程
*/
public class MyWorkerThread extends ForkJoinWorkerThread {
// 每个线程所执行的任务数
private static ThreadLocal<Integer> taskCounter = new ThreadLocal<>();
/**
* 构造函数
*
* @param pool 分合池对象
*/
protected MyWorkerThread(ForkJoinPool pool) {
super(pool);
}
/**
* 当一个Fork/Join框架的线程开始执行任务的时候进行调用,它初始化任务计数器
*/
@Override
protected void onStart() {
super.onStart();
System.out.printf("MyWorkerThread %d: Initializing task counter.\n", getId());
taskCounter.set(0);
}
/**
* 当一个Fork/Join框架的线程结束执行任务的时候进行调用,输出线程执行的任务数
*/
@Override
protected void onTermination(Throwable exception) {
System.out.printf("MyWorkerThread %d: %d\n", getId(), taskCounter.get());
super.onTermination(exception);
}
/**
* 增加任务计数
*/
public void addTask() {
int counter = taskCounter.get().intValue();
counter++;
taskCounter.set(counter);
}
}
package com.concurrency.task;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
/**
* 自定义Fork/Join工作线程工厂
*/
public class MyWorkerThreadFactory implements ForkJoinWorkerThreadFactory {
/**
* 为Fork/Join框架创建一个工厂线程
*
* @param pool 线程将要被执行的线程池
* @return 一个自定义的工作线程对象
*/
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new MyWorkerThread(pool);
}
}
package com.concurrency.core;
import com.concurrency.task.MyRecursiveTask;
import com.concurrency.task.MyWorkerThreadFactory;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws Exception {
// 创建一个工厂线程工厂
MyWorkerThreadFactory factory = new MyWorkerThreadFactory();
// 创建一个Fork/Join池
ForkJoinPool pool = new ForkJoinPool(4, factory, null, false);
// 初始化一个待计算的数组
int array[] = new int[100000];
for (int i = 0; i < array.length; i++) {
array[i] = 1;
}
// 创建一个执行计算的任务对象
MyRecursiveTask task = new MyRecursiveTask(array, 0, array.length);
// 任务提交到fork/join池中
pool.execute(task);
// 待待任务执行束
task.join();
// 关闭Fork/Join池
pool.shutdown();
// 待待池中的所有任务运行结束
pool.awaitTermination(1, TimeUnit.DAYS);
// 输出任务执行的结果
System.out.printf("Main: Result: %d\n", task.get());
// 输出消息表示程序运行结束
System.out.printf("Main: End of the program\n");
}
}
图7.7-1 运行结果
在Fork/Join框架里使用的线程被称为工作线程(WorkerThread)。 Java提供了 ForkJoinWorkerThread类,它继承了 Thread类并实现了可在ForkJoin框架里使用的工作线程。
本节范例实现了 MyWorkerThread类,它继承了ForkJoinWorkerThread类,并覆盖了类的两个方法。这里我们在每个工作线程中实现一个计数器,以统计一个工作线程已执行了多少任务,然后我们使用了ThreadLocal类型的计数器。这样每个线程都有自己的计数器,这对于程序员来讲是透明的。
覆盖ForkJoinWorkerThread类的onStart()方法,以初始化任务计数器。当工作线程开始执行这个方法时会被自动调用。也覆盖了 onTerroination()方法,用来输出任务计数器中的值到控制台。当工作线程完成执行这个方法时也会被自动调用。我们也实现了一个addTask()方法,用来增加每个线程的任务计数器。
与Java并发API中的所有执行器一样,ForkJoinPool类也使用工厂模式来创建它的线程,所以如果想在ForkJoinPool类中使用MyWorkertThread线程,必须实现自己的线程工厂。在Fork/Join框架里,这个工厂必须实现ForkJoinPool.ForkJoinWorkerThreadFactory接口。针对这个目的,我们实现了 MyWorkerThreadFactory类。它只有 一个newThread()方法,用来创建一个新的MyWorkerThread对象,
最后,我们需要使用刚创建的工厂来初始化ForkJoinPool类,这是在Main主类中使用ForkJoinPool类的构造器来实现的。
当一个线程正常结束或抛出Exception异常时,ForkJoinWorkerThread类提供的 onTermination()方法都会被自动调用这个方法接收一个Throwable对象作为参数。如果参数为null值,工作线程正常结束;如果参数有一个值,线程将抛出异常。所以我们必须编写相应的代码来处理这种异常情况。
Executor框架分离了任务的创建和执行。在这个框架下,只需要实现Runnable对象和Executor对象,并将Runnable对象发送给执行器即可,这样,执行器将创建执行这些任务的线程,并对其进行管理直到线程终止。
Java7在Fork/Join框架中提供了一个特殊形式的执行器。这个框架旨在通过分拆和合并技术,把任务分拆为更小的任务。在一个任务中,我们需要检査待解决问题的规模。如果问题的规模比制定的规模大,则需要把问题分拆为两个或多个任务,并使用Fork/Join 框架执行这些任务。如果问题的规模小于己制定的规模,直接在任务里解决问题即可。另外,可以选择是否返回结果。Fork/Join框架通过工作窃取算法(Work-Stealing Algorithm), 提升了这类问题的整体性能。
Fork/Join框架的主类是ForkJoinPool类。从内部来讲,它有下面两个元素:
? 一个任务队列,存放的是等待被执行的任务;
?一个执行这些任务的线程池。
默认情况下,ForkJoinPool类执行的任务是ForkJoinTask类的对象。我们也可以传递Runnable和Callable对象到ForkJoinPool类中,但是它们不会利用Fork/Join框架的优势。一般来说,我们将ForkJoinTask的两种子类传递给ForkJoinPool对象。
? RecursiveAction:用于任务不返回结果的情况-
? RecursiveTask:用于任务返回结果的情况.
在本节,我们将学习如何通过继承ForkJoinTask类来实现Fork/Join框架下的定制任务。这个任务实现计算自身的执行时间并在控制台输出,所以我们能够控制它的进一步演变。也可以实现自己的Fork/Join任务来输出日志信息,并获得任务中使用到的资源, 或对任务结果做进一步的处理。
package com.concurrency.task;
import java.util.Date;
import java.util.concurrent.ForkJoinTask;
/**
* 自定义工作任务,无返回值
*/
public abstract class MyWorkerTask extends ForkJoinTask<Void> {
private static final long serialVersionUID = 1L;
// 任务名称
private String name;
/**
* 构造函数,初始化属性
*
* @param name 任务名称
*/
public MyWorkerTask(String name) {
this.name = name;
}
/**
* 获取执行结果,(此任务执行无结果返回)
*/
@Override
public Void getRawResult() {
return null;
}
/**
* 设置结果
*/
@Override
protected void setRawResult(Void value) {
}
/**
* 主方法,执行任务
*/
@Override
protected boolean exec() {
Date startDate = new Date();
compute();
Date finishDate = new Date();
long diff = finishDate.getTime() - startDate.getTime();
System.out.printf("MyWorkerTask: %s : %d Milliseconds to complete.\n", name, diff);
return true;
}
/**
* 获取任务的名字
*
* @return 任务的名字
*/
public String getName() {
return name;
}
/**
* 模板方法,等待子类实现
*/
protected abstract void compute();
}
package com.concurrency.task;
/**
* 自定义任务类
*/
public class Task extends MyWorkerTask {
private static final long serialVersionUID = 1L;
// 任务要处理的数组
private int array[];
// 开始做处理的位置
private int start;
// 处理结束的位置(不包含)
private int end;
/**
* 构造函数,初始化属性
*
* @param name 任务的名称
* @param array 处理的数组
* @param start 开始处理的位置
* @param end 处理结束的位置
*/
public Task(String name, int array[], int start, int end) {
super(name);
this.array = array;
this.start = start;
this.end = end;
}
/**
* 主访方法,如果要处理的元素个娄大于100个就分成两个子任务进行处理
*/
@Override
protected void compute() {
if (end - start > 100) {
int mid = (end + start) / 2;
Task task1 = new Task(this.getName() + "1", array, start, mid);
Task task2 = new Task(this.getName() + "2", array, mid, end);
invokeAll(task1, task2);
} else {
for (int i = start; i < end; i++) {
array[i]++;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.concurrency.core;
import com.concurrency.task.Task;
import java.util.concurrent.ForkJoinPool;
public class Main {
public static void main(String[] args) throws Exception {
int array[] = new int[10000];
// 创建一个执行池
ForkJoinPool pool = new ForkJoinPool();
// 创建一个任务对象
Task task = new Task("Task", array, 0, array.length);
// 把任务提交到执行池
pool.invoke(task);
// 关闭执行池
pool.shutdown();
// 输出信息,表明程序执行结束
System.out.printf("Main: End of the program.\n");
}
}
图7.8-1 部分运行结果
在本节,我们继承ForkJoinTask类而实现了 MyWorkerTask类,这是我们的任务基 类,它能在ForkJoinPool中执行,能使用执行器提供的所有优点(如工作窃取算法)。这 个类与 RecuresiveAction 和 RecursiveTask 类相当。
当继承ForkJoinTask类时,必须实现下列三个方法,
? setRawResult():被用来设置任务的结果。当任务不返回任何结果时,方法为空。
? getRawResult():被用来获取任务的结果。当任务不返回任何结果时,方法必须返冋null值。
? exec():实现任务的逻辑。本例将逻辑委托到了抽象方法compute()中(跟 RecursiveAction和RecursiveTask类一样),在exec()方法中测量这个方法执行的时间,并输出到控制台。
最后,在本例的main主类中,创建了有10,000个元素的数组、一个ForkJoinPool执行器和一个Task对象,用来处理整个数组。运行程序,会发现不同的任务分别在控制台输出了各自的执行时间。
锁是Java并发API提供的最基本的同步机制之一。程序员用锁来保护代码的临界区 (Critical Sertion),所以同一时间只有一个线程能执行临界区代码。它提供了下列两种操作:
? lock():当要访问临界区代码时调用这个操作,如果另一个线程正在运行临界区代码,其他线程将被阻塞直到被访问临界区的锁唤醒。
? unlock():在临界区代码结尾调用这个操作,以允许其他线程来访问这部分临界区代码。
在Java并发API中,锁是使用Lock接口来声明的,并且有一些类实现了Lock接口, 如 ReentrantLock 类。
本节将学习如何实现自定义Lock对象,通过实现Lock接口,来保护临界区代码。
package com.concurrency.task;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* 自定义抽象队列同步器
*/
public class MyAbstractQueuedSynchronizer extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
// 原子变量,存储锁的状态,0闲,1忙
private AtomicInteger state;
/**
* 构造函数
*/
public MyAbstractQueuedSynchronizer() {
state = new AtomicInteger(0);
}
/**
* 获取锁
*
* @param arg (在本方法中不使用)
* @return true获取,false未获取
*/
@Override
protected boolean tryAcquire(int arg) {
return state.compareAndSet(0, 1);
}
/**
* 释放锁
*
* @param arg 释放量(本方法中不使用)
* @return true成功,false失败
*/
@Override
protected boolean tryRelease(int arg) {
return state.compareAndSet(1, 0);
}
}
package com.concurrency.task;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 自定义锁实现类
*/
public class MyLock implements Lock {
/**
* 用来实现锁的同步器
*/
private AbstractQueuedSynchronizer sync;
/**
* 构造函数,初始化属性
*/
public MyLock() {
sync = new MyAbstractQueuedSynchronizer();
}
/**
* 获取锁
*/
@Override
public void lock() {
sync.acquire(1);
}
/**
* 获取锁,如果获取不锁线程会阻塞到锁释放,阻塞的线程可以被中断
*/
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* 获取锁,如果获取到就返回true,如果获取不到就返回false
*/
@Override
public boolean tryLock() {
try {
return sync.tryAcquireNanos(1, 1000);
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
}
/**
* 在指定的时间办获取锁,如果获取到就返回true,否则就返回false
*
* @param time 时间量
* @param unit 时间单位
* @return 如果获取到就返回true,否则就返回false
* @throws InterruptedException
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, TimeUnit.NANOSECONDS.convert(time, unit));
}
/**
* 释放锁
*/
@Override
public void unlock() {
sync.release(1);
}
/**
* 创建一个新的条件变量
*/
@Override
public Condition newCondition() {
return sync.new ConditionObject();
}
}
package com.concurrency.task;
import java.util.concurrent.TimeUnit;
/**
* 自定义任务类
*/
public class Task implements Runnable {
// 使用自定义锁对象
private MyLock lock;
// 任务名称
private String name;
/**
* 构造函数
*
* @param name 任务名称
* @param lock 使用的锁
*/
public Task(String name, MyLock lock) {
this.lock = lock;
this.name = name;
}
/**
* 主方法,运行两秒种(其实就是休眠)
*/
@Override
public void run() {
lock.lock();
System.out.printf("Task: %s: Take the lock\n", name);
try {
TimeUnit.SECONDS.sleep(2);
System.out.printf("Task: %s: Free the lock\n", name);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
package com.concurrency.core;
import com.concurrency.task.MyLock;
import com.concurrency.task.Task;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
// 创建一个自定义的锁对象
MyLock lock = new MyLock();
// 创建10个运行任务对象
for (int i = 0; i < 10; i++) {
Task task = new Task("Task-" + i, lock);
Thread thread = new Thread(task);
thread.start();
}
// 主线程试图获取锁
boolean value;
do {
try {
value = lock.tryLock(1, TimeUnit.SECONDS);
if (!value) {
System.out.printf("Main: Trying to get the Lock\n");
}
} catch (InterruptedException e) {
e.printStackTrace();
value = false;
}
} while (!value);
// 主线程释放锁
System.out.printf("Main: Got the lock\n");
lock.unlock();
// 输出信息,表明程序运行结束
System.out.printf("Main: End of the program\n");
}
}
图7.9-1 运行结果
Java并发API提供了 AbstractQueuedSynchronizer类,它用来实现带有锁或信号特性 的同步机制。顾名思义,AbstractQueuedSynchronizer是一个抽象类。它提供操作来对临界区代码的访问进行控制,并对等待访问临界区代码的阻塞线程队列进行管理.它有下面两个方法。
? tryAcquire():当访问临界区代码时调用这个方法。如果访问成功,返回true值; 否则返回false值。
? tryRelease():当释放对临界区代码的访问时调用这个方法。如果释放成功,返回 true值;否则返回false值。
在这两个方法中,我们需要实现对临界区代码的并发访问控制。在例子中,我们实现了继承自 AbstractQueuedSyncrhcmizer 类的 MyQueuedSynchonizer 类,并覆盖了这两个抽象方法,在覆盖的时候使用了 AtomicInteger变量对临界区代码的访问进行控制。锁可以被获取的时候,变量值为0,这时允许一个线程访问临界区代码:锁在不可用的时候,变量值为1,这时不允许任何线程访问临界区代码。
我们使用AtomicInteger类的compareAndSet()方法试图把第一个参数的值设置为第二个参数指定值。在实现tryAequire()方法的时候,又使用compareAndSet()方法试图把原子变量从0设置为1。同样,在实现tryRelease()方法的时候,我们使用compareAndSet()方法试图把原子变量从1设置为0。
必须实现AbstractQueuedSynchronteer抽象类,因为这个类的其他实现(如在ReentrantLock中使用的)被实现为私有内部类,所以无法访问到这些内部类。
接下来,我们实现了 MyLock类,它实现了 Lock接口,并且有一个MyQueuedSyndironizer 属性。在实现Lock接口的所有方法中,我们具体使用了 MyQueuedSynchronizer对象的方法。
最后,我们应用Task类,实现了 Runnable接口,并使用一个MyLock对象访问临界区。临界区将正在访问它的休眠线程2秒钟。Main主类创建了一个MyLock对象,并运行了10个Task对象,这10个Task对象共享MyLock对象。Main主类也使用了 trylock()方法来尝试获得这个锁。
运行范例,我们可以看到只有一个线程能够访问临界区代码,当这个线程结束时,另一个线程才能访问临界区代码.
通过定制锁,能够获得它的使用情况、控制临界区的锁定时间,还可以实现高级同步机制。例如,对仅在特定时间内才可用的资源进行访问控制。
AbstractQueuedSynchronized抽象类提供了两个方法用来管理锁状态。明getState()和setState()这两个方法接收并返回锁状态的整型值。你可能已经使用这些方法而不是AtomicInteger 属性来存储锁状态。
Java并发API提供了另一个类来实现同步机制,即AbstractQueuedLongSynchronizer抽象类,它与AbstractQueuedSynchronizer抽象类是一样的,只是使用了一个long属性来存储线程的状态而已。
Java 7 API提供了几种用于并发应用程序的数据结构。我们要重点关注以下两种结构.
? LinkedTransferQueue:适用于拥有生产者一消费者结构的程序中。在这些应用程序中,有一个或多个数据生产者和数据消费者,然后这些生产者/消费者却共享着一个数据结构。生产者将数据存放到数据结构中,消费者则从数据结构中取出数据。如果数据结构为空,消费者被阻塞直到数据结构中有可用的数据。如果数据结构已满,则生产者被阻塞直到数据结构有可用的空间可以存放生产者将要存放进来的数据。
? PriorityBlockingQueue:在这个数据结构中,元素按顺序存储。这些元素必须实现Comparable接口,并实现接口中定义的compareTo()方法。当插入一个元素时,它会与已有元素进行比较直至找到它的位置。
LinkedTransferQueue中的元素按到达的先后顺序进行存储,所以早到的被优先消费。你可能面临这样的一个场景:你想开发一个生产者/消费者程序,数据是按某种优先级(而不是按到达的时间顺序)被消费。本节将学习如何实现一个数据结构,范例用来解决数据结构中的元素是按优先级排序的生产者/消费者问题,优先级高的先被处理。
package com.concurrency.task;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
* 自定义线程优先级类
*
* @param <E> 泛型参数
*/
public class MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E> {
private static final long serialVersionUID = 1L;
// 等待的消费者数目
private AtomicInteger counter;
// 存储传输元素的队列
private LinkedBlockingQueue<E> transfered;
// 锁对象
private ReentrantLock lock;
/**
* 构造函数
*/
public MyPriorityTransferQueue() {
counter = new AtomicInteger(0);
lock = new ReentrantLock();
transfered = new LinkedBlockingQueue<>();
}
/**
* 将元素发送到一个正在等待的消费者,如查没有等待中的消费者,就返回false
*/
@Override
public boolean tryTransfer(E e) {
lock.lock();
boolean value;
if (counter.get() == 0) {
value = false;
} else {
put(e);
value = true;
}
lock.unlock();
return value;
}
/**
* 将元素发送到一个正在等待的消费者,如查没有等待中的消费者,将元素存储到transfer队列中,
* 并且等待出现试图获取元素的第一个消费者,这在这前线程被阻塞
*/
@Override
public void transfer(E e) throws InterruptedException {
lock.lock();
if (counter.get() != 0) {
put(e);
lock.unlock();
} else {
transfered.add(e);
lock.unlock();
synchronized (e) {
e.wait();
}
}
}
/**
* 第一个参数用以表示生产和消费的元素, 第二个参数表示如果没有消费者则等待一个消费者的时间,
* 第三个参数表示等待时间的单 位。如果有消费者在等待,它就立即发送元素。否则,将参数指定的
* 时间转换为毫秒并使 用wait()方法让线程休眠。当消费者取元素时,如果线程仍在wait()方法中
* 休眠,将使用notify()方法去唤醒它
*/
@Override
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
lock.lock();
if (counter.get() != 0) {
put(e);
lock.unlock();
return true;
} else {
transfered.add(e);
long newTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
lock.unlock();
e.wait(newTimeout);
lock.lock();
if (transfered.contains(e)) {
transfered.remove(e);
lock.unlock();
return false;
} else {
lock.unlock();
return true;
}
}
}
/**
* 使用counter属性的值来计算该方法的返回值,不为0返回true,否则返回false
*/
@Override
public boolean hasWaitingConsumer() {
return (counter.get() != 0);
}
/**
* 返回 counter 属性的值
*/
@Override
public int getWaitingConsumerCount() {
return counter.get();
}
/**
* 如果在transfered队列中没有元素,则释放锁并尝试使用toke()方法从队列中取
* 得一个元素并再次获取锁.如果队列中没有元素,该方法将让线程休眠直至有元素可被消费
*/
@Override
public E take() throws InterruptedException {
lock.lock();
counter.incrementAndGet();
E value = transfered.poll();
if (value == null) {
lock.unlock();
value = super.take();
lock.lock();
} else {
synchronized (value) {
value.notify();
}
}
counter.decrementAndGet();
lock.unlock();
return value;
}
}
package com.concurrency.task;
/**
* 产生者类
*/
public class Producer implements Runnable {
// // 用来存储由这个类生成的事件
private MyPriorityTransferQueue<Event> buffer;
public Producer(MyPriorityTransferQueue<Event> buffer) {
this.buffer = buffer;
}
/**
* 创建100个Event对象,使用创建序号作为优先级(事件创 建的越晚优先级越高),
* 并使用put()方法将它们插入到队列中。
*/
@Override
public void run() {
for (int i = 0; i < 100; i++) {
Event event = new Event(Thread.currentThread().getName(), i);
buffer.put(event);
}
}
}
package com.concurrency.task;
/**
* 消费者类
*/
public class Consumer implements Runnable {
private MyPriorityTransferQueue<Event> buffer;
public Consumer(MyPriorityTransferQueue<Event> buffer) {
this.buffer = buffer;
}
/**
* 消费1002个Event(例子中所有产生的事件), 并在控制台输出产生事件的线程的名称以及事件的优先级priority。
*/
@Override
public void run() {
for (int i = 0; i < 1002; i++) {
try {
Event value = buffer.take();
System.out.printf("Consumer: %s: %d\n", value.getThread(), value.getPriority());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.concurrency.task;
/**
* 自定义事件对象
*/
public class Event implements Comparable<Event> {
// 产生事件的线程名
private String thread;
// 线程优先级
private int priority;
/**
* 构造函数
*
* @param thread 产生事件的线程名
* @param priority 线程优先级
*/
public Event(String thread, int priority) {
this.thread = thread;
this.priority = priority;
}
public String getThread() {
return thread;
}
public int getPriority() {
return priority;
}
@Override
public int compareTo(Event e) {
if (this.priority > e.getPriority()) {
return -1;
} else if (this.priority < e.getPriority()) {
return 1;
} else {
return 0;
}
}
}
package com.concurrency.core;
import com.concurrency.task.Consumer;
import com.concurrency.task.Event;
import com.concurrency.task.MyPriorityTransferQueue;
import com.concurrency.task.Producer;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws Exception {
MyPriorityTransferQueue<Event> buffer = new MyPriorityTransferQueue<>();
Producer producer = new Producer(buffer);
Thread producerThreads[] = new Thread[10];
for (int i = 0; i < producerThreads.length; i++) {
producerThreads[i] = new Thread(producer);
producerThreads[i].start();
}
Consumer consumer = new Consumer(buffer);
Thread consumerThread = new Thread(consumer);
consumerThread.start();
System.out.printf("Main: Buffer: Consumer count: %d\n", buffer.getWaitingConsumerCount());
Event myEvent = new Event("Core Event", 0);
buffer.transfer(myEvent);
System.out.printf("Main: My Event has ben transfered.\n");
for (int i = 0; i < producerThreads.length; i++) {
producerThreads[i].join();
}
TimeUnit.SECONDS.sleep(1);
System.out.printf("Main: Buffer: Consumer count: %d\n", buffer.getWaitingConsumerCount());
myEvent = new Event("Core Event 2", 0);
buffer.transfer(myEvent);
consumerThread.join();
System.out.printf("Main: End of the program\n");
}
}
图7.10-1 部分运行结果
图7.10-2 部分运行结果
本节实现了MyPriorityTransferQueue数据结构。该结构用在生产者一消费者问题中, 但是里面的元素按优先级进行排序而不是按到达的先后顺序排序。因为Java不允许多继承, 所以第一考虑是使用基类。MyPriorityTransferQueue类继承了PriorityBlockingQueue 类,并实现了把元素按优先级插入到结构中的操作。MyPriorityTransferQueue类也实现了 TransterQueue接口,用来增加与生产者一消费者相关的操作方法。
MyPriorityTransferQueue 类有下面3个属性。
? 一个名为counter的AtomicInteger属性:用来存储等待从数据结构中读取元素的消费者的数量。当消费者调用take()方法从数据结构中取元素时,counter属性增加1。当消费者完成take()方法的执行,counter属性减1。counter用在hasWaitingConsumer()和 getWaitingConsumerCount()方法的实现中。
? 一个名为lock的ReentrantLock属性:用来控制访问上述实现的生产者一消费者的操作,实现仅有一个线程可操作数据结构的目标。
? 一个LinkedBlockingQueue列表:存储己传递(尚未被消费)的元素。
我们在MyPriorityTransferQueue类中实现了几个方法,这些方法都在TransferQueue 接口中进行声明的,而take()方法在所继承的PriorityBlockingQueue类中则有默认实现。
这些方法中的两个已在之前部分描述过,下面是其他方法的介绍。
? tryTransfer(E e):这个方法尝试直接发送一个元素到消费者。如果有消费者正在 等待,这个方法将把元素存入到可被立即消费的优先级队列中,然后返回true,如果没有消费者在等待,则返回false。
? transfer(E e):这个方法直接传递一个元素到一个消费者。如果有消费者正在等待, 这个方法将把元素存入到可被立即消费的优先级队列中。否则,元素被存储在己转移(尚未被消费)的元素列表中,线程将被阻塞直至元素被消费。当线程休眠时,必须释放锁,否则队列被阻塞。
? tryTransfer(E e, long timeout,TimeUnit unit):这个方法与 transfer()方法相似,但是线程阻塞的时间由参数决定。当线程休眠时,必须释放锁,否则队列被阻塞。
? take():这个方法返回下一个要被消费的元素。如果在转移列表transfered中有元素,那么就从这个列表中获取将要被消费的元素。否则,就从优先队列中获取。
在这个数据结构之后,我们实现了 Event类,它是存储在数据结构中的元素类。Event 类有两个属性存储生产者ID(线程的名称)和事件的优先级priority,它实现了 Comparable 接口,这是数据结构的一个需求。
接下来,实现了生产者Producer和消费者Consumer类。在本例中,有10个生产者和消费者,它们共享相同的缓冲区。每个生产者生成100个带有递增优先级的事件,所以有更髙优先级的事件最后被生成。
本例的Main主类创建了 MyPriorityTransferQueue对象、10个生产者和1个消费者, 并且使用MyPriorityTransferQueue类对象buffer的transfer()方法传递两个事件到缓冲区。
我们可以看到,有更髙优先级的事件先被消费,当transfered转移列表中有元素的时候,消费者会先消费高优先级的事件。
从Java 5开始就已经引入了原子变量(Atomic Variable),它提供对单个变量的原子操作。当线程使用原子变量执行操作时,类的实现包括了一个机制来检査操作是否在单步内结束。简单来讲,就是操作获取变量值,然后通过本地变量来改变值,接着尝试改旧值为新值。如果旧值未变,则执行改变。否则,方法重新执行。
本节将学习如何继承一个原子对象和如何实现两个遵守原子对象机制保证所有操作在单步内结束的方法。
package com.concurrency.task;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 自定义原子类,进行停车计数
*/
public class ParkingCounter extends AtomicInteger {
private static final long serialVersionUID = 1L;
// 最多可以停车的数目
private int maxNumber;
/**
* 构造函数
* @param maxNumber 最多可以停车的数目
*/
public ParkingCounter(int maxNumber) {
set(0);
this.maxNumber = maxNumber;
}
/**
* 停车操作
*
* @return 停车成功,返回true,否则返回false
*/
public boolean carIn() {
for (; ; ) {
int value = get();
if (value == maxNumber) {
System.out.printf("ParkingCounter: The parking is full.\n");
return false;
} else {
int newValue = value + 1;
boolean changed = compareAndSet(value, newValue);
if (changed) {
System.out.printf("ParkingCounter: A car has entered.\n");
return true;
}
}
}
}
/**
* 离开操作
* @return 成功,返回true,否则返回false
*/
public boolean carOut() {
for (; ; ) {
int value = get();
if (value == 0) {
System.out.printf("ParkingCounter: The parking is empty.\n");
return false;
} else {
int newValue = value - 1;
boolean changed = compareAndSet(value, newValue);
if (changed) {
System.out.printf("ParkingCounter: A car has gone out.\n");
return true;
}
}
}
}
}
package com.concurrency.task;
/**
* 场景类,模拟停车操作
*/
public class Sensor1 implements Runnable {
// 停车场计数器
private ParkingCounter counter;
/**
* 构造函数
*
* @param counter 停车场计数器
*/
public Sensor1(ParkingCounter counter) {
this.counter = counter;
}
/**
* 主方法,进行停车场场景模拟
*/
@Override
public void run() {
counter.carIn();
counter.carIn();
counter.carIn();
counter.carIn();
counter.carOut();
counter.carOut();
counter.carOut();
counter.carIn();
counter.carIn();
counter.carIn();
}
}
package com.concurrency.task;
/**
* 场景类,模拟停车操作
*/
public class Sensor2 implements Runnable {
// 停车场计数器
private ParkingCounter counter;
/**
* 构造函数
*
* @param counter 停车场计数器
*/
public Sensor2(ParkingCounter counter) {
this.counter=counter;
}
/**
* 主方法,进行停车场场景模拟
*/
@Override
public void run() {
counter.carIn();
counter.carOut();
counter.carOut();
counter.carIn();
counter.carIn();
counter.carIn();
counter.carIn();
counter.carIn();
counter.carIn();
}
}
package com.concurrency.core;
import com.concurrency.task.ParkingCounter;
import com.concurrency.task.Sensor1;
import com.concurrency.task.Sensor2;
public class Main {
public static void main(String[] args) throws Exception {
// 停车计数器
ParkingCounter counter = new ParkingCounter(5);
// 创建两个场景对象
Sensor1 sensor1 = new Sensor1(counter);
Sensor2 sensor2 = new Sensor2(counter);
Thread thread1 = new Thread(sensor1);
Thread thread2 = new Thread(sensor2);
thread1.start();
thread2.start();
// 等待两个线运行结束
thread1.join();
thread2.join();
// 停车场中的车的数目
System.out.printf("Main: Number of cars: %d\n", counter.get());
// 输出信息,表明程序运行结束
System.out.printf("Main: End of the program.\n");
}
}
图7.11-1 运行结果
ParkingCounter类继承白AtomicInteger类,它带有两个原子操作carIn()和carOut()。本例模拟一个系统来控制停车场内的汽车数量。停车场能接纳的汽车数由maxNumber属性表示。
carIn()方法将停车场现有汽车数与最大停车数相比较。如果相等,则汽车不可以进入停车场,并返回false。否则,使用下面的原子操作指令。
1. 将原子对象的值赋给一个本地变量。
2. 将本地变量值增加1作为新值,并把这个新值赋给另一个不同的变量。
3. 使用compareAndSet()方法尝试使用新值替换旧值。如果返回true,作为参数的旧值就是当前内部计数器的值,所以计数器值将发生改变。这个操作是以原子方式执行的,将返回true,如carIn()方法。如果compareAndSet()返回false,作为参数的旧值已不是当前内部计数器的值(另外一个线程已修改过它),所以这个操作就不是以原子方式执行的。操作将重新开始直到它能够以原子方式完成。
carOut()方法与carIn()相似。我们还实现了两个Runnable对象,这两个对象使用carIn()和carOut()方法模拟停车场的活动。当运行这个程序时,能够看到停车场永不会超出设定的最大停车数。
版权声明:本文为博主原创文章,未经博主允许不得转载。
原文地址:http://blog.csdn.net/derrantcm/article/details/48214759