码迷,mamicode.com
首页 > 其他好文 > 详细

Unix IPC之Posix信号量实现生产者消费者

时间:2015-08-17 18:48:12      阅读:211      评论:0      收藏:0      [点我收藏+]

标签:

采用多生产者,多消费者模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
 * 生产者
 */
P(nempty);
P(mutex);
// 写入一个空闲位置
V(mutex);
V(nstored);
 
/**
 * 消费者
 */
P(nstored);
P(mutex):
// 清空一个非空闲位置
V(mutex);
V(nempty);

全局性说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include    "unpipc.h"
 
#define NBUFF        10
#define MAXNTHREADS 100
 
int     nitems, nproducers, nconsumers;     /* read-only */
 
struct      /* data shared by producers and consumers */
{
    int   buff[NBUFF];
    int   nput;           /* item number: 0, 1, 2, ... */
    int   nputval;        /* value to store in buff[] */
    int   nget;           /* item number: 0, 1, 2, ... */
    int   ngetval;        /* value fetched from buff[] */
    sem_t mutex, nempty, nstored;     /* semaphores, not pointers */
} shared;
 
void    *produce(void *);
void    *consume(void *);
/* end globals */

主函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/* include main */
int
main(int argc, char **argv)
{
    int     i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];
    pthread_t   tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS]; 
 
    if (argc != 4)
        err_quit("usage: prodcons4 <#items> <#producers> <#consumers>");
    nitems = atoi(argv[1]);
    nproducers = min(atoi(argv[2]), MAXNTHREADS);
    nconsumers = min(atoi(argv[3]), MAXNTHREADS);
 
    /* 4initialize three semaphores */
    Sem_init(&shared.mutex, 0, 1);
    Sem_init(&shared.nempty, 0, NBUFF);
    Sem_init(&shared.nstored, 0, 0);
 
    /* 4create all producers and all consumers */
    Set_concurrency(nproducers + nconsumers);
    for (i = 0; i < nproducers; i++)
    {
        prodcount[i] = 0;
        Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);
    }
    for (i = 0; i < nconsumers; i++)
    {
        conscount[i] = 0;
        Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);
    }
 
    /* 4wait for all producers and all consumers */
    for (i = 0; i < nproducers; i++)
    {
        Pthread_join(tid_produce[i], NULL);
        printf("producer count[%d] = %d\n", i, prodcount[i]);
    }
    for (i = 0; i < nconsumers; i++)
    {
        Pthread_join(tid_consume[i], NULL);
        printf("consumer count[%d] = %d\n", i, conscount[i]);
    }
 
    Sem_destroy(&shared.mutex);
    Sem_destroy(&shared.nempty);
    Sem_destroy(&shared.nstored);
    exit(0);
}
/* end main */

生产者线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/* include produce */
void *
produce(void *arg)
{
    for ( ; ; )
    {
        Sem_wait(&shared.nempty);   /* wait for at least 1 empty slot */
        Sem_wait(&shared.mutex);
 
        if (shared.nput >= nitems)
        {
            Sem_post(&shared.nstored);  /* let consumers terminate */
            Sem_post(&shared.nempty);
            Sem_post(&shared.mutex);
            return(NULL);           /* all done */
        }
 
        shared.buff[shared.nput % NBUFF] = shared.nputval;
        shared.nput++;
        shared.nputval++;
 
        Sem_post(&shared.mutex);
        Sem_post(&shared.nstored);  /* 1 more stored item */
        *((int *) arg) += 1;
    }
}
/* end produce */

消费者线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* include consume */
void *
consume(void *arg)
{
    int     i;
 
    for ( ; ; )
    {
        Sem_wait(&shared.nstored);  /* wait for at least 1 stored item */
        Sem_wait(&shared.mutex);
 
        if (shared.nget >= nitems)
        {
            Sem_post(&shared.nstored);
            Sem_post(&shared.mutex);
            return(NULL);           /* all done */
        }
 
        i = shared.nget % NBUFF;
        if (shared.buff[i] != shared.ngetval)
            printf("error: buff[%d] = %d\n", i, shared.buff[i]);
        shared.nget++;
        shared.ngetval++;
 
        Sem_post(&shared.mutex);
        Sem_post(&shared.nempty);   /* 1 more empty slot */
        *((int *) arg) += 1;
    }
}
/* end consume */







Unix IPC之Posix信号量实现生产者消费者

标签:

原文地址:http://www.cnblogs.com/fengkang1008/p/4737021.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!