标签:
看下这个场景,老张去厕所,发现门是锁着的,他就在门口等着里边人出来,此时小王也来了,他想了想,决定去楼上的厕所碰碰运气。
如果把门类比为一种竞争资源的话,老张就像mutex,而小王更像atomic,注意是像而已。
atomic跟传统的通过临界区加锁来避免竞争的多线程处理方式来说,它更像是一种状态机编程,根据当前的状态做出相应的逻辑。
而至于是小王还是老张谁先解决内急,无从得知,同样,mutex vs atomic 的性能对比,实测见真知。
atomic支持的数据类型有限,其他信息可以查看之前我的一篇blog:躲不开的多线程。
hashmap的数据存储一般是array,每个元素按照自己的index(下标)存放,数据结构天然决定了非常适合lock-free(atomic)。
但hashmap有两个‘讨厌‘的技术点:rehash和probe。
这里的rehash是说当‘空间不够时‘,需要重新申请一块大的内存,并对之前所有的元素重新hash计算,然后确认其新的index,最后
再一次插入的过程。rehash的同时其他操作不能并行,比如查找,试想一下,在高度并发情况下,百万级别元素的rehash操作是多么的糟糕,
当然这与选择atomic无关,并发情况下,最好不要支持rehash功能。
这里probe是说当多个key求数组下标时,冲突不可能完全避免,自然就有了解决冲突算法:probe,probe的效率相应的决定了查找的效率。
在高度并发情况下,开链比线性探查更适合,因为一个桶的冲突不会影响其他桶。当然开链没有线性探查的局部性好,代码也比线性探查复杂,
使用atomic来编写lock-free数据结构显示不是一件容易的事情,所以显然用后者更可控一些。
hashmap的关键指标可能是平均查询时间,也可能是较充分的内存利用。但在AtomicHashmap的设计里,我觉得并发执行效率是第一考虑因素,
所以‘锁‘一定发生在同一个元素操作中。
其他指标应该等同于一般的hashmap。
class Counters { private: AtomicHashMap<int64_t,int64_t> ahm; public: explicit Counters(size_t numCounters) : ahm(numCounters) {} void increment(int64_t obj_id) { auto ret = ahm.insert(make_pair(obj_id, 1)); if (!ret.first) { // obj_id already exists, increment NoBarrier_AtomicIncrement(&ret.first->second, 1); } } int64_t getValue(int64_t obj_id) { auto ret = ahm.find(obj_id); return ret != ahm.end() ? ret->second : 0; } // Serialize the counters without blocking increments string toString() { string ret = "{\n"; ret.reserve(ahm.size() * 32); for (const auto& e : ahm) { ret += folly::to<string>( " [", e.first, ":", NoBarrier_Load(&e.second), "]\n"); } ret += "}\n"; return ret; } };
数据为二层组织结构,一级数组存储的是二级数组指针,二级数组为元素的真实存储空间,元素是[key,value]的pair。
AtomicHashMap类主要负责对AtomicHashArray对象的创建和管理,以及接口的封装。
AtomicHashArray是hashmap的实现类:插入、查找、删除
当首次插入时
//容量为size和加载因子的除数 size_t capacity = size_t(maxSize / maxLoadFactor); size_t sz = sizeof(AtomicHashArray) + sizeof(value_type) * capacity; auto const mem = Allocator().allocate(sz); //将对象指针绑在mem上 new (mem) AtomicHashArray(capacity, c.emptyKey, c.lockedKey, c.erasedKey, c.maxLoadFactor, c.entryCountThreadCacheSize); //key全部初始化为empty FOR_EACH_RANGE(i, 0, map->capacity_) { cellKeyPtr(map->cells_[i])->store(map->kEmptyKey_, std::memory_order_relaxed); }
当插满时
//根据增长因子确认要分配的大小,要分配的大小<*2 size_t numCellsAllocated = (size_t) (primarySubMap->capacity_ * std::pow(1.0 + kGrowthFrac_, nextMapIdx - 1)); size_t newSize = (int) (numCellsAllocated * kGrowthFrac_); //初始化一个新submap Config config; config.emptyKey = primarySubMap->kEmptyKey_; config.lockedKey = primarySubMap->kLockedKey_; config.erasedKey = primarySubMap->kErasedKey_; config.maxLoadFactor = primarySubMap->maxLoadFactor(); config.entryCountThreadCacheSize = primarySubMap->getEntryCountThreadCacheSize(); subMaps_[nextMapIdx].store(SubMap::create(newSize, config).release(), std::memory_order_relaxed);
AtomicHashArray的插入算法
insertInternal(KeyT key_in, T&& value) { //hash + %size ,获取下标 size_t idx = keyToAnchorIdx(key_in); size_t numProbes = 0; for (;;) { value_type* cell = &cells_[idx]; //判断cell是否被使用 if (relaxedLoadKey(*cell) == kEmptyKey_) { if (isFull_.load(std::memory_order_acquire)) { //已经满了,就不能插入了 //返回capacity_,告诉AtomicHashMap类:我这个submap不能再插入了 return SimpleRetT(capacity_, false); } else { //还没有满 if (tryLockCell(cell)) { //tryLockCell其实是compare_exchange_strong,即当这个cell的key为empty时 //将empty状态修改为lock状态 new (&cell->second) ValueT(std::forward<T>(value)); //将cell的key字段标记为插入的key unlockCell(cell, key_in); // Sets the new key //已经插入的元素>=最大元素时,标记isFull_为true //最大元素maxEntries_ == 初始化时的参数size if (numEntries_.readFast() >= maxEntries_) { isFull_.store(NO_NEW_INSERTS, std::memory_order_relaxed); } //插入成功 return SimpleRetT(idx, true); } //注意:线程走到这里,说明之前没有成功trylockcell //线程继续往下走 } } if (kLockedKey_ == acquireLoadKey(*cell)) { //cell还在被锁定,说明其他线程还在插入这个cell //等待其他线程插入完成 //为什么要等待,因为其他线程插入可能会失败,也可能其他线程和本身线程插入的key一模一样, //需要特定的逻辑 FOLLY_SPIN_WAIT( kLockedKey_ == acquireLoadKey(*cell) ); } const KeyT thisKey = acquireLoadKey(*cell); if (EqualFcn()(thisKey, key_in)) { //比较两个key一样,那本次插入失败,因为之前已经有一个成功插入了 return SimpleRetT(idx, false); } else if (thisKey == kEmptyKey_ || thisKey == kLockedKey_) { //两个key不一样,状态又不是插入成功的状态,那么continue了 continue; } ++numProbes; if (UNLIKELY(numProbes >= capacity_)) { //所有的元素空间都遍历一遍了,还是没成功插入,只有失败了 return SimpleRetT(capacity_, false); } //线性探测 idx = probeNext(idx, numProbes); } }
当AtomicHashArray插入失败,AtomicHashmap会创建一个新的submap,继续插入,周而复始,直至失败。
AtomicHashArray的查找算法
findInternal(const KeyT key_in) { //查找失败,那么遍历查找 for (size_t idx = keyToAnchorIdx(key_in), numProbes = 0; ; idx = probeNext(idx, numProbes)) { //根据下标,获取key,如果一致,那么查找成功 const KeyT key = acquireLoadKey(cells_[idx]); if (LIKELY(EqualFcn()(key, key_in))) { return SimpleRetT(idx, true); } //key是个空,说明之前并没有插入过 if (UNLIKELY(key == kEmptyKey_)) { // if we hit an empty element, this key does not exist return SimpleRetT(capacity_, false); } ++numProbes; //最坏的情况,全部遍历了一遍,还是没查找,所以失败了 if (UNLIKELY(numProbes >= capacity_)) { // probed every cell...fail return SimpleRetT(capacity_, false); } } }
这里查找有三个关键点
当AtomicHashArray查找失败,AtomicHashmap会寻找下一个被初始化过的submap进行查找,周而复始,直至失败。
AtomicHashArray的删除算法
erase(KeyT key_in) { //遍历删除 for (size_t idx = keyToAnchorIdx(key_in), numProbes = 0; ; idx = probeNext(idx, numProbes)) { //获取下标的key value_type* cell = &cells_[idx]; KeyT currentKey = acquireLoadKey(*cell); //如果key是空的,说明不存在 //如果key正在被插入,那么也是不存在的 if (currentKey == kEmptyKey_ || currentKey == kLockedKey_) { // If we hit an empty (or locked) element, this key does not exist. This // is similar to how it‘s handled in find(). return 0; } if (EqualFcn()(currentKey, key_in)) { //找到了元素 KeyT expect = currentKey; if (cellKeyPtr(*cell)->compare_exchange_strong(expect, kErasedKey_)) { numErases_.fetch_add(1, std::memory_order_relaxed); //将key标记为erase //为什么没有释放内存,为什么不再把key标记为empty //因为查找的时候,是把value的指针给的用户,没有时机做那些 return 1; } //别的线程已经搞定了删除,那么本线程返回就可以了 //返回0?应该返回1 return 0; } //key不一样,说明之前冲突过 //那么遍历往下找吧 ++numProbes; if (UNLIKELY(numProbes >= capacity_)) { return 0; } } }
当AtomicHashArray删除失败,AtomicHashmap会寻找下一个被初始化过的submap进行删除,周而复始,直至失败
标签:
原文地址:http://www.cnblogs.com/gistao/p/4583734.html