标签:
#include <pthread.h>
struct msg {
    struct msg *m_next;
    /* ... more stuff here ... */
    int m_id;
};
msg* workq;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
void process_msg(void)
{
    msg* mp;
    for ( ; ; )
    {
        pthread_mutex_lock(&qlock);
        while (workq == NULL)
        {
            pthread_cond_wait(&qready, &qlock);
        }
        mp = workq;
        workq = mp->m_next;
        pthread_mutex_unlock(&qlock);
        /* now process the message mp */
    }
}
void enqueue_msg(msg* mp)
{
    pthread_mutex_lock(&qlock);
    mp->m_next = workq;
    workq = mp;
    pthread_mutex_unlock(&qlock);
    pthread_cond_signal(&qready);
}
// threads/condvar.c 11-9
#include <stdlib.h>
// #include <stdio.h>
// #include <string.h>
// #include <unistd.h>
#include <pthread.h>
#include "apue.h"
struct job {
    job(pthread_t threadid): j_id(threadid)
    {
        memset((char*)this + sizeof(pthread_t), 0,
                sizeof(job) - sizeof(pthread_t));
    }
    pthread_t   j_id;   /* tells which thread handles this job */
    struct job *j_next;
    struct job *j_prev;
    /* ... more stuff here ... */
    bool        finished;
};
struct queue {
    job*            q_head;
    job*            q_tail;
    pthread_mutex_t q_lock;
    pthread_cond_t  q_cond;
};
/*
 * Initialize a queue.
 */
int queue_init(struct queue *qp)
{
    int err;
    qp->q_head = NULL;
    qp->q_tail = NULL;
    err = pthread_mutex_init(&qp->q_lock, NULL);
    if (err != 0)
        return(err);
    err = pthread_cond_init(&qp->q_cond, NULL);
    if (err != 0)
        return(err);
    /* ... continue initialization ... */
    return(0);
}
/*
 * Insert a job at the head of the queue.
 */
void job_insert(struct queue *qp, struct job *jp)
{
    pthread_mutex_lock(&qp->q_lock);
    jp->j_next = qp->q_head;
    jp->j_prev = NULL;
    if (qp->q_head != NULL)
        qp->q_head->j_prev = jp;
    else
        qp->q_tail = jp;	/* list was empty */
    qp->q_head = jp;
    pthread_mutex_unlock(&qp->q_lock);
}
/*
 * Append a job on the tail of the queue.
 */
void job_append(struct queue *qp, struct job *jp)
{
    pthread_mutex_lock(&qp->q_lock);
    printf("%ld %s write lock\n", pthread_self(), __FUNCTION__);
    fflush(stdout);
    jp->j_next = NULL;
    jp->j_prev = qp->q_tail;
    if (qp->q_tail != NULL)
    {
        qp->q_tail->j_next = jp;
    }
    else
    {
        qp->q_head = jp;	/* list was empty */
    }
    qp->q_tail = jp;
    pthread_mutex_unlock(&qp->q_lock);
    printf("%ld %s unlock\n", pthread_self(), __FUNCTION__);
    fflush(stdout);
    pthread_cond_signal(&qp->q_cond);
}
/*
 * Remove the given job from a queue.
 */
void job_remove(struct queue *qp, struct job *jp)
{
    pthread_mutex_lock(&qp->q_lock);
    printf("%ld %s write lock\n", pthread_self(), __FUNCTION__);
    fflush(stdout);
    if (jp == qp->q_head)
    {
        qp->q_head = jp->j_next;
        if (qp->q_tail == jp)
        {
            qp->q_tail = NULL;
        }
        else
        {
            jp->j_next->j_prev = jp->j_prev;
        }
    } else if (jp == qp->q_tail)
    {
        qp->q_tail = jp->j_prev;
        jp->j_prev->j_next = jp->j_next;
    }
    else
    {
        jp->j_prev->j_next = jp->j_next;
        jp->j_next->j_prev = jp->j_prev;
    }
    pthread_mutex_unlock(&qp->q_lock);
    printf("%ld %s unlock\n", pthread_self(), __FUNCTION__);
    fflush(stdout);
}
/*
 * Find a job for the given thread ID.
 */
job* job_find(struct queue *qp, pthread_t id)
{
    struct job *jp;
    if (pthread_mutex_lock(&qp->q_lock) != 0)
    {
        return(NULL);
    }
    printf("%ld %s read lock\n", pthread_self(), __FUNCTION__);
    fflush(stdout);
    for (jp = qp->q_head; jp != NULL; jp = jp->j_next)
    {
        if (pthread_equal(jp->j_id, id))
        {
            break;
        }
    }
    pthread_mutex_unlock(&qp->q_lock);
    printf("%ld %s unlock\n", pthread_self(), __FUNCTION__);
    fflush(stdout);
    return(jp);
}
void DoSomeWork(job* pJob)
{
    pthread_t threadid = pthread_self();
    printf("%ld DoSomeWork threadid \n", threadid);
    fflush(stdout);
    pJob->finished = true;
    usleep(10);
}
void* thr_fn(void* arg)
{
    pthread_t threadid = pthread_self();
    printf("%ld thread start threadid \n", threadid);
    fflush(stdout);
    int job_done = 0;
    job* pJob;
    for ( ; ; )
    {
        printf("%ld begin job finding \n", threadid);
        fflush(stdout);
        pJob = job_find(static_cast<queue*>(arg), threadid);
        if (NULL == pJob)
        {   // without the condtion, I have to do a lot of query, testing if pJob is NULL
            // not I could just wait for the condition, and then invoke the job_find method
            pthread_mutex_lock(&static_cast<queue*>(arg)->q_lock);
            pthread_cond_wait(&static_cast<queue*>(arg)->q_cond, &static_cast<queue*>(arg)->q_lock);
            pthread_mutex_unlock(&static_cast<queue*>(arg)->q_lock);
        }
        else
        {
            printf("%ld job found \n", threadid);
            fflush(stdout);
            DoSomeWork(pJob);
            printf("%ld %d job done \n", threadid, job_done + 1);
            fflush(stdout);
            job_remove(static_cast<queue*>(arg), pJob);
            if (50000 == ++job_done)
            {
                break; // thread has do 5 job, thread return;
            }
        }
    }
    printf("%ld I have finised %d job , returning threadid \n", threadid, job_done);
    fflush(stdout);
    return((void *)0);
}
int main()
{
    pthread_t threadid0;
    pthread_t threadid1;
    void* ret;
    int err;
    queue queue0;
    queue_init(&queue0);
    err = pthread_create(&threadid0, NULL, thr_fn, static_cast<void*>(&queue0));
    if (0 != err)
        err_quit("can‘t create thread: %s\n", strerror(err));
    err = pthread_create(&threadid1, NULL, thr_fn, static_cast<void*>(&queue0));
    if (0 != err)
        err_quit("can‘t create thread: %s\n", strerror(err));
    for (int i = 0; i < 100000; i++)
    {
        job* pJob;
        if (0 == i % 2)
        {
            pJob = new job(threadid0);
        }
        else
        {
            pJob = new job(threadid1);
        }
        usleep(100);
        job_append(&queue0, pJob);
    }
    printf("begin pthread_join thread0 \n");
    err = pthread_join(threadid0, &ret);
    if (err != 0)
        err_quit("can‘t join with thread 0: %s\n", strerror(err));
    printf("thread 0 exit code %d\n", (int)(long)ret);
    printf("begin pthread_join thread1 \n");
    err = pthread_join(threadid1, &ret);
    if (err != 0)
        err_quit("can‘t join with thread 1: %s\n", strerror(err));
    printf("thread 1 exit code %d\n", (int)(long)ret);
    printf("main thread exiting\n");
    return 0;
}
我没有完全使用msg,这个例子,我修改了读写锁的例子来完成,条件变量的例子
标签:
原文地址:http://www.cnblogs.com/sunyongjie1984/p/4282042.html