标签:
上一篇文章里说到了一个极简易队列的实现,然而它对于并发存在一个问题,就是当多个或者说就是两个线程并发地访问队列,分别调用 push() 与 tryPop() 时,可能就会导致数据争用或者死锁。
以下是一种思路,通过分离数据允许并发。其大致思路是预先分配一个不储存任何数据的结点占位,当 push() 进一个新结点时,将 push 的值作为 data 存入当前数据为空的 tail,然后再在 tail 后面构建个新的用于占位的空结点。这样的设计使 push() 操作仅访问 tail,而 tryPop() 虽然还是两个都访问,但是对于 head 的访问极短暂。
template<typename T>
class Queue
{
private:
struct Node{
std::shared_ptr<T> data;
std::unique_ptr<T> next;
};
std::unique_ptr<T> head;
Node* tail;
public:
Queue():head(std::make_shared<Node>()), tail(head.get ())
{}
Queue(const Queue&) = delete;
Queue& operator=(const Queue&) = delete;
std::shared_ptr<T> tryPop()
{
if (tail == head.get ()){
return std::shared_ptr<T>();
}
auto const result = head->data();
auto const oldHead = std::move(head);
head = std::move(oldHead->next);
return result;
}
void push(T newValue)
{
auto const tailData = std::make_shared<T>(std::move(newValue));
tail->data = tailData;
auto const nextTail = std::make_unique<T>();
auto const newTail = nextTail.get();
tail->next = std::move(nextTail);
tail = newTail;
}
};
当然目前还是单线程的,但是这个版本离线程安全的队列又进了一步。
而根据这个就能再进一步地写出相对更高效的并发队列:
template<typename T>
class ThreadsafeQueue
{
private:
struct Node
{
std::shared_ptr<T> data;
std::unique_ptr<T> next;
};
std::unique_ptr<Node> head;
Node* tail;
mutable std::mutex headMutex;
mutable std::mutex tailMutex;
Node* getTail() const
{
std::lock_guard<std::mutex> lock(tailMutex);
return tail;
}
std::unique_ptr<Node> popHead()
{
std::lock_guard<std::mutex> lock(headMutex);
if (getTail () == head.get ()){
return nullptr;
}
auto const oldHead = std::move(head);
head = std::move(oldHead->next);
return oldHead;
}
public:
ThreadsafeQueue():
head(std::make_shared<Node>()), tail(head.get ())
{}
ThreadsafeQueue(const ThreadsafeQueue&) = delete;
ThreadsafeQueue& operator=(const ThreadsafeQueue&) = delete;
std::shared_ptr<T> tryPop()
{
auto const popedHead = popHead ();
return (popedHead == nullptr? std::shared_ptr<T>() : popedHead->data;
}
void push(T newValue)
{
auto newData = std::make_shared<T>(std::move(newValue));
auto nextTail = std::make_unique<Node>();
auto newTail = nextTail.get();
std::lock_guard<std::mutex> lock(tailMutex);
tail->data = newData;
tail->next = std::move(nextTail);
tail = newTail;
}
};
这里面最重要的是 tailMutex 因为若没有这个互斥元,则 push() 和 tryPop() 可能会在不同的线程中造成数据竞争或未定义行为。而因为在 push() 和 tryPop() 中都存在 tailMutex 所以即便是在不同的线程里调用这两个函数也能保证 getTail() 要不得到的是真正 tail 的本身或者它的前一个结点。
另外一个重点就是 headMutex,这个的重要性还是用反证法说明,假如我这么写:
std::unique_ptr<Node> popHead()
{
auto const oldTail = getTail(); // 1
std::lock_grand<std::mutex> lock(headMutex); // 2
if (head.get() == oldTail)
{
return nullptr;
}
auto oldHead = std::move(head);
head = std::move(oldHead->next);
return oldHead;
}
这里就可能出现一个问题,就是如果线程 A 在 1 这个地方取得 tail 之后暂停,接着其他的线程 BCDEFG 中把这个 ThreadsafeQueue pop 得连 A 中取得的 tail 都出去了,接下来 A 再执行的时候直接就傻逼了,或者说就算 A 中取得的 tail 没出去,head 也被改变过了,后面的操作也成 UB 了。
标签:
原文地址:http://www.cnblogs.com/wuOverflow/p/4836236.html