标签:des style blog http io color os ar 使用
template<typename Data> class concurrent_queue { private: std::queue<Data> the_queue; mutable boost::mutex the_mutex; public: void push(const Data& data) { boost::mutex::scoped_lock lock(the_mutex); the_queue.push(data); } bool empty() const { boost::mutex::scoped_lock lock(the_mutex); return the_queue.empty(); } Data& front() { boost::mutex::scoped_lock lock(the_mutex); return the_queue.front(); } Data const& front() const { boost::mutex::scoped_lock lock(the_mutex); return the_queue.front(); } void pop() { boost::mutex::scoped_lock lock(the_mutex); the_queue.pop(); } };
front
和pop会互相竞争。
但是对于一个消费者的系统就不是问题了。 假如队列是空的话多个线程有可能无事可做进入一个等待循环:while(some_queue.empty()) { boost::this_thread::sleep(boost::posix_time::milliseconds(50)); }
尽管sleep相较于忙等待避免了大量cpu资源的浪费,这个设计还是有些不足。首先线程必须每隔50ms(或者其他间隔)唤醒一次用来锁定mutex、检查队列、解锁mutex、强制上下文切换。 其次,睡眠的间隔时间相当于强加了一个限制给响应时间:数据被加到队列后到线程响应的响应时间。— 0ms到50ms都有可能,平均是25ms。
concurrent_queue里实现了个成员方法
:template<typename Data> class concurrent_queue { private: boost::condition_variable the_condition_variable; public: void wait_for_data() { boost::mutex::scoped_lock lock(the_mutex); while(the_queue.empty()) { the_condition_variable.wait(lock); } } void push(Data const& data) { boost::mutex::scoped_lock lock(the_mutex); bool const was_empty=the_queue.empty(); the_queue.push(data); if(was_empty) { the_condition_variable.notify_one(); } } // rest as before };
template<typename Data> class concurrent_queue { public: void push(Data const& data) { boost::mutex::scoped_lock lock(the_mutex); bool const was_empty=the_queue.empty(); the_queue.push(data); lock.unlock(); // unlock the mutex if(was_empty) { the_condition_variable.notify_one(); } } // rest as before };
wait_for_data
, front
以及pop
全都要锁mutex,消费者还是会快速交替调用锁操作。 吧wait和pop整合为一个操作可以减少加锁解锁操作:template<typename Data> class concurrent_queue { public: void wait_and_pop(Data& popped_value) { boost::mutex::scoped_lock lock(the_mutex); while(the_queue.empty()) { the_condition_variable.wait(lock); } popped_value=the_queue.front(); the_queue.pop(); } // rest as before };
boost::optional来避免神马问题,NND没太看懂是啥。
This does, of course, require that an instance Data
can be created by the calling code in order to receive the result, which is not always the case. In those cases, it might be worth using something like boost::optional
to avoid this requirement.wait_and_pop
不仅移掉了锁的间接开销还带来了额外的好处。 — 现在自动允许多个消费者了。std::vector
thread-safe — 你需要外部锁去做许多共同的工作,让内部锁变得浪费资源), the combined function safely handles concurrent calls.If multiple threads are popping entries from a full queue, then they just get serialized inside wait_and_pop
, and everything works fine. If the queue is empty, then each thread in turn will block waiting on the condition variable. When a new entry is added to the queue, one of the threads will wake and take the value, whilst the others keep blocking. If more than one thread wakes (e.g. with a spurious wake-up), or a new thread calls wait_and_pop
concurrently, thewhile
loop ensures that only one thread will do the pop
, and the others will wait.
Update: As commenter David notes below, using multiple consumers does have one problem: if there are several threads waiting when data is added, only one is woken. Though this is exactly what you want if only one item is pushed onto the queue, if multiple items are pushed then it would be desirable if more than one thread could wake. There are two solutions to this: use notify_all()
instead of notify_one()
when waking threads, or to callnotify_one()
whenever any data is added to the queue, even if the queue is not currently empty. If all threads are notified then the extra threads will see it as a spurious wake and resume waiting if there isn‘t enough data for them. If we notify with every push()
then only the right number of threads are woken. This is my preferred option: condition variable notify calls are pretty light-weight when there are no threads waiting. The revised code looks like this:
template<typename Data> class concurrent_queue { public: void push(Data const& data) { boost::mutex::scoped_lock lock(the_mutex); the_queue.push(data); lock.unlock(); the_condition_variable.notify_one(); } // rest as before };
There is one benefit that the separate functions give over the combined one — the ability to check for an empty queue, and do something else if the queue is empty. empty
itself still works in the presence of multiple consumers, but the value that it returns is transitory — there is no guarantee that it will still apply by the time a thread calls wait_and_pop
, whether it was true
or false
. For this reason it is worth adding an additional function: try_pop
, which returnstrue
if there was a value to retrieve (in which case it retrieves it), or false
to indicate that the queue was empty.
template<typename Data> class concurrent_queue { public: bool try_pop(Data& popped_value) { boost::mutex::scoped_lock lock(the_mutex); if(the_queue.empty()) { return false; } popped_value=the_queue.front(); the_queue.pop(); return true; } // rest as before };
front
and pop
方法,我们这个简单而又单纯的实现,现在已经变成了一个可用的多生产者多消费者队列。多生产者多消费者队列的最终方案:
template<typename Data> class concurrent_queue { private: std::queue<Data> the_queue; mutable boost::mutex the_mutex; boost::condition_variable the_condition_variable; public: void push(Data const& data) { boost::mutex::scoped_lock lock(the_mutex); the_queue.push(data); lock.unlock(); the_condition_variable.notify_one(); } bool empty() const { boost::mutex::scoped_lock lock(the_mutex); return the_queue.empty(); } bool try_pop(Data& popped_value) { boost::mutex::scoped_lock lock(the_mutex); if(the_queue.empty()) { return false; } popped_value=the_queue.front(); the_queue.pop(); return true; } void wait_and_pop(Data& popped_value) { boost::mutex::scoped_lock lock(the_mutex); while(the_queue.empty()) { the_condition_variable.wait(lock); } popped_value=the_queue.front(); the_queue.pop(); } };
Posted by Anthony Williams
使用Condition Variables 实现一个线程安全队列
标签:des style blog http io color os ar 使用
原文地址:http://www.cnblogs.com/kevin-/p/4064073.html