标签:
采用多生产者,多消费者模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | /** * 生产者 */ P(nempty); P(mutex); // 写入一个空闲位置 V(mutex); V(nstored); /** * 消费者 */ P(nstored); P(mutex): // 清空一个非空闲位置 V(mutex); V(nempty); |
全局性说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | #include "unpipc.h" #define NBUFF 10 #define MAXNTHREADS 100 int nitems, nproducers, nconsumers; /* read-only */ struct /* data shared by producers and consumers */ { int buff[NBUFF]; int nput; /* item number: 0, 1, 2, ... */ int nputval; /* value to store in buff[] */ int nget; /* item number: 0, 1, 2, ... */ int ngetval; /* value fetched from buff[] */ sem_t mutex, nempty, nstored; /* semaphores, not pointers */ } shared; void *produce( void *); void *consume( void *); /* end globals */ |
主函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | /* include main */ int main( int argc, char **argv) { int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS]; pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS]; if (argc != 4) err_quit( "usage: prodcons4 <#items> <#producers> <#consumers>" ); nitems = atoi (argv[1]); nproducers = min( atoi (argv[2]), MAXNTHREADS); nconsumers = min( atoi (argv[3]), MAXNTHREADS); /* 4initialize three semaphores */ Sem_init(&shared.mutex, 0, 1); Sem_init(&shared.nempty, 0, NBUFF); Sem_init(&shared.nstored, 0, 0); /* 4create all producers and all consumers */ Set_concurrency(nproducers + nconsumers); for (i = 0; i < nproducers; i++) { prodcount[i] = 0; Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]); } for (i = 0; i < nconsumers; i++) { conscount[i] = 0; Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]); } /* 4wait for all producers and all consumers */ for (i = 0; i < nproducers; i++) { Pthread_join(tid_produce[i], NULL); printf ( "producer count[%d] = %d\n" , i, prodcount[i]); } for (i = 0; i < nconsumers; i++) { Pthread_join(tid_consume[i], NULL); printf ( "consumer count[%d] = %d\n" , i, conscount[i]); } Sem_destroy(&shared.mutex); Sem_destroy(&shared.nempty); Sem_destroy(&shared.nstored); exit (0); } /* end main */ |
生产者线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | /* include produce */ void * produce( void *arg) { for ( ; ; ) { Sem_wait(&shared.nempty); /* wait for at least 1 empty slot */ Sem_wait(&shared.mutex); if (shared.nput >= nitems) { Sem_post(&shared.nstored); /* let consumers terminate */ Sem_post(&shared.nempty); Sem_post(&shared.mutex); return (NULL); /* all done */ } shared.buff[shared.nput % NBUFF] = shared.nputval; shared.nput++; shared.nputval++; Sem_post(&shared.mutex); Sem_post(&shared.nstored); /* 1 more stored item */ *(( int *) arg) += 1; } } /* end produce */ |
消费者线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | /* include consume */ void * consume( void *arg) { int i; for ( ; ; ) { Sem_wait(&shared.nstored); /* wait for at least 1 stored item */ Sem_wait(&shared.mutex); if (shared.nget >= nitems) { Sem_post(&shared.nstored); Sem_post(&shared.mutex); return (NULL); /* all done */ } i = shared.nget % NBUFF; if (shared.buff[i] != shared.ngetval) printf ( "error: buff[%d] = %d\n" , i, shared.buff[i]); shared.nget++; shared.ngetval++; Sem_post(&shared.mutex); Sem_post(&shared.nempty); /* 1 more empty slot */ *(( int *) arg) += 1; } } /* end consume */ |
标签:
原文地址:http://www.cnblogs.com/fengkang1008/p/4737021.html