标签:des blog dha while self false 问题 adc creat
编程中,当涉及到多个线程需要访问同一个全局对象时,往往需要进行线程同步,而线程同步,往往是一个约定俗成的东西。比如说:
1 //*.cpp 2 int g_var ; pthread_mutex_t g_mtx ; 3 static void * threadFunc(void * Par) { 4 pthread_mutex_lock(&g_mtx) ; 5 6 g_var = //some value ; 7 8 return NULL ; 9 }
为了保持g_var的一致性,理应在所有线程改写其值时加锁保护,以防止其它线程同时对其进行修改而引发不好的行为。然而此时假如有某个线程,在改写全局对象g_var是并未加锁保护,或者使用了不同的锁,那么此前所有的保护动作都将前功尽弃。所以,在多线程中操作全局对象或者全局变量时,约定俗成的东西是这样:在任何线程访问需要同步的对象时,必须加锁保护,且所有线程必须为同一个锁。
基于以上的前提,当第一次看到pthread_cond_wait这个函数时,多少会感到有些无所适从。pthread_cond_wait函数用于等待某个信号的发生,当没有信号时,线程处于挂起的状态。pthread_cond_signal则是向指定的条件变量发送信号唤醒某个等待在该条件变量上的线程。所以直观地理解,这个两个函数该是配对使用的。让人疑惑的地方在于两个函数的参数是不同的。pthread_cond_signal(pthread_cond_t * cond) 和pthread_cond_wait(pthread_cond_t * cond, pthread_mutex_t * mtx) 按照官方文档的说法,pthread_cond_wait调用时,需要向其传入一个锁定的 mutex对象,用来保护把线程加入等待对列这个动作不被打断而因此错过条件变量上的信号。怎么理解呢,可以分别看看pthread_cond_signal和pthread_cond_wait的源代码。
大概的伪代码如下:
1 pthread_cond_signal(pthread_cond_t * cond) { 2 锁定cond->_data.__lock 3 if (cond 中有线程在等待) { 4 发送信号 5 } 6 7 解锁cond->_data.__lock 8 } 9 10 pthread_cond_wait(pthread_cond_t * cond, pthread_mutex_t * mtx) { 11 锁定cond->_data.__lock 12 解锁mtx 13 标记cond中变量,表示有线程等待在cond上 14 解锁cond->_data.__lock 15 进入等待 16 锁定mtx 17 }
可以看出,两个函数均使用了cond中的变量锁,为了保证pthread_cond_wait从函数调用到进入等待期间唯一占有cond,引用了一个cond外部的mutex, 在锁住cond->_data._lock 之后马上就释放掉, 就是为了保证在pthread_cond_wait进入等待前cond的内容不被外部修改。让人费解的地方在于,怎么pthread_cond_signal就没这个锁参数呢????按理说它是必须用到和pthread_cond_wait参数中同一把锁,否则无法得到上述的保证,万一调用者不小心使用了其它锁呢?看到网上有许多给出的例子这么写
1 pthread_cond_t cond ; 2 pthread_mutex_t mtx ; 3 4 static void * productor(void * par) { 5 while (1) { 6 pthread_mutex_lock(&mtx) ; 7 pthread_cond_signal(&cond) ; 8 pthrad_mutex_unlock(&mtx) ; 9 } 10 11 return NULL ; 12 } 13 14 static void * consumer(void * par) { 15 while (1) { 16 pthread_mutex_lock(&mtx) ; 17 pthread_cond_wait(&cond, &mtx) ; 18 pthread_mutex_unlock(&mtx) ; 19 } 20 21 return NULL ; 22 }
按照前面的分析,pthread_cond_wait确实能保证在它进入等待之前不会错过信号量。看起来代码没什么问题,但是想一下如下的场景:
当调用pthread_cond_signal时,没有线程等待在cond变量上,会发生什么呢?看pthread_cond_signal的源码可以知道,里面有一个关键的判断,只有检查到cond上有线程在等待时,才会发送信号,如果发送signal时没有线程在等待,该signal将丢失。在业务上可以表现为,接收线程收到了一条数据,要把数据发送给工作线程去处理,而此时所有的工作线程都在忙,没来得及调用pthread_cond_wait,那该数据就被丢掉了。为了防止这种情况发生,可以先建立一个缓冲区,存储所有的通知事件:
1 pthread_cond_t cond ; 2 pthread_mutex_t mtx ; 3 4 std::queue<task> TaskQueue ; 5 6 static void * productor(void * par) { 7 while (1) { 8 task t ; 9 pthread_mutex_lock(&mtx) ; 10 TaskQueue.push(t) ; 11 pthread_cond_signal(&cond) ; 12 pthrad_mutex_unlock(&mtx) ; 13 } 14 15 return NULL ; 16 } 17 18 static void * consumer(void * par) { 19 while (1) { 20 pthread_mutex_lock(&mtx) ; 21 if (TaskQueue.empty() == false) { 22 task t = TaskQueue.pop() ; 23 pthread_mutex_unlock(&mtx) ; 24 25 //do something //处理数据 26 } 27 else { 28 pthread_cond_wait(&cond, &mtx) ; //等待信号 29 if (TaskQueue.empty() != false) { 30 task t = TaskQueue.pop() ; 31 pthread_mutex_unlock(&mtx) ; 32 //do something //处理数据 33 34 continue ; 35 } 36 37 pthread_mutex_unlock(&mtx) ; 38 } 39 } 40 41 return NULL ; 42 }
完整测试工程代码:
1 #include <stdio.h> 2 #include <pthread.h> 3 #include <queue> 4 5 6 typedef std::queue<int> TaskQueue ; 7 #define COUNT 20000 //每个生产线程生产的任务量 8 9 pthread_cond_t g_cond ; //全局信号量 10 pthread_mutex_t g_mtx ; //全局互斥变量 11 TaskQueue g_task ; //任务队列 12 bool g_bConsumerStop = false ; 13 const int g_productorCount = 2 ; //生产线程数 14 const int g_consumerCount = 5 ; //消费线程数 15 int g_proExitCount = 0 ; 16 17 //生产线程函数,生产指定数量的任务量,完成后线程退出 18 static void * productor(void * parameter) { 19 pthread_t tid = pthread_self() ; 20 21 int n = 0 ; 22 while (1) { 23 pthread_mutex_lock(&g_mtx) ; 24 g_task.push(n) ; 25 pthread_cond_signal(&g_cond) ; 26 pthread_mutex_unlock(&g_mtx) ; 27 28 if (++n >= COUNT) { 29 break ; 30 } 31 } 32 33 pthread_mutex_lock(&g_mtx) ; 34 if (++g_proExitCount == g_productorCount) { 35 g_bConsumerStop = true ; 36 pthread_cond_broadcast(&g_cond) ; //唤醒所有等待线程 37 } 38 pthread_mutex_unlock(&g_mtx) ; 39 40 printf("productor thread exit: tid[%lu] create task count[%d]\n", tid, n) ; 41 42 return NULL ; 43 } 44 45 //消费者线程函数,处理完所有任务后线程退出 46 static void * consumer(void * parameter) { 47 pthread_t tid = pthread_self() ; 48 49 int nCount = 0 ; 50 while (1) { 51 pthread_mutex_lock(&g_mtx) ; 52 if (g_task.empty() == false) { 53 g_task.pop() ; 54 pthread_mutex_unlock(&g_mtx) ; 55 56 nCount++ ; 57 } 58 else if (g_bConsumerStop == false){ 59 pthread_cond_wait(&g_cond, &g_mtx) ; 60 if (g_task.empty() == false) { 61 g_task.pop() ; 62 nCount++ ; 63 pthread_mutex_unlock(&g_mtx) ; 64 65 continue ; 66 } 67 pthread_mutex_unlock(&g_mtx) ; 68 } 69 else { 70 pthread_mutex_unlock(&g_mtx) ; 71 break ; 72 } 73 74 } 75 76 printf("consumer thread exit: tid[%lu], handlecount[%d]\n", tid, nCount) ; 77 78 return NULL ; 79 } 80 81 int main(int argc, char * argv[]) 82 { 83 84 pthread_cond_init(&g_cond, NULL) ; 85 pthread_mutex_init(&g_mtx, NULL) ; 86 87 pthread_t szCsmHanle[g_consumerCount] = {0} ; 88 pthread_t szPrdHanle[g_productorCount] = {0} ; 89 for (int i=0; i<g_consumerCount; i++) { 90 pthread_create(&szCsmHanle[i], NULL, consumer, NULL) ; 91 } 92 for (int i=0; i<g_productorCount; i++) { 93 pthread_create(&szPrdHanle[i], NULL, productor, NULL) ; 94 } 95 for (int i=0; i<g_consumerCount; i++) { 96 pthread_join(szCsmHanle[i], NULL) ; 97 } 98 for (int i=0; i<g_productorCount; i++) { 99 pthread_join(szPrdHanle[i], NULL) ; 100 } 101 pthread_cond_destroy(&g_cond) ; 102 pthread_mutex_destroy(&g_mtx) ; 103 104 return 0 ; 105 }
pthread_cond_wait、pthread_cond_signal 不深入的思考
标签:des blog dha while self false 问题 adc creat
原文地址:http://www.cnblogs.com/afx-/p/7417421.html