标签:
线程作为操作系统调度的最小单元,多个线程能够同时执行,这将显著提升程序性能,在多核环境中表现得更加明显。但是,过多地创建线程和对线程的不当管理也容易造成问。
public class Priority { private static volatile boolean notStart=true; private static volatile boolean notEnd=true; static class Job implements Runnable{ private int priority; private long jobCount; public Job(int priority){ this.priority=priority; } @Override public void run() { while(notStart){ Thread.yield(); } while(notEnd){ Thread.yield(); jobCount++; } } } public static void main(String[] args) throws InterruptedException{ List<Job> jobs=new ArrayList<Job>(); for(int i=0;i<10;i++){ int priority=i<5?Thread.MIN_PRIORITY:Thread.MAX_PRIORITY; Job job=new Job(priority); jobs.add(job); Thread thread=new Thread(job,"Thread:"+i); thread.setPriority(priority); thread.start(); } notStart=false; Thread.sleep(1000); notEnd=false; for(Job job:jobs){ System.out.println("Job Priority:"+job.priority+" Count:"+job.jobCount); } } }我们开始10个工作线程,在特定时间内进行计数。在本机上运行结果如下:
public class ThreadState { //该线程不断进行睡眠,进行有限时间等待 static class TimeWaiting implements Runnable{ @Override public void run() { while(true){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } //该线程在Waiting.class实例上wait()进入等待状态,知道其他线程调用Waiting.class实例上的notify()和notifyAll() static class Waiting implements Runnable{ @Override public void run() { while(true){ synchronized(Waiting.class){ try { Waiting.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } //该线程在Blocked.class实例上加锁后,过一段时间再释放锁,因此下一个线程申请锁将会阻塞住 static class Blocked implements Runnable{ @Override public void run() { synchronized(Blocked.class){ while(true){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } } public static void main(String[] args){ new Thread(new TimeWaiting(),"TimeWaitingThread").start(); new Thread(new Waiting(),"WaitingThread").start(); //使用两个Blocked线程,一个获取锁成功,另一个将会被阻塞 new Thread(new Blocked(),"BlockedThread-1").start(); new Thread(new Blocked(),"BlockedThread-2").start(); } }1、执行上述代码。
public class Daemon { static class DaemonRunner implements Runnable{ @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("DeamonThread finally run."); } } } public static void main(String[] args){ Thread thread=new Thread(new DaemonRunner(),"DaemonRunner"); thread.setDaemon(true); thread.start(); } }运行Daemon程序,可以看到在终端或者命令提示符上没有任何输出。main线程(非Daemon线程)在启动了线程DaemonRunner之后随着main方法执行完毕而终止,而此时Java虚拟机中已经没有非Daemon线程,虚拟机需要退出。Java虚拟机中的所有Daemon线程都需要立即终止,因此DaemonRunner立即终止,但是DaemonRunner中的finally块并没有执行。
在运行线程之前,首先要构造一个线程对象,线程对象的构造过程中我们可以根据需要设置特定的线程属性,例如线程名称、线程优先级、是否为Daemon线程等。
public class Interrupted { static class SleepRunner implements Runnable{ @Override public void run() { while(true){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class BusyRunner implements Runnable{ @Override public void run() { while(true){ } } } public static void main(String[] args) throws InterruptedException{ //sleepRunner不停的尝试睡眠 Thread sleepRunner=new Thread(new SleepRunner(),"SleepThread"); sleepRunner.setDaemon(true); //busyRunner不停的运转 Thread busyRunner=new Thread(new BusyRunner(),"BusyThread"); busyRunner.setDaemon(true); sleepRunner.start(); busyRunner.start(); //休眠5秒,让sleepRunner和busyRunner充分运行 Thread.sleep(1000*5); sleepRunner.interrupt(); busyRunner.interrupt(); System.out.println("SleepThread interrupted is "+sleepRunner.isInterrupted()); System.out.println("BusyThread interrupted is "+busyRunner.isInterrupted()); //防止sleepRunner和busyRunner立刻退出 Thread.sleep(1000*2); } }输出:
public class Shutdown { static class Runner implements Runnable{ private long i; private volatile boolean on=true; @Override public void run() { while(on&&!Thread.currentThread().isInterrupted()){ i++; } System.out.println("Count i="+i); } public void cancel(){ on=false; } } public static void main(String[] args) throws InterruptedException{ Thread countThread=new Thread(new Runner(),"CountThread"); countThread.start(); //睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知中断而结束 Thread.sleep(1000*1); countThread.interrupt();//使用中断来终止线程 //重新开启一个计数线程 Runner second=new Runner(); countThread=new Thread(second,"CountThread"); countThread.start(); //睡眠1秒,main线程对second进行取消,使CountThread能够感知on为false而结束 Thread.sleep(1000*1); second.cancel(); } }示例在执行过程中,main线程通过中断操作和cancel()方法均可使CountThread得以终止。这种通过标识位或者中断操作的方式能够使线程在终止时有机会去清理资源,而不是武断地将线程停止,因此这种终止线程的做法显得更加安全和优雅。
while (value != desire) { Thread.sleep(1000); } doSomething();
public class WaitNotify { static boolean flag=true; static Object lock=new Object(); static class Wait implements Runnable{ @Override public void run() { //加锁拥有lock的Monitor synchronized(lock){ //当条件不满足时, 继续wait,同时释放了lock的锁 while(flag){ try { System.out.println("flag is true.wait! "+System.currentTimeMillis()); lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //条件满足时,完成工作 System.out.println("flag is false.running "+System.currentTimeMillis()); } } } static class Notify implements Runnable{ @Override public void run() { //加锁,拥有lock的Monitor synchronized(lock){ //发送通知,通知时不会释放lock的锁 //直到当前线程释放了lock后,WaitThread才能从wait方法中返回 System.out.println("hold lock.notify "+System.currentTimeMillis()); lock.notifyAll(); flag=false; try { Thread.sleep(1000*5); } catch (InterruptedException e) { e.printStackTrace(); } } //再次加锁 synchronized(lock){ System.out.println("hold lock.again "+System.currentTimeMillis()); try { Thread.sleep(1000*5); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) throws InterruptedException{ Thread waitThread=new Thread(new Wait(),"WaitThread"); waitThread.start(); Thread.sleep(1000); Thread notifyThread=new Thread(new Notify(),"NotifyThread"); notifyThread.start(); } }输出:
synchronized(对象){ //当条件不满足时, 继续wait,同时释放了lock的锁 while(条件不满足){ lock.wait(); } //条件满足时,完成对应的处理逻辑 }
synchronized(对象){ //改变条件 对象.notifyAll(); //通知等待线程 }
public class SingleProduceConsumer { static boolean empty=true; static Object plate=new Object();//盘子 static class Consumer implements Runnable{ @Override public void run() { //加锁 synchronized(plate){ //当条件不满足时,继续wait while(empty){ try { plate.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //条件满足时,完成工作 System.out.println("从盘子里拿一个苹果!"); } } } static class Produce implements Runnable{ @Override public void run() { //加锁 synchronized(plate){ //改变条件 System.out.println("向盘子里放入一个苹果!"); empty=false; plate.notifyAll(); } } } public static void main(String[] args) throws InterruptedException{ Thread consumerThread=new Thread(new Consumer()); consumerThread.start(); Thread produceThread=new Thread(new Produce()); produceThread.start(); Thread.sleep(100); consumerThread.interrupt(); produceThread.interrupt(); } }输出:
public class ProduceConsumerOne { static AtomicInteger plate=new AtomicInteger(0);//盘子 static class Consumer implements Runnable{ @Override public void run() { try{ while(true){ //加锁 synchronized(plate){ //当条件不满足时,继续wait while(plate.intValue()==0){ plate.wait(); } //条件满足时,完成工作 plate.set(0); System.out.println("从盘子里拿一个苹果!"); plate.notifyAll(); } } }catch (InterruptedException e) { return; } } } static class Produce implements Runnable{ @Override public void run() { try { while(true){ //加锁 synchronized(plate){ while(plate.intValue()==1){ plate.wait(); } plate.set(1); System.out.println("向盘子里放入一个苹果!"); plate.notifyAll(); } } } catch (InterruptedException e) { return; } } } public static void main(String[] args) throws InterruptedException{ Thread consumerThread=new Thread(new Consumer()); consumerThread.start(); Thread.sleep(10); Thread produceThread=new Thread(new Produce()); produceThread.start(); Thread.sleep(100); produceThread.interrupt(); Thread.sleep(10); consumerThread.interrupt(); } }
public class ProduceConsumerN { static final int MaxSize=10; static AtomicInteger basket=new AtomicInteger(0);//篮子初始为空 static class Consumer implements Runnable{ @Override public void run() { try{ while(true){ //加锁 synchronized(basket){ //当条件不满足时,继续wait while(basket.intValue()==0){ basket.wait(); } //条件满足时,完成工作 int i=basket.intValue(); basket.set(basket.intValue()-1); System.out.println("从篮子里拿苹果 "+i+" !"); basket.notifyAll(); } } }catch (InterruptedException e) { return; } } } static class Produce implements Runnable{ @Override public void run() { try { while(true){ //加锁 synchronized(basket){ //当条件不满足时,继续wait while(basket.intValue()==MaxSize){ basket.wait(); } //当条件满足时,完成工作 int i=basket.intValue(); basket.set(basket.intValue()+1); System.out.println("向盘子里放入苹果"+(i+1)+"!"); basket.notifyAll(); } } } catch (InterruptedException e) { return; } } } public static void main(String[] args) throws InterruptedException{ Thread consumerThread=new Thread(new Consumer()); consumerThread.start(); Thread.sleep(10); Thread produceThread=new Thread(new Produce()); produceThread.start(); Thread.sleep(100); consumerThread.interrupt(); produceThread.interrupt(); } }
public class MultiProduceConsumerN { static final int MaxSize=10; static Object mutexProduce=new Object(); static Object mutexConsumer=new Object(); static AtomicInteger basket=new AtomicInteger(0);//篮子初始为空 static class Consumer implements Runnable{ @Override public void run() { try{ while(true){ synchronized(mutexConsumer){ //加锁 synchronized(basket){ //当条件不满足时,继续wait while(basket.intValue()==0){ basket.wait(); } //条件满足时,完成工作 int i=basket.intValue(); basket.set(basket.intValue()-1); System.out.println(Thread.currentThread().getName()+"从篮子里拿苹果 "+i+" !"); basket.notifyAll(); } } } }catch (InterruptedException e) { return; } } } static class Produce implements Runnable{ @Override public void run() { try { while(true){ synchronized(mutexProduce){ //加锁 synchronized(basket){ //当条件不满足时,继续wait while(basket.intValue()==MaxSize){ basket.wait(); } //当条件满足时,完成工作 int i=basket.intValue(); basket.set(basket.intValue()+1); System.out.println(Thread.currentThread().getName()+"向盘子里放入苹果"+(i+1)+"!"); basket.notifyAll(); } } } } catch (InterruptedException e) { return; } } } public static void main(String[] args) throws InterruptedException{ Thread consumer1=new Thread(new Consumer(),"Consumer1"); consumer1.start(); Thread consumer2=new Thread(new Consumer(),"Consumer2"); consumer2.start(); Thread.sleep(10); Thread produce1=new Thread(new Produce(),"Produce1"); produce1.start(); Thread produce2=new Thread(new Produce(),"Produce2"); produce2.start(); Thread.sleep(100); produce1.interrupt(); produce2.interrupt(); Thread.sleep(10);//消费完剩余产品 consumer1.interrupt(); consumer2.interrupt(); } }
public class Piped { static class Print implements Runnable{ private PipedReader in; public Print(PipedReader in){ this.in=in; } @Override public void run() { int receive=0; try{ while((receive=in.read())!=-1){ System.out.print((char)receive); } }catch(IOException e){ } } } public static void main(String[] args) throws IOException { PipedWriter out=new PipedWriter(); PipedReader in =new PipedReader(); //将输入流和输出流进行连接,否则在使用时会抛出IOException out.connect(in); Thread printThread=new Thread(new Print(in),"PrintThread"); printThread.start(); int receive=0; try{ while((receive=System.in.read())!=-1){ out.write(receive); } }finally{ out.close(); } } }运行程序,在控制台下输入的字符将会原样输出。
public class Join { static class Domino implements Runnable{ private Thread preThread; public Domino(Thread preThread){ this.preThread=preThread; } @Override public void run() { try { preThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" terminate."); } } public static void main(String[] args) throws InterruptedException{ Thread preThread=Thread.currentThread(); for(int i=0;i<10;i++){ //每个线程都拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回 Thread thread=new Thread(new Domino(preThread),"Thread"+String.valueOf(i)); thread.start(); preThread=thread; } Thread.sleep(1000); System.out.println(Thread.currentThread().getName()+" terminate."); } }输出:
// 加锁当前线程对象 public final synchronized void join() throws InterruptedException { // 条件不满足,继续等待 while (isAlive()) { wait(0); } // 条件符合,方法返回 }当线程终止时,会调用线程自身的notifyAll()方法,会通知所有等待在该线程对象上的线程。可以看到join()方法的逻辑结构与等待/通知经典范式一致,即加锁、循环和处理逻辑3个步骤。
public T get() { } public void set(T value) { } public void remove() { } protected T initialValue() { }get()方法是用来获取ThreadLocal在当前线程中保存的变量副本,set()用来设置当前线程中变量的副本,remove()用来移除当前线程中变量的副本,initialValue()是一个protected方法,用来给ThreadLocal变量提供初始值,每个线程都会获取这个初始值的一个副本。
public class ThreadLocalTest { //创建一个Integer型的线程本地变量 public static final ThreadLocal<Integer> local = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { return 0; } }; //计数 static class Counter implements Runnable{ @Override public void run() { //获取当前线程的本地变量,然后累加5次 int num = local.get(); for (int i = 0; i < 100; i++) { num++; } //重新设置累加后的本地变量 local.set(num); System.out.println(Thread.currentThread().getName() + " : "+ local.get()); } } public static void main(String[] args) throws InterruptedException { Thread[] threads = new Thread[5]; for (int i = 0; i < 5; i++) { threads[i] = new Thread(new Counter() ,"CounterThread-[" + i+"]"); threads[i].start(); } } }输出:
public class ThreadLocalMisunderstand { static class Index { private int num; public void increase() { num++; } public int getValue() { return num; } } private static Index num=new Index(); //创建一个Integer型的线程本地变量 public static final ThreadLocal<Index> local = new ThreadLocal<Index>() { @Override protected Index initialValue() { return num; } }; //计数 static class Counter implements Runnable{ @Override public void run() { //获取当前线程的本地变量,然后累加5次 Index num = local.get(); for (int i = 0; i < 10000; i++) { num.increase(); } //重新设置累加后的本地变量 local.set(num); System.out.println(Thread.currentThread().getName() + " : "+ local.get().getValue()); } } public static void main(String[] args) throws InterruptedException { Thread[] threads = new Thread[5]; for (int i = 0; i < 5; i++) { threads[i] = new Thread(new Counter() ,"CounterThread-[" + i+"]"); } for (int i = 0; i < 5; i++) { threads[i].start(); } } }输出:
private static ThreadLocal<Index> local = new ThreadLocal<Index>() { @Override protected Index initialValue() { return new Index(); //注意这里 } };
标签:
原文地址:http://blog.csdn.net/sunxianghuang/article/details/51944798