标签:
多线程消费队列到指定个数时触发一个生产线程往队列中补充元素,保证队列中有足够的数据供消费,不至于使消费线程等待,也不至于在队列中堆得过多。假设10人消费,先放2个篮子,每个篮子10个(篮子得够大,怎么也得够在场的人分一次),吃完一篮子赶紧叫人再提一篮子来,谁负责叫人?吃篮子里最后一个的,或吃另外一篮第一个的,这样得知道哪个是最后一个,哪个是第一个。另外一个方法在篮子底部放个托盘,谁拿到托盘谁负责叫人,全部消费完时,篮子里不放托盘了,按人数在篮子里放甜点,每人一份,吃完收工。
final long startTime = System.currentTimeMillis();//开始时间 int index = 0;//模拟处理索引 final int person = 10;//模拟消费者个数 final LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(1000); /**托盘*/ class Salver{}; /**甜点*/ class Dessert{}; /**模拟启动程序*/ void start() { /* 1、先来两篮子 */ for (int k = 0; k < 2; k++) { for (int i = 0; i < person; i++) { queue.add(++index); queue.add(new Salver()); } } /* 2、启动消费者 */ for (int i = 0; i < person; i++) { new Thread(new Consumer()).start(); } } /**消费者*/ class Consumer implements Runnable{ @Override public void run() { try { while (true) { Object thing = queue.take(); if (thing instanceof Salver) {//拿到托盘,叫人再来一篮子,接着取下一个 new Thread(new Producer()).start(); continue; } else if (thing instanceof Dessert) {//吃完甜点收工 break; } /*模拟实际处理*/ System.out.println(thing); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } } } /**生产者*/ class Producer implements Runnable { @Override public void run() { try { synchronized(Producer.class){//避免没拿来消费完同时拿 /* 消费1分钟停止,根据实际情况调整,比如库里没有待处理数据或不足一篮子 */ if (System.currentTimeMillis() - startTime > 60 * 1000) { /* 人均一份甜点 */ for (int i = 0; i < person; i++) { queue.put(new Dessert()); } } else { /* 加一篮子 */ for (int i = 0; i < person; i++) { queue.put(index++); } /* 放一托盘 */ queue.put(new Salver()); } } } catch (Exception e) { e.printStackTrace(); } } }
标签:
原文地址:http://my.oschina.net/h2do/blog/524605