标签:生产者 pthread win 启动 lin amp upd stop let
最近任务需要在MFC下做多线程生产者消费者模式的东西,我找了半天貌似MFC没有类似Java里面BlockingQueue那样的工具(也许是我手残没找到)。
网上好像也有很多大佬去实现这个。但是我没仔细去找,看了看一些资料就想着造个轮子玩玩。
实现如下:
主要是利用CCriticalSection保护内置的std::list,然后用CEvent来实现生产者消费者的同步。
参考资料:http://stackoverflow.com/questions/6683356/c-templated-producer-consumer-blockingqueue-unbounded-buffer-how-do-i-end-el
接口文件:IBlockingQueue.h
1 #pragma once 2 3 template <class T> 4 class IBlockingQueue 5 { 6 public: 7 virtual ~IBlockingQueue() {} // 为了让实现这个接口的类的析构函数能被正确调用,参考:http://blog.csdn.net/chaoguodong/article/details/6935524 8 virtual int size() = 0; 9 virtual T pop_front() = 0; 10 virtual T pop_back() = 0; 11 virtual void push_front(T val) = 0; 12 virtual void push_back(T val) = 0; 13 virtual bool empty() = 0; 14 virtual void stop() = 0; 15 virtual bool is_stop() = 0; 16 };
实现文件:BlockingQueue.h
#pragma once #include <afxmt.h> #include <list> #include "Bridge.h" // 参考资料:http://stackoverflow.com/questions/6683356/c-templated-producer-consumer-blockingqueue-unbounded-buffer-how-do-i-end-el class StoppingException { public: LPCTSTR msg = _T("Stopping"); StoppingException(); ~StoppingException(); }; template <class T> class CBlockingQueue : public IBlockingQueue<T> { CCriticalSection m_cs; // 保护 m_lst CEvent m_emptyEvent; // m_lst 为空 就 reset,m_lst 不为空 就 set 放行 std::list<T> m_lst; bool m_stop; public: CBlockingQueue(); CBlockingQueue(const CBlockingQueue<T>& obj); CBlockingQueue<T>& operator=(const CBlockingQueue<T>& obj); virtual int size(); virtual T pop_front(); virtual T pop_back(); virtual void push_front(T val); virtual void push_back(T val); virtual bool empty(); virtual void stop(); virtual bool is_stop(); }; template <class T> CBlockingQueue<T>::CBlockingQueue() : m_emptyEvent(FALSE, TRUE, NULL, NULL), m_stop(false) // 初始为RESET,不自动RESET {} template <class T> CBlockingQueue<T>::CBlockingQueue(const CBlockingQueue<T>& obj) : m_emptyEvent(FALSE, TRUE, NULL, NULL), m_stop(false) // 初始为RESET,不自动RESET { m_cs.Lock(); obj.m_cs.Lock(); m_lst = obj.m_lst; obj.m_cs.Unlock(); m_cs.Unlock(); } template <class T> CBlockingQueue<T>& CBlockingQueue<T>::operator = (const CBlockingQueue<T>& obj) { m_cs.Lock(); obj.m_cs.Lock(); m_lst = obj.m_lst; obj.m_cs.Unlock(); m_cs.Unlock(); return *this; } template <class T> int CBlockingQueue<T>::size() { m_cs.Lock(); int sz = 0; sz = m_lst.size(); m_cs.Unlock(); return sz; } template <class T> T CBlockingQueue<T>::pop_front() { T val; bool done = false; while (!done && !is_stop()) { ::WaitForSingleObject(m_emptyEvent.m_hObject, INFINITE); if (is_stop()) throw StoppingException(); m_cs.Lock(); if (m_lst.empty()) { } else { val = m_lst.front(); m_lst.pop_front(); if (m_lst.empty()) m_emptyEvent.ResetEvent(); done = true; } m_cs.Unlock(); } return val; } template <class T> T CBlockingQueue<T>::pop_back() { T val; bool done = false; while (!done && !is_stop()) { ::WaitForSingleObject(m_emptyEvent.m_hObject, INFINITE); if (is_stop()) throw StoppingException(); m_cs.Lock(); if (m_lst.empty()) { } else { val = m_lst.back(); m_lst.pop_back(); if (m_lst.empty()) m_emptyEvent.ResetEvent(); done = true; } m_cs.Unlock(); } return val; } template <class T> void CBlockingQueue<T>::push_front(T val) { m_cs.Lock(); m_lst.push_front(val); m_emptyEvent.SetEvent(); m_cs.Unlock(); } template <class T> void CBlockingQueue<T>::push_back(T val) { m_cs.Lock(); m_lst.push_back(val); m_emptyEvent.SetEvent(); m_cs.Unlock(); } template <class T> bool CBlockingQueue<T>::empty() { m_cs.Lock(); bool bEmpty = m_lst.empty(); m_cs.Unlock(); return bEmpty; } template <class T> bool CBlockingQueue<T>::is_stop() { m_cs.Lock(); bool bStop = m_stop; m_cs.Unlock(); return bStop; } template <class T> void CBlockingQueue<T>::stop() { m_cs.Lock(); m_stop = true; m_emptyEvent.SetEvent(); m_cs.Unlock(); }
实现文件:BlockingQueue.cpp
#include "BlockingQueue.h" StoppingException::StoppingException() {} StoppingException::~StoppingException() {}
测试文件:MyApp.h
#pragma once #include <afxwin.h> class CMyApp : public CWinApp { public: virtual BOOL InitInstance(); };
测试文件:MyApp.cpp
#include "MyApp.h" #include "BlockingQueue.h" using namespace std; class CMainWindow : public CFrameWnd { public: CMainWindow(); DECLARE_MESSAGE_MAP() afx_msg void OnClose(); }; CMainWindow::CMainWindow() { Create(NULL, _T("The Hello Application"), WS_OVERLAPPED | WS_CAPTION | WS_SYSMENU | WS_MINIMIZEBOX | WS_THICKFRAME, CRect(32, 64, 352, 304)); } CMyApp myApp; // 共享的数据‘ IBlockingQueue<int>* pBQ = new CBlockingQueue<int>(); #define NUM_PRODUCER 9 // 生产者个数 #define NUM_CONSUMER 5 // 消费者个数 CWinThread* pThreadProducer[NUM_PRODUCER]; // 生产者线程 CWinThread* pThreadConsumer[NUM_CONSUMER]; // 消费者线程 HANDLE hConsumer[NUM_CONSUMER]; // 消费者HANDLE // 生产 UINT Produce(LPVOID pParam) { for (int i = 0; i < 10; ++i) { TRACE(_T("Producer[%d]Producing: %d\n"), ::GetCurrentThreadId(), i); pBQ->push_back(i); TRACE(_T("Producer[%d]Producing: %d\n"), ::GetCurrentThreadId(), i); pBQ->push_front(i); TRACE(_T("Producer[%d]Sleeping...\n"), ::GetCurrentThreadId()); ::Sleep(1000); } TRACE(_T("Producer[%d]Exiting...\n"), ::GetCurrentThreadId()); return 0; } // 消费 UINT Consume(LPVOID pParam) { try { while (true) { TRACE(_T("Consumer[%d]Waiting...\n"), ::GetCurrentThreadId()); int val = pBQ->pop_front(); TRACE(_T("Consumer[%d]Consuming: %d\n"), ::GetCurrentThreadId(), val); val = pBQ->pop_back(); TRACE(_T("Consumer[%d]Consuming: %d\n"), ::GetCurrentThreadId(), val); } } catch (StoppingException& e) { TRACE(_T("Consumer[%d]%s...\n"), ::GetCurrentThreadId(), e.msg); } TRACE(_T("Consumer[%d]Exiting...\n"), ::GetCurrentThreadId()); return 0; } // 主线程(UI) BOOL CMyApp::InitInstance() { _CrtSetBreakAlloc(210); m_pMainWnd = new CMainWindow; m_pMainWnd->ShowWindow(m_nCmdShow); m_pMainWnd->UpdateWindow(); // 共享的初始数据 for (int i = 0; i < 100; ++i) { pBQ->push_back(i); } // 创建消费者线程 for (int i = 0; i < NUM_CONSUMER; ++i) { CWinThread* pThread = ::AfxBeginThread(Consume, NULL, THREAD_PRIORITY_NORMAL, 0, CREATE_SUSPENDED); pThread->m_bAutoDelete = FALSE; pThreadConsumer[i] = pThread; hConsumer[i] = pThread->m_hThread; } // 启动消费者线程 for (int i = 0; i < NUM_CONSUMER; ++i) { pThreadConsumer[i]->ResumeThread(); } // 创建生产者线程 for (int i = 0; i < NUM_PRODUCER; ++i) { CWinThread* pThread = ::AfxBeginThread(Produce, NULL, THREAD_PRIORITY_NORMAL, 0, CREATE_SUSPENDED); pThread->m_bAutoDelete = FALSE; pThreadProducer[i] = pThread; } // 启动生产者线程 for (int i = 0; i < NUM_PRODUCER; ++i) { pThreadProducer[i]->ResumeThread(); } return TRUE; } BEGIN_MESSAGE_MAP(CMainWindow, CFrameWnd) ON_WM_CLOSE() END_MESSAGE_MAP() // 退出主线程 void CMainWindow::OnClose() { pBQ->stop(); ::WaitForMultipleObjects(NUM_CONSUMER, hConsumer, TRUE, INFINITE); for (int i = 0; i < NUM_CONSUMER; ++i) { delete pThreadConsumer[i]; } for (int i = 0; i < NUM_PRODUCER; ++i) { delete pThreadProducer[i]; } delete pBQ; CFrameWnd::OnClose(); }
标签:生产者 pthread win 启动 lin amp upd stop let
原文地址:http://www.cnblogs.com/qrlozte/p/6675006.html