码迷,mamicode.com
首页 > 编程语言 > 详细

线程通信

时间:2017-01-14 23:10:00      阅读:338      评论:0      收藏:0      [点我收藏+]

标签:产生   can   value   cat   delay   now()   共享   out   执行   

线程通信

一 使用Synchronized的线程

1.当线程在系统内运行时,线程的调度具有一定的透明性,程序通常无法准确控制线程的轮换执行,但java也提供了一些机制来保证线程协调运行。
Object类提供了wait(),notify()和notifyAll()三个方法,这三个方法属于Object类,但是必须由同步监视器来调用,可以分为以下两种情况:
(1)对于使用synchronized修饰的同步方法,因为该类的默认实例this就是同步监视器,所以可以在同步方法中直接调用这三个方法
(2)对于使用synchronized修饰的同步代码块,同步监视器是synchronized后括号里的对象,所以必须使用该对象调用这三个方法

2.wait():导致当前线程等待,直到其他线程调用该同步监视器的notify()方法或notifyAll()方法来唤醒该线程。无时间参数的wait会一直等待,直到其他线程通知,带毫秒的wait会等到指定时间后自动苏醒。调用wait()方法的当前线程会释放对该同步监视器的锁定。
notify():唤醒在此同步监视器上等待的单个线程。如果所有线程都是在此同步监视器上等待,则会选择唤醒其中一个线程,选择是任意的。
notifyAll():唤醒在此同步监视器上等待的所有线程。

3.假设在系统中有两个线程,分别代表存款者和取钱者,系统要求存款者和取钱者不断重复存钱、取钱的动作,而且要求每当存款者将钱存入指定账户后,取钱者就立即取出该笔钱,不允许存款者连续两次存钱,也不允许取钱者连续两次取钱。

程序中可以通过一个旗标来标识账户中是否已有存款,当旗标为false时,账户中没有存款,存款者线程可以向下执行,当存款者存款后,旗标为true,并调用notify()或notifyAll()方法来唤醒其他线程,并调用wait()让存款者线程等待,取款者的操作类似。

Account.java

public class Account {
    private String accountNo;
    private double balance;
    private boolean flag=false;
    public Account(String accountNo,double balance){
        this.accountNo=accountNo;
        this.balance=balance;
    }

    //因为账户余额不可以随便更改,所以只为balance提供getter方法
    public double getBalance() {
        return balance;
    }

    public String getAccountNo() {
        return accountNo;
    }

    public void setAccountNo(String accountNo) {
        this.accountNo = accountNo;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        Account account = (Account) o;

        return accountNo.equals(account.accountNo);

    }

    @Override
    public int hashCode() {
        return accountNo.hashCode();
    }

    public synchronized void draw(double drawAmount){
        try{
            if(!flag){
                wait();
            }else{
                System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount);
                balance-=drawAmount;
                System.out.println("账户余额为:"+balance);
                flag=false;
                notifyAll();
            }
        }catch(InterruptedException ex){
            ex.printStackTrace();
        }
    }

    public synchronized void depoist(double depoistAmount){
        try{
            if(flag){
                wait();
            }else{
                System.out.println(Thread.currentThread().getName()+"存款:"+depoistAmount);
                balance+=depoistAmount;
                System.out.println("账户余额为:"+balance);
                flag=true;
                notifyAll();
            }
        }catch(InterruptedException ex){
            ex.printStackTrace();
        }
    }
}

DrawThread.java 

public class DrawThread extends Thread {
    private Account account;
    private double drawAmount;

    public DrawThread(String name, Account account, double drawAmount) {
        super(name);
        this.account = account;
        this.drawAmount = drawAmount;
    }
    public void run(){
        for(int i=0;i<100;i++) {
            account.draw(drawAmount);
        }    
    }
}

DepositThread.java

public class DepositThread extends Thread {
    private Account account;
    private double depositAmount;
    public DepositThread(String name, Account account, double depositAmount) {
        super(name);
        this.account = account;
        this.depositAmount = depositAmount;
    }
    public void run(){
        for(int i=0;i<100;i++){
            account.depoist(depositAmount);
        }
    }
}

DrawTest.java 

public class DrawTest {
    public static void main(String[] args){
        Account acct=new Account("1234567",0);
        new DrawThread("取钱者",acct,800).start();
        new DepositThread("存款者甲",acct,800).start();
        new DepositThread("存钱者乙",acct,800).start();
        new DepositThread("存钱者丙",acct,800).start();
    }
}

结果:
存款者甲存款:800.0
账户余额为:800.0
取钱者取钱:800.0
账户余额为:0.0


从结果可以发现,三个存款者线程随机的向账户中存款,只有一个取款者取钱。但是程序最后被阻塞无法继续向下执行,这是因为三个存款者线程共有300此存款操作,而一个取钱者线程只有100次取钱操作,程序最后被阻塞,如下:


存钱者丙存款:800.0
账户余额为:800.0

这里的阻塞并不是死锁,只是取钱者线程已经执行完毕,而存款者线程还在等待其他线程来取钱而已,并不是等待其他线程释放同步监视器。

 

二 使用Condition控制线程通信

1.如果程序中直接使用Lock对象来保证同步,则系统中不存在隐式的同步监视器,就不能使用wait(),notify(),notifyAll()进行线程通信了。Java提供了一个Condition来保持协调,使用Condition可以让那些已经得到Lock对象却无法继续执行的线程释放Lock锁,也可以唤醒其他处于等待的线程。Condition实例被绑定在了一个Lock对象上,只要调用Lock对象的newCondition()方法即可。

2.Condition提供了如下三个方法:
await():同wait()方法,但是该方法有更多的变体,如long awaitNanos(long nanosTimeout),void awaitUninterruptibly(),awaitUtil(Date deadline)等
signal():同notify()
signalAll():同notifyAll()

3.Account.java

public class Account {
    private final Lock lock=new ReentrantLock();
    private final Condition cond=lock.newCondition();
    private String accountNo;
    private double balance;
    private boolean flag=false;
    public Account(String accountNo,double balance){
        this.accountNo=accountNo;
        this.balance=balance;
    }

    //因为账户余额不可以随便更改,所以只为balance提供getter方法
    public double getBalance() {
        return balance;
    }

    public String getAccountNo() {
        return accountNo;
    }

    public void setAccountNo(String accountNo) {
        this.accountNo = accountNo;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        Account account = (Account) o;

        return accountNo.equals(account.accountNo);

    }

    @Override
    public int hashCode() {
        return accountNo.hashCode();
    }

    public synchronized void draw(double drawAmount){
        lock.lock();
        try{
            if(!flag){
                cond.await();
            }else{
                System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount);
                balance-=drawAmount;
                System.out.println("账户余额为:"+balance);
                flag=false;
                cond.signalAll();
            }
        }catch(InterruptedException ex){
            ex.printStackTrace();
        }finally{
            lock.unlock();
        }
    }

    public synchronized void deposit(double depositAmount){
        lock.lock();
        try{
            if(flag){
                cond.await();
            }else{
                System.out.println(Thread.currentThread().getName()+"存款:"+depositAmount);
                balance+=depositAmount;
                System.out.println("账户余额为:"+balance);
                flag=true;
                cond.signalAll();
            }
        }catch(InterruptedException ex){
            ex.printStackTrace();
        }finally{
            lock.unlock();
        }
    }
}

其他的几个java文件不变,运行结果是一样的。

 

三 使用阻塞队列(BlockingQueue)控制线程通信
1.BlockingQueue是Queue的子接口,但它的主要用途并不是作为容器,而是作为线程同步的工具。它的一个特征是:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。程序的两个线程交替向BlockingQueue中放入、取出元素,即可很好的控制线程的通信。

2.提供了两个支持阻塞的方法:
put(E e):试图把E元素放入BlockingQueue中,如果队列的元素已满,则阻塞该线程;
take():尝试从BlockingQueue的头部取出元素,如果队列的元素已空,则阻塞该线程;

3.BlockingQueue继承了Queue接口,可以使用Queue接口中的方法,归纳起来分为三组:
(1)在队列尾部插入元素,包括add(E e),offer(E e),put(E e),当队列已满时,分别会抛出异常、返回false、阻塞队列;
(2)在队列头部删除并返回删除的元素,包括remove(),poll()和take(),当队列已空时,分别会抛出异常、返回false、阻塞队列;
(3)在队列头部取出但不删除元素,包括element()和peek()方法,当队列已空时,着三个方法分别抛出异常、返回false

4.Java7之后又新增了一些阻塞队列,包含5个实现类:
(1)ArrayBlockingQueue:基于数组实现的BlockingQueue队列;
(2)LinkedBlockingQueue:基于链表实现的BlockingQueue队列;
(3)PriorityBlockingQueue:它并不是标准的阻塞队列,当调用remove()扥方法取出元素时,并不是取出队列中存在时间最长的元素,而是队列中最小的元素。判断元素的大小可根据元素(实现Comparable接口)的本身大小来自然排序,也可以使用Comparator进行定制排序。
(4)SynchronousQueue:同步队列,对该队列的存取操作必须交替进行;
(5)DelayQueue:它是一个特殊的BlockingQueue,底层基于PriorityBlockingQueue实现,并要求所有的集合元素都要实现Delay接口。

5.举例

public class BlockingQueueTest{
    public static void main(String[] args) throws Exception{
        BlockingQueue<String> bq=new ArrayBlockingQueue<String>(2);
        bq.put("java");
        bq.put("study");
        bq.put("javaee");  //阻塞线程
    }
}

 

 

 

 

四 线程池
1.系统启动一个新线程的成本是比较高的,因为涉及到与操作系统的交互,使用线程池可以很好的提高性能,尤其是当程序中需要创建大量生存期很短的线程时,更要考虑使用线程池。线程池在系统启动时就创建大量的空闲的线程,程序将一个Runnable对象或Callable对象传给线程池,线程池就会启动一个线程来执行它的run()或call()方法,当方法结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待下一个Runnable对象的run()或call()方法。使用线程池还可以控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃。

2.Java5之后支持内建线程池,通过一个Executors工厂类来产生线程池,该工厂类包含以下几个静态工厂方法来创建线程池:
(1)newCachedThreadPool():创建一个具有缓存功能的线程池
(2)newFixedThreadPool( int nThreads):创建一个可重用的、具有固定线程数的线程池
(3)newSingleThreadPool():创建一个只有单线程的线程池
(4)newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在指定延迟后执行线程任务。
(6)newSingleThreadScheduledExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。
(7)ExecutorService newWorkStealingPool(int parallelism):创建持有足够的线程的线程池来支持给定的并行级别,该方法还会使用多个队列来减少竞争
(8)ExecutorService newWorkStealingPool():上一个方法的简化版本,如果当前机器有4个CPU,则目标并行级别被设置为4,也就是相当于前一个方法传入4作为参数
注意:前三个方法返回一个ExecutorService对象,该对象代表一个线程池,可以执行Runnable或Callable对象所代表的线程;中间两个方法返回ScheduledExecutorService线程池,它可以在指定延迟后执行线程任务;后两个是Java8新增的,充分利用了多CPU并行的能力,这两个方法生成的work stealing池,相当于后台线程池,如果所有的后台线程都死亡了,work stealing池中的线程会自动死亡。

3.ExecutorService里提供了如下三个方法:
(1)Future<?> submit(Runnable task): 将一个Runnable对象提交给指定的线程池,线程池将在有空闲线程时执行Runnable对象代表的任务。这里的Future对象代表Runnable任务的返回值,但run()方法无返回值,所以在run()方法执行结束后返回null。但可以调用Future的isDone(),isCancelled()来获得Runnable对象的执行状态。
(2)<T> Future<T> submit(Runnable task, T result): 这里的result显式指定线程执行结束的返回值,所以Future对象将在run()方法执行结束后返回result
(3)<T> Future<T> submit(Callable<T> task): 这里提交的是Callable对象

4.ScheduledExecutorService提供了4个方法:
(1)ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit):指定callable任务将在delay延迟后执行。
(2)ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit):指定command任务将在delay延迟后执行。
(3)ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit): 指定command任务将在delay延迟后执行,而且以设定频率重复执行,也就是说,在initialDelay后开始执行,一次在initialDelay+period,initialDelay+2*period...处重复执行。
(4)ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit):
创建并执行一个在给定初始延迟后首次启用的定期操作,随后在每一次执行终止和下一次执行开始之间都会存在给定的延迟。如果任务在某次执行时遇到异常,就会取消后续执行;否则,只会通过程序来显示取消或终止该任务。

5.用完一个线程池后,应该调用该线程池的shutdown()方法,该方法将启动线程池的关闭序列,线程池不再接收新任务,但会将所有已提交任务执行完成;也可以调用shutdownNow()方法来关闭线程池,该方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

6.举例

public class ThreadPoolTest {
    public static void main(String[] args) throws Exception{
        ExecutorService pool= Executors.newFixedThreadPool(6);
        Runnable target= ()->{
            for(int i=0;i<100;i++){
                System.out.println(Thread.currentThread().getPriority()+"的i值为:"+i);
            }
        };
        //向线程池中提交两个线程
        pool.submit(target)
        pool.submit(target);
        pool.shutdown();
    }
}

 

五 ThreadLocal类

1.java为ThreadLocal类增加了泛型支持,使用ThreadLocal类可以简化多线程编程时的并发访问,使用这个工具类可以很简捷的隔离多线程程序的竞争资源。

2.ThreadLocal,是Thread Local Variable (线程局部变量) 的意思,它的功能很简单,就是为每一个使用该变量的线程都提供一个变量值的副本,使每一个线程都可以独立的改变自己的副本,而不会与其他线程的副本冲突。从线程的角度看,就好像每一个线程都完全拥有该变量一样。

3.ThreadLocal类只提供了三个public方法:
T get():返回此线程局部变量中当前线程副本中的值
void remove():删除此线程局部变量中当前线程的值
void set(T value):设置此线程局部变量中当前线程副本中的值

4.ThreadLocalTest.java

 

class Account{
    //定义一个ThreadLocal类型的变量,该变量将是一个线程局部变量,每个线程都会保留该变量的一个副本
    private ThreadLocal<String> name=new ThreadLocal<>();
    public Account(String str){
        this.name.set(str);
        //用于访问当前线程的name的副本
        System.out.println("---"+this.name.get());
    }
    public String getName(){
        return name.get();
    }
    public void setName(String str){
        this.name.set(str);
    }
}
class MyTest extends Thread{
    private Account account;

    public MyTest(String name, Account account) {
        super(name);
        this.account = account;
    }
    public void run(){
        for(int i=0;i<10;i++){
            if(i==6) {
                account.setName(getName());
            }
            System.out.println(account.getName()+"账户的i值:"+i);
        }
    }
}
public class ThreadLocalTest {
    public static void main(String[] args){
        //虽然两个线程共享同一个账户,即只有一个账户名,但由于账户名是ThreadLocal类型的,所以每个线程都完全拥有各自的账户名副本,因此在i==6之后,将看到两个线程访问同一个账户时出现不同的账户名
        Account at=new Account("初始名");
        new MyTest("线程甲",at).start();
        new MyTest("线程乙",at).start();
    }
}

 

结果:

---初始名
null账户的i值:0
null账户的i值:0
null账户的i值:1
null账户的i值:1
null账户的i值:2
null账户的i值:2
null账户的i值:3
null账户的i值:3
null账户的i值:4
null账户的i值:4
null账户的i值:5
null账户的i值:5
线程甲账户的i值:6
线程乙账户的i值:6
线程甲账户的i值:7
线程乙账户的i值:7
线程甲账户的i值:8
线程乙账户的i值:8
线程甲账户的i值:9
线程乙账户的i值:9

实际上账户名有三个副本,主线程一个,另外启动的两个线程各一个,它们的值互不干扰,每个线程都完全拥有自己的ThreadLocal变量。ThreadLocal将需要并发访问的资源复制多份,每个线程拥有一份资源,每个线程都拥有自己的线程副本,从而也就没有必要对该变量进行同步了。ThreadLocal提供了线程安全的共享对象,在编写多线程代码时,可以把不安全的整个变量封装进ThreadLocal,或者把该对象与线程相关的状态使用ThreadLocal保存。

5.ThreadLocal并不能代替同步机制,两者面向的问题领域不同,同步机制是为了同步多个线程对相同资源的并发访问,是多个线程之间进行通信的有效方式,而ThreadLocal是为了隔离多个线程的数据共享,从根本上避免多个线程之间对共享资源的竞争。

 

 

线程通信

标签:产生   can   value   cat   delay   now()   共享   out   执行   

原文地址:http://www.cnblogs.com/lyy-2016/p/6286279.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!