标签:
本系列文章旨在分享Java5多线程与并法库的高级应用示例,所用到的大多数类均在java.util.concurrent包下。
package ustc.lichunchun.thread; /* * 创建线程的两种传统方式 */ public class TraditionalThread { public static void main(String[] args) { //在Thread子类覆盖的run方法中编写运行代码 Thread t1 = new Thread(){ public void run(){ while(true){ try{ Thread.sleep(1000); }catch(InterruptedException e){ e.printStackTrace(); } System.out.println("1: " + Thread.currentThread().getName()); System.out.println("2: " + this.getName()); } } }; t1.start(); //----------------------------------------------------------- //在传递给Thread对象的Runnable对象的run方法中编写代码 Thread t2 = new Thread(new Runnable(){ @Override public void run() { while(true){ try{ Thread.sleep(1000); }catch(InterruptedException e){ e.printStackTrace(); } System.out.println("1: " + Thread.currentThread().getName()); } } }); t2.start(); //----------------------------------------------------------- //涉及知识点:匿名内部类对象的构造方法如何调用父类的非默认构造方法 new Thread(new Runnable(){ @Override public void run() { while(true){ try{ Thread.sleep(1000); }catch(InterruptedException e){ e.printStackTrace(); } System.out.println("Runnable: " + Thread.currentThread().getName()); } } }){ public void run(){ while(true){ try{ Thread.sleep(1000); }catch(InterruptedException e){ e.printStackTrace(); } System.out.println("Thread: " + Thread.currentThread().getName()); } } }.start();//Thread: Thread-2 } }
package ustc.lichunchun.thread; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Res{ private String name; private int count = 1; private boolean flag; private Lock lock = new ReentrantLock(); private Condition producer_con = lock.newCondition(); private Condition consumer_con = lock.newCondition(); public void set(String name){ lock.lock(); try{ while(flag){ try{ producer_con.await(); }catch(InterruptedException e){ //e.printStackTrace(); } } this.name = name + "-" + count; count++; System.out.println(Thread.currentThread().getName()+ "......生产者 ...... " + this.name); this.flag = true; consumer_con.signal(); }finally{ lock.unlock(); } } public void get(){ lock.lock(); try{ while(!flag){ try{ consumer_con.await(); }catch(InterruptedException e){ //e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+ "...消费者 ... " + this.name); this.flag = false; producer_con.signal(); }finally{ lock.unlock(); } } } class Producer implements Runnable{ private Res r; Producer(Res r){ this.r = r; } @Override public void run() { while(true){ r.set("小龙虾"); } } } class Consumer implements Runnable{ private Res r; Consumer(Res r){ this.r = r; } @Override public void run() { while(true){ r.get(); } } } public class ProducerConsumerDemo { public static void main(String[] args) { Res r = new Res(); Producer pro = new Producer(r); Consumer con = new Consumer(r); Thread t0 = new Thread(pro); Thread t1 = new Thread(pro); Thread t2 = new Thread(con); Thread t3 = new Thread(con); t0.start(); t1.start(); t2.start(); t3.start(); } }
package ustc.lichunchun.thread; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Timer; import java.util.TimerTask; //完成一个定时调度的程序,每个2秒打印一次时间 class MyTask extends TimerTask{ //任务调度类都要继承TimerTask @Override public void run() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); System.out.println("当前的系统时间为:" + sdf.format(new Date())); } } public class TraditionalTimerDemo { public static void main(String[] args) { Timer t = new Timer(); //建立Timer类对象 MyTask mytask = new MyTask(); //定义任务 t.schedule(mytask, 1000, 2000); //设置任务的执行,1秒后开始,每2秒重复 } }schedule()与scheduleAtFixedRate()方法的区别:
package ustc.lichunchun.thread; import java.util.Date; import java.util.Timer; import java.util.TimerTask; public class TraditionalTimerTest { private static int count = 0; class MyTimerTaskA extends TimerTask{ @Override public void run() { System.out.println(Thread.currentThread().getName() + "......Bombing!"); new Timer().schedule(new MyTimerTaskB(), 4000); } } class MyTimerTaskB extends TimerTask{ @Override public void run() { System.out.println(Thread.currentThread().getName() + "......Bombing!"); new Timer().schedule(new MyTimerTaskA(), 2000); } } public static void main(String[] args) { //1.主线程每个1秒打印一次当前秒数,定时器Timer线程设定为10秒后执行单次bombing打印 new Timer().schedule(new TimerTask() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "......Bombing!"); } }, 10000); while(true){ System.out.println(Thread.currentThread().getName() + "..." + new Date().getSeconds()); try{ Thread.sleep(1000); }catch(InterruptedException e){ e.printStackTrace(); } } //-------------------------------------------------------------------- //2.Timer线程10秒后, 每隔2秒打印一次bombing new Timer().schedule(new TimerTask() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "......Bombing!"); } }, 10000, 2000); //-------------------------------------------------------------------- //3.1.Timer线程每2秒、4秒各炸一次,循环往复(方法一) new Timer().schedule(new TraditionalTimerTest().new MyTimerTaskA(), 2000); while(true){ System.out.println(Thread.currentThread().getName() + "..." + new Date().getSeconds()); try{ Thread.sleep(1000); }catch(InterruptedException e){ e.printStackTrace(); } } //-------------------------------------------------------------------- //3.2.Timer线程每2秒、4秒各炸一次,循环往复(方法二) class MyTimerTaskC extends TimerTask{ @Override public void run() { count = (count + 1) % 2; System.out.println(Thread.currentThread().getName() + "......Bombing!"); new Timer().schedule(new MyTimerTaskC(), 2000 + 2000*count); } } new Timer().schedule(new MyTimerTaskC(), 2000); while(true){ System.out.println(Thread.currentThread().getName() + "..." + new Date().getSeconds()); try{ Thread.sleep(1000); }catch(InterruptedException e){ e.printStackTrace(); } } } }
package ustc.lichunchun.thread; import java.util.concurrent.locks.ReentrantLock; /* * 线程要运行的代码就相当于共享资源厕所的一个坐席,互斥锁就相当于厕所坐席里的门闩。 */ public class TraditionalThreadSynchronized { public static void main(String[] args) { new TraditionalThreadSynchronized().init(); /* final ReentrantLock lock = new ReentrantLock(); Thread t = new Thread(){ public void run() { lock.lock(); System.out.println("thread t execute"); lock.unlock(); }; }; lock.lock(); lock.lock(); t.start(); Thread.sleep(200); System.out.println("release one once"); lock.unlock(); 上面的代码会出现死锁,因为主线程2次获取了锁,但是却只释放1次锁,导致线程t永远也不能获取锁。 */ } private void init(){ final Outputer outputer = new Outputer(); new Thread(new Runnable() { @Override public void run() { while(true){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } outputer.output("lichunchun"); } } }).start(); new Thread(new Runnable() { @Override public void run() { while(true){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } outputer.output2("alibaba"); } } }).start(); } static class Outputer{ public void output(String name){ int len = name.length(); synchronized (Outputer.class) { //this<-->output1() || Outputer.class<-->output2() for(int i = 0; i < len; i++){ System.out.print(name.charAt(i)); } System.out.println(); //output2(name);//synchronized也是一把可重入锁, } } public synchronized void output1(String name){ int len = name.length(); for(int i = 0; i < len; i++){ System.out.print(name.charAt(i)); } System.out.println(); } public static synchronized void output2(String name){ int len = name.length(); for(int i = 0; i < len; i++){ System.out.print(name.charAt(i)); } System.out.println(); } } }
这里,我们通过一道面试题来讲解。
子线程循环10次,接着主线程循环100次,接着又回到子线程循环10次,接着再回到主线程又循环100次,如此循环50次。
方法一:
package ustc.lichunchun.thread; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Business{ private boolean bShouldSub = true; public synchronized void sub(int i){ while(!bShouldSub){ //线程有可能在没有被通知的时候"伪唤醒",所以用while判断更加可靠 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } for(int j = 1; j <= 10; j++){ System.out.println("sub thread sequence of " + j + ", loop of " + i); } bShouldSub = false; this.notify(); } public synchronized void main(int i){ while(bShouldSub){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } for(int j = 1; j <= 100; j++){ System.out.println("main thread sequence of " + j + ", loop of " + i); } bShouldSub = true; this.notify(); } } public class TraditionalThreadCommunication { public static void main(String[] args) { final Business business = new Business(); new Thread(new Runnable(){ @Override public void run() { for(int i = 1; i <= 50; i++){ business.sub(i); } } }).start(); for(int i = 1; i <= 50; i++){ business.main(i); } } }方法二:
/* * 下面使用jdk5中的并发库来实现 */ class Business{ private boolean bShouldSub = true; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void sub(int i){ lock.lock(); try{ while(!bShouldSub){ try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } for(int j = 1; j <= 10; j++){ System.out.println("sub thread sequence of " + j + ", loop of " + i); } bShouldSub = false; condition.signal(); }finally{ lock.unlock(); } } public void main(int i){ lock.lock(); try{ while(bShouldSub){ try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } for(int j = 1; j <= 100; j++){ System.out.println("main thread sequence of " + j + ", loop of " + i); } bShouldSub = true; condition.signal(); }finally{ lock.unlock(); } } } public class TraditionalThreadCommunication{ public static void main(String[] args) { final Business business = new Business(); new Thread(new Runnable(){ @Override public void run() { for(int i = 1; i <= 50; i++){ business.sub(i); } } }).start(); for(int i = 1; i <= 50; i++){ business.main(i); } } }
package ustc.lichunchun.thread; import java.util.HashMap; import java.util.Map; import java.util.Random; /* * 线程范围内的共享变量,各线程之间相互独立 */ public class ThreadScopeShareData { //private static int data = 0; private static Map<Thread, Integer> threadData = new HashMap<Thread, Integer>(); public static void main(String[] args) { for(int i = 0; i < 2; i++){ new Thread(new Runnable(){ @Override public void run() { int data = new Random().nextInt(); System.out.println(Thread.currentThread().getName() + " has put data: " + data); threadData.put(Thread.currentThread(), data); new A().get(); new B().get(); } }).start(); } } static class A{ public void get(){ int data = threadData.get(Thread.currentThread()); System.out.println("A from " + Thread.currentThread().getName() + " get data: " + data); } } static class B{ public void get(){ int data = threadData.get(Thread.currentThread()); System.out.println("B from " + Thread.currentThread().getName() + " get data: " + data); } } }
线程范围内共享变量的应用:ThreadLocal类的实用技巧
由上一节的原理代码示例和插图可以知道,ThreadLocal类的作用和目的:
用于实现线程内的数据共享,即对于相同的程序代码,多个模块在同一个线程中运行时要共享一份数据,而在另外线程中运行时又共享另外一份数据。
实现对ThreadLocal变量的封装,让外界不要直接操作ThreadLocal变量。
----对基本类型的数据的封装,这种应用相对很少见。
----对对象类型的数据的封装,比较常见,即让某个类针对不同线程分别创建一个独立的实例对象。
实验案例:定义一个全局共享的ThreadLocal变量,然后启动多个线程向该ThreadLocal变量中存储一个随机值,接着各个线程调用另外其他多个类的方法,这多个类的方法中读取这个ThreadLocal变量的值,就可以看到多个类在同一个线程中共享同一份数据。
package ustc.lichunchun.thread; import java.util.Random; /* 定义一个全局共享的ThreadLocal变量,然后启动多个线程向该ThreadLocal变量中存储一个随机值, 接着各个线程调用另外其他多个类的方法,这多个类的方法中读取这个ThreadLocal变量的值, 就可以看到多个类在同一个线程中共享同一份数据。 */ public class ThreadLocalTest{ private static ThreadLocal<Integer> x = new ThreadLocal<Integer>(); private static ThreadLocal<MyThreadScopeData> myThreadScopeData = new ThreadLocal<MyThreadScopeData>(); public static void main(String[] args) { for(int i = 0; i < 2; i++){ new Thread(new Runnable(){ @Override public void run() { int data = new Random().nextInt(); System.out.println(Thread.currentThread().getName() + " has put data: " + data); x.set(data); MyThreadScopeData myData = new MyThreadScopeData(); myData.setName("name:"+data); myData.setAge(data); myThreadScopeData.set(myData); new A().get(); new B().get(); } }).start(); } } static class A{ public void get(){ int data = x.get(); System.out.println("A from " + Thread.currentThread().getName() + " get data :" + data); MyThreadScopeData myData = myThreadScopeData.get(); System.out.println("A from " + Thread.currentThread().getName() + " getMyData: " + myData.getName() + "," + myData.getAge()); } } static class B{ public void get(){ int data = x.get(); System.out.println("B from " + Thread.currentThread().getName() + " get data :" + data); MyThreadScopeData myData = myThreadScopeData.get(); System.out.println("B from " + Thread.currentThread().getName() + " getMyData: " + myData.getName() + "," + myData.getAge()); } } }进阶版案例:
package ustc.lichunchun.thread; import java.util.Random; public class ThreadLocalTest { public static void main(String[] args) { for (int i = 0; i < 2; i++){ new Thread(new Runnable(){ @Override public void run() { int data = new Random().nextInt(); System.out.println(Thread.currentThread().getName() + " has put data: " + data); MyThreadScopeData.getInstance().setName("name:" + data); MyThreadScopeData.getInstance().setAge(data); new A().get(); new B().get(); } }).start(); } } static class A{ public void get() { MyThreadScopeData myData = MyThreadScopeData.getInstance(); System.out.println("A from " + Thread.currentThread().getName() + " getMyData: " + myData.getName() + ", age:" + myData.getAge()); } } static class B{ public void get() { MyThreadScopeData myData = MyThreadScopeData.getInstance(); System.out.println("B from " + Thread.currentThread().getName() + " getMyData: " + myData.getName() + ", age:" + myData.getAge()); } } } //基于饿汉单例模式的改造,每个线程对应一个MyThreadScopeData实例 class MyThreadScopeData{ private MyThreadScopeData(){} private static ThreadLocal<MyThreadScopeData> map = new ThreadLocal<MyThreadScopeData>(); public static MyThreadScopeData getInstance(){ MyThreadScopeData instance = map.get(); if(instance == null){ instance = new MyThreadScopeData(); map.set(instance); } return instance; } private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }ThreadLocal的应用场景
1. 如果每个线程执行的代码相同,可以使用同一个Runnable对象,这个Runnable对象中有那个共享数据。
例如,四个窗口售100张票:
package ustc.lichunchun.thread; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class MultiThreadShareData { public static void main(String[] args) { ShareData1 data1 = new ShareData1(); new Thread(data1).start(); new Thread(data1).start(); new Thread(data1).start(); new Thread(data1).start(); } } class ShareData1 implements Runnable{ private int count = 100; @Override public void run() { while(true){ synchronized(this){ if(this.count > 0){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+": "+count--); }else{ break; } } } } }
2. 如果每个线程执行的代码不同,这时候需要用不同的Runnable对象,有如下三种方式来实现这些Runnable对象之间的数据共享:
(1)将共享数据封装在另外一个对象中,然后将这个对象逐一传递给各个Runnable对象。每个线程对共享数据的操作方法也分配到那个对象身上去完成,这样容易实现针对该数据进行的各个操作的互斥和通信。
public class MultiThreadShareData{ public static void main(String[] args) { ShareData data = new ShareData();//含有共享数据的对象只有一个,并作为参数传递给各个Runnable线程对象作为其各自的任务 for(int i = 0; i < 2; i++){ new Thread(new Inc(data)).start(); new Thread(new Dec(data)).start(); } } } class ShareData{ private int j = 0;//共享数据 private Lock lock = new ReentrantLock(); //private Condition con = lock.newCondition();//可用于等待唤醒机制,配上flag标志 //下面是对共享数据进行要进行的两个操作,定义在这个对象中,可以方便的实现互斥 public void inc(){ lock.lock(); j++; System.out.println("j="+(this.j-1)+" increase by "+Thread.currentThread().getName()+": j="+this.j); lock.unlock(); } public void dec(){ lock.lock(); j--; System.out.println("j="+(this.j+1)+" decrease by "+Thread.currentThread().getName()+": j="+this.j); lock.unlock(); } } class Inc implements Runnable{//对共享数据进行加法的线程任务 private ShareData data; public Inc(ShareData data){ this.data = data; } @Override public void run() { while(true){ data.inc(); } } } class Dec implements Runnable{//对共享数据进行减法的线程任务 private ShareData data; public Dec(ShareData data){ this.data = data; } @Override public void run() { while(true){ data.dec(); } } }
(2)将共享数据封装在另外一个对象中,每个线程对共享数据的操作方法也分配到那个对象身上去完成,对象作为这个外部类中的成员变量或方法中的局部变量,每个线程的Runnable对象作为外部类中的成员内部类或局部内部类。
public class MultiThreadShareData{ private static ShareData data = new ShareData(); public static void main(String[] args) { //final ShareData data = new ShareData();//方法的局部final变量也可以 new Thread(new Runnable(){ @Override public void run() { while(true){ data.inc(); } } }).start(); new Thread(new Runnable(){ @Override public void run() { while(true){ data.dec(); } } }).start(); } } class ShareData{ private int j = 0; public synchronized void inc(){ j++; System.out.println("j="+(this.j-1)+" increase by "+Thread.currentThread().getName()+": j="+this.j); } public synchronized void dec(){ j--; System.out.println("j="+(this.j+1)+" decrease by "+Thread.currentThread().getName()+": j="+this.j); } }
(3)将这些Runnable对象作为某一个类中的内部类,共享数据作为这个外部类中的成员变量(内部类可以直接操作外部类成员变量),每个线程对共享数据的操作方法也分配给外部类,以便实现对共享数据进行的各个操作的互斥和通信,作为内部类的各个Runnable对象调用外部类的这些方法。
public class MultiThreadShareData{ private int j = 0; public static void main(String[] args) { MultiThreadShareData data = new MultiThreadShareData(); Inc inc = data.new Inc(); Dec dec = data.new Dec(); for(int i = 0; i < 2; i++){ new Thread(inc).start(); new Thread(dec).start(); } } private synchronized void inc(){ j++; System.out.println(Thread.currentThread().getName()+" inc: "+j); } private synchronized void dec(){ j--; System.out.println(Thread.currentThread().getName()+" dec: "+j); } class Inc implements Runnable{ @Override public void run() { for(int i = 0; i < 100; i++){ inc(); } } } class Dec implements Runnable{ @Override public void run() { for(int i = 0; i < 100; i++){ dec(); } } } }
总结:要同步互斥的几段代码最好是分别放在几个独立的方法中,这些方法再放在同一个类中,这样比较容易实现它们之间的同步互斥和通信。
1. 何谓原子操作?
Atomic一词跟原子有点关系,后者曾被人认为是最小物质的单位。计算机中的Atomic是指不能分割成若干部分的意思。如果一段代码被认为是Atomic,则表示这段代码在执行过程中,是不能被中断的。通常来说,原子指令由硬件提供,供软件来实现原子方法(某个线程进入该方法后,就不会被中断,直到其执行完成)
在x86 平台上,CPU提供了在指令执行期间对总线加锁的手段。CPU芯片上有一条引线#HLOCK pin,如果汇编语言的程序中在一条指令前面加上前缀"LOCK",经过汇编以后的机器代码就使CPU在执行这条指令的时候把#HLOCK pin的电位拉低,持续到这条指令结束时放开,从而把总线锁住,这样同一总线上别的CPU就暂时不能通过总线访问内存了,保证了这条指令在多处理器环境中的原子性。
2. JDK1.5的原子包:java.util.concurrent.atomic
这个包里面提供了一组原子类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。实际上是借助硬件的相关指令来实现的,不会阻塞线程(或者说只是在硬件级别上阻塞了)。其中的类可以分成4组
Atomic类的作用
这四种基本类型用来处理布尔,整数,长整数,对象四种数据。
4. 示例
例如,类 AtomicLong 和 AtomicInteger 提供了原子增量方法。一个应用程序将按以下方式生成序列号:
class Sequencer { <span style="white-space:pre"> </span>private AtomicLong sequenceNumber = new AtomicLong(0); <span style="white-space:pre"> </span>public long next() { return sequenceNumber.getAndIncrement(); } }AtomicBoolean使用示例:
private AtomicBoolean running = new AtomicBoolean(false); @Override protected Object execute() throws Exception { if (running.compareAndSet(false, true)) { try { //TODO } finally { running.set(false); } } }
标签:
原文地址:http://blog.csdn.net/zhongkelee/article/details/51720521