标签:
在Java中,可以通过配合使用Object对象的wait()方法,notify()方法和notifyAll()方法来实现线程间的通信。当在线程中调用wait()方法,将阻塞等待其他线程的通知(notify或notifyAll)或被中断。
Object是所有类的超类,它有5个方法组成等待/通知机制的核心:notify(),notifyAll(), wait(), wait(long), wait(long, int);这5个方法都被声明为final,因此在子类中不能覆写任何一个方法。
1、wait() //只能在同步块或同步方法中调用
public final void wait() throws InterruptedException, IllegalMonitorStateException
该方法用来将当前线程置入休眠状态,直到接到通知或被中断为止。在调用wait()之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用wait()方法。进入wait()方法后,当前线程释放锁。在从wait()返回前,线程与其他线程竞争重新获得锁。如果调用wait()时,没有持有适当的锁,则抛出IllegalMonitorStateException,它是RuntimeException的一个子类,因此,不需要try-catch结构。2、notify()
public final native void notify() throws IllegalMonitorStateException
该方法也要在同步方法或同步块中调用,即在调用前,线程也必须要获得该对象的对象级别锁,如果调用notify()时没有持有适当的锁,也会抛出IllegalMonitorStateException。
该方法用来通知那些可能等待该对象的对象锁的其他线程。如果有多个线程等待,则线程规划器任意挑选出其中一个wait()状态的线程来发出通知,并使它等待获取该对象的对象锁(notify后,当前线程不会马上释放该对象锁,wait所在的线程并不能马上获取该对象锁,要等到程序退出synchronized代码块后,当前线程才会释放锁,wait所在的线程也才可以获取该对象锁),但不惊动其他同样在等待被该对象notify的线程们。当第一个获得了该对象锁的wait线程运行完毕以后,它会释放掉该对象锁,此时如果该对象没有再次使用notify语句,则即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,会继续阻塞在wait状态,直到这个对象发出一个notify或notifyAll。这里需要注意:它们等待的是被notify或notifyAll,而不是锁。这与下面的notifyAll()方法执行后的情况不同。3、notifyAll()
public final native void notifyAll() throws IllegalMonitorStateException
该方法与notify()方法的工作方式相同,重要的一点差异是:
notifyAll使所有原来在该对象上wait的线程统统退出wait的状态(即全部被唤醒,不再等待notify或notifyAll,但由于此时还没有获取到该对象锁,因此还不能继续往下执行),变成等待获取该对象上的锁,一旦该对象锁被释放(notifyAll线程退出调用了notifyAll的synchronized代码块的时候),他们就会去竞争。如果其中一个线程获得了该对象锁,它就会继续往下执行,在它退出synchronized代码块,释放锁后,其他的已经被唤醒的线程将会继续竞争获取该锁,一直进行下去,直到所有被唤醒的线程都执行完毕。4、wait(long)和wait(long,int)
显然,这两个方法是设置等待超时时间的,后者在超值时间上加上ns,精度也难以达到,因此,该方法很少使用。对于前者,如果在等待线程接到通知或被中断之前,已经超过了指定的毫秒数,则它通过竞争重新获得锁,并从wait(long)返回。另外,需要知道,如果设置了超时时间,
当wait()返回时,我们不能确定它是因为接到了通知还是因为超时而返回的,因为wait()方法不会返回任何相关的信息。但一般可以通过设置标志位来判断,在notify之前改变标志位的值,在wait()方法后读取该标志位的值来判断,当然为了保证notify不被遗漏(还没有wait的时候,就已经notify啦),我们还需要另外一个标志位来循环判断是否调用wait()方法。
notify通知的遗漏:即threadA还没开始wait的时候,threadB已经notify了。
当调用wait时,如果线程调用了interrupt方法时,会抛出InterruptedException。
解决办法:
1. 需要添加一个boolean指示变量,该变量只能在同步代码块内部访问和修改。
public class MissedNotifyFix {
private Object proceedLock;
//该标志位用来指示线程是否需要等待
private boolean okToProceed;
public MissedNotifyFix() {
print("in MissedNotify()");
proceedLock = new Object();
//先设置为false
okToProceed = false;
}
public void waitToProceed() throws InterruptedException {
print("in waitToProceed() - entered");
synchronized ( proceedLock ) {
print("in waitToProceed() - entered sync block");
//while循环判断,这里不用if的原因是为了防止早期通知
while (okToProceed == false) {
print("in waitToProceed() - about to wait()");
proceedLock.wait();
print("in waitToProceed() - back from wait()");
}
print("in waitToProceed() - leaving sync block");
}
print("in waitToProceed() - leaving");
}
public void proceed() {
print("in proceed() - entered");
synchronized ( proceedLock ) {
print("in proceed() - entered sync block");
//通知之前,将其设置为true,这样即使出现通知遗漏的情况,也不会使线程在wait出阻塞
okToProceed = true;
print("in proceed() - changed okToProceed to true");
proceedLock.notifyAll();
print("in proceed() - just did notifyAll()");
print("in proceed() - leaving sync block");
}
print("in proceed() - leaving");
}
private static void print(String msg) {
String name = Thread.currentThread().getName();
System.out.println(name + ": " + msg);
}
public static void main(String[] args) {
final MissedNotifyFix mnf = new MissedNotifyFix();
Runnable runA = new Runnable() {
public void run() {
try {
//休眠1000ms,大于runB中的500ms,
//是为了后调用waitToProceed,从而先notifyAll,后wait,
Thread.sleep(1000);
mnf.waitToProceed();
} catch ( InterruptedException x ) {
x.printStackTrace();
}
}
};
Thread threadA = new Thread(runA, "threadA");
threadA.start();
Runnable runB = new Runnable() {
public void run() {
try {
//休眠500ms,小于runA中的1000ms,
//是为了先调用proceed,从而先notifyAll,后wait,
Thread.sleep(500);
mnf.proceed();
} catch ( InterruptedException x ) {
x.printStackTrace();
}
}
};
Thread threadB = new Thread(runB, "threadB");
threadB.start();
try {
Thread.sleep(10000);
} catch ( InterruptedException x ) {}
print("about to invoke interrupt() on threadA");
threadA.interrupt();
}
}
import java.util.*;
public class EarlyNotify {
private List list;
public EarlyNotify() {
list = Collections.synchronizedList(new LinkedList());
}
public String removeItem() throws InterruptedException {
print("in removeItem() - entering");
synchronized ( list ) {
while ( list.isEmpty() ) { //这里用if语句会发生危险
print("in removeItem() - about to wait()");
list.wait();
print("in removeItem() - done with wait()");
}
//删除元素
String item = (String) list.remove(0);
print("in removeItem() - leaving");
return item;
}
}
public void addItem(String item) {
print("in addItem() - entering");
synchronized ( list ) {
//添加元素
list.add(item);
print("in addItem() - just added: ‘" + item + "‘");
//添加后,通知所有线程
list.notifyAll();
print("in addItem() - just notified");
}
print("in addItem() - leaving");
}
private static void print(String msg) {
String name = Thread.currentThread().getName();
System.out.println(name + ": " + msg);
}
public static void main(String[] args) {
final EarlyNotify en = new EarlyNotify();
Runnable runA = new Runnable() {
public void run() {
try {
String item = en.removeItem();
print("in run() - returned: ‘" +
item + "‘");
} catch ( InterruptedException ix ) {
print("interrupted!");
} catch ( Exception x ) {
print("threw an Exception!!!\n" + x);
}
}
};
Runnable runB = new Runnable() {
public void run() {
en.addItem("Hello!");
}
};
try {
//启动第一个删除元素的线程
Thread threadA1 = new Thread(runA, "threadA1");
threadA1.start();
Thread.sleep(500);
//启动第二个删除元素的线程
Thread threadA2 = new Thread(runA, "threadA2");
threadA2.start();
Thread.sleep(500);
//启动增加元素的线程
Thread threadB = new Thread(runB, "threadB");
threadB.start();
Thread.sleep(10000); // wait 10 seconds
threadA1.interrupt();
threadA2.interrupt();
} catch ( InterruptedException x ) {}
}
}
生产者和消费者在同一时间段内共用同一存储空间,生产者向空间生产数据,而消费者取走数据。
这个模型主要涉及两个知识点:1.同步 2.线程间的通信
class Info{ // 定义信息类
private String name = "name";//定义name属性,为了与下面set的name属性区别开
private String content = "content" ;// 定义content属性,为了与下面set的content属性区别开
private boolean flag = true ; // 设置标志位,初始时先生产
public synchronized void set(String name,String content){
while(!flag){
try{
super.wait() ;
}catch(InterruptedException e){
e.printStackTrace() ;
}
}
this.setName(name) ; // 设置名称
try{
Thread.sleep(300) ;
}catch(InterruptedException e){
e.printStackTrace() ;
}
this.setContent(content) ; // 设置内容
flag = false ; // 改变标志位,表示可以取走
super.notifyAll();
}
public synchronized void get(){
while(flag){
try{
super.wait() ;
}catch(InterruptedException e){
e.printStackTrace() ;
}
}
try{
Thread.sleep(300) ;
}catch(InterruptedException e){
e.printStackTrace() ;
}
System.out.println(this.getName() +
" --> " + this.getContent()) ;
flag = true ; // 改变标志位,表示可以生产
super.notifyAll();
}
public void setName(String name){
this.name = name ;
}
public void setContent(String content){
this.content = content ;
}
public String getName(){
return this.name ;
}
public String getContent(){
return this.content ;
}
}
class Producer implements Runnable{ // 通过Runnable实现多线程
private Info info = null ; // 保存Info引用
public Producer(Info info){
this.info = info ;
}
public void run(){
boolean flag = true ; // 定义标记位
for(int i=0;i<10;i++){
if(flag){
this.info.set("姓名--1","内容--1") ; // 设置名称
flag = false ;
}else{
this.info.set("姓名--2","内容--2") ; // 设置名称
flag = true ;
}
}
}
}
class Consumer implements Runnable{
private Info info = null ;
public Consumer(Info info){
this.info = info ;
}
public void run(){
for(int i=0;i<10;i++){
this.info.get() ;
}
}
}
public class ThreadCaseDemo03{
public static void main(String args[]){
Info info = new Info(); // 实例化Info对象
Producer pro = new Producer(info) ; // 生产者
Consumer con = new Consumer(info) ; // 消费者
new Thread(pro).start() ;
//启动了生产者线程后,再启动消费者线程
try{
Thread.sleep(500) ;
}catch(InterruptedException e){
e.printStackTrace() ;
}
new Thread(con).start() ;
}
}
Executor框架是在Java5中引入的,其内部使用了线程池机制,通过该框架来控制线程的启动,执行和关闭,可以简化并发编程的操作。通过Executor来启动线程比使用Thread的start方法,除了更易管理,效率更好。
Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。
Executor接口中之定义了一个方法execute(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类。
ExecutorService接口继承自Executor接口,它提供了更丰富的实现多线程的方法,比如,ExecutorService提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以调用ExecutorService的shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。
ExecutorService的生命周期包括三种状态:运行、关闭、终止。创建后便进入运行状态,当调用了shutdown()方法时,便进入关闭状态,此时意味着ExecutorService不再接受新的任务,但它还在执行已经提交了的任务,当所有已经提交了的任务执行完后,便到达终止状态。如果不调用shutdown()方法,ExecutorService会一直处在运行状态,不断接收新的任务,执行新的任务,服务器端一般不需要关闭它,保持一直运行即可。
Executors:Executors提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。
public static ExecutorService newFixedThreadPool(int nThreads)
创建固定数目线程的线程池。
public static ExecutorService newCachedThreadPool()
创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newSingleThreadExecutor()
创建一个单线程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
public class TestCachedThreadPool{
public static void main(String[] args){
ExecutorService executorService = Executors.newCachedThreadPool();
// ExecutorService executorService = Executors.newFixedThreadPool(5);
// ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++){
executorService.execute(new TestRunnable());
System.out.println("************* a" + i + " *************");
}
executorService.shutdown();
}
}
class TestRunnable implements Runnable{
public void run(){
System.out.println(Thread.currentThread().getName() + "线程被调用了。");
}
}
任务分两类:一类是实现了Runnable接口的类,一类是实现了Callable接口的类。两者都可以被ExecutorService执行,但是Runnable任务没有返回值,而Callable任务有返回值。并且Callable的call()方法只能通过ExecutorService的submit(Callable task) 方法来执行,并且返回一个 Future,是表示任务等待完成的 Future。
Callable接口类似于Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常而Callable又返回结果,而且当获取返回结果时可能会抛出异常。Callable中的call()方法类似Runnable的run()方法,区别同样是有返回值,后者没有。
当将一个Callable的对象传递给ExecutorService的submit方法,则该call方法自动在一个线程上执行,并且会返回执行结果Future对象。同样,将Runnable的对象传递给ExecutorService的submit方法,则该run方法自动在一个线程上执行,并且会返回执行结果Future对象,但是在该Future对象上调用get方法,将返回null。
public class CallableDemo{
public static void main(String[] args){
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<String>> resultList = new ArrayList<Future<String>>();
//创建10个任务并执行
for (int i = 0; i < 10; i++){
//使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
Future<String> future = executorService.submit(new TaskWithResult(i));
//将任务执行结果存储到List中
resultList.add(future);
}
//遍历任务的结果
for (Future<String> fs : resultList){
try{
while(!fs.isDone());//Future返回如果没有完成,则一直循环等待,直到Future返回完成
/*需要注意:如果Future的返回尚未完成,则get()方法会阻塞等待,直到Future完成返回,可以通过调用isDone()方法判断Future是否完成了返回。*/
System.out.println(fs.get()); //打印各个线程(任务)执行的结果
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}finally{
//启动一次顺序关闭,执行以前提交的任务,但不接受新任务
executorService.shutdown();
}
}
}
}
class TaskWithResult implements Callable<String>{
private int id;
public TaskWithResult(int id){
this.id = id;
}
/**
* 任务的具体过程,一旦任务传给ExecutorService的submit方法,
* 则该方法自动在一个线程上执行
*/
public String call() throws Exception {
System.out.println("call()方法被自动调用!!! " + Thread.currentThread().getName());
//该返回结果将被Future的get方法得到
return "call()方法被自动调用,任务返回的结果是:" + id + " " + Thread.currentThread().getName();
}
}
自定义线程池,可以用ThreadPoolExecutor类创建,它有多个构造方法来创建线程池,用该类很容易实现自定义的线程池
构造函数说明:
corePoolSize:线程池中所保存的核心线程数,包括空闲线程。
maximumPoolSize:池中允许的最大线程数。
keepAliveTime:线程池中的空闲线程所能持续的最长时间。
unit:持续时间的单位。
workQueue:任务执行前保存任务的队列,仅保存由execute方法提交的Runnable任务。
我们可以看出,当试图通过excute方法讲一个Runnable任务添加到线程池中时,按照如下顺序来处理:
1、如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;
2、如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);
3、如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;
4、如果线程池中的线程数量等于了maximumPoolSize,有4种处理方式。
总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于corePoolSize,再看缓冲队列workQueue是否满,最后看线程池中的线程数量是否大于maximumPoolSize。
另外,当线程池中的线程数量大于corePoolSize时,如果里面有线程的空闲时间超过了keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。
public class ThreadPoolTest{
public static void main(String[] args){
//创建等待队列
BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);
//创建线程池,池中保存的线程数为3,允许的最大线程数为5
ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);
//创建七个任务
Runnable t1 = new MyThread();
Runnable t2 = new MyThread();
Runnable t3 = new MyThread();
Runnable t4 = new MyThread();
Runnable t5 = new MyThread();
Runnable t6 = new MyThread();
Runnable t7 = new MyThread();
//每个任务会在一个线程上执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
pool.execute(t6);
pool.execute(t7);
//关闭线程池
pool.shutdown();
}
}
class MyThread implements Runnable{
@Override
public void run(){
System.out.println(Thread.currentThread().getName() + "正在执行。。。");
try{
Thread.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
几种排队的策略:
下面说说几种排队的策略:
1、直接提交。缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们。如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒绝新提交的任务。newCachedThreadPool采用的便是这种策略。
2、无界队列。使用无界队列(典型的便是采用预定义容量的 LinkedBlockingQueue,理论上是该缓冲队列可以对无限多的任务排队)将导致在所有 corePoolSize 线程都工作的情况下将新任务加入到缓冲队列中。这样,创建的线程就不会超过 corePoolSize,也因此,maximumPoolSize 的值也就无效了。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列
。newFixedThreadPool采用的便是这种策略。
3、有界队列。当使用有限的 maximumPoolSizes 时,有界队列(一般缓冲队列使用ArrayBlockingQueue,并制定队列的最大长度)有助于防止资源耗尽,但是可能较难调整和控制,队列大小和最大池大小需要相互折衷,需要设定合理的参数。
标签:
原文地址:http://blog.csdn.net/jiangxishidayuan/article/details/51706393