码迷,mamicode.com
首页 > 其他好文 > 详细

BlockingQueue的使用

时间:2015-04-04 15:19:12      阅读:144      评论:0      收藏:0      [点我收藏+]

标签:java多线程

1、BlockingQueue常用API如下:

技术分享

技术分享

截图自:http://dlc-cdn.sun.com/jdk/jdk-api-localizations/jdk-api-zh-cn/publish/1.6.0/html/zh_CN/api/java/util/concurrent/BlockingQueue.html

1.1、放入数据:

         offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,

    则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
  offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
    加入BlockingQueue,则返回失败。
  put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
    直到BlockingQueue里面有空间再继续.

1.2、获取数据:

  poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
    取不到时返回null;
  poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
    队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  take():取走BlockingQueue里排在首位的对象,BlockingQueue为空,阻断进入等待状态直到
    
BlockingQueue有新的数据被加入; 
  drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数), 
    通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

2、BlockingQueue接口的具体实现类如下:

技术分享

技术分享

引自:http://blog.csdn.net/vernonzheng/article/details/8247564

    1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.

    2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的

    3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.

    4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.

    其中LinkedBlockingQueueArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.  

3、应用1——ArrayBlockingQueue

package concurrency;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BlockingQueueTest {
	// 定义装苹果的篮子,用BlockingQueue模拟
	public static class Basket{
		// 篮子,能够装5个苹果
		// ArrayBlockingQueue必须有容量限制
		private final BlockingQueue<String> basket=new ArrayBlockingQueue<String>(5);
		// 生产苹果
		public void produce(){
			try{
				basket.put("An apple");
			}catch(InterruptedException e){
				e.printStackTrace();
			}
		}
		// 消费苹果
		public String consume(){
			String str=null;
			try{
				str=basket.take();
			}catch(InterruptedException e){
				e.printStackTrace();
			}
			return str;
		}
	}
	// 测试方法
	public static void testBasket(){
		// 实例化篮子
		final Basket basket=new Basket();
		// 定义苹果生产者
		class Producer implements Runnable{
			public void run(){
				while(true){
					// 生产苹果
					System.out.println("准备生产苹果:"+
							System.currentTimeMillis());
					basket.produce();
					System.out.println("生产苹果完毕:"+
							System.currentTimeMillis());
					try{
						Thread.sleep(300);
					}catch(InterruptedException e){
						e.printStackTrace();
					}
				}
			}
		}
		// 定义苹果消费者
		class Consumer implements Runnable{
			public void run(){
				while(true){
					// 消费苹果
					System.out.println("准备消费苹果:"+
							System.currentTimeMillis());
					basket.consume();
					System.out.println("消费苹果完毕:"+
							System.currentTimeMillis());
					try{
						Thread.sleep(500);
					}catch(InterruptedException e){
						e.printStackTrace();
					}
				}
			}
		}
		ExecutorService exec=Executors.newCachedThreadPool();
		Producer producer=new Producer();
		Consumer consumer=new Consumer();
		exec.execute(producer);
		exec.execute(consumer);
		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		// 强行终止正在执行的线程,shutdown等待正在执行的线程结束,等待列表的线程不再执行
		exec.shutdownNow();
	}
	public static void main(String[] args){
		testBasket();
	}
}

输出结果:如下:exec.shutdownNow()只是试图去强制关闭正在执行的线程,事实上并没有真正关闭成功。本例是手动停止程序运行的,否则会一直持续运行。

准备生产苹果:1428121675968
生产苹果完毕:1428121675968
准备消费苹果:1428121675968
消费苹果完毕:1428121675968
准备生产苹果:1428121676265
生产苹果完毕:1428121676265
准备消费苹果:1428121676468
消费苹果完毕:1428121676468
准备生产苹果:1428121676562
生产苹果完毕:1428121676562
准备生产苹果:1428121676874
生产苹果完毕:1428121676874
准备消费苹果:1428121676968
消费苹果完毕:1428121676968
准备生产苹果:1428121677171
生产苹果完毕:1428121677171
准备消费苹果:1428121677468
消费苹果完毕:1428121677468
准备生产苹果:1428121677468
生产苹果完毕:1428121677468
准备生产苹果:1428121677765
生产苹果完毕:1428121677765
准备消费苹果:1428121677968
消费苹果完毕:1428121677968
准备生产苹果:1428121678078
生产苹果完毕:1428121678078
准备生产苹果:1428121678374
生产苹果完毕:1428121678374
准备消费苹果:1428121678468
消费苹果完毕:1428121678468
准备生产苹果:1428121678671
生产苹果完毕:1428121678671
准备消费苹果:1428121678968
消费苹果完毕:1428121678968
准备生产苹果:1428121678968
生产苹果完毕:1428121678968
准备生产苹果:1428121679281
生产苹果完毕:1428121679281
准备消费苹果:1428121679468
消费苹果完毕:1428121679468
准备生产苹果:1428121679578
生产苹果完毕:1428121679578
准备生产苹果:1428121679874
准备消费苹果:1428121679968
生产苹果完毕:1428121679968
消费苹果完毕:1428121679968
准备生产苹果:1428121680265
准备消费苹果:1428121680468
消费苹果完毕:1428121680468
生产苹果完毕:1428121680468
准备生产苹果:1428121680765
java.lang.InterruptedException: sleep interrupted
生产苹果完毕:1428121680968
准备消费苹果:1428121680968
消费苹果完毕:1428121680968
	at java.lang.Thread.sleep(Native Method)
	at concurrency.BlockingQueueDemo$1Consumer.run(BlockingQueueDemo.java:66)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(Unknown Source)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source)
	at java.util.concurrent.ArrayBlockingQueue.put(Unknown Source)
	at concurrency.BlockingQueueDemo$Basket.produce(BlockingQueueDemo.java:17)
	at concurrency.BlockingQueueDemo$1Producer.run(BlockingQueueDemo.java:44)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
准备生产苹果:1428121681265
生产苹果完毕:1428121681265
准备消费苹果:1428121681468
消费苹果完毕:1428121681468
准备生产苹果:1428121681578
生产苹果完毕:1428121681578
准备生产苹果:1428121681874
准备消费苹果:1428121681968
消费苹果完毕:1428121681968
生产苹果完毕:1428121681968
准备生产苹果:1428121682265


4、应用2——LinkedBlockingQueue

BlockingQueueTest测试类:
package concurrency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
 
/**
 * @author jackyuj
 */
public class BlockingQueueTest {
 
    public static void main(String[] args) throws InterruptedException {
        // 声明一个容量为10的缓存队列
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
 
        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer = new Consumer(queue);
 
        // 借助Executors
        ExecutorService service = Executors.newCachedThreadPool();
        // 启动线程
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer);
 
        // 执行10s
        Thread.sleep(10 * 1000);
        producer1.stop();
        producer2.stop();
        producer3.stop();
 
        Thread.sleep(2000);
        // 退出Executor
        service.shutdown();
    }
}

Producer类:
package concurrency;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * 生产者线程
 * 
 * @author jackyuj
 */
public class Producer implements Runnable {
 
    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }
 
    public void run() {
        String data = null;
        Random r = new Random();
 
        System.out.println("启动生产者线程!");
        try {
            while (isRunning) {
                System.out.println("正在生产数据...");
                Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
 
                data = "data:" + count.incrementAndGet();
                System.out.println("将数据:" + data + "放入队列...");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.out.println("放入数据失败:" + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("退出生产者线程!");
        }
    }
 
    public void stop() {
        isRunning = false;
    }
 
    private volatile boolean      isRunning               = true;
    private BlockingQueue queue;
    private static AtomicInteger  count                   = new AtomicInteger();
    private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
 
}

Consumer类:
package concurrency;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
 
/**
 * 消费者线程
 * 
 * @author jackyuj
 */
public class Consumer implements Runnable {
 
    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }
 
    public void run() {
        System.out.println("启动消费者线程!");
        Random r = new Random();
        boolean isRunning = true;
        try {
            while (isRunning) {
                System.out.println("正从队列获取数据...");
                String data = queue.poll(2, TimeUnit.SECONDS);
                if (null != data) {
                    System.out.println("拿到数据:" + data);
                    System.out.println("正在消费数据:" + data);
                    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                } else {
                    // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
                    isRunning = false;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("退出消费者线程!");
        }
    }
 
    private BlockingQueue<String> queue;
    private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
}

输出结果:
启动生产者线程!
正在生产数据...
启动消费者线程!
正从队列获取数据...
启动生产者线程!
正在生产数据...
启动生产者线程!
正在生产数据...
将数据:data:1放入队列...
正在生产数据...
拿到数据:data:1
正在消费数据:data:1
将数据:data:2放入队列...
正在生产数据...
将数据:data:3放入队列...
正在生产数据...
将数据:data:4放入队列...
正在生产数据...
将数据:data:5放入队列...
正在生产数据...
正从队列获取数据...
拿到数据:data:2
正在消费数据:data:2
将数据:data:6放入队列...
正在生产数据...
正从队列获取数据...
拿到数据:data:3
正在消费数据:data:3
将数据:data:7放入队列...
正在生产数据...
将数据:data:8放入队列...
正在生产数据...
将数据:data:9放入队列...
正在生产数据...
将数据:data:10放入队列...
正在生产数据...
将数据:data:11放入队列...
正在生产数据...
将数据:data:12放入队列...
正在生产数据...
正从队列获取数据...
拿到数据:data:4
正在消费数据:data:4
将数据:data:13放入队列...
正在生产数据...
正从队列获取数据...
拿到数据:data:5
正在消费数据:data:5
将数据:data:14放入队列...
正在生产数据...
将数据:data:15放入队列...
正在生产数据...
正从队列获取数据...
拿到数据:data:6
正在消费数据:data:6
正从队列获取数据...
拿到数据:data:7
将数据:data:16放入队列...
正在消费数据:data:7
正在生产数据...
将数据:data:17放入队列...
正在生产数据...
将数据:data:18放入队列...
正从队列获取数据...
拿到数据:data:8
正在消费数据:data:8
正在生产数据...
正从队列获取数据...
拿到数据:data:9
正在消费数据:data:9
将数据:data:19放入队列...
正在生产数据...
将数据:data:20放入队列...
将数据:data:21放入队列...
正从队列获取数据...
拿到数据:data:10
正在消费数据:data:10
正在生产数据...
将数据:data:22放入队列...
将数据:data:23放入队列...
正从队列获取数据...
正在生产数据...
拿到数据:data:11
正在消费数据:data:11
正从队列获取数据...
拿到数据:data:12
正在消费数据:data:12
正在生产数据...
正从队列获取数据...
拿到数据:data:13
正在消费数据:data:13
正在生产数据...
将数据:data:24放入队列...
将数据:data:25放入队列...
将数据:data:26放入队列...
正从队列获取数据...
拿到数据:data:14
正在消费数据:data:14
正在生产数据...
将数据:data:27放入队列...
正从队列获取数据...
正在生产数据...
拿到数据:data:15
正在消费数据:data:15
正从队列获取数据...
正在生产数据...
拿到数据:data:16
正在消费数据:data:16
正从队列获取数据...
正在生产数据...
拿到数据:data:17
正在消费数据:data:17
将数据:data:28放入队列...
将数据:data:29放入队列...
将数据:data:30放入队列...
正从队列获取数据...
正在生产数据...
拿到数据:data:18
正在消费数据:data:18
正从队列获取数据...
正在生产数据...
拿到数据:data:19
正在消费数据:data:19
将数据:data:31放入队列...
正从队列获取数据...
拿到数据:data:20
正在生产数据...
正在消费数据:data:20
将数据:data:32放入队列...
正从队列获取数据...
拿到数据:data:21
正在生产数据...
正在消费数据:data:21
将数据:data:33放入队列...
正从队列获取数据...
拿到数据:data:22
正在消费数据:data:22
正在生产数据...
将数据:data:34放入队列...
正从队列获取数据...
拿到数据:data:23
正在消费数据:data:23
退出生产者线程!
将数据:data:35放入队列...
正从队列获取数据...
退出生产者线程!
拿到数据:data:24
正在消费数据:data:24
正从队列获取数据...
拿到数据:data:25
退出生产者线程!
正在消费数据:data:25
正从队列获取数据...
拿到数据:data:26
正在消费数据:data:26
正从队列获取数据...
拿到数据:data:27
正在消费数据:data:27
正从队列获取数据...
拿到数据:data:28
正在消费数据:data:28
正从队列获取数据...
拿到数据:data:29
正在消费数据:data:29
正从队列获取数据...
拿到数据:data:30
正在消费数据:data:30
正从队列获取数据...
拿到数据:data:31
正在消费数据:data:31
正从队列获取数据...
拿到数据:data:32
正在消费数据:data:32
正从队列获取数据...
拿到数据:data:33
正在消费数据:data:33
正从队列获取数据...
拿到数据:data:34
正在消费数据:data:34
正从队列获取数据...
拿到数据:data:35
正在消费数据:data:35
正从队列获取数据...
退出消费者线程!

参考链接:
BlockingQueue队列详解 

BlockingQueue的使用

标签:java多线程

原文地址:http://blog.csdn.net/ldrmcml/article/details/44871963

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!