Producer Consumer
生产者创建数据,通过中介控制流量并安全传递给消费者。
适用环境
生产者生产数据的速度与消费者处理数据的速度不一致,中介者通过缓存和阻塞对消费者的数据压力进行调整。
样例
4生产者生产产品,放入市场,2消费者消费。
产品
package ProducerConsumer; public class Product { private String prdId=null; public Product(String prdId) { this.prdId=prdId; } public String getOrderId(){ return this.prdId; } }市场
package ProducerConsumer; import java.util.LinkedList; public class Mart { private final LinkedList<Product> items=new LinkedList<Product>(); private final int size; private int state=0; public Mart(int size){ this.size=size; } public synchronized void put(Product prd){ while(items.size()>=size){ try { wait(100); } catch (InterruptedException e) { } } items.add(prd); notifyAll(); } public synchronized Product get(){ while(items.size()<=0&&state==0){ try { wait(100); } catch (InterruptedException e) { } } Product ret=null; ret=items.poll(); notifyAll(); return ret; } public synchronized boolean isEmpty(){ if(items.size()==0){ return true; } return false; } public synchronized void setState(int st){ this.state=st; } }生产者
package ProducerConsumer; import java.util.UUID; public class Producer implements Runnable{ private final String myName; private final Mart mart; private int state=0; public Producer(String myName,Mart mart){ this.myName=myName; this.mart=mart; } @Override public void run() { while(state==0){ Product p=new Product(UUID.randomUUID().toString()); mart.put(p); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.myName+" 生产的产品:"+p.getOrderId()+" 已经进入市场。"); } System.out.println(this.myName+" 已停止生产。"); } public void setState(int st){ this.state=st; } }消费者
package ProducerConsumer; public class Consumer implements Runnable { private final String myName; private final Mart mart; private int state=0; public Consumer(String myName,Mart mart){ this.myName=myName; this.mart=mart; } @Override public void run() { while(state==0){ Product p=mart.get(); if(p!=null){ try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.myName+" 消费了产品:"+p.getOrderId()+"。"); } } System.out.println(this.myName+" 已停止消费。"); } public void setState(int st){ this.state=st; } }测试类
package ProducerConsumer; public class Test { public static void main(String[] args) { System.out.println("主线开始。"); Mart m=new Mart(10); Producer p1=new Producer("p1",m); Producer p2=new Producer("p2",m); Producer p3=new Producer("p3",m); Producer p4=new Producer("p4",m); Consumer c1=new Consumer("c1",m); Consumer c2=new Consumer("c2",m); Thread ctd1=new Thread(c1); Thread ctd2=new Thread(c2); Thread ptd1=new Thread(p1); Thread ptd2=new Thread(p2); Thread ptd3=new Thread(p3); Thread ptd4=new Thread(p4); ptd1.start(); ptd2.start(); ptd3.start(); ptd4.start(); try { System.out.println("主线程休眠5秒。"); Thread.sleep(5000); System.out.println("主线程休眠结束。"); System.out.println("市场到达饱和,生产者被阻塞,开启消费者。"); ctd1.start(); ctd2.start(); System.out.println("主线程再次休眠5秒。"); Thread.sleep(5000); System.out.println("主线程休眠结束。"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("设置生产者毒丸。"); p1.setState(-1); p2.setState(-1); p3.setState(-1); p4.setState(-1); System.out.println("等待队列清空。"); while(true){ if(!m.isEmpty()){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }else{ break; } } System.out.println("队列已清空。"); m.setState(-1); System.out.println("设置消费者毒丸。"); c1.setState(-1); c2.setState(-1); } }
本文出自 “JAVA技术栈笔记” 博客,请务必保留此出处http://stroll.blog.51cto.com/11038467/1856913
JAVA多线程(七)模式-Producer Consumer
原文地址:http://stroll.blog.51cto.com/11038467/1856913