标签:
条件变量是线程之前同步的另一种机制。条件变量给多线程提供了一种会和的场所。当条件变量和互斥锁一起使用时,允许线程以无竞争的方式等待特定的条件发生。这样大大减少了锁竞争引起的线程调度和线程等待。
消息队列是服务器端开发过程中绕不开的一道坎,前面,我已经实现了一个基于互斥锁和三队列的消息队列,性能很不错。博客园中的其他园主也实现了很多基于环形队列和lock-free的消息队列,很不错,今天我们将要实现一个基于双缓冲、互斥锁和条件变量的消息队列;这个大概也参考了一下java的blockingqueue,在前面一个博客中有简单介绍!!基于三缓冲的队列,虽然最大限度上解除了线程竞争,但是在玩家很少,消息很小的时候,需要添加一些buff去填充数据,这大概也是其一个缺陷吧!
消息队列在服务器开发过程中主要用于什么对象呢?
1: 我想大概就是通信层和逻辑层之间的交互,通信层接受到的网络数据,验证封包之后,通过消息队列传递给逻辑层,逻辑层将处理结果封包再传递给通信层!
2:逻辑线程和数据库IO线程的分离;数据库IO线程负责对数据库的读写更新,逻辑层对数据库的操作,封装成消息去请求数据库IO线程,数据库IO线程处理完之后,再交回给逻辑层。
3:日志;处理模式与方式2 类似。不过日志大概是不需要返回的!
给出源代码:
BlockingQueue.h文件
/* * BlockingQueue.h * * Created on: Apr 19, 2013 * Author: archy_yu */ #ifndef BLOCKINGQUEUE_H_ #define BLOCKINGQUEUE_H_ #include <queue> #include <pthread.h> typedef void * CommonItem; class BlockingQueue { public : BlockingQueue(); virtual ~BlockingQueue(); int peek(CommonItem &item); int append(CommonItem item); private : pthread_mutex_t _mutex; pthread_cond_t _cond; std::queue<CommonItem> _read_queue; std::queue<CommonItem> _write_queue; }; #endif /* BLOCKINGQUEUE_H_ */ |
BlockingQueue.cpp 文件代码
/* * BlockingQueue.cpp * * Created on: Apr 19, 2013 * Author: archy_yu */ #include "BlockingQueue.h" BlockingQueue::BlockingQueue() { pthread_mutex_init(& this ->_mutex,NULL); pthread_cond_init(& this ->_cond,NULL); } BlockingQueue::~BlockingQueue() { pthread_mutex_destroy(& this ->_mutex); pthread_cond_destroy(& this ->_cond); } int BlockingQueue::peek(CommonItem &item) { if ( ! this ->_read_queue.empty() ) { item = this ->_read_queue.front(); this ->_read_queue.pop(); } else { pthread_mutex_lock(& this ->_mutex); while ( this ->_write_queue.empty()) { pthread_cond_wait(& this ->_cond,& this ->_mutex); } while (! this ->_write_queue.empty()) { this ->_read_queue.push( this ->_write_queue.front()); this ->_write_queue.pop(); } pthread_mutex_unlock(& this ->_mutex); } return 0; } int BlockingQueue::append(CommonItem item) { pthread_mutex_lock(& this ->_mutex); this ->_write_queue.push(item); pthread_cond_signal(& this ->_cond); pthread_mutex_unlock(& this ->_mutex); return 0; } |
测试代码:
BlockingQueue _queue; void * process( void * arg) { int i=0; while ( true ) { int *j = new int (); *j = i; _queue.append(( void *)j); i ++; } return NULL; } int main( int argc, char ** argv) { pthread_t pid; pthread_create(&pid,0,process,0); long long int start = get_os_system_time(); int i = 0; while ( true ) { int * j = NULL; _queue.peek(( void * &)j); i ++; if (j != NULL && (*j) == 100000) { long long int end = get_os_system_time(); printf ( "consume %d\n" ,end - start); break ; } } return 0; } |
欢迎拍砖!!!
标签:
原文地址:http://www.cnblogs.com/androidshouce/p/5589766.html