标签:style blog http color java 使用 os io
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;/** *
@author jackyuj */public class BlockingQueueTest
{ public static void main(String[]
args) throws InterruptedException
{ //
声明一个容量为10的缓存队列 BlockingQueue<String>
queue = new LinkedBlockingQueue<String>(10); Producer
producer1 = new Producer(queue); Producer
producer2 = new Producer(queue); Producer
producer3 = new Producer(queue); Consumer
consumer = new Consumer(queue); //
借助Executors ExecutorService
service = Executors.newCachedThreadPool(); //
启动线程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); //
执行10s Thread.sleep(10 *
1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2000); //
退出Executor service.shutdown(); }} |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;/** *
消费者线程 *
*
@author jackyuj */public class Consumer
implements Runnable
{ public Consumer(BlockingQueue<String>
queue) { this.queue
= queue; } public void run()
{ System.out.println("启动消费者线程!"); Random
r = new Random(); boolean isRunning
= true; try { while (isRunning)
{ System.out.println("正从队列获取数据..."); String
data = queue.poll(2,
TimeUnit.SECONDS); if (null !=
data) { System.out.println("拿到数据:" +
data); System.out.println("正在消费数据:" +
data); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); }
else { //
超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。 isRunning
= false; } } }
catch (InterruptedException
e) { e.printStackTrace(); Thread.currentThread().interrupt(); }
finally { System.out.println("退出消费者线程!"); } } private BlockingQueue<String>
queue; private static final int DEFAULT_RANGE_FOR_SLEEP
= 1000;}import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/** *
生产者线程 *
*
@author jackyuj */public class Producer
implements Runnable
{ public Producer(BlockingQueue
queue) { this.queue
= queue; } public void run()
{ String
data = null; Random
r = new Random(); System.out.println("启动生产者线程!"); try { while (isRunning)
{ System.out.println("正在生产数据..."); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); data
= "data:" +
count.incrementAndGet(); System.out.println("将数据:" +
data + "放入队列..."); if (!queue.offer(data,
2,
TimeUnit.SECONDS)) { System.out.println("放入数据失败:" +
data); } } }
catch (InterruptedException
e) { e.printStackTrace(); Thread.currentThread().interrupt(); }
finally { System.out.println("退出生产者线程!"); } } public void stop()
{ isRunning
= false; } private volatile boolean isRunning
= true; private BlockingQueue
queue; private static AtomicInteger
count = new AtomicInteger(); private static final int DEFAULT_RANGE_FOR_SLEEP
= 1000;} |
标签:style blog http color java 使用 os io
原文地址:http://blog.csdn.net/aigoogle/article/details/38441987