标签:
(可能会有很多错误,请谨慎阅读,如果本人发现会及时更新)。
最近在学习多线程编程,周末的时候用java写了一个生产者消费模型,这里做一些记录和总结。
Producer
while(true)
data = generateData()
queue.enqueue(data)
Consumer
while(true)
data = queue.dequeue()
main
sharedQueue = new Queue()
producer1, producer2 ...
consumer1, consumer2 ...
start producer1, producer2 ...
start consumer1, consumer2 ...
stop producer1, producer2 ...
stop consumer1, consumer2 ...
这个程序中工作任务很简单就是生产者不断放入一些随机数到队列中,而消费者就是不断取出这些数并打印。
public class ProducerConsumer {
private static Queue<Integer> queue;
public static void main(String[] args) {
queue = new LinkedList<Integer>();
int producerNum = 1;
Producer[] producers = new Producer[producerNum];
for (int i = 0; i < producerNum; i++) {
producers[i] = new Producer(queue);
producers[i].start();
}
int consumerNum = 2;
Consumer[] consumers = new Consumer[consumerNum];
for (int j = 0; j < consumerNum; j++) {
consumers[j] = new Consumer(queue);
consumers[j].start();
}
try {
Thread.sleep(2 * 1000);
System.out.println("Main thread is awakened now!");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// stopping all producers and consumers
for (int i = 0; i < producerNum; i++) {
producers[i].stopLoop();
}
for (int j = 0; j < consumerNum; j++) {
consumers[j].stopLoop();
}
// make sure all threads is stopping
for (int i = 0; i < producerNum; i++) {
try {
producers[i].join();
System.out.println("Producer: " + producers[i].getId() + " is stopped!");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for (int j = 0; j < consumerNum; j++) {
try {
consumers[j].join();
System.out.println("Consumer: " + consumers[j].getId() + " is stopped!");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
class Producer extends Thread {
private Queue<Integer> queue;
private boolean running = true;
private long threadId; // 因为每次run之后,才会进入到新的thread
private static Lock lock = new ReentrantLock();
public Producer(Queue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
threadId = Thread.currentThread().getId();
while (running) {
try {
double item = Math.random() * 100;
lock.lock(); // 这里为什么加锁?因为queue我们使用的是LinkedList,所以offer操作不是同步的,不能让不同的生产者互相争抢
queue.offer((int)item);
System.out.println("Produce:" + threadId + "--" + item);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 放在finally中,保证程序不会中途出错而导致解锁步骤不运行
}
}
}
public void stopLoop() {
System.out.println("Stopping Producer -- " + threadId);
running = false;
}
}
class Consumer extends Thread {
/**
* 这版开始加锁,不再出现NullPointerException错误
* 消费的过程中其实和生产的过程没有冲突,所以只要消费者之间共享一把锁就行了
* 让queue.peek 和 poll绑定,使之检查是有效的
*/
private Queue<Integer> queue;
private boolean running = true;
private static Lock lock = new ReentrantLock();
private long threadId;
public Consumer(Queue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
threadId = Thread.currentThread().getId();
while (running) {
if (queue.peek() != null) {
try {
lock.lock(); // 加锁第一是因为poll非同步,还有peek和poll之间非原子性,不加锁会导致peek检查很容易失效
if (queue.peek() != null) { // double check
int item = queue.poll();
System.out.println("Consume:" + threadId +" -- " + item);
} else {
System.out.println("Consume:" + threadId +" -- empty");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
public void stopLoop() {
System.out.println("Stopping Consumer: " + threadId);
running = false;
}
}
开始点评这个版本:
- 巨大的缺点就是queue是无界的,很容易造成queue被爆掉
- running这个变量非同步,所以在主线程中调用stopLoop虽然可能生效,但是按照java的内存模型来说,没有同步的变量在不同线程中可能不能被互相观察到,这导致consumer的线程都观察不到running已经被主线程设置为false了。从而导致程序停不下来(查看Effective java的66条)。解决这个问题有两种方法,一种是使用synchronized修饰的方法来封装running的读写,一种是把running秀事成volatile(参看深入理解Java虚拟机:JVM高级特性与最佳实践)。
自己利用LinkedList实现了一个BlockQueue,当然java中本身就有个这个而数据结构,这个造轮子只是为了理解其中的原理。
这次这个程序稍微复杂一些,存取的不在是随机数,而是一个个log。
/**
* 1、第一版的时候我们只用了linkedlist来模拟queu,但是让出现producer特别慢,但是consumer又总是再探测浪费资源?
* 2、还有就是producer产生速度过去快,难道就让它把内存挤爆吗?
* 3、所以我们需要blockingQueue,第一是限制queue的大小,第二是协调两方的生产和消费速度。
* @author xiedandan
*
* 疑问点:使用monitor来设计一个blocking Queue
* 1. 不对queue进行synchronize,会爆出IllegalMonitorException
* 2. 然后开始不断调整synchronize位置,比如在while(running)外边,在while(running)里面
* 3. 然后就开了怎么都唤不醒consumer的bug
*/
public class ProducerConsumerBlockingQueue {
private static SudoBlockingQueue<Log> queue;
public static void main(String[] args) {
// 和格版本一致
}
}
/* mimic a bin-log restore system.
* Producer to produce operating logs and consumer to read the logs and try to restore the record.
* log schema: transactionId, operatingType, dataValue(before current operation)
*
* version 1: Suppose we only have one row record in whole table and only add operation,
* so the log order is not important.
*/
class ProducerLog extends Thread {
private SudoBlockingQueue<Log> queue;
private boolean volatile running = true;
private long threadId;
// private Lock lock; // 1. 为啥在这里我们不用lock呢?因为都封装在queue自身中了。
public ProducerLog(SudoBlockingQueue<Log> queue) {
this.queue = queue;
}
@Override
public void run() {
threadId = Thread.currentThread().getId();
while(running) {
try {
synchronized(queue) { // 2. 试试把这句话取掉会产生结果?
// 1.if (queue.isFull()) { // 3. 为什么不能使用if,而要使用while?
while (queue.isFull()) {
System.out.println("Proudce:" + threadId + "--- wait");
queue.wait();
}
boolean e = queue.isEmpty();
Log log = new Log(1, 1, (int)(Math.random() * 100));
System.out.println("Proudce:" + threadId + "---" + log);
queue.offer(log);
if (e) {
System.out.println("Proudce:" + threadId + "--- notify all");
queue.notifyAll();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stopLoop() {
System.out.println("Stopping Prouder:" + threadId);
running = false;
queue.syncNotifyAll(); // 4. 为啥这里还有再次notifyall一次呢?
}
}
class ConsumerLog extends Thread {
private SudoBlockingQueue<Log> queue;
private boolean running = true;
private long threadId;
public ConsumerLog(SudoBlockingQueue<Log> queue) {
this.queue = queue;
}
@Override
public void run() {
threadId = Thread.currentThread().getId();
while(running) {
try {
synchronized(queue) {
while (running && queue.isEmpty()) { //注意这里不仅要检查empty,还有检查running,不然程序会迅速再次陷入wait不能退出。
System.out.println("Consumer:" + threadId + "--wait");
queue.wait();
}
boolean f = queue.isFull();
Log log = queue.poll();
System.out.println("Consumer:" + threadId + "--" + log);
if (f) {
System.out.println("Consumer:" + threadId + "--notify");
queue.notifyAll();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stopLoop() {
System.out.println("Stopping Consumer -- " + threadId);
running = false;
queue.syncNotifyAll();
}
}
class Log {
public int tranId;
public int operaType;
public int data;
public Log(int tranId, int operaType, int data) {
this.tranId = tranId;
this.operaType = operaType;
this.data = data;
}
@Override
public String toString() {
return "TranId:" + tranId + ",OperaType:" + operaType + ",Data" + data;
}
}
class SudoBlockingQueue <T> extends LinkedList<T> {
private static final long serialVersionUID = 13344L;
private final int CAPACITY;
public SudoBlockingQueue(int capacity) {
CAPACITY = capacity;
}
boolean isFull() {
return super.size() == CAPACITY;
}
int getCapacity() {
return CAPACITY;
}
public synchronized void syncNotifyAll() {
this.notifyAll();
}
public synchronized void syncWait() {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
回答上面的问题:
这个版本设计的也比较糟糕,因为需要调用多次的stopLoop,可以把running设计成static,只用调用一次stopLoop即可。
写正确一个并发程序真实不容易,特别容易遇到各种死锁而结束不了程序。学会并发编程:
1. 生产者消费模型
2. 线程池
3. 进程池
标签:
原文地址:http://blog.csdn.net/physicsdandan/article/details/51931975