标签:
上一篇记录了几种环形缓冲区的设计方法和环形缓冲区在生产者消费者模式下的使用(并发有锁),这一篇主要看看怎么实现并发无锁。
首先对环形缓冲区做下说明:
然后对涉及到的几个技术做下说明:
⑴CAS,Compare & Set,X86下对应的是CMPXCHG 汇编指令,原子操作,基本语义如下:
int CAS(address,old_value,new_value) { int ret = *addresss; if(ret == old_value){ *address = new_value; } return ret; }
为了方便调用者知道调用结果,通常被编译器改写成如下形式:
bool CAS(address,old_value,new_value) { int ret = *addresss; if(ret == old_value){ *address = new_value; return true; } return ret; }
⑵sched_yield(),调用sched_yield可以使当前线程让出cpu,内核会把当前线程插入到线程优先级所对应的就绪队列,然后调度新的线程占有cpu,Stack Overflow上的解释:
无锁队列的实现依托3个变量,in,out,max_out
in,被所有生产者共享,表示第一个可写入的位置,每次成功读取,值加1
out,被消费者共享,表示第一个课读取的位置,每次成功读取,值加1
max_out,生产者用于发布数据,[out,max_out]这一段表示消费者可消费的数据,max_out只能被生产者有序修改
out=max_out时,表示没有数据可消费
que_size,环形缓冲区大小,总是2的幂
in-out表示缓冲区中有多少数据
in-out = que_size,表示缓冲区满
1 template<typename ELEM_TYPE> 2 class queue{ 3 public: 4 queue(int size);//把size上调至2的幂保存到que_size中 5 bool enqueue(const ELEM_TYPE& in_data); 6 bool dequeue(ELEM_TYPE& out_data ); 7 ...//其他成员 8 private: 9 ELEM_TYPE *arry; 10 int in; 11 int out; 12 int max_out;//用于读写线程同步 13 int que_size; 14 ... //其他成员 15 } 16 17 template <typename ELEM_T> 18 bool queue<ELEM_T>::enqueue(const ELEM_T& in_data) 19 { 20 int cur_in ; 21 int cur_out; 22 23 do{ 24 cur_in = in; 25 cur_out = out; 26 27 if(cur_in - cur_out == que_size){ 28 return false; 29 } 30 31 }while(!CAS(&in,cur_in,cur_in+1))//如果cur_in==in依然成立,说明没有其他线程修改in,cur_in是可用的,并修改in;如果cur_in!=in,说明in值已被其他线程修改 32 33 arry[cur_in&(que_size-1)] = in_data; 34 35 while(!CAS(&max_out,cur_in,cur_in+1)){//发布数据 36 sched_yield();//发布数据失败,cur_in之前还有数据没有发布出去,让出cpu,让其他线程先执行 37 } 38 39 return true; 40 } 41 42 template <typename ELEM_T> 43 bool queue<ELEM_T,QUE_SIZE>::dequeue(ELEM_T& out_data) 44 { 45 int cur_out; 46 int cur_max_out; 47 48 do{ 49 cur_out = out; 50 cur_max_out = max_out; 51 52 if(cur_out == cur_max_out){ 53 return false; 54 } 55 56 out_data = arry[cur_out&(que_size-1)] ;//先尝试获取数据 57 58 }while(!CAS(&out,cur_out,cur_out+1)) 59 60 return true; 61 }
enqueue操作:
23行到30行:
先预取可插入位置cur_in
然后判断缓冲区是否满,满了就不插入了,直接退出,返回false,表示缓冲区满
然后使用CAS(&in,cur_in,cur_in+1)判断该位置是否被使用过,在单线程中,25行到31行,cur_in和in永远相等,因为这之间根本就没有修过cur_in或in的语句,但是,在多线程中,in是所有生产者共享的,可能被其他线程在31行修改,所以,执行到31行时,要么cur_in<in,要么cur_in=in,CAS操作刚好可以判断相等的这个关系,如果相等,说明没有别的线程修改过in,也就是则预取的插入位置cur_in有效;如果不等,忙等待。
35行到37行:
这是生产者发布自己生产的数据,发布的方式就是把max_out指向当前线程的插入位置,同样,max_out可能会被其他线程修改,所以可能导致CAS失败,这时就不做忙等待了,而是让出cpu,仔细想一下,CAS失败的原因就是现在这个线程执行的太快了,导致这个线程插入位置cur_in之前还有数据没有发布出去,所以这个线程先让出cpu,先让没发布好数据的生产者先发布数据。
其实这样做会存在问题:如果线程A发布数据时,发现在他之前,还有没发布好数据的线程,假设为B,那么线程B在32行到34行之间挂掉之后,B之后的所有线程会一直处于忙等待的状态。我现在还不知道这个问题要怎么解决。
dequeue操作类似
说明一下,下面18行、21行、31行、34行是伪码,看注释就知道其含义了。
1 queue<int> dataque(1024);//环形缓冲区 2 3 queue<int> asleep_producers(32);//缓冲区满时,生产者应该阻塞,该队列就是生产者等待队列 4 queue<int> asleep_consumers(32);//缓冲区空时,消费者应该阻塞,该队列就是消费者等待队列 5 6 7 void produce(){ 8 if(!dataque.enqueue(in_data)){//生产的数据入队列 9 10 /* 数据入队列失败,把当前数据丢掉,或者保存到磁盘中等等*/ 11 ... 12 ... 13 14 if(!asleep_producers.enqueue(this->gettid())){//把当前生产者线程加入等待队列 15 pthread_exit();//如果加入失败,说明已经达到了等待队列的最大值,那么线程主动退出 16 } 17 18 producer_poller.wait();//睡眠,等待信号的到来 19 } 20 21 wakeup_consumers(asleep_consumers);//生产了一个数据,应该从消费者等待队列中取一个线程消费数据,本质上可以发送一个数据可消费者的poller 22 } 23 24 25 void consume() 26 { 27 if(!dataque.dequeue(out_data)){ 28 29 asleep_consumers.enqueue(this->gettid());//没能成功取得数据,把自己加入到消费者等待队列中 30 31 consumer_poller.wait();//睡眠,等待信号到来 32 } 33 34 wakeup_producers(asleep_producers);//消费了一个数据,应该从生产者等待队列中取一个线程生产数据,本质上可以发送一个数据给生产者的poller 35 }
关于producer_poller、consumer_poller、wait、wakeup的实现可以用下面的方案:
每个线程内含一个socketpair和select,select用于监听socketpair的读端。
生产者线程起来的时候先去尝试一次生产(调用produce函数),失败之后使用select监听消费者发过来的可生产信号,select成功监听到事件后,再去生产(调用produce)
消费者线程起来的时候先去尝试一次消费(调用consume函数),失败之后使用select监听生产者发过来的可消费信号,select成功监听到事件后,再去消费(调用consume)
具体的实现方案可以参考:zeromq源码分析笔记之线程间收发命令(2)
参考资料
sched_yield(2) - Linux man page
标签:
原文地址:http://www.cnblogs.com/zengzy/p/5145899.html