码迷,mamicode.com
首页 > 其他好文 > 详细

生产者/消费者模式(一)

时间:2014-11-18 17:28:27      阅读:248      评论:0      收藏:0      [点我收藏+]

标签:des   style   blog   http   io   color   ar   os   使用   

  生产者消费者问题是一个多线程同步问题的经典案例,大多数多线程编程问题都是以生产者-消费者模式为基础,扩展衍生来的。在生产者消费者模式中,缓冲区起到了连接两个模块的作用:生产者把数据放入缓冲区,而消费者从缓冲区取出数据,如下图所示:

  bubuko.com,布布扣

  可以看出Buffer缓冲区作为一个中介,将生产者和消费者分开,使得两部分相对独立,生产者消费者不需要知道对方的实现逻辑,对其中一个的修改,不会影响另一个,从设计模式的角度看,降低了耦合度。而对于图中处在多线程环境中Buffer,需要共享给多个多个生产者和消费者,为了保证读写数据和操作的正确性与时序性,程序需要对Buffer结构进行同步处理。通常情况下,生产者-消费者模式中的广泛使用的Buffer缓冲区结构是阻塞队列。

  阻塞队列的特点为:“当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻 塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从 队列中移除一个或者多个元素,或者完全清空队列。” 阻塞队列的实现通常是对普通队列进行同步控制,封装起来。一个linux下的通用队列定义如下,代码所示的队列没有为满的情况,不设定队列元素总数的上限:

  1 template<class T>
  2 class BlockQueue
  3 {
  4 public:
  5     BlockQueue();
  6     ~BlockQueue();
  7 
  8     void Add(T *pData);
  9     T *Get(int timeout=0);
 10     int Count();
 11     void SetBlockFlag();
 12 private:
 13     typedef struct _Node
 14     {
 15         T * m_pData;
 16         struct _Node * m_pNext;
 17     } Node;
 18     Node * m_pFirst;
 19     Node * m_pLast;
 20     int m_nCount;
 21 
 22     bool m_bBlockFlag;           //是否阻塞
 23     CCond m_condQueue;
 24 };
 25 
 26 template<class T>
 27 BlockQueue<T>::BlockQueue(int nMetuxFlag)
 28 {
 29     m_pFirst = NULL;    //首元素
 30     m_pLast  = NULL;    //末元素
 31     m_nCount = 0;    //元素格式
 32 
 33     m_bBlockFlag = false;
 34 }
 35 
 36 template<class T>
 37 void BlockQueue<T>::SetBlockFlag()
 38 {
 39     m_bBlockFlag = true;
 40 }
 41 
 42 template<class T>
 43 BlockQueue<T>::~BlockQueue()
 44 {
 45     Node *pTmp = NULL;
 46     while (m_pFirst != NULL)
 47     {
 48         pTmp = m_pFirst;
 49         m_pFirst = m_pFirst->m_pNext;
 50         delete pTmp;
 51         pTmp = NULL;
 52     }
 53     m_pLast = NULL;
 54 }
 55 
 56 template<class T>
 57 void BlockQueue<T>::Add(T *pData)
 58 {
 59     (m_condQueue.GetMutex()).EnterMutex();
 60 
 61     Node *pTmp = new Node;
 62     if (pTmp == NULL)
 63     {
 64         (m_condQueue.GetMutex()).LeaveMutex();
 65         return;
 66     }
 67     pTmp->m_pData = pData;
 68     pTmp->m_pNext = NULL;
 69     if (m_nCount == 0)
 70     {
 71         m_pFirst = pTmp;
 72         m_pLast  = pTmp;
 73     }
 74     else
 75     {
 76         m_pLast->m_pNext = pTmp;
 77         m_pLast = pTmp;
 78     }
 79     m_nCount++;
 80 
 81     if (m_bBlockFlag)
 82     {
 83         m_condQueue.Signal();
 84     }
 85      
 86     (m_condQueue.GetMutex()).LeaveMutex();
 87 }
 88 
 89 template<class T>
 90 T *BlockQueue<T>::Get(int timeout)
 91 {
 92     (m_condQueue.GetMutex()).EnterMutex();
 93 
 94     while (m_nCount == 0)
 95     {
 96         if (!m_bBlockFlag)
 97         {
 98             (m_condQueue.GetMutex()).LeaveMutex();
 99             return NULL;
100         }
101         else
102         {
103             if (m_condQueue.WaitNoLock(timeout) == 1)
104             {
105                 (m_condQueue.GetMutex()).LeaveMutex();
106                 return NULL;
107             }            
108         }
109     }
110 
111     T * pTmpData = m_pFirst->m_pData;
112     Node *pTmp = m_pFirst;
113 
114     m_pFirst = m_pFirst->m_pNext;
115     delete pTmp;
116     pTmp = NULL;
117     m_nCount --;
118     if (m_nCount == 0)
119     {
120         m_pLast = NULL;
121     }
122     
123     (m_condQueue.GetMutex()).LeaveMutex();
124 
125     return pTmpData;
126 }
127 
128 template<class T>
129 int BlockQueue<T>::Count()
130 {
131     (m_condQueue.GetMutex()).EnterMutex();
132     int nCount = m_nCount;
133     (m_condQueue.GetMutex()).LeaveMutex();
134 
135     return nCount;
136 }

  队列中使用了Cond条件变量,实现多线程环境中队列的同步与阻塞:

  • 条件变量自己持有mutex,在向队列中Add/Get元素时,mutex将队列锁住,同一时刻只允许一个线程对队列进行操作;
  • 消费者从队列中获取数据时,如果设置了阻塞模式,那么当队列元素为空时,需要阻塞在条件变量上进行等待,其他线程调用Add函数,向队列中添加元素后,唤醒阻塞在条件变量上的线程。注意这里用到的等待条件是while(count == 0)而不是if(count == 0),因为在多核处理器环境中,Signal唤醒操作可能会激活多于一个线程(阻塞在条件变量上的线程),使得多个调用等待的线程返回。所以用while循环对condition多次判断,可以避免这种假唤醒。
  • 队列元素为包含T类型指针的Node类型指针,析构时只负责释放队列中的Node元素,而真正指向T类型数据的指针,需要调用者进行分配和释放,一般由生产者调用new分配,消费者使用后调用delete释放。这里需要特别注意,因为如果忘记释放,会造成内存泄露。

  这段代码是我在多线程环境下,较为简单的生产者-消费者模式中,通用的一个队列模型,目前队列在服务器中稳定运行,还未出现太大的性能问题,当然这与应用模块逻辑较为简单、交易量较小有很大关。仔细分析这段代码,几乎对每个函数的操作都进行了加锁保护,临界区从始到末,粒度较大;再者,队列中持有的T对象指针,均是由调用者动态分配和释放的,在交易量很大的情况下,频繁调用Add和Get函数,不仅会导致加锁解锁的性能消耗,也会使系统产生大量内存碎片。

  未完待续……

生产者/消费者模式(一)

标签:des   style   blog   http   io   color   ar   os   使用   

原文地址:http://www.cnblogs.com/Tour/p/4106085.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!