转自:http://blog.chinaunix.net/uid-28458801-id-4262445.html
操作系统:ubuntu10.04
前言:
在嵌入式开发中,只要是带操作系统的,在其上开发产品应用,基本都需要用到多线程。
为了提高效率,尽可能的提高并发率。因此,线程之间的通信就是问题的核心。
根据当前产品需要,使用 环形缓冲区 解决。
一,环形缓冲区的实现
1,cbuf.h
点击(此处)折叠或打开
-
#ifndef __CBUF_H__
-
#define __CBUF_H__
-
-
#ifdef __cplusplus
-
extern "C" {
-
#endif
-
-
/* Define to prevent recursive inclusion
-
-------------------------------------*/
-
#include "types.h"
-
#include "thread.h"
-
-
-
typedef struct _cbuf
-
{
-
int32_t size; /* 当前缓冲区中存放的数据的个数 */
-
int32_t next_in; /* 缓冲区中下一个保存数据的位置 */
-
int32_t next_out; /* 从缓冲区中取出下一个数据的位置 */
-
int32_t capacity; /* 这个缓冲区的可保存的数据的总个数 */
-
mutex_t mutex; /* Lock the structure */
-
cond_t not_full; /* Full -> not full condition */
-
cond_t not_empty; /* Empty -> not empty condition */
-
void *data[CBUF_MAX];/* 缓冲区中保存的数据指针 */
-
}cbuf_t;
-
-
-
/* 初始化环形缓冲区 */
-
extern int32_t cbuf_init(cbuf_t *c);
-
-
/* 销毁环形缓冲区 */
-
extern void cbuf_destroy(cbuf_t *c);
-
-
/* 压入数据 */
-
extern int32_t cbuf_enqueue(cbuf_t *c,void *data);
-
-
/* 取出数据 */
-
extern void* cbuf_dequeue(cbuf_t *c);
-
-
-
/* 判断缓冲区是否为满 */
-
extern bool cbuf_full(cbuf_t *c);
-
-
/* 判断缓冲区是否为空 */
-
extern bool cbuf_empty(cbuf_t *c);
-
-
/* 获取缓冲区可存放的元素的总个数 */
-
extern int32_t cbuf_capacity(cbuf_t *c);
-
-
-
#ifdef __cplusplus
-
}
-
#endif
-
-
#endif
-
/* END OF FILE
- ---------------------------------------------------------------*/
2,cbuf.c
点击(此处)折叠或打开
-
#include "cbuf.h"
-
-
-
-
/* 初始化环形缓冲区 */
-
int32_t cbuf_init(cbuf_t *c)
-
{
-
int32_t ret = OPER_OK;
-
-
if((ret = mutex_init(&c->mutex)) != OPER_OK)
-
{
-
#ifdef DEBUG_CBUF
-
debug("cbuf init fail ! mutex init fail !\n");
-
#endif
-
return ret;
-
}
-
-
if((ret = cond_init(&c->not_full)) != OPER_OK)
-
{
-
#ifdef DEBUG_CBUF
-
debug("cbuf init fail ! cond not full init fail !\n");
-
#endif
-
mutex_destroy(&c->mutex);
-
return ret;
-
}
-
-
if((ret = cond_init(&c->not_empty)) != OPER_OK)
-
{
-
#ifdef DEBUG_CBUF
-
debug("cbuf init fail ! cond not empty init fail !\n");
-
#endif
-
cond_destroy(&c->not_full);
-
mutex_destroy(&c->mutex);
-
return ret;
-
}
-
-
c->size = 0;
-
c->next_in = 0;
-
c->next_out = 0;
-
c->capacity = CBUF_MAX;
-
-
#ifdef DEBUG_CBUF
-
debug("cbuf init success !\n");
-
#endif
-
-
return ret;
-
}
-
-
-
/* 销毁环形缓冲区 */
-
void cbuf_destroy(cbuf_t *c)
-
{
-
cond_destroy(&c->not_empty);
-
cond_destroy(&c->not_full);
-
mutex_destroy(&c->mutex);
-
-
#ifdef DEBUG_CBUF
-
debug("cbuf destroy success \n");
-
#endif
-
}
-
-
-
-
/* 压入数据 */
-
int32_t cbuf_enqueue(cbuf_t *c,void *data)
-
{
-
int32_t ret = OPER_OK;
-
-
if((ret = mutex_lock(&c->mutex)) != OPER_OK) return ret;
-
-
/*
-
* Wait while the buffer is full.
-
*/
-
while(cbuf_full(c))
-
{
-
#ifdef DEBUG_CBUF
-
debug("cbuf is full !!!\n");
-
#endif
-
cond_wait(&c->not_full,&c->mutex);
-
}
-
-
c->data[c->next_in++] = data;
-
c->size++;
-
c->next_in %= c->capacity;
-
-
mutex_unlock(&c->mutex);
-
-
/*
-
* Let a waiting consumer know there is data.
-
*/
-
cond_signal(&c->not_empty);
-
-
#ifdef DEBUG_CBUF
-
// debug("cbuf enqueue success ,data : %p\n",data);
-
debug("enqueue\n");
-
#endif
-
-
return ret;
-
}
-
-
-
-
/* 取出数据 */
-
void* cbuf_dequeue(cbuf_t *c)
-
{
-
void *data = NULL;
-
int32_t ret = OPER_OK;
-
-
if((ret = mutex_lock(&c->mutex)) != OPER_OK) return NULL;
-
-
/*
-
* Wait while there is nothing in the buffer
-
*/
-
while(cbuf_empty(c))
-
{
-
#ifdef DEBUG_CBUF
-
debug("cbuf is empty!!!\n");
-
#endif
-
cond_wait(&c->not_empty,&c->mutex);
-
}
-
-
data = c->data[c->next_out++];
-
c->size--;
-
c->next_out %= c->capacity;
-
-
mutex_unlock(&c->mutex);
-
-
-
/*
-
* Let a waiting producer know there is room.
-
* 取出了一个元素,又有空间来保存接下来需要存储的元素
-
*/
-
cond_signal(&c->not_full);
-
-
#ifdef DEBUG_CBUF
-
// debug("cbuf dequeue success ,data : %p\n",data);
-
debug("dequeue\n");
-
#endif
-
-
return data;
-
}
-
-
-
/* 判断缓冲区是否为满 */
-
bool cbuf_full(cbuf_t *c)
-
{
-
return (c->size == c->capacity);
-
}
-
-
/* 判断缓冲区是否为空 */
-
bool cbuf_empty(cbuf_t *c)
-
{
-
return (c->size == 0);
-
}
-
-
/* 获取缓冲区可存放的元素的总个数 */
-
int32_t cbuf_capacity(cbuf_t *c)
-
{
-
return c->capacity;
- }
二,辅助文件
为了提高程序的移植性,对线程相关进行封装。
1,thread.h
点击(此处)折叠或打开
-
#ifndef __THREAD_H__
-
#define __THREAD_H__
-
-
#ifdef __cplusplus
-
extern "C" {
-
#endif
-
-
/* Define to prevent recursive inclusion
-
-------------------------------------*/
-
#include "types.h"
-
-
-
-
-
-
typedef struct _mutex
-
{
-
pthread_mutex_t mutex;
-
}mutex_t;
-
-
-
typedef struct _cond
-
{
-
pthread_cond_t cond;
-
}cond_t;
-
-
-
typedef pthread_t tid_t;
-
typedef pthread_attr_t attr_t;
-
typedef void* (* thread_fun_t)(void*);
-
-
-
typedef struct _thread
-
{
-
tid_t tid;
-
cond_t *cv;
-
int32_t state;
-
int32_t stack_size;
-
attr_t attr;
-
thread_fun_t fun;
-
}thread_t;
-
-
-
-
/* mutex */
-
extern int32_t mutex_init(mutex_t *m);
-
extern int32_t mutex_destroy(mutex_t *m);
-
extern int32_t mutex_lock(mutex_t *m);
-
extern int32_t mutex_unlock(mutex_t *m);
-
-
-
/* cond */
-
extern int32_t cond_init(cond_t *c);
-
extern int32_t cond_destroy(cond_t *c);
-
extern int32_t cond_signal(cond_t *c);
-
extern int32_t cond_wait(cond_t *c,mutex_t *m);
-
-
-
-
/* thread */
-
/* 线程的创建,其属性的设置等都封装在里面 */
-
extern int32_t thread_create(thread_t *t);
-
//extern int32_t thread_init(thread_t *t);
-
-
#define thread_join(t, p) pthread_join(t, p)
-
#define thread_self() pthread_self()
-
#define thread_sigmask pthread_sigmask
-
-
-
#ifdef __cplusplus
-
}
-
#endif
-
-
#endif
-
/* END OF FILE
- ---------------------------------------------------------------*/
2,thread.c
点击(此处)折叠或打开
-
#include "thread.h"
-
-
-
-
-
/* mutex */
-
int32_t mutex_init(mutex_t *m)
-
{
-
int32_t ret = OPER_OK;
-
-
if((ret = pthread_mutex_init(&m->mutex, NULL)) != 0)
-
ret = -THREAD_MUTEX_INIT_ERROR;
-
-
return ret;
-
}
-
-
-
int32_t mutex_destroy(mutex_t *m)
-
{
-
int32_t ret = OPER_OK;
-
-
if((ret = pthread_mutex_destroy(&m->mutex)) != 0)
-
ret = -MUTEX_DESTROY_ERROR;
-
-
return ret;
-
}
-
-
-
-
int32_t mutex_lock(mutex_t *m)
-
{
-
int32_t ret = OPER_OK;
-
-
if((ret = pthread_mutex_lock(&m->mutex)) != 0)
-
ret = -THREAD_MUTEX_LOCK_ERROR;
-
-
return ret;
-
}
-
-
-
-
int32_t mutex_unlock(mutex_t *m)
-
{
-
int32_t ret = OPER_OK;
-
-
if((ret = pthread_mutex_unlock(&m->mutex)) != 0)
-
ret = -THREAD_MUTEX_UNLOCK_ERROR;
-
-
return ret;
-
}
-
-
-
-
-
-
-
/* cond */
-
int32_t cond_init(cond_t *c)
-
{
-
int32_t ret = OPER_OK;
-
-
if((ret = pthread_cond_init(&c->cond, NULL)) != 0)
-
ret = -THREAD_COND_INIT_ERROR;
-
-
return ret;
-
}
-
-
-
-
int32_t cond_destroy(cond_t *c)
-
{
-
int32_t ret = OPER_OK;
-
-
if((ret = pthread_cond_destroy(&c->cond)) != 0)
-
ret = -COND_DESTROY_ERROR;
-
-
return ret;
-
}
-
-
-
-
int32_t cond_signal(cond_t *c)
-
{
-
int32_t ret = OPER_OK;
-
-
-
if((ret = pthread_cond_signal(&c->cond)) != 0)
-
ret = -COND_SIGNAL_ERROR;
-
-
return ret;
-
}
-
-
-
-
-
int32_t cond_wait(cond_t *c,mutex_t *m)
-
{
-
int32_t ret = OPER_OK;
-
-
if((ret = pthread_cond_wait(&c->cond, &m->mutex)) != 0)
-
ret = -COND_WAIT_ERROR;
-
-
return ret;
- }
三,测试
1,测试代码
点击(此处)折叠或打开
-
/*
-
* cbuf begin
-
*/
-
#define OVER (-1)
-
-
static cbuf_t cmd;
-
static int line_1[200];
-
static int line_2[200];
-
//static int temp = 0;
-
-
static bool line1_finish = false;
-
static bool line2_finish = false;
-
-
void* producer_1(void *data)
-
{
-
int32_t i = 0;
-
-
for(i = 0; i < 200; i++)
-
{
-
line_1[i] = i+1000;
-
cbuf_enqueue(&cmd, &line_1[i]);
-
-
if(0 == (i % 9)) sleep(1);
-
}
-
-
line1_finish = true;
-
-
return NULL;
-
}
-
-
void* producer_2(void *data)
-
{
-
int32_t i = 0;
-
-
for(i = 0; i < 200; i++)
-
{
-
line_2[i] = i+20000;
-
cbuf_enqueue(&cmd, &line_2[i]);
-
-
if(0 == (i % 9)) sleep(1);
-
}
-
-
line2_finish = true;
-
-
return NULL;
-
}
-
-
-
void* consumer(void *data)
-
{
-
int32_t *ptr = NULL;
-
-
while(1)
-
{
-
ptr = cbuf_dequeue(&cmd);
-
printf("%d\n",*ptr);
-
-
if(cbuf_empty(&cmd) && line2_finish && line1_finish)
-
{
-
printf("quit\n");
-
break;
-
}
-
}
-
-
return NULL;
-
}
-
-
-
void test_cbuf_oper(void)
-
{
-
pthread_t l_1;
-
pthread_t l_2;
-
pthread_t c;
-
-
cbuf_init(&cmd);
-
-
pthread_create(&l_1,NULL,producer_1,0);
-
pthread_create(&l_2,NULL,producer_2,0);
-
pthread_create(&c,NULL,consumer,0);
-
-
pthread_join(l_1,NULL);
-
pthread_join(l_2,NULL);
-
pthread_join(c,NULL);
-
-
cbuf_destroy(&cmd);
-
}
-
-
-
void test_cbuf(void)
-
{
-
test_cbuf_oper();
-
}
-
-
-
/*
-
* cbuf end
- */
2,测试结果
四,参考文件
1,《bareos-master》源码
2,《nginx》源码