标签:
欢迎转载,转载请注明出处。尊重他人的一丢丢努力,谢谢啦!
阅读本篇之后,如果你觉得说得还有点道理,那不妨先戳一下从生产者消费者窥探线程同步(下) ,两篇一起嚼才更好呢。
最近复习了下生产者消费者模式,虽然对它不太陌生,但要说认认真真地实现,还真从来没有过,这里将它总结一下,有不妥或者见识不到之处,欢迎留言指出。
大概基于以下2点:
(1)可以实现解耦
大多数设计模式,都会创造出一个第三者来担任解耦角色。比如末班模式的模板类,工厂模式的工厂类等。而消费者观察者模式则是使用拥塞队列来给两者解耦的。解耦之后,生产者和消费者就是两个相对独立的个体,他们之间不再进行直接的交互,而是通过拥塞队列来中转完成。
(2)线程安全
既然提到了拥塞队列,肯定就少不了并发问题,就少不了线程安全。更具体一点来说,整个安全控制要做到以下6点:
1 同一时间只有一个生产者生产;
2 同一时间只有一个消费者消费;
3 生产的同时不能消费;
4 消费的同时不能生产;
5 拥塞队列为空,不能消费;
6 拥塞队列为满,不能生产;
总之,就是一个时间点,只能进行一种活动。
从它的特点来看,要想通过不同的实现方式,必然要在线程安全这一块花点心思。代码层面的线程同步,主要有三种实现方式:阻塞,非阻塞和一些不需要同步方案的代码(本身就是安全的)。而在这三种方式中,使用最多的恐怕要数阻塞方式,也就是互斥同步,网上一些博文对这个概念似乎有偏颇之嫌,这里先明确一下两个概念:互斥和同步。
所谓的互斥,就是互斥同步(下文简称互斥),它是实现同步的一种阻塞方案,互斥是方法,同步是目的。它们两个并不是并列关系,而应该算是一种因果关系。
互斥的实现方式包括临界区Critical Section,互斥量Metux,信号量Semaphore,当然还有伟大的synchronized、以及Java 5以后提供的Lock等等。
网上大多数实现都是synchronized、Lock、BlockingQueue这三总方式,毋庸置疑,这三种方式确实用的比较多。值得指出的是,通过Semaphore和Metux的PV操作,同样可以达到目的。
前面已经说过,设计模式大多数都是奔着解耦去的,能使一团糟糕的代码变得条理清晰。在上代码不妨先来看一下程序结构:
主要四个文件构成,各自的作用如名字所示:
(1)首先得有产品吧,不然生产毛线,对应Product;
(2)有了产品,自然就有生产者和消费者,对应Producer和Consumer,实质是两个线程;
(3)有了生产者消费者,如何实现他们之间的交互,也就是怎么解决何时生产何时该消费呢,这就用到了前面说过的拥塞队列,对应StorageQueue。
惯例,到了能上代码就不说话环节。
//定义产品
public class Product {
long id;
String name;
public Product(long id, String name) {
super();
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "产品 详情:[id= " + id + " , name= " + name + "]";
}
}
//消费者
public class Consumer extends Thread {
int num;
public Consumer(int num) {
super();
this.num = num;
}
@Override
public void run() {
// TODO Auto-generated method stub
super.run();
//注意,不同的实现,这里需要更改为对应的仓库
StorageLock.consume(num);
}
}
//生产者
public class Producer extends Thread {
int num;
public Producer(int num) {
// TODO Auto-generated constructor stub
this.num = num;
}
@Override
public void run() {
// TODO Auto-generated method stub
super.run();
//注意,不同的实现,这里需要更改为对应的仓库
StorageLock.produce(num);
}
}
核心:拥塞控制完全交给BlockQueue来实现,这个队列内部用到了可重入锁的await()和singal()方法,队列满时,再存放就阻塞;队列空时,再取就阻塞。
值得注意,BlockQueue有两套(实际上是三套)存取的方法,分别是put()和take()、offer()和poll()。它们对着应不同的处理策略,说白了就是当队列满时,调用put()方法会阻塞,一直等到队列有空闲然后将元素放进去。而后者offer()不会等待,而是直接丢弃,返回false,它看起来更像是add()方法的线程安全版!!!自己动手写的时候,一定要注意。
public class StorageQueue {
public static Integer MAX = 50;
public static ArrayBlockingQueue<Product> list = new ArrayBlockingQueue<>(
MAX);
public static void main(String[] args) {
// TODO Auto-generated method stub
ExecutorService s = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
s.submit(new Producer(2));
s.submit(new Consumer(1));
}
}
public static void produce(Integer num) {
if (list.size() == MAX) {
System.out.println(Thread.currentThread().getName()
+ " 我是生产,我在等待... ");
}
try {
for (int i = 0; i < num; i++) {
list.put(new Product(i, ""));
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 库存: "
+ list.size());
}
public static void consume(Integer num) {
if (list.size() == 0) {
System.out.println(Thread.currentThread().getName()
+ " 我是消费,我在等待... ");
}
try {
for (int i = 0; i < num; i++) {
list.take();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 库存: "
+ list.size());
}
}
这是部分输出,考虑到篇幅,就不全部贴上了,后文附有源码下载链接,感兴趣的可自行下载运行:
pool-1-thread-1 库存: 2
pool-1-thread-2 库存: 1
pool-1-thread-4 库存: 0
pool-1-thread-5 库存: 3
pool-1-thread-1 库存: 1
pool-1-thread-2 库存: 2
pool-1-thread-6 库存: 2
...
核心:没什么好说的,主要是使用对象的notify()和wait()来实现线程间通信。可以用同步方法或者同步代码块,这里采用的是同步代码块。
public class StorageSync {
public static Integer MAX = 50;
public static List<Product> list = new ArrayList<>();
public static void main(String[] args) {
// TODO Auto-generated method stub
ExecutorService s = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
s.submit(new Producer(50));
s.submit(new Consumer(5));
}
}
public static void produce(Integer num) {
synchronized (list) {
AtomicInteger m = new AtomicInteger(0);
while (list.size() + num > MAX) {
// if (list.size() + num > MAX) {
try {
m.addAndGet(1);
System.out.println(Thread.currentThread().getName() + " 阻塞"
+ " m: " + m);
System.out.println("要生产的数量:" + num + "\t库存量:" + list.size()
+ "\t暂时不能执行生产任务!");
list.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// else {
for (int i = 0; i < num; i++) {
list.add(new Product(i, ""));
}
System.out.println(Thread.currentThread().getName() + " m: " + m);
System.out.println("已经生产数:" + num + "\t现仓储量为:" + list.size());
list.notify();
// }
}
}
public static void consume(Integer num) {
synchronized (list) {
// if (num > list.size()) {
while (num > list.size()) {
try {
System.out
.println(Thread.currentThread().getName() + " 阻塞");
System.out.println("要消费的数量:" + num + "\t库存量:" + list.size()
+ "\t暂时不能执行消费任务!");
list.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// } else {
for (int i = 0; i < num; i++) {
list.remove(0);
}
System.out.println(Thread.currentThread().getName());
System.out.println("已经消费数:" + num + "\t现仓储量为:" + list.size());
list.notifyAll();
// }
}
}
}
部分运行结果:
pool-1-thread-1 m: 0
已经生产数:50 现仓储量为:50
pool-1-thread-1 阻塞 m: 1
要生产的数量:50 库存量:50 暂时不能执行生产任务!
pool-1-thread-2
已经消费数:5 现仓储量为:45
pool-1-thread-1 阻塞 m: 2
要生产的数量:50 库存量:45 暂时不能执行生产任务!
pool-1-thread-2
已经消费数:5 现仓储量为:40
pool-1-thread-1 阻塞 m: 3
要生产的数量:50 库存量:40 暂时不能执行生产任务!
pool-1-thread-2 阻塞 m: 1
要生产的数量:50 库存量:40 暂时不能执行生产任务!
pool-1-thread-65
已经消费数:5 现仓储量为:35
...
注意代码中的AtomicInteger m
是我用来跟踪线程状态的变量,表次该线程阻塞的次数,完全可以删去。
不知道你有没有这样的疑问:代码中使用了这样的循环语句while (list.size() + num > MAX)
,为什么不用if(list.size() + num > MAX)
来判断呢?这里怎么看都应该是个顺序控制,而不应该是个循环呀?再说了,在执行到list.wait();
之后,线程不是阻塞了吗?后面的for循环语句自然就不会执行,为什么还要用while()来循环判断,岂不多余?
乍一听,上面的分析确实”蛮有道理”,而且我相信,大多数人第一次写的时候,很容易就想到if上来了。我们不妨先顺着这思路写一下,看看有什么后果。将代码中的if语句屏蔽去掉。
public static void produce(Integer num) {
synchronized (list) {
AtomicInteger m = new AtomicInteger(0);
// while (list.size() + num > MAX) {
if (list.size() + num > MAX) {
try {
m.addAndGet(1);
System.out.println(Thread.currentThread().getName() + " 阻塞"
+ " m: " + m);
System.out.println("要生产的数量:" + num + "\t库存量:" + list.size()
+ "\t暂时不能执行生产任务!");
list.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// else {
for (int i = 0; i < num; i++) {
list.add(new Product(i, ""));
}
System.out.println(Thread.currentThread().getName() + " m: " + m);
System.out.println("已经生产数:" + num + "\t现仓储量为:" + list.size());
list.notify();
// }
}
}
public static void consume(Integer num) {
synchronized (list) {
if (num > list.size()) {
// while (num > list.size()) {
try {
System.out
.println(Thread.currentThread().getName() + " 阻塞");
System.out.println("要消费的数量:" + num + "\t库存量:" + list.size()
+ "\t暂时不能执行消费任务!");
list.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// } else {
for (int i = 0; i < num; i++) {
list.remove(0);
}
System.out.println(Thread.currentThread().getName());
System.out.println("已经消费数:" + num + "\t现仓储量为:" + list.size());
list.notifyAll();
// }
}
}
部分运行结果:
pool-1-thread-1 m: 0
已经生产数:50 现仓储量为:50
pool-1-thread-2
已经消费数:5 现仓储量为:45
pool-1-thread-2 阻塞 m: 1
要生产的数量:50 库存量:45 暂时不能执行生产任务!
pool-1-thread-1 阻塞 m: 1
要生产的数量:50 库存量:45 暂时不能执行生产任务!
pool-1-thread-3 阻塞 m: 1
要生产的数量:50 库存量:45 暂时不能执行生产任务!**
pool-1-thread-4
已经消费数:5 现仓储量为:40
pool-1-thread-3 m: 1
已经生产数:50 现仓储量为:90**
pool-1-thread-1 m: 1
已经生产数:50 现仓储量为:140
pool-1-thread-2 m: 1
已经生产数:50 现仓储量为:190
...
what?丑旦,你的仓库容量都飙到190啦?这还了得!汗…
冷静下来,先分析一下原因。不妨从有代表性的pool-1-thread-3
入手,根据输出的结果来看,第一下pool-1-thread-3
阻塞的时候,它的输出是正常的,即打印了要生产的数量:50 库存量:45 暂时不能执行生产任务!
,然而第二次阻塞的时候,明知道仓库容不下,Mr pool-1-thread-3
先生还是不听话地生产了50个,一下子就爆仓了。
为什会这样呢?让我们暂且回到代码的结构上看一下。
根据这个图,再来重现一下上面的情况。
我们的Mr pool-1-thread-3
先生运气不太好,仓库已经满了,不能大展拳脚进行生产,只得乖乖滴执行了list.wait();
,放弃手中所持有的同步锁,目前处于阻塞状态(第一次阻塞)。注意虽然Mr pool-1-thread-3
先生已经放弃了同步锁,但他此时仍然处于”monitorenter”和”monitorexit”字节码之间,也就是说他目前的状态是,仍然处于同步块之中,暂时丧失了获得锁的权利,直到一个notify来临。得,那Mr pool-1-thread-3
先生,你先凉快一会儿去吧。
接下来,该我们的Miss pool-1-thread-4
小姐登场。仓库已满,只是消费的时候。Miss pool-1-thread-4
小姐毫不客气地消费了5个资源之后,爽快地抛了一个notifyAll()。
处于阻塞状态的每一双眼睛立刻闪现出欲望的光芒。原来救世主是你,我们的Mr pool-1-thread-3
先生心下大喜,立刻伸出双手。哎,你别说,再搓比的屌丝也有春天,再倒霉的线程也有狗屎运的时候。托总管JVM的福,Mr pool-1-thread-3
先生拿到了这把锁,他立刻从原地(也就是list.wait()这一行)出发,高歌猛进。
不好,一下子就进到了for循环里面,仓库就崩塌了,悲剧就发生了…
更可怕的时,Mr pool-1-thread-3
先生完全没有意识到自己闯下的大祸,在一顿愉快的生产之后,竟然也大手一挥,扮演起救世主的角色,继续抛出notify(),后面的情况可想而知。
反应快的同学可能要说了,为什么不在for循环前面加上一个else呢?
这是另外一个问题了。
不妨先把代码修改一下,看看有什么现象。
确实,一切”正常”,并没有爆仓,所有的生产、消费好像都没什么问题。但是细心的话,你会发现我这里打印的”m”的值,要么是0要么是1,永远不会有别的值。而使用while的则不一样。
至于这种想现象,还要回到代码上来看。
同样的,我们Mr pool-1-thread-3
先生拿到了Miss pool-1-thread-4
小姐释放的同步锁,他立刻从原地(也就是list.wait()这一行)出发,高歌猛进。
不好,下面是else分支,我进不去啊,而下面也没有可让我执行的语句了,Mr pool-1-thread-3
先生暗暗叫苦,吐血三升,倒地而亡,当然在蹬腿之前,他还要把自己辛苦拿到的锁释放出来…
这样也没什么问题呃,Mr pool-1-thread-3
先生生产不了,还有下一个Mr pool-1-thread-3
先生呢?
真的是这样吗?
Miss pool-1-thread-4
小姐发出的唤醒,本意是给那些阻塞在生产线,希望继续生产的优质男们,结果优质男们拿到锁之后并不能生产,而是直接挂了。对,就是挂了。而最终执行生产的都是那些一开始并没有被阻塞起来的线程。
这样看起来,有点类似于非阻塞的同步控制但并不是(遗忘的请自行回到篇首复习,说白了,就是一个线程先进行操作,如果没有发生竞争,那就成功了;如果发生竞争,这个线程就不断地重试,直到成功)。事实上,这种做法,选择的是一种抛弃策略,就是一个线程无法生产,那就放弃它,让下一个线程来尝试生产,直到仓库存满为止。造成的后果,就是资源浪费,想生产的不能保证都能生产到,该消费的也不保证都能消费到。
反观,while()则巧妙地化解了这个问题。
依然是,我们Mr pool-1-thread-3
先生拿到了Miss pool-1-thread-4
小姐释放的同步锁,他立刻从原地(也就是list.wait()这一行)出发,高歌猛进。
不好,Mr pool-1-thread-3
先生发现自己依然处在循环之中,要想出去,必须得满足判断条件。于是,他开始计算list.size() + num > MAX
是否成立,运气好的话(不成立),跳出循环,开始愉快地生产。运气不好(成立),依然被圈在while()里,只得再次执行了list.wait();
,释放同步锁,等待下一个救世主的到来…
所以我们也能看到,使用while()的程序输出的m值是不确定的,而且一个线程对应的m值,会呈现出增长的态势,也说它的状态是唤醒–等待–唤醒–等待…想生产而不得,委屈ing…也从侧面吻合了我们的分析。JDK源码中类似的并发控制,也都是用的while(),所有以后就放心地使用它吧。
如果你对这个地方的写法还有疑问,一定要自己把程序跑起来,对着输出分析一下。
后两种实现,我们放在下一篇讲。
标签:
原文地址:http://blog.csdn.net/luochoudan/article/details/51693854