码迷,mamicode.com
首页 > 编程语言 > 详细

浅析线程间通信一:互斥量和条件变量

时间:2015-01-03 21:08:40      阅读:386      评论:0      收藏:0      [点我收藏+]

标签:linux   线程   同步   互斥量   条件变量   

     线程同步的目的简单来讲就是保证数据的一致性。在Linux中,常用的线程同步方法有互斥量( mutex )、读写锁和条件变量,合理使用这三种方法可以保证数据的一致性,但值得的注意的是,在设计应用程序时,所有的线程都必须遵守相同的数据访问规则为前提,才能保证这些同步方法有效,如果允许某个线程在没有得到访问权限(比如锁)的情况下访问共享资源,那么其他线程在使用共享资源前都获得了锁,也会出现数据不一致的问题。另外还有自旋锁、barrier和信号量线程同步方法。本文将讨论互斥量和条件变量的使用,并给出了相应的代码和注意事项,相关代码也在我的github上下载。

    互斥量

    互斥量从本质上说是一把锁,在访问共享资源前主动对互斥量进行加锁,在访问完成后需要主动释放互斥量上的锁。对互斥量加锁后,其他线程试图对互斥量加锁时都会被阻塞直到互斥量的锁释放(注意如果线程试图对同一个互斥量加锁两次,那么它自身就会陷入阻塞,即死锁状态。)。如果释放互斥量锁时有多个线程阻塞,所有阻塞在该互斥量的线程都会变成可运行状态,其中第一个变成可运行状态的线程获得互斥量的锁,其他线程会再次阻塞(注意那个阻塞线程的获得锁是跟实现相关的,是不确定的,比如可能是首先唤醒优先级最高的被阻塞线程,而优先级相同的,则按FIFO原则来唤醒)下面是使用互斥量来处理经典的生产者-消费者问题,代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define MAXNITEMS       1000000
#define MAXNTHREADS         100
#define MIN(a,b) (((a) < (b))?(a):(b))

int     nitems;         /* read-only by producer and consumer,to express maximum item */
struct {
  pthread_mutex_t   mutex;
  int   buff[MAXNITEMS];
  int   nput;
  int   nval;
} shared = { PTHREAD_MUTEX_INITIALIZER };

void    *produce(void *), *consume(void *);

int main(int argc, char **argv)
{
    int         i, nthreads, count[MAXNTHREADS];
    pthread_t   tid_produce[MAXNTHREADS], tid_consume;

    if (argc != 3)
    {
        printf("usage: prodcons4 <#items> <#threads>\n");
        return 1;
    }
    nitems = MIN(atoi(argv[1]), MAXNITEMS);
    nthreads = MIN(atoi(argv[2]), MAXNTHREADS);

    printf("main:%d,%d,%d",shared.nput,shared.nval,shared.buff[0]);

    /* create all producers and one consumer */
    for (i = 0; i < nthreads; i++) {
        count[i] = 0;
        pthread_create(&tid_produce[i], NULL, produce, &count[i]);
    }   
    pthread_create(&tid_consume, NULL, consume, NULL);

    /* wait for all producers and the consumer */
    for (i = 0; i < nthreads; i++) {
        pthread_join(tid_produce[i], NULL);
        printf("count[%d] = %d\n", i, count[i]);        
}
    pthread_join(tid_consume, NULL);

    exit(0);
}

void * produce(void *arg)
{
    for ( ; ; ) {
        pthread_mutex_lock(&shared.mutex);
        if (shared.nput >= nitems) {
            pthread_mutex_unlock(&shared.mutex);
            return(NULL);       /* array is full, we're done */
        }
        shared.buff[shared.nput] = shared.nval;
        shared.nput++;
        shared.nval++;
        pthread_mutex_unlock(&shared.mutex);
        *((int *) arg) += 1;
    }
}

void consume_wait(int i)
{
    for ( ; ; ) {
        pthread_mutex_lock(&shared.mutex);
        if (i < shared.nput) {
            pthread_mutex_unlock(&shared.mutex);
            return;         /* an item is ready */
        }
        pthread_mutex_unlock(&shared.mutex);
        sched_yield();
    }
}

void * consume(void *arg)
{
    int     i;

    for (i = 0; i < nitems; i++) {
        consume_wait(i);
        if (shared.buff[i] != i)
            printf("buff[%d] = %d\n", i, shared.buff[i]);
    }
    return(NULL);
}

编译运行后结果如下:

$gcc -Wall -lpthread mutex_example.c -o mutex_example
$./mutex_example 1000000 5
count[0] = 188090
count[1] = 197868
count[2] = 194924
count[3] = 211562
count[4] = 207556

上面程序实现是多个生产者线程和一个消费者线程同步,有以下几个地方值得注意:

I)结构体shared中包括一个互斥变量和需要同步的数据,把他们封装到一个结构体的目的是为了强调这些变量只应该在拥有其互斥锁时访问。把共享数据和它们的同步变量(互斥锁、条件变量或信号量)封装到一个结构体中,是一个很好的编程技巧。

II)在函数produce()中,每个线程的count元素的增加不属于锁的临界区,因为每个线程由各自的计数器count[i],应该总是尽可能减少由一个互斥锁锁住的代码量。

III)在函数consume()中,调用了函数consume_wait(i)来检测期待的条目是否准备好了,若没有准备好,则调用sched_yield(),调用该接口后,会使得消费线程让出当前的CPU,并把该线程移到相应优先级队列的末尾。在这里是消费线程把CPU让出给生产者线程。但这仍然不是理想的方法,理想的方法,当buff中有数据时(或有指定数目的数据时),生产者线程唤醒消费者线程。

    条件变量

     条件变量是线程可用的另一种同步机制。互斥量用于上锁,条件变量则用于等待,并且条件变量总是需要与互斥量一起使用。下面是用条件变量重新实现上面的消费者-生产者问题,代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define MAXNITEMS       1000000
#define MAXNTHREADS         100
#define MIN(a,b) (((a) < (b))?(a):(b))

int     nitems;             /* read-only by producer and consumer,to express maximum item */
int     buff[MAXNITEMS];
struct {
  pthread_mutex_t   mutex;
  int               nput;   /* next index to store */
  int               nval;   /* next value to store */
} put = { PTHREAD_MUTEX_INITIALIZER };

struct {
  pthread_mutex_t   mutex;
  pthread_cond_t    cond;
  int               nready; /* number ready for consumer */
} nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };

int     nsignals; /*call pthread_cond_signal count*/

void    *produce(void *), *consume(void *); 

int main(int argc, char **argv)
{
    int         i, nthreads, count[MAXNTHREADS];
    pthread_t   tid_produce[MAXNTHREADS], tid_consume;

    if (argc != 3)
    {   
        printf("usage: prodcons4 <#items> <#threads>\n");
        return 1;
    }   
    nitems = MIN(atoi(argv[1]), MAXNITEMS);
    nthreads = MIN(atoi(argv[2]), MAXNTHREADS);

    /* create all producers and one consumer */
for (i = 0; i < nthreads; i++) {
        count[i] = 0;
        pthread_create(&tid_produce[i], NULL, produce, &count[i]);
    }
    pthread_create(&tid_consume, NULL, consume, NULL);

    /* wait for all producers and the consumer */
    for (i = 0; i < nthreads; i++) {
        pthread_join(tid_produce[i], NULL);
        printf("count[%d] = %d\n", i, count[i]);
    }
    pthread_join(tid_consume, NULL);
    printf("nsignals = %d\n", nsignals);

    exit(0);
}

void * produce(void *arg)
{
    for ( ; ; ) {
        pthread_mutex_lock(&put.mutex);
        if (put.nput >= nitems) {
            pthread_mutex_unlock(&put.mutex);
            return(NULL);       /* array is full, we're done */
        }
        buff[put.nput] = put.nval;
        put.nput++;
        put.nval++;
        pthread_mutex_unlock(&put.mutex);

        pthread_mutex_lock(&nready.mutex);
        if (nready.nready == 0) {
            pthread_cond_signal(&nready.cond);
            nsignals++;
        }
        nready.nready++;
        pthread_mutex_unlock(&nready.mutex);

        *((int *) arg) += 1;
    }
}

void * consume(void *arg)
{
    int     i;

    for (i = 0; i < nitems; i++) {
        pthread_mutex_lock(&nready.mutex);
        while (nready.nready == 0)
            pthread_cond_wait(&nready.cond, &nready.mutex);
        nready.nready--;
        pthread_mutex_unlock(&nready.mutex);

        if (buff[i] != i)
            printf("buff[%d] = %d\n", i, buff[i]);
    }
    return(NULL);
}

编译运行后结果如下:

$gcc -Wall -lpthread cond_var_example.c  -o cond_var_example
$./cond_var_example  1000000 5
count[0] = 220234
count[1] = 201652
count[2] = 199165
count[3] = 182972
count[4] = 195977
nsignals = 3

上面程序实现是多个生产者线程和一个消费者线程同步,有以下几个地方值得注意:

      I)因为调度的不确定性,程序运行的结果每次都是不一样的。

      II)在consume()中可以看到,消费线程通过一个while循环等待nready.nready(用来统计当前有多少条目供消费者处理)变为非0,若nready.nready值为0,则调用pthread_cond_wait()阻塞线程,使线程进入休眠状态,直到其他线程调用pthread_cond_signal()唤醒它,而在唤醒它之前,该线程一直处于休眠状态,并不会参与调度或消耗CPU。关于函数pthread_cond_wait(),需要注意以下几点:

      a、线程在调用pthread_cond_wait()函数之前,需要获得与之关联的互斥量后才能调用改接口,在上面的例子中,就是计数器nready.nready对应的互斥量nready.mutex,只是获得改互斥量后,线程才能判断nready.nready是否为零,若是,进而以刚才互斥量的地址作为参数调用pthread_cond_wait()接口。若没有获取与之关联的互斥量就调用pthread_cond_wait,其结果是未定义的。

      b、调用pthread_cond_wait()函数,该函数原子地执行下面两个动作:首先给互斥量nready.mutex解锁;然后把调用线程投入睡眠,直到另外某个线程就本条件变量调用pthread_cond_signal。注意这两个操作必须是原子操作,否则线程在解锁后,投入睡眠之前,其他线程调用pthread_cond_signal的话,当前线程投入睡眠后,就一直得不到通知了,即错过条件的变化,在这里是nready.nready的值变化。执行上面两个操作后,线程已经睡眠了,此时函数pthread_cond_wait()还没有返回。

      c、当其他线程调用pthread_cond_signal或pthread_cond_broadcast时,会唤醒相应条件变量等待的线程,此时被唤醒的线程,可以参与调度了,此时被唤醒的线程继续执行pthread_cond_wait()函数,函数pthread_cond_wait()返回之前,会重新给条件变量对应的互斥量上锁,在这里就是nready.mutex,若该函数成功返回,则当前线程有重新获得了nready.mutex锁,当然nready.mutex也可能被其他线程继续占有,此时线程再次阻塞。总之,在函数pthread_cond_wait()成功返回之前,必然又对相应的互斥量成功加锁了,pthread_cond_wait从调用到返回,整个过程相当于:unlock, just_wait, lock这三个操作,并且前面两个操作是原子操作。

      d、在pthread_cond_wait()成功返回后,需要重新检查对应的条件是否成立,在这里是nready.nready的值。因为可能发生虚假的唤醒:期待的条件(在这里是消费线程期待nready.nready的值不为0)还不成立时的唤醒。各种线程实现都应该最大限度减少这些虚假唤醒的数量。

      III)在produce()可以看到,在nready.nready的值加1之前,如果计数器的值为0,那就调用pthread_cond_signal唤醒可能正在等待其值变为非零的线程。可以看出,互斥量是用来同步对nready.nready的访问,而关联的条件变量则用于等待和发送信号。如果仅仅是使用互斥量来同步,则就可能出现前面的忙等待(即使调用sched_yield也只是延迟了而已),而使用条件变量后,则线程就可以阻塞休眠了,直到需要的条件发生(在这里就是nready.nready的值非零),而改变这个条件的线程也是通过条件变量(以条件变量做为参数调用pthread_cond_signal或pthread_cond_broadcast)来通知阻塞在相应条件变量的线程。

      IV)调用pthread_cond_signal函数前不是一定要对互斥变量加锁的(当然也不存在返回时释放互斥量的情况),但通常修改条件变量时需要加锁,这就导致一个问题。pthread_cond_signal即可以放在pthread_mutex_lockpthread_mutex_unlock之间,也可以放在 pthread_mutex_lockpthread_mutex_unlock之后,到底放在什么位置比较好。在上面的例子中,使用的的属于第一种方法。即下面的形式:

pthread_mutex_lock
xxxxxxx
pthread_cond_signal
pthread_mutex_unlock
这样做的缺点是:在最话情况下,当该条件变量被发送信号后,系统立即调度等待其上的线程,该线程开始运行,但是在pthread_cond_wait返回之前,需要再次获得互斥量,而此时互斥量被其他线程(即唤醒它的那个线程)占有,因此被唤醒的线程再次阻塞,所以这样一来一回会有性能的问题。但是在LinuxThreads或者NPTL里面,就不会有这个问题,因为在Linux 线程中,有两个队列,分别是cond_wait队列和mutex_lock队列, pthread_cond_signal只是让线程从cond_wait队列移到mutex_lock队列,而不用返回到用户空间,不会有性能的损耗。另外一种情况形式如下:

pthread_mutex_lock
xxxxxxx
pthread_mutex_unlock
pthread_cond_signal

而我们上面的例子对应代码,可以改成:

int dosignal;
pthread_mutex_lock(&nready.mutex);
dosignal = (nready.nready == 0);
pthread_mutex_unlock(&nready.mutex);
if (dosignal)
pthread_cond_signal(&nready.cond);
这种形式优点是不会出现之前说的那个潜在的性能损耗,因为在通知等待线程之前就已经释放锁了;缺点是如果unlockpthread_cond_signal之间,有个低优先级的线程正在mutex上等待的话,那么这个低优先级的线程就会抢占高优先级的线程 (调用pthread_cond_wait的线程),而这在上面的放中间的模式下是不会出现的。

      V)通常pthread_cond_signal函数只唤醒等待在相应条件变量上的一个线程。如果没有线程处在阻塞等待在相应条件变量,pthread_cond_signal也会成功返回。假如有多个线程正在阻塞等待着这个条件变量的话,那么是根据各等待线程优先级的高低确定哪个线程接收到信号开始继续执行。如果各线程优先级相同,则根据等待时间的长短来确定哪个线程获得信号。但无论如何一个pthread_cond_signal调用最多发信一次(按照POSIX规定是至少一个线程唤醒,该接口实现也可能唤醒多个线程)。而在某些情况下,可能需要唤醒多个线程(比如一个写入者唤醒多个读者线程),此时可以调用pthread_cond_broadcast唤醒阻塞在相应条件变量上的所有线程。

      VI)总的来说,给条件变量发送信号的代码大体如下:

struct {    
  pthread_mutex_t   mutex;
  pthread_cond_t    cond;
  维护本条件的各个变量
  } var= { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };
pthread_mutex_lock(&var.mutex); 
设置条件为真
pthread_cond_signal(&var.cond);
pthread_mutex_unlock(&var.mutex);
在我们的例子中,用来维护条件的变量是一个整数计数器,设置条件的操作就是给该计数器加1。我们做了优化处理,即只有该计数器从0变为1时才发出条件变量的信号。测试条件并进入睡眠以等待条件变量为真的代码大体如下:

pthread_mutex_lock(&var.mutex);
while (条件为假)
pthread_cond_wait(&var.cond, &var.mutex);
修改条件
pthread_mutex_unlock(&var.mutex);
      VI)为什么使用条件变量,需要对一个互斥量加锁?因为条件变量的实现,是以已经获得的加锁的互斥量为前提的。若没有获取与之关联的互斥量就调用pthread_cond_wait,其结果是未定义的。这个互斥量就是用来对保护条件本身的访问。

参考资料

《UNIX环境高级编程》 11.6线程的同步 
《UNIX网络编程卷2:进程间通信》第7章、第8章和第10章
http://stackoverflow.com/questions/2763714/why-do-pthreads-condition-variable-functions-require-a-mutex 


浅析线程间通信一:互斥量和条件变量

标签:linux   线程   同步   互斥量   条件变量   

原文地址:http://blog.csdn.net/maximuszhou/article/details/42318169

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!