码迷,mamicode.com
首页 > 编程语言 > 详细

多线程生产者消费者问题处理

时间:2019-10-27 11:09:14      阅读:90      评论:0      收藏:0      [点我收藏+]

标签:否则   adp   可见性   使用   必须   生产线   阻塞   bsp   final   

一、比较低级的办法是用wait和notify来解决这个问题。

消费者生产者问题:

这个问题是一个多线程同步问题的经典案例,生产者负责生产对象,消费者负责将生成者产生的对象取出,两者不断重复此过程。这过程需要注意几个问题:

不论生产者和消费者有几个,必须保证:

1.生产者每次产出的对象必须不一样,产生的对象有且仅有出现一次;

2.消费者每次取出的对象必须不一样,取出的对象有且仅有出现一次;

3.一定是先产生该对象,然后对象才能被取出,顺序不能乱;

第一种情况:
多个生产者轮流负责生产,多个消费者负责取出。一旦生产者产生一个对象,其他生产者不能生产,只能由消费者执行取出操作;

需要的对象有商品类、消费者、生产者;

//测试类
public class ProducerConsumer {
 
    public static void main(String[] args) {
        // 定义资源对象
        Resource r = new Resource();
 
        //定义一个生产者和一个消费者
        Producer p = new Producer(r);
        Consumer c = new Consumer(r);
 
        //启动四个线程,2个负责生产者,两个消费者
        Thread t1 = new Thread(p);
        Thread t2 = new Thread(p);
        Thread t3 = new Thread(c);
        Thread t4 = new Thread(c);
 
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
 
}
 
//商品类
class Resource{
    private String name;
    private int count = 1;
    private  boolean flag = false;
 
    //产生商品
    public synchronized void set(String name) {
        while (flag) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.name = name + "---" + count++;
        System.out.println(Thread.currentThread().getName() + " 生产者" + this.name);
        flag = true;
        //唤醒所有线程
        this.notifyAll();
 
    }
    //取出商品
    public synchronized void out() {
            while (!flag) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + " 消费者________" + this.name);
            flag = false;
            this.notifyAll();
    }
}
 
//定义生产者
class Producer implements Runnable{
 
    private Resource res;
 
    public Producer(Resource res) {
        this.res = res;
    }
 
    @Override
    public void run() {
        while (true) {
            res.set("+商品+");
        }
    }
}
 
//定义消费者
class Consumer implements Runnable{
 
    private Resource res;
 
    Consumer(Resource res) {
        this.res = res;
    }
 
    @Override
    public void run() {
        while (true) {
            res.out();
        }
    }
}

  


运行结果是产生一个,随即取出一个,循环往复,其运行结果的部分如下:

Thread-2 消费者________+商品+---67821
Thread-1 生产者+商品+---67822
Thread-3 消费者________+商品+---67822
Thread-0 生产者+商品+---67823
Thread-2 消费者________+商品+---67823
Thread-1 生产者+商品+---67824
Thread-3 消费者________+商品+---67824
Thread-0 生产者+商品+---67825
Thread-2 消费者________+商品+---67825
Thread-1 生产者+商品+---67826
Thread-3 消费者________+商品+---67826
Thread-0 生产者+商品+---67827
Thread-2 消费者________+商品+---67827
Thread-1 生产者+商品+---67828
Thread-3 消费者________+商品+---67828
Thread-0 生产者+商品+---67829
Thread-2 消费者________+商品+---67829
Thread-1 生产者+商品+---67830
Thread-3 消费者________+商品+---67830
Thread-0 生产者+商品+---67831
Thread-2 消费者________+商品+---67831
Thread-1 生产者+商品+---67832

  


第二种情况:
目标:生产者与消费者轮换着抢夺执行权,但是生产者最多可以库存5个,消费者最多可以连续取出5个

此时需要定义一种中间对象:仓库类。该类是生产者和消费者共享的一块区域,里面数据类型选择链表结果存放产生的对象。仓库是有容量上限的,当数量达到上限后,生产者不允许继续生产产品.当前线程进入等待状态,等待其他线程唤醒。当仓库没有产品时,消费者不允许继续消费,当前线程进入等待状态,等待其他线程唤醒。

第一种解决方式,采用同步代码块(synchronized),结合着 wait() 和 notifyAll() 的方法,具体代码如下:

package Thread;
/**
 * 2个消费者,3个生产者
 */
 
import java.util.LinkedList;
 
public class ProConThreadDemo {
    public static void main(String[] args) {
        Respository res = new Respository();
 
        //定义2个消费者,3个生产者
        Worker p1 = new Worker(res,"手机");
        Worker p2 = new Worker(res,"电脑");
        Worker p3 = new Worker(res,"鼠标");
        Constomer c1 = new Constomer(res);
        Constomer c2 = new Constomer(res);
 
        Thread t1 = new Thread(p1,"甲");
        Thread t2 = new Thread(p2,"乙");
        Thread t3 = new Thread(p3,"丙");
        Thread t4 = new Thread(c1,"aaa");
        Thread t5 = new Thread(c2,"bbb");
 
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();
 
    }
 
 
 
}
//仓库类
class Respository{
 
    private LinkedList<Product> store = new LinkedList<Product>();
 
    //生产者的方法,用于向仓库存货
    //最多只能有一个线程同时访问该方法.
    public synchronized void push(Product p,String ThreadName){
        //设置仓库库存最多能存5个商品
        /* 仓库容量最大值为5,当容量等于5的时候进入等待状态.等待其他线程唤醒
         * 唤醒后继续循环,等到仓库的存量小于5时,跳出循环继续向下执行准备生产产品.
         */
        while (store.size()==5){
            try {
                System.out.println(ThreadName+" 发现:仓库已满,赶紧叫人运走");
                //因为仓库容量已满,无法继续生产,进入等待状态,等待其他线程唤醒.
                this.wait();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        this.notifyAll();
        store.addLast(p);
        System.out.println(ThreadName+" 给仓库添加 "+p.Name+p.Id+"号名称为 "+" 当前库存量为:"+store.size());
        //为了方便观察运行结果,每次生产完后等待0.1秒
        try {
            Thread.sleep(100);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
 
    }
 
 
    //消费者的方法,用于仓库出货
    //最多只能有一个线程同时访问该方法.
    public synchronized void pop(String ThreadName){
        /* 当仓库没有存货时,消费者需要进行等待.等待其他线程来唤醒
         * 唤醒后继续循环,等到仓库的存量大于0时,跳出循环继续向下执行准备消费产品.
         */
        while (store.size()==0){
            try {
                System.out.println(ThreadName+" 发现:仓库空了,赶紧安排生产");
                //因为仓库容量已空,无法继续消费,进入等待状态,等待其他线程唤醒.
                this.wait();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        this.notifyAll();
        //定义对象。存放pollFirst()方法删除的对象,
        Product p = store.pollFirst();
        System.out.println(ThreadName+"买走 "+p.Name+p.Id+" 当前库存量为:"+store.size());
        //为了方便观察运行结果,每次取出后等待0.1秒
        try {
            Thread.sleep(100);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
 
    }
 
 
}
 
 
//产品类
class Product{
    //产品的唯一标识Id
    public int Id;
    //产品的名称
    public String Name;
 
    public Product(String name, int id) {
        Name = name;
        Id = id;
    }
 
}
 
//生产者
class Worker implements Runnable{
    //关键字volatile 是为了保持 Id 的可见性,一旦Id被修改,其他任何线程用到Id的地方,都会相应修改
    //否则下方run方法容易出问题,生产商品的Id和名称 与到时候消费者取出商品的Id和名称不一致
    public volatile Integer Id = 0;
 
    public volatile String name;
 
    //引用一个产品
    private Product p;
    //引用一个仓库
    Respository res;
 
    boolean flag = true;
 
    public Worker(Respository res,String name) {
        this.res = res;
        this.name = name;
    }
 
    @Override
    public void run() {
        while (flag){
            p  = new Product(name,Id);
            res.push(new Product(this.p.Name,Id++),Thread.currentThread().getName());
        }
    }
}
 
class Constomer implements Runnable{
    boolean flag = true;
 
    //引用一个仓库
    Respository res;
 
    public Constomer(Respository res) {
        this.res = res;
    }
 
    @Override
    public void run() {
        while (flag) {
 
            res.pop(Thread.currentThread().getName());
 
 
        }
 
    }
}

  


运行结果如下,可见仓库最多库存为5个,接近于实际生产

aaa 发现:仓库空了,赶紧安排生产
乙 给仓库添加 电脑0号名称为  当前库存量为:1
丙 给仓库添加 鼠标0号名称为  当前库存量为:2
甲 给仓库添加 手机0号名称为  当前库存量为:3
bbb买走 电脑0 当前库存量为:2
bbb买走 鼠标0 当前库存量为:1
甲 给仓库添加 手机1号名称为  当前库存量为:2
丙 给仓库添加 鼠标1号名称为  当前库存量为:3
乙 给仓库添加 电脑1号名称为  当前库存量为:4
aaa买走 手机0 当前库存量为:3
aaa买走 手机1 当前库存量为:2
aaa买走 鼠标1 当前库存量为:1
aaa买走 电脑1 当前库存量为:0
aaa 发现:仓库空了,赶紧安排生产
乙 给仓库添加 电脑2号名称为  当前库存量为:1
丙 给仓库添加 鼠标2号名称为  当前库存量为:2
甲 给仓库添加 手机2号名称为  当前库存量为:3
bbb买走 电脑2 当前库存量为:2
bbb买走 鼠标2 当前库存量为:1
甲 给仓库添加 手机3号名称为  当前库存量为:2
丙 给仓库添加 鼠标3号名称为  当前库存量为:3
乙 给仓库添加 电脑3号名称为  当前库存量为:4
aaa买走 手机2 当前库存量为:3
乙 给仓库添加 电脑4号名称为  当前库存量为:4
乙 给仓库添加 电脑5号名称为  当前库存量为:5

  


第二种方法,利用 lock类 替代 synchronized的使用,这样可以优化代码,主要是在唤醒的时候可以根据条件去唤醒指定的某些线程。例如:当库存为空的时候,第一种方法是唤醒所有等待的线程,也包括取出的线程;而此时lock类 可以设置在库存为空的时候,只唤醒生产线程,取出的线程依旧处于等待状态,具体代码如下:

package Thread;
 
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
public class ProConThreadPool {
 
    public static void main(String[] args) {
 
       Respository res = new Respository();
 
       Worker p1 = new Worker(res,"手机");
       Worker p2 = new Worker(res,"电脑");
       Worker p3 = new Worker(res,"鼠标");
 
       Constomer c1 = new Constomer(res);
       Constomer c2 = new Constomer(res);
 
       Thread t1 = new Thread(p1,"甲");
       Thread t2 = new Thread(p2,"乙");
       Thread t3 = new Thread(p3,"丙");
       Thread t4 = new Thread(c1,"aaa");
       Thread t5 = new Thread(c2,"bbb");
 
       t1.start();
       t2.start();
       t3.start();
       t4.start();
       t5.start();
 
    }
 
 
 
}
//仓库类
class Respository{
 
    private Lock lock = new ReentrantLock();
 
    private LinkedList<Product> store = new LinkedList<Product>();
 
    private Condition condition_pro = lock.newCondition();
    private Condition condition_con = lock.newCondition();
 
    public LinkedList<Product> getStore() {
        return store;
    }
 
    public void setStore(LinkedList<Product> store) {
        this.store = store;
    }
    //向仓库存货
    public  void push(Product p,String ThreadName) throws InterruptedException{
        lock.lock();
        try {
            //设置仓库库存最多能存5个商品
            while (store.size()==5){
                    System.out.println(ThreadName+" 发现:仓库已满,赶紧叫人运走");
                    condition_pro.await();
            }
            condition_con.signalAll();
            store.addLast(p);
            System.out.println(ThreadName+" 给仓库添加 "+p.Name+p.Id+"号名称为 "+" 当前库存量为:"+store.size());
 
        }finally {
            lock.unlock();
        }
        try {
            Thread.sleep(100);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
 
    }
 
 
    //仓库出货
    public void pop(String ThreadName) throws InterruptedException
    {
        lock.lock();
        try{
            while (store.size()==0){
 
                    System.out.println(ThreadName+" 发现:仓库空了,赶紧安排生产");
                    condition_con.await();
            }
            condition_pro.signalAll();
            Product p = store.pollFirst();
            System.out.println(ThreadName+"买走 "+p.Name+p.Id+" 当前库存量为:"+store.size());
 
        }
        finally {
            lock.unlock();
        }
        try {
            Thread.sleep(100);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
 
    }
 
 
}
 
 
 
class Product{
    public int Id;
 
    public String Name;
 
    public Product(String name, int id) {
        Name = name;
        Id = id;
    }
 
}
 
class Worker implements Runnable{
 
    public volatile Integer Id = 0;
 
    public volatile String name;
 
    //引用一个产品
    private Product p;
    //引用一个仓库
    Respository res;
 
    boolean flag = true;
 
    public Worker(Respository res,String name) {
        this.res = res;
        this.name = name;
    }
 
    @Override
    public void run(){
        while (flag){
                p  = new Product(name,Id);
                try {
                    res.push(new Product(this.p.Name,Id++),Thread.currentThread().getName());
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
 
            }
    }
}
 
class Constomer implements Runnable{
    boolean flag = true;
 
    //引用一个仓库
    Respository res;
 
    public Constomer(Respository res) {
        this.res = res;
    }
 
    @Override
    public void run() {
        while (flag) {
            try {
                res.pop(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
 
    }
}

  


运行结果与上面类似:

aaa 发现:仓库空了,赶紧安排生产
bbb 发现:仓库空了,赶紧安排生产
丙 给仓库添加 鼠标0号名称为  当前库存量为:1
乙 给仓库添加 电脑0号名称为  当前库存量为:2
甲 给仓库添加 手机0号名称为  当前库存量为:3
aaa买走 鼠标0 当前库存量为:2
bbb买走 电脑0 当前库存量为:1
bbb买走 手机0 当前库存量为:0
乙 给仓库添加 电脑1号名称为  当前库存量为:1
甲 给仓库添加 手机1号名称为  当前库存量为:2
aaa买走 电脑1 当前库存量为:1
丙 给仓库添加 鼠标1号名称为  当前库存量为:2
aaa买走 手机1 当前库存量为:1
甲 给仓库添加 手机2号名称为  当前库存量为:2
乙 给仓库添加 电脑2号名称为  当前库存量为:3
bbb买走 鼠标1 当前库存量为:2
丙 给仓库添加 鼠标2号名称为  当前库存量为:3
aaa买走 手机2 当前库存量为:2
甲 给仓库添加 手机3号名称为  当前库存量为:3
bbb买走 电脑2 当前库存量为:2
乙 给仓库添加 电脑3号名称为  当前库存量为:3
丙 给仓库添加 鼠标3号名称为  当前库存量为:4

 

二、比较赞的办法是用Semaphore 或者 BlockingQueue来实现生产者消费者模型。

 BlockingQueue 是线程安全的,并且在调用 put,take 方法时会阻塞线程。

基于以上特性,可以不加任何锁解决生产者消费者问题。

直接上代码:

public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> bq = new LinkedBlockingQueue<>(2);
 
        CountDownLatch cdl = new CountDownLatch(2);
 
        Thread t1 = new Thread(()->{ // 生产者线程
                try {
                    for (int i = 0; i < 100; i++)
                        bq.put("z" + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    cdl.countDown();
                }
        });
 
        Thread t2 = new Thread(()->{ // 消费者线程
                try {
                    for (int i = 0; i < 100; i++)
                        System.out.println(bq.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    cdl.countDown();
                }
        });
        t2.start();
        t1.start();
        cdl.await(); // 等待两个线程结束
        System.out.println(bq.size());
 
    }

  

参考:https://blog.csdn.net/zjt980452483/article/details/81348668

     https://blog.csdn.net/qq_29697901/article/details/90405141

 

多线程生产者消费者问题处理

标签:否则   adp   可见性   使用   必须   生产线   阻塞   bsp   final   

原文地址:https://www.cnblogs.com/wjqhuaxia/p/11746675.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!