环形无锁队列
Table of Contents
1 环形无锁队列的实现
数据结构定义:
template class LockFreeQueue
{
private:
ElementT *mArray;
int mCapacity;
int mFront;
int mTail;
}
由于出队操作是在队首进行,入队操作是在队尾进行,因此,我们可以尝试用mFront和mTail来实现多个线程之间的协调。这其中会用到CAS操作:
入队操作伪码:
……
do {
获取当前的mTail的值:curTailIndex;
计算新的mTail的值:newTailIndex = (newTailIndex + 1) % size;
} while(!CAS(mTail, curTailIndex, newTailIndex));
插入元素到curTailIndex;
其中的do-while循环实现的是一个忙式等待:线程试图获取当前的队列尾部空间的控制权;一旦获取成功,则向其中插入元素。
但是这样出队的时候就出现了问题:如何判断队首的位置里是否有相应元素呢?仅使用mFront来判断是不行的,这只能保证出队进程不会对同一个索引位置进行出队操作,而不能保证mFront的位置中一定有有效的元素。因此,为了保证出队队列与入队队列之间的协调,需要在LockFreeQueue中添加一个标志数组:
char *mFlagArray;
mFlagArray中的元素标记mArray中与之对应的元素位置是否有效。mFlagArray中的元素有4个取值:
- 0表示对应的mArray中的槽位为空;
- 1表示对应槽位已被申请,正在写入;
- 2表示对应槽位中为有效的元素,可以对其进行出对操作;
- 3则表示正在弹出操作。
修改后的无锁队列的代码如下:
template class LockFreeQueue
{
public:
LockFreeQueue(int s = 0)
{
mCapacity = s;
mFront = 0;
mTail = 0;
mSize = 0;
}
~LockFreeQueue() {}
/**
* 初始化queue。分配内存,设定size
* 非线程安全,需在单线程环境下使用
*/
bool initialize()
{
mFlagArray = new char[mCapacity];
if (NULL == mFlagArray)
return false;
memset(mFlagArray, 0, mCapacity);
mArray = reinterpret_cast(new char[mCapacity * sizeof(ElementT)]);
if (mArray == NULL)
return false;
memset(mArray, 0, mCapacity * sizeof(ElementT));
return true;
}
const int capacity(void) const
{
return mCapacity;
}
const int size(void) const
{
return mSize;
}
/**
* 入队函数,线程安全
*/
bool push(const ElementT & ele)
{
if (mSize >= mCapacity)
return false;
int curTailIndex = mTail;
char *cur_tail_flag_index = mFlagArray + curTailIndex;
//// 忙式等待
// while中的原子操作:如果当前tail的标记为已占用(1),则更新cur_tail_flag_index,继续循环;否则,将tail标记设为已经占用
while (!__sync_bool_compare_and_swap(cur_tail_flag_index, 0, 1))
{
curTailIndex = mTail;
cur_tail_flag_index = mFlagArray + curTailIndex;
}
//// 两个入队线程之间的同步
int update_tail_index = (curTailIndex + 1) % mCapacity;
// 如果已经被其他的线程更新过,则不需要更新;
// 否则,更新为 (curTailIndex+1) % mCapacity;
__sync_bool_compare_and_swap(&mTail, curTailIndex, update_tail_index);
// 申请到可用的存储空间
*(mArray + curTailIndex) = ele;
// 写入完毕
__sync_fetch_and_add(cur_tail_flag_index, 1);
// 更新size;入队线程与出队线程之间的协作
__sync_fetch_and_add(&mSize, 1);
return true;
}
/**
* 出队函数,线程安全
*/
bool pop(ElementT *ele)
{
if (mSize <= 0)
return false;
int cur_head_index = mFront;
char *cur_head_flag_index = mFlagArray + cur_head_index;
while (!__sync_bool_compare_and_swap(cur_head_flag_index, 2, 3))
{
cur_head_index = mFront;
cur_head_flag_index = mFlagArray + cur_head_index;
}
// 取模操作可以优化
int update_head_index = (cur_head_index + 1) % mCapacity;
__sync_bool_compare_and_swap(&mFront, cur_head_index, update_head_index);
*ele = *(mArray + cur_head_index);
// 弹出完毕
__sync_fetch_and_sub(cur_head_flag_index, 3);
// 更新size
__sync_fetch_and_sub(&mSize, 1);
return true;
}
private:
ElementT *mArray;
int mCapacity; // 环形数组的大小
int mSize; //队列中元素的个数
int mFront;
int mTail;
char *mFlagArray; // 标记位,标记某个位置的元素是否被占用
};
2 死锁及饥饿
LockFreeQueue实现了基本的多线程之间的协调,不会存在多个线程同时对同一个资源进行操作的情况,也就不会产生数据竞跑,这保证了对于这个队列而言,基本的访问操作(出队、入队)的执行都是安全的,其结果是可预期的。
在多线程环境下,LockFreeQueue会不会出现死锁的情况呢?死锁有四个必要条件:
- 对资源的访问是互斥的;
- 请求和保持请求;
- 资源不可剥夺;
- 循环等待。
在LockFreeQueue中,所有的线程都是对资源进行申请后再使用,一个线程若申请到了资源(这里的资源主要指环形队列中的内存槽位),就会立即使用,并且在使用完后释放掉该资源。不存在一个线程使用A资源的同时去申请B资源的情况,因此并不会出现死锁。
但LockFreeQueue可能出现饥饿状态。例如,对两个出队线程A、B,两者都循环进行出队操作。当队列中有元素时,A总能申请到这个元素并且执行到弹出操作,而B则只能在DeQueue函数的while循环中一直循环下去。
3 一些优化
对LockFreeQueue可以进行一些优化。比如:
- 对于环形数组大小,可以设定为2的整数倍,如1024。这样取模的操作即可以简化为与mCapacity-1的按位与操作。
- 忙式等待的时候可能会出现某个线程一直占用cpu的情况。此时可以使用sleep(0),让该线程让出CPU时间片,从就绪态转为挂起态。