生产者-消费者问题是一个经典的进程同步问题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制。在同一个进程地址空间内执行的N个线程生产者线程生产物品,然后将物品放置在一个空缓冲区中供N个消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放出一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来。
生产者个数、消费者个数、还是缓冲区大小、每个生产者生产产品的个数等。
生产者-消费者并发执行的过程。消费者消费完所有的产品结束。
//main.cpp #include "Storage.h" //代码如下 #include <signal.h> #include <pthread.h> #include <unistd.h> #include <string.h> #include <stdio.h> #include <stdio.h> #include <iostream> using namespace std; const int sleepTime = 1; struct passStruct { pthread_mutex_t *m_mutex; //所有线程所共享的互斥量 int *m_products; //生产者一次生产的产品数 Storage *m_storage; //共享的存储区 int *m_nThreadNum; //线程标号 }; pthread_cond_t notempty; pthread_cond_t notfull; void *producer(void *arg) { //获取从控制线程中传来的数值 passStruct tmp = *static_cast<passStruct *>(arg); //生产者每次生产的产品数 int *products = tmp.m_products; //控制线程中已经初始化了的互斥量 pthread_mutex_t *mutex = tmp.m_mutex; //获取仓库 Storage *storage = tmp.m_storage; char strToStore[128],strCurrentStored[128]; sprintf(strToStore,"\tAdd %d product to storage...\n",*products); while (true) { //获取互斥量 pthread_mutex_lock(mutex); //如果当前仓库已满(已经没有空间可以填充),则打印阻塞消息 if (!storage -> isHaveSpace()) { cout << "++ producer " << *(tmp.m_nThreadNum) << " Block\n" << endl; //等待条件变量,并且释放互斥量 pthread_cond_wait(?full,mutex); } //仓库中已经有空间了,先睡一会再进行生产 sleep(sleepTime); //打印激活消息 cout << "++ producer " << *(tmp.m_nThreadNum) << " Activate" << endl; //开始生产产品 write(STDOUT_FILENO,strToStore,strlen(strToStore)); storage -> addToStorage(*products); //打印当前仓库总的产品数 sprintf(strCurrentStored,"\t\tcurrent storage %d products\n\n", storage -> currentCount()); write(STDOUT_FILENO,strCurrentStored,strlen(strCurrentStored)); //通知等待信号量notempty的第一个消费者线程,当前仓库非空 pthread_cond_signal(?empty); pthread_mutex_unlock(mutex); //睡一会儿参与下一轮竞争 sleep(sleepTime); } pthread_exit(NULL); } void *consumer(void *arg) { passStruct tmp = *static_cast<passStruct *>(arg); pthread_mutex_t *mutex = tmp.m_mutex; Storage *storage = tmp.m_storage; string strToDisplay("\tGet a Product from storage...\n"); char strCurrentStored[128]; while (true) { //获取互斥量 pthread_mutex_lock(mutex); //当前仓库中已空(仓库中没有产品),则打印阻塞消息,并等待条件变量的到来 if (!storage -> isHaveProduct()) { cout << "-- consumer " << *(tmp.m_nThreadNum) << " Block\n" << endl; pthread_cond_wait(?empty,mutex); } //当前仓库中已经有产品了^^,先睡一会在进行消费 sleep(sleepTime); //打印激活消息 cout << "-- consumer " << *(tmp.m_nThreadNum) << " Activate" << endl; write(STDOUT_FILENO,strToDisplay.c_str(),strToDisplay.size()); storage -> getFromStorage(); //打印当前仓库中的产品数 sprintf(strCurrentStored,"\t\tcurrent storage %d products\n\n", storage -> currentCount()); write(STDOUT_FILENO,strCurrentStored,strlen(strCurrentStored)); //告知第一个阻塞在notfull条件变量的生产者线程,当前仓库已经有空间了 pthread_cond_signal(?full); pthread_mutex_unlock(mutex); //先睡一会再参与竞争 sleep(sleepTime); } pthread_exit(NULL); } //信号捕捉函数 void onSignal(int signalNumber) { switch (signalNumber) { //如果捕捉到SIGUSR1,则整个程序退出,SIGUSR1由stop.sh程序产生 case SIGUSR1: cout << "Main Program Ending..." << endl; _exit(0); break; case SIGINT: cout << "Can‘t Kill The Program with Ctrl+C, Please Use the Shell Script stop.sh!" << endl; sleep(5); break; default: break; } } int main() { //注册信号 signal(SIGUSR1,onSignal); signal(SIGINT,onSignal); //freopen("back.txt","w",stdout); //初始化互斥量以及条件变量 pthread_mutex_t *mutex = new pthread_mutex_t; pthread_mutex_init(mutex,NULL); pthread_cond_init(?empty,NULL); pthread_cond_init(?full,NULL); int numberOfProducer; cout << "Please input the number of Producer: "; cin >> numberOfProducer; int numberOfConsumer; cout << "Please input the number of Consumer: "; cin >> numberOfConsumer; int numberOfProducts; cout << "Please input the number of Products for ONE PRODUCER: "; cin >> numberOfProducts; int sizeOfStorage; cout << "Please input the size of the Storage: "; cin >> sizeOfStorage; Storage storage(sizeOfStorage); //初始化所传递的值 passStruct passValue; passValue.m_mutex = mutex; passValue.m_products = &numberOfProducts; passValue.m_storage = &storage; pthread_t pthreadProducer,pthreadConsumer; for (int i = 0; i != numberOfConsumer; ++i) { passValue.m_nThreadNum = new int(i+1); pthread_create(&pthreadConsumer,NULL,consumer, static_cast<void *>(&passValue)); } for (int i = 0; i != numberOfProducer; ++i) { passValue.m_nThreadNum = new int(i+1); pthread_create(&pthreadProducer,NULL,producer, static_cast<void *>(&passValue)); } //等待线程结束 pthread_join(pthreadProducer,NULL); pthread_join(pthreadConsumer,NULL); pthread_mutex_destroy(mutex); delete mutex; pthread_cond_destroy(?empty); pthread_cond_destroy(?full); return 0; }
//Storage.h #ifndef STORAGE_H_INCLUDED #define STORAGE_H_INCLUDED class Storage { public: Storage(int); ~Storage(); int currentCount() { return hasBeenStored; } bool isHaveSpace() { if (hasBeenStored < bufferSize) { return true; } return false; } bool isHaveProduct() { if (hasBeenStored != 0) { return true; } return false; } bool isEmpty() { return hasBeenStored == 0; } void addToStorage(int n); void getFromStorage(); private: int bufferSize; int hasBeenStored; }; #endif // STORAGE_H_INCLUDED
//Storage.cpp #include "Storage.h" Storage::Storage(int n = 0):bufferSize(n),hasBeenStored(0) { } Storage::~Storage() { } void Storage::addToStorage(int n) { if (hasBeenStored + n > bufferSize) hasBeenStored = bufferSize; else hasBeenStored += n; } void Storage::getFromStorage() { if (!isEmpty()) -- hasBeenStored; else hasBeenStored = 0; }
//Makefile CC = g++ CPPFLAGS = -Wall -g -pthread SOURCES = $(wildcard *.cpp) OBJECTS = $(SOURCES:.cpp=.o) BIN = main .PHONY: all clean all: $(BIN) $(BIN): $(OBJECTS) $(CC) $(CPPFLAGS) -o $@ $^ @echo "# # # # # # OK! # # # # # " %.o: %.cpp $(CC) $(CPPFLAGS) -c $^ -o $@ clean: -rm -rf $(BIN) $(OBJECTS) *.cbp *.layout
main程序的启动脚本
#!/bin/bash # A Shell Script for Start the main program BIN=main ISEXIST=$(/bin/ls | /bin/grep main$) #if this program not exits, make it. if [ "$ISEXIST" = "" ] ; then /usr/bin/make fi #if this program not running, start it. PID=$(/usr/bin/pgrep $BIN) if [ "$PID" = "" ] ; then ./main fi
main程序的终止脚本
#!/bin/bash # A Shell Script for Stop the main Program PID=$(/usr/bin/pgrep main) if [ "$PID" != "" ] ; then /bin/kill -USR1 $PID #给main程序发送SIGUSR1信号 fi
原文地址:http://blog.csdn.net/zjf280441589/article/details/41287091