码迷,mamicode.com
首页 > 编程语言 > 详细

java线程间通信

时间:2016-07-13 17:17:07      阅读:234      评论:0      收藏:0      [点我收藏+]

标签:

等待通知机制的实现

方法wait()的作用是使当前执行代码的线程进行等待,wait()方法是object类的方法,该方法的作用是将当前线程置入”预执行队列中”,并且在wait()所在的代码行处停止执行,直到接到通知,或者被中断为止。 在调用wait()方法执行,线程需要先获得该对象的对象级别锁,也就是说,只能在同步方法,或者同步块中调用wait()方法,在执行wait()方法后,当前线程释放锁,在从wait()方法返回前,线程与其他线程竞争重新获得锁,如果调用wait()是没有持有适当的锁,则会跑出IllegalMonitorStateException。

方法notify(),也需要在同步方法或者同步代码块中调用,即在调用前,线程必须获得该对象的对象级别锁,如果调用notify()时候,没有持有适当的锁,也会跑出IllegalMonitorStateException异常。该方法用来通知那些可能等待该对象的对象锁的其他线程,如果有多个等待线程,则由线程规划器随机挑选出一个呈wait状态的线程,对其发出notify,并使它等待获取该对象的对象锁。需要注意的是,执行notify()方法后,当前线程不会马上释放该对象锁,呈wait状态的线程也不是马上获取对象锁,需要等到执行notify()方法的线程将程序执行完,也就是退出synchronized代码块后,当前线程才会释放锁。而呈wait状态的其线程才能获取该对象锁

wait使线程停止运行,而notify使停止的线程继续运行。

public static void main(String[] args) {
        try {
            String str = "abc";
            str.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

此时运行代码会有如下效果:
技术分享
这是因为没有”对象监视器”,也就是没有同步加锁。

public static void main(String[] args) {
        String str = new String();
        System.out.println("sync 上面....");
        try {
            synchronized (str) {
                System.out.println("sync 第一行....");
                str.wait();
                System.out.println("wait 下面的代码....");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

此时效果如下:
技术分享
可以看到这里程序卡在了wait方法这里,这是因为没有地方调用notify来唤醒当前的线程。

使用wait & notify

技术分享

public static void main(String[] args) {
        Object obj = new Object();
        ThreadFirst first = new ThreadFirst(obj);
        first.start();
        ThreadSecond second = new ThreadSecond(obj);
        second.start();
    }

此时程序运行效果如下:
技术分享

小结:

  • 关键字synchronized可以将任何一个Object对象作为同步对象来看待。
  • wait()方法可以是调用该方法的线程释放共享资源的锁,然后从运行状态退出,进入等待队列,知道被再次唤醒。
  • notify()方法可以随机唤醒等待队列鸿额等待同一共享资源 的”一个”线程,并使该线程提出等待队列,进入可运行状态。
  • notifyAll()方法可以使所有正在等待队列中等待同一共享资源的”全部”线程从等待状态退出,进入可运行状态。此时,优先级最高的线程最先执行,但也有可能随机执行。
  • 每一个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列,就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程,一个线程被唤醒后,才会进入就绪队列,等待CPU的调度,反之,一个线程被wait后,就会进入阻塞队列,等待下一次被唤醒。

方法notify执行后,不立即释放锁

创建MyService类,定义两个方法,lockMethod,notifyMethod分别用来锁住和释放同一锁对象,当然,该锁对象是通过参数传递进来的

public class MyService {


    public void lockMethod(Object lock) {
        synchronized (lock) {
            try {
                System.out.println("begin wait "+Thread.currentThread().getName()+" the thread name is :"+Thread.currentThread().getName());
                lock.wait();
                System.out.println("end wait "+Thread.currentThread().getName()+" the thread name is :"+Thread.currentThread().getName());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void notifyMethod(Object lock) {
        try {
            synchronized (lock) {
                System.out.println("begin notify "+Thread.currentThread().getName()+" the thread name is :"+Thread.currentThread().getName());
                lock.notify();
                Thread.sleep(5000);
                System.out.println("end notify "+Thread.currentThread().getName()+" the thread name is :"+Thread.currentThread().getName());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

技术分享
测试代码:

public static void main(String[] args) {
        Object obj = new Object();
        LockThread first = new LockThread(obj);
        first.start();
        NotifyThread second = new NotifyThread(obj);
        second.start();
    }

此时运行效果如下:
技术分享
可以看到:必须执行完成notify()方法所咋的同步synchronized代码块以后才释放锁

interrupt方法和wait方法

public class LockThread extends Thread {
    private Object obj;

    public LockThread(Object obj) {
        super();
        this.obj = obj;
    }

    @Override
    public void run() {
        super.run();
        MyService myService = new MyService();
        myService.lockMethod(obj);
    }

    public static void main(String[] args) {
        Object obj = new Object();
        LockThread first = new LockThread(obj);
        first.start(); //调用start方法,线程运行以后,会处于wait状态
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        first.interrupt();
    }
}

此时程序运行效果如下:
技术分享
可以看到,当线程呈wait()状态时候,调用线程对象的interrupt()方法会出现InterruptedException异常。

方法wait(long)使用

方法wait(long)方法的功能是等待某一时间内是否有线程对锁进行唤醒,如果超过这个事件则自动唤醒。

public void lockMethod(Object lock) {
        synchronized (lock) {
            try {
                System.out.println("begin wait "+Thread.currentThread().getName()+" the thread name is :"+Thread.currentThread().getName());
                lock.wait(3000);
                System.out.println("end wait "+Thread.currentThread().getName()+" the thread name is :"+Thread.currentThread().getName());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

这里调用wait(3000);来使当前线程处于wait状态,3秒过后,自动唤醒。效果如下:
技术分享

生产者消费者模式实现

等待/通知模式最经典的案例就是”生产者”/”消费者”模式。

一生产一消费

  • ValueObject :用来中间存放生产内容的类
  • Produce : 用来生产产品
  • Customer 用来消费产品
  • ThreadProduce: 用来不断的通过Produce 生产产品的线程
  • ThreadCustomer : 用来通过Customer 消费产品的线程

ValueObject .java

public class ValueObject {
    public static String value = "";
}

Produce .java

public class Produce {
    private String lock;

    public Produce(String lock) {
        super();
        this.lock = lock;
    }

    /**
     * 生产一个value,并通知消费者消费
     */
    public void setValue() {
        try {
            synchronized (lock) {
                // 如果没有被消费,则处于等待状态
                if (!"".equals(ValueObject.value)) {
                    lock.wait();
                }
                // 生产一个值,并通过notify方法通知消费者消费
                String value = System.currentTimeMillis()+"";
                ValueObject.value = value;
                System.out.println("设置的值是:"+ValueObject.value);
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

Customer .java

public class Customer {
    private String lock;

    public Customer(String lock) {
        super();
        this.lock = lock;
    }

    /**
     * 消费一个value,并通知生产者继续生产
     */
    public void getValue() {
        try {
            synchronized (lock) {
                // if条件表示生产者还没有生产完成
                if ("".equals(ValueObject.value)) {
                    lock.wait();
                }
                System.out.println("消费的值是:"+ValueObject.value);
                // 设置value为空, 表示已经消费,并通过notify方法通知生产者生产
                ValueObject.value = "";
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ThreadProduce.java

public class ThreadProduce extends Thread {

    private Produce produce;

    public ThreadProduce(Produce produce) {
        super();
        this.produce = produce;
    }

    @Override
    public void run() {
        super.run();
        while (true) {
            produce.setValue();
        }
    }
}

ThreadCustomer.java

public class ThreadCustomer extends Thread {

    private Customer customer;

    public ThreadCustomer(Customer customer) {
        super();
        this.customer = customer;
    }

    @Override
    public void run() {
        super.run();
        while (true) {
            customer.getValue();
        }
    }
}

测试代码:TestPC.java

public class TestPC {

    private static String lock = new String();

    public static void main(String[] args) {

        Produce produce = new Produce(lock);
        Customer customer = new Customer(lock);

        ThreadProduce threadProduce = new ThreadProduce(produce);
        ThreadCustomer threadCustomer = new ThreadCustomer(customer);

        threadProduce.start();
        threadCustomer.start();
    }
}

此时运行效果如下:
技术分享
可以看到,这里,每当生产者生产一个value,就会通过notify通知消费者消费,同样,消费者,每消费一个value,就会通知生产者在生产。

通过管道进行线程间通信:字节流

管道流,是一种特殊的流,用于在不同线程之间直接传递数据,一个线程发送数据到输入管道,另一个线程从输入管道读取数据。在JDK中,提供了4个类来使线程间可以进行通信:

  • PipedInputStream 和 PipedOutputStream
  • PipedReader 和 PipedWriter

ReadData.java

public class ReadData {
    // 从输入管道中读取数据
    public void readData(PipedInputStream pipedInputStream) {
        System.out.println("read.....");
        try {
            byte[] byteArray = new byte[20];
            int readLength = pipedInputStream.read(byteArray);
            while (readLength != -1) {
                String newData = new String(byteArray,0,readLength);
                System.out.print(newData);
                readLength = pipedInputStream.read(byteArray);
            }
            System.out.println();
            pipedInputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

WriteData.java

public class WriteData {
    // 向输出管道中写入数据
    public void writeData(PipedOutputStream pipedOutputStream) {
        System.out.println("write.....");
        try {
            for (int i = 0; i < 20; i++) {
                String outData = " "+(i+1);
                pipedOutputStream.write(outData.getBytes());
                System.out.print(outData);
            }
            System.out.println();
            pipedOutputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

ThreadWrite.java

public class ThreadWrite extends Thread {
    private WriteData writeData;
    private PipedOutputStream pipedOutputStream;
    public ThreadWrite(WriteData writeData, PipedOutputStream pipedOutputStream) {
        super();
        this.writeData = writeData;
        this.pipedOutputStream = pipedOutputStream;
    }

    @Override
    public void run() {
        super.run();
        writeData.writeData(pipedOutputStream);
    }
}

ThreadRead.java

public class ThreadRead extends Thread {
    private ReadData readData;
    private PipedInputStream pipedInputStream;
    public ThreadRead(ReadData readData, PipedInputStream pipedInputStream) {
        super();
        this.readData = readData;
        this.pipedInputStream = pipedInputStream;
    }

    @Override
    public void run() {
        super.run();
        readData.readData(pipedInputStream);
    }
}

Run.java

public class Run {
    public static void main(String[] args) {

        try {
            WriteData writeData = new WriteData();
            ReadData readData = new ReadData();

            PipedOutputStream pipedOutputStream = new PipedOutputStream();
            PipedInputStream pipedInputStream = new PipedInputStream();
            // 使两个stream之间产生通信链接
            pipedInputStream.connect(pipedOutputStream);

            ThreadWrite threadWrite = new ThreadWrite(writeData, pipedOutputStream);
            ThreadRead threadRead = new ThreadRead(readData, pipedInputStream);

            // 首先启动读取线程,此时,由于输出管道没有任何输出,所以会进入阻塞状态
            threadRead.start();
            threadWrite.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

此时运行结果如下:
技术分享
可以看到两个线程之间通过管道流进行通信。

使用等待通知实现交替打印

下面我创建20个线程,其中10个打印”AAA”,另外10个打印”BBB”,并且是交叉进行打印的。
PrintTool.java

public class PrintTool {

    volatile boolean isPrintA = false;

    // 使用类级别的锁,确保printA或者printB一次执行完成
    synchronized public void printA() {
        try {
            while (isPrintA) {
                wait();
            }

            for (int i = 0; i < 5; i++) {
                System.out.print("AAA");
            }
            System.out.println();
            isPrintA = true;
            notifyAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    synchronized public void printB() {
        try {
            while (!isPrintA) {
                wait();
            }

            for (int i = 0; i < 5; i++) {
                System.out.print("BBB");
            }
            System.out.println();
            isPrintA = false;
            notifyAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ThreadPrintB.java和ThreadPrintA分别如下:
技术分享

PrintRun.java

public class PrintRun {

    public static void main(String[] args) {
        PrintTool printTool = new PrintTool();
        // 分别创建5个线程ThreadPrintA 和ThreadPrintB 交替打印
        for (int i = 0; i < 5; i++) {
            ThreadPrintA threadPrintA = new ThreadPrintA(printTool);
            ThreadPrintB threadPrintB = new ThreadPrintB(printTool);

            threadPrintA.start();
            threadPrintB.start();
        }
    }
}

此时运行效果如下:
技术分享

方法join()的使用

在很多情况下,主线程创建并启动子线程,如果子线程中药进行大量的耗时运算,主线程往往将早于子线程结束之前结束,此时如果主线程想要等到子线程执行完成之后在结束,就要用到join()方法了。join()方法的作用是等待线程对象销毁。

join()方法实践

下面,我在main方法中运行一个线程,该线程每次执行的时间都是随机的,我们要做的就是在当前线程执行完成之后,在结束main线程。

public class RandomThread extends Thread{

    @Override
    public void run() {
        super.run();
        try {
            int seconds = (int) (Math.random() * 10000);
            sleep(seconds);
            System.out.println("RandomThread cost "+seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        try {
            RandomThread randomThread = new RandomThread();
            randomThread.start();
            randomThread.join();

            System.out.println("我是main线程,等待RandomThread执行结束了。。。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

此时程序的运行结果如下:
技术分享

可以看到方法join具有使线程排队运行的作用,类似于同步的效果。join与synchronized的区别是:join内部使用wait()方法进行等待,而synchronized关键字使用的是”对象监视器”,作为同步。

ThreadLocal的使用

在java中,变量值的共享可以使用public static 变量的形式,所有的线程都使用同一个public static变量,如果想实现每一个线程都有自己的共享变量在JDK中提供了一个类ThreadLocal。

类ThreadLocal解决的是变量在不同线程间的隔离性,也就是不同线程拥有自己的值,不同线程中的值是可以放入ThreadLocal类中进行保存的。
Tools.java

public class Tools {
    // 用来保存不同线程的值
    public static ThreadLocal local = new ThreadLocal();

}

技术分享
测试代码如下:

public static void main(String[] args) {
        ThreadA threadA = new ThreadA();
        ThreadB threadB = new ThreadB();

        threadA.start();
        threadB.start();
    }

此时系统运行结果如下:
技术分享

java线程间通信

标签:

原文地址:http://blog.csdn.net/mockingbirds/article/details/51878431

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!