<2>可以在C++代码中使用,因为头文件里增加了extern "C"声明。
可以在头文件设置 SEC_TIME,MSEC_TIME 来实现内部等待超时的秒级,微妙级超时等待。
e.g tm_queue_t my_queue = {.max = 100};//定义一个最大能缓冲100个元素的队列。
struct node my_node;//队列中出队入队的基本单位。用来临时缓存。
e.g queue_init(&my_queue);
e.g queue_in(&my_queue, &my_node);
e.g queue_out(&my_queue, BLOCK, &my_node);
e.g queue_clear(&my_queue);
e.g queue_destroy(&my_queue);
<7>该队列基本操作元素为struct node;使用者可以根据自己需要在该结构体内部设置自己的数据结构。
测试 *
先申请两个长度为MAX到数组,int buf_1[MAX];int buf_2[MAX]
<pre name="code" class="cpp">/****************************************************************************** 版权所有 (C), 2014-2017, XXXX科技股份有限公司 ****************************************************************************** 文 件 名 : tm_queue.h 版 本 号 : 初稿 作 者 : liuhuahan 生成日期 : 2014年12月2日 最近修改 : 功能描述 : 出队时可以阻塞等待的队列 函数列表 : 修改历史 : 1.日 期 : 2014年12月2日 作 者 : liuhuahan 修改内容 : 创建文件 ******************************************************************************/ /*----------------------------------------------* * 包含头文件 * *----------------------------------------------*/ /*----------------------------------------------* * 外部变量说明 * *----------------------------------------------*/ /*----------------------------------------------* * 外部函数原型说明 * *----------------------------------------------*/ /*----------------------------------------------* * 内部函数原型说明 * *----------------------------------------------*/ /*----------------------------------------------* * 全局变量 * *----------------------------------------------*/ /*----------------------------------------------* * 模块级变量 * *----------------------------------------------*/ /*----------------------------------------------* * 常量定义 * *----------------------------------------------*/ /*----------------------------------------------* * 宏定义 * *----------------------------------------------*/ #ifndef TM_QUEUE_H #define TM_QUEUE_H #include <pthread.h> #include <sys/time.h> #include <stdbool.h> #include <stdlib.h> #include <stdio.h> #ifdef __cplusplus #if __cplusplus extern "C"{ #endif #endif /* __cplusplus */ #define TIMEOUT 110 /* pthread_cond_timedwait函数等待超时的返回值 */ /* 调试开关 */ //#define BZQ_DEBUG #ifdef BZQ_DEBUG #define DBG_PRINT(fmt, args...) do{ printf("%s %s %d ",__FILE__, __FUNCTION__, __LINE__); printf(fmt, ##args); }while(0); #else #define DBG_PRINT(fmt, args...) #endif #define SEC_TIME 1 /* 等待超时时间,单位s */ #define MSEC_TIME 000 /* 等待超时时间,单位ms */ #define BLOCK 1 /* 阻塞出队标志 */ #define NOBLOCK 0 /* 非阻塞出队 */ struct node{ int buf[3]; }; typedef struct tm_queue{ pthread_cond_t msg_sem;/* 消息条件信号*/ pthread_mutex_t lock;/* 互斥锁 */ const int max;/*该队列缓冲区最大值;*/ int head;/* 队列头索引号 */ int tail;/* 队列尾索引号 */ struct node *nodes;/* 该队列缓冲区指针 */ } tm_queue_t; int queue_init(tm_queue_t *tm_que_p);/* 初始化函数 */ int queue_destroy(tm_queue_t *tm_que_p);/* 销毁队列 */ int queue_in(tm_queue_t *tm_que_p, struct node * node_p);/* 入队 */ int queue_out(tm_queue_t *tm_que_p, int flag, struct node * node_p);/* 出队 */ int get_queue_length( tm_queue_t *tm_que_p); //返回队列的长度 void queue_clear(tm_queue_t *tm_que_p); //清空队列 void queue_print(tm_queue_t *tm_que_p); //打印队列 #ifdef __cplusplus #if __cplusplus } #endif #endif /* __cplusplus */ #endif /* TM_QUEUE_H */
<pre name="code" class="cpp">tm_queue.c
/****************************************************************************** 版权所有 (C), 2014-2017, XXXX科技股份有限公司 ****************************************************************************** 文 件 名 : tm_queue.c 版 本 号 : 初稿 作 者 : liuhuahan 生成日期 : 2014年12月2日 最近修改 : 功能描述 : 出队时可以阻塞等待的队列 函数列表 : get_queue_length is_empty node_print queue_destroy queue_in queue_init queue_out queue_print set_timeout 修改历史 : 1.日 期 : 2014年12月2日 作 者 : liuhuahan 修改内容 : 创建文件 ******************************************************************************/ /*----------------------------------------------* * 包含头文件 * *----------------------------------------------*/ /*----------------------------------------------* * 外部变量说明 * *----------------------------------------------*/ /*----------------------------------------------* * 外部函数原型说明 * *----------------------------------------------*/ /*----------------------------------------------* * 内部函数原型说明 * *----------------------------------------------*/ /*----------------------------------------------* * 全局变量 * *----------------------------------------------*/ /*----------------------------------------------* * 模块级变量 * *----------------------------------------------*/ /*----------------------------------------------* * 常量定义 * *----------------------------------------------*/ /*----------------------------------------------* * 宏定义 * *----------------------------------------------*/ #include"tm_queue.h" static bool is_empty(tm_queue_t *tm_que_p); //判断队列是否为空 static int set_timeout(struct timespec * outtime, int sec, int msec); static void node_print(int num, struct node * message);/* 单个元素打印 */ /***************************************************************************** 函 数 名 : tm_queue_init 功能描述 : 队列初始化 输入参数 : tm_queue_t * tm_que_p int num 输出参数 : 无 返 回 值 : 0成功,-1 paramenter err,-2 malloc err,-3 mutex err,-4 cond err 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月27日 作 者 : liu 修改内容 : 新生成函数 *****************************************************************************/ int queue_init(tm_queue_t *tm_que_p) { int ret = 0; if((!tm_que_p)||(tm_que_p->max <= 0)){ return -1; } tm_que_p->head = 0; tm_que_p->tail = 0; tm_que_p->nodes = (struct node *)malloc(tm_que_p->max * sizeof(struct node)); if(tm_que_p->nodes == 0){ return -2; } ret = pthread_mutex_init(&(tm_que_p->lock),NULL); if(ret){ free(tm_que_p->nodes); return -3; } ret = pthread_cond_init(&(tm_que_p->msg_sem),NULL); if(ret){ free(tm_que_p->nodes); pthread_mutex_destroy(&tm_que_p->lock); return -4; } return ret; } /***************************************************************************** 函 数 名 : queue_destroy 功能描述 : 销毁队列 输入参数 : tm_queue_t * tm_que_p 输出参数 : 无 返 回 值 :0:成功,-1:参数错误,-2:mutex销毁失败,-3:cond销毁失败,-4:两个销毁均失败 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月27日 作 者 : liu 修改内容 : 新生成函数 *****************************************************************************/ int queue_destroy(tm_queue_t * tm_que_p) { int ret = 0, tmp = 0; if(!tm_que_p){ return -1; } tm_que_p->tail = 0; tm_que_p->head = 0; free(tm_que_p->nodes); tmp = pthread_mutex_destroy(&tm_que_p->lock); if(tmp){ DBG_PRINT("pthread_mutex_destroy failed\n") ret = -2; } tmp = pthread_cond_destroy(&tm_que_p->msg_sem); if(tmp){ DBG_PRINT("pthread_cond_destroy failed\n") if(ret){ return -4; }else{ return -3; } } return ret; } /***************************************************************************** 函 数 名 : queue_in 功能描述 : 入队函数 输入参数 : tm_queue_t * tm_que_p struct node * node_p 输出参数 : 无 返 回 值 : 0 success, -1 failed 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月27日 作 者 : liu 修改内容 : 新生成函数 *****************************************************************************/ int queue_in(tm_queue_t * tm_que_p,struct node * node_p) { int ret; ret = pthread_mutex_lock(&(tm_que_p->lock)); if(ret){ DBG_PRINT("pthread_mutex_unlock failed,err = %d\n",ret); return -1; } if((tm_que_p->tail+ 1) % tm_que_p->max == tm_que_p->head){ //队列已满 ret = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG DBG_PRINT("queue have full,queue_in failed\n"); if(ret){ DBG_PRINT("pthread_mutex_unlock failed,err = %d\n",ret); } #endif /* BZQ_DEBUG */ return -1; } tm_que_p->nodes[tm_que_p->tail] = *node_p; //元素进队 tm_que_p->tail = (tm_que_p->tail + 1) % (tm_que_p->max); //游标tail下移一位,如果已达最后,就移到前面 printf("id = %d, number = %d, tail = %d\n",node_p->buf[0],node_p->buf[2],tm_que_p->tail); ret = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(ret){ DBG_PRINT("pthread_mutex_unlock failed,err = %d\n",ret); } #endif /* BZQ_DEBUG */ pthread_cond_signal(&(tm_que_p->msg_sem)); return 0; } /***************************************************************************** 函 数 名 : queue_out 功能描述 : 队头元素出队 输入参数 : tm_queue_t * tm_que_p int flag 输出参数 : struct node * node_p 返 回 值 : 0成功,-1失败 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月29日 作 者 : liuhuahan 修改内容 : 新生成函数 *****************************************************************************/ int queue_out(tm_queue_t * tm_que_p, int flag, struct node * node_p) { int ret, tmp; struct timespec outtime; ret = pthread_mutex_lock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(ret){ DBG_PRINT("pthread_mutex_lock failed,err = %d\n",ret); return -1; } #endif /* BZQ_DEBUG */ while(1){ //如果队列为空解锁等待信号 if(is_empty(tm_que_p)){ /* 若为NOBLOCK模式,直接返回-1 */ if(flag == NOBLOCK){ return -1; } set_timeout(&outtime,SEC_TIME,MSEC_TIME); ret = pthread_cond_timedwait(&(tm_que_p->msg_sem), &(tm_que_p->lock), &outtime); if(!ret){ /* 严格讲该处应该再次判断一次,防止多个读线程可能引起的冲突, * 但是因为使用队列时只有一个读线程,故该处不判断也可以. */ if(is_empty(tm_que_p)){ DBG_PRINT("UNEXPECTED IS EMPTY\n") continue; } *node_p = tm_que_p->nodes[tm_que_p->head];//返回队头的元素 tm_que_p->head = (tm_que_p->head + 1) % (tm_que_p->max); //游标header向下移一位,如果是队列的末尾移动到最前面 tmp = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG // printf("out ID = %d, number = %d, head = %d\n",node_p->buf[0],node_p->buf[2],tm_que_p->head); if(tmp){ DBG_PRINT("pthread_mutex_unlock failed,err = %d\n",tmp); } #endif /* BZQ_DEBUG */ return 0; }else if(ret == TIMEOUT){ continue; }else{ DBG_PRINT("pthread_cond_timedwait failed,err = %d\n",ret); tmp = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(tmp){ DBG_PRINT("pthread_mutex_unlock failed,err = %d\n",tmp); } #endif /* BZQ_DEBUG */ return -1; } }else{ *node_p = tm_que_p->nodes[tm_que_p->head];//返回队头的元素 tm_que_p->head = (tm_que_p->head + 1) % tm_que_p->max; //游标header向下移一位,如果是队列的末尾移动到最前面 tmp = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG // printf("out ID = %d, number = %d, head = %d\n",node_p->buf[0],node_p->buf[2],tm_que_p->head); if(tmp){ DBG_PRINT("pthread_mutex_unlock failed,err = %d\n",tmp); } #endif /* BZQ_DEBUG */ return 0; } } } /***************************************************************************** 函 数 名 : set_timeout 功能描述 : 设置超时时间 输入参数 : int sec:超时时间,单位s; int msec :超时时间,单位ms 输出参数 : struct timespec * outtime 返 回 值 : 0:success,-1:paramter invalid,-2 gettimeofday failed 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月28日 作 者 : liu 修改内容 : 新生成函数 *****************************************************************************/ static int set_timeout(struct timespec * outtime, int sec, int msec) { int ret; struct timeval now; if(outtime == NULL){ DBG_PRINT("paramer invalid\n"); return -1; } ret = gettimeofday(&now, NULL); if(ret){ DBG_PRINT("gettimeofday failed,err = %d\n",ret); return -2; } outtime->tv_sec = now.tv_sec + sec; outtime->tv_nsec = (now.tv_usec + msec*1000 )* 1000; return 0; } /***************************************************************************** 函 数 名 : is_empty 功能描述 : 检测队列是否为空,因该函数内部未加锁,故只供内部调用 输入参数 : tm_queue_t * tm_que_p 输出参数 : 无 返 回 值 : 0:非空,1:空 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月28日 作 者 : liu 修改内容 : 新生成函数 *****************************************************************************/ static bool is_empty(tm_queue_t * tm_que_p) { return (tm_que_p->head == tm_que_p->tail)? true : false; } /***************************************************************************** 函 数 名 : get_queue_length 功能描述 : 获得队列中存储元素个数 输入参数 : tm_queue_t * tm_que_p 输出参数 : 无 返 回 值 : -1失败,大于等于0成功,值为队列中的元素个数 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月29日 作 者 : liuhuahan 修改内容 : 新生成函数 *****************************************************************************/ int get_queue_length(tm_queue_t * tm_que_p) { int len, ret; ret = pthread_mutex_lock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(ret){ DBG_PRINT("pthread_mutex_lock failed err = %d\n",ret); return -1; } #endif /* BZQ_DEBUG */ len = (tm_que_p->tail - tm_que_p->head + tm_que_p->max) % tm_que_p->max; ret = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(ret){ DBG_PRINT("pthread_mutex_unlock failed err = %d\n",ret); return -1; } #endif /* BZQ_DEBUG */ return len; } /***************************************************************************** 函 数 名 : queue_print 功能描述 : 打印队列中的所有元素 输入参数 : tm_queue_t * tm_que_p 输出参数 : 无 返 回 值 : 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月29日 作 者 : liuhuahan 修改内容 : 新生成函数 *****************************************************************************/ void queue_print(tm_queue_t * tm_que_p) { int i,ret; ret = pthread_mutex_lock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(ret){ DBG_PRINT("pthread_mutex_lock failed,err = %d\n",ret); return; } DBG_PRINT("head = %d, tail = %d\n",tm_que_p->head,tm_que_p->tail); #endif /* BZQ_DEBUG */ if(tm_que_p->head == tm_que_p->tail) return; else if(tm_que_p->tail < tm_que_p->head) { for(i = tm_que_p->head; i < tm_que_p->max; ++i) node_print(i, &tm_que_p->nodes[i]); for(i = 0; i < tm_que_p->tail; ++i) node_print(i, &tm_que_p->nodes[i]); } else{ for(i = tm_que_p->head ;i < tm_que_p->tail;++i) node_print(i, &tm_que_p->nodes[i]); } ret = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(ret){ DBG_PRINT("pthread_mutex_unlock failed err = %d\n",ret); return; } #endif /* BZQ_DEBUG */ } /***************************************************************************** 函 数 名 : node_print 功能描述 : 打印单个元素 输入参数 : int num 元素ID struct node * message 元素指针 输出参数 : 无 返 回 值 : 无 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月29日 作 者 : liuhuahan 修改内容 : 新生成函数 *****************************************************************************/ static void node_print(int num,struct node * message) { printf("node[%d] = [%d][%d][%d]\n",num,message->buf[0],message->buf[1],message->buf[2]); }
#include "tm_queue.h" #include <stdlib.h> #include <unistd.h> #define MAXLEN 500 int buf1[MAXLEN]; int buf2[MAXLEN]; int _buf1[MAXLEN]; int _buf2[MAXLEN]; int err1 = 0; int err2 = 0; pthread_mutex_t buf1_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t buf2_mutex = PTHREAD_MUTEX_INITIALIZER; tm_queue_t que_test = {.max = 500,}; void * test_queue_1(void *p) { int i, tmp, id; struct node tmp_node; id = (int)p; tmp_node.buf[0] = id; for(i = 0; i < MAXLEN; i++) { // printf("test_queue_1 id = %d willing be writed\n",i); tmp = random(); tmp_node.buf[1] = tmp; tmp_node.buf[2] = i; pthread_mutex_lock(&buf1_mutex); if(queue_in(&que_test,&tmp_node)){ // printf("queue_in failed, i = %d\n",i); i--; err1++; pthread_mutex_unlock(&buf1_mutex); usleep(1); continue; }else{ buf1[i] = tmp; pthread_mutex_unlock(&buf1_mutex); //if(tmp == 0){ // printf("UNEXPECTED buf1[%d] = 0\n", i); //} //printf("WRITE BUF1[%d] = %d, tmp = %d\n",i, buf1[i], tmp); // printf("queue write success, i = %d rand = %d\n",i,tmp); } //sleep(1); } // printf("the test_queue_1 have quited, i = %d\n", i); return NULL; } void * test_queue_2(void *p) { int i, tmp, id; struct node tmp_node; id = (int)p; tmp_node.buf[0] = id; for(i = 0; i < MAXLEN; i++) { tmp = random(); tmp_node.buf[1] = tmp; tmp_node.buf[2] = i; pthread_mutex_lock(&buf2_mutex); if(queue_in(&que_test,&tmp_node)){ i--; err2++; pthread_mutex_unlock(&buf2_mutex); usleep(1); continue; }else{ buf2[i] = tmp; pthread_mutex_unlock(&buf2_mutex); //if(tmp == 0){ // printf("UNEXPECTED buf2[%d] = 0\n", i); //} //printf("WRITE BUF2[%d] = %d, tmp = %d\n",i, buf2[i], tmp); } //sleep(1); } return NULL; } int main(int argc, char *argv[]) { pthread_t id_1, id_2; int i = 0; //int m = 0; //int n = 0; int ret; struct node tmp_node; if(queue_init(&que_test)){ printf("queue_init failed \n"); return -1; } printf("queue-max = %d\n", que_test.max); pthread_create(&id_1,NULL,test_queue_1,(void *)1); pthread_create(&id_2,NULL,test_queue_2,(void *)2); // sleep(10); #if 1 for(i = 0; i < 2*MAXLEN; i++) { if(queue_out(&que_test, BLOCK, &tmp_node)){ printf("queue_out failed\n"); }else{ if(((volatile int )tmp_node.buf[0]) == 1){ pthread_mutex_lock(&buf1_mutex); if(buf1[tmp_node.buf[2]] != ((volatile int)tmp_node.buf[1])) printf("buf1[%d] = %d,but read is %d\n",tmp_node.buf[2], buf1[tmp_node.buf[2]], tmp_node.buf[1]); pthread_mutex_unlock(&buf1_mutex); //m++; } else if(tmp_node.buf[0] == 2){ pthread_mutex_lock(&buf2_mutex); //printf("node id = %d\n",tmp_node.buf[0]); if(buf2[tmp_node.buf[2]] != tmp_node.buf[1]) printf("buf2[%d] = %d,but read is %d\n",tmp_node.buf[2], buf2[tmp_node.buf[2]], tmp_node.buf[1]); pthread_mutex_unlock(&buf2_mutex); //n++; }else{ printf("there is err ID = %d\n",tmp_node.buf[0]); } //printf("out ID = %d, number = %d\n",tmp_node.buf[0],tmp_node.buf[2]); } } printf("err1 = %d, err2 = %d, read number = %d\n",err1,err2,i); #endif ret = get_queue_length(&que_test); if(ret){ printf("queue length is %d\n", ret); } queue_print(&que_test); pthread_join(id_1,NULL); pthread_join(id_2,NULL); return 0; }