loki库之内存池SmallObj
介绍
loki库的内存池实现主要在文件smallobj中,顾名思义它的优势主要在小对象的分配与释放上,loki库是基于策略的方法实现的,简单的说就是把某个类通过模板参数传递给主类,比如某个对象的创建可以通过不同的创建策略进行创建,本文主要讲loki的大致实现。
smallobj层次
loki.smallobj主要分四层:
- 应用层smallobject,重载了operator new 和operator delete,内存通过底层获取
- 内存分配smallobjAllocator,这一层相当C语言的malloc和free,底层由数组Loki::FixedAllocator组成,根据需要的内存大小判断调用哪一个下标的Loki::FixedAllocator
- 固定内存分配器FixAllocator,这是组成上层的基础,初始化的时候需要设置固定分配的大小blocksize*n,n对应上一层中数组的下标
- 内存块管理chunk,可以简单的理解为这就是一片连续的内存
整体结构图如下:
下面我们自低向上分析smallobj源代码。
chunk
chunk是一块连续内存数组,初始化时候内存大小已经固定,每次分配出去的内存大小也是固定的,考虑到分配的效率,它通过数组下标的方式将连续的内存链接成链表,节点个数最多为256个(用char表示大小,原因参看C++ modern design),这样达到灵活分配的目的,内存示意图如下:
初始化函数如下:
bool Chunk::Init( std::size_t blockSize, unsigned char blocks ) { const std::size_t allocSize = blockSize * blocks; Data_ = static_cast< unsigned char * >( ::std::malloc( allocSize ) ); Reset( blockSize, blocks ); return true; }
比较重要的函数在Reset里面,它的主要功能是将chunk连续的内存划分为一个个固定大小的节点,通过下标指针的方式链接成链表,下标指针存储在每个节点内存的起始处,chunk有下面三个成员变量:
/// Pointer to array of allocated blocks. unsigned char * pData_; /// Index of first empty block. unsigned char firstAvailableBlock_; /// Count of empty blocks. unsigned char blocksAvailable_; //将pData_内存链接成链表 void Chunk::Reset(std::size_t blockSize, unsigned char blocks) { firstAvailableBlock_ = 0; blocksAvailable_ = blocks; unsigned char i = 0; //链表链接起来 for ( unsigned char * p = pData_; i != blocks; p += blockSize ) { *p = ++i; } }
上层调用chunk的分配内存函数Allocate的时候传递的大小是与init的参数blocksize是一致的,下面的函数allocate就是分配链表的头节点,并将当前链表长度减1
void* Chunk::Allocate(std::size_t blockSize) { if ( IsFilled() ) return NULL; assert((firstAvailableBlock_ * blockSize) / blockSize == firstAvailableBlock_); unsigned char * pResult = pData_ + (firstAvailableBlock_ * blockSize); firstAvailableBlock_ = *pResult; --blocksAvailable_; return pResult; }
FixedAllocator
FixedAllocator,顾名思义每次调用它分配的内存大小是确定的,分配的大小在初始化的时候确定,底层维护则一个chunk vector以及三个重要的指针,这三个指针或者为NULL或者指向chunks_中的元素,主要的目的是提高内存分配的效率:
typedef std::vector< Chunk > Chunks; /// Container of Chunks. Chunks chunks_; /// Pointer to Chunk used for last or next allocation. Chunk * allocChunk_; //用于分配 /// Pointer to Chunk used for last or next deallocation. Chunk * deallocChunk_;//用于回收 /// Pointer to the only empty Chunk if there is one, else NULL. Chunk * emptyChunk_;//指向一个内存都在自己手里的chunk
这里需要注意这三个指针的含义:
emptyChunk_
主要起到一个缓冲或者中介的作用,它指向的chunk是chunks_中唯一一个空的chunk(内存未分配出去,全部在手上),chunks_里面只能用一个空chunk,如果没用空的chunk那么emptyChunk_为NULL。emptyChunk重要其的是一个缓冲的作用,当allocChunk指向的chunk分配完之后那么久将emptyChunk给allocChunk用,如果deallocChunk_指向chunk把分配出去的内存都回收回来变为一个空chunk之后,如果
allocChunk_
指向chunks_中的元素,主要用于分配内存,如果为NULL或者指向的chunk已经分配完(掌管的内存都给别人用了),那么就将emptyChunk_指向的chunk给allocChunk_用,如果emptyChunk为NULL说明chunks_里面没有一个空的chunk(这里空chunk指的是未分配过内存到外面或者内存已经全部回收回来的chunk,即自己通过pData_掌管的内存都在自己手里),那么就会新建一个然后添加到chunks_中,并且将allocChunk_指向新添加的
deallocChunk_
指向chunk_中的元素,主要用于回收内存,当指向的chunk回收全部已经分配出去的内存之后就将这个chunk交给emptyChunk_管理,然后指向新的chunk
分配与回收过程代码分析
可以看出其实emptyChunk_起到桥梁的作用,它要保证chunks_中有且只有一个空的chunk,这样可以节约内存的使用,同时又能快速的分配,下面看一下FixedAllocator分配与回收内存代码.
/*********************************************************************** * 分配内存 * ***********************************************************************/ void * FixedAllocator::Allocate( void ) { if ( ( NULL == allocChunk_ ) || allocChunk_->IsFilled() ) //如果allocChunk_不可用的话 { if ( NULL != emptyChunk_ ) { allocChunk_ = emptyChunk_; //将emptyChunk_保存的emptyChunk_交给allocChunk_ emptyChunk_ = NULL; } else { // 在chunks_中找到一个合适的chunk给allocChunk否则就新建一个chunk并push到chunks中 for ( ChunkIter i( chunks_.begin() ); ; ++i ) { if ( chunks_.end() == i ) { if ( !MakeNewChunk() ) return NULL; break; } if ( !i->IsFilled() ) { allocChunk_ = &*i; break; } } } } else if ( allocChunk_ == emptyChunk_) // detach emptyChunk_ from allocChunk_, because after // calling allocChunk_->Allocate(blockSize_); the chunk // is no longer empty. emptyChunk_ = NULL; assert( allocChunk_ != NULL ); assert( !allocChunk_->IsFilled() ); //通过 void * place = allocChunk_->Allocate( blockSize_ ); // prove either emptyChunk_ points nowhere, or points to a truly empty Chunk. assert( ( NULL == emptyChunk_ ) || ( emptyChunk_->HasAvailable( numBlocks_ ) ) ); assert( CountEmptyChunks() < 2 ); return place; } /*********************************************************************** * 回收内存 * ***********************************************************************/ bool FixedAllocator::Deallocate( void * p, Chunk * hint ) { // VicinityFind的作用是从chunks中找到p所属的chunk,可以简单的遍历所用的chunks元素,但是loki库中考虑到效率,用了比较特殊的查找方法,看后面代码 Chunk * foundChunk = ( NULL == hint ) ? VicinityFind( p ) : hint; if ( NULL == foundChunk ) return false; assert( foundChunk->HasBlock( p, numBlocks_ * blockSize_ ) ); deallocChunk_ = foundChunk; DoDeallocate(p); assert( CountEmptyChunks() < 2 ); //确保chunks中不会多余一个空的chunk return true; } void FixedAllocator::DoDeallocate(void* p) { // call into the chunk, will adjust the inner list but won‘t release memory deallocChunk_->Deallocate(p, blockSize_); if ( deallocChunk_->HasAvailable( numBlocks_ ) ) // 判断deallocChunk指向的chunk是否已经回收完自己分配出去的内存,如果是就将这个chunk交给emptyChunk管理 { if ( NULL != emptyChunk_ )//如果emptyChunk已经指向一个空的chunk的话,需要将这个chunk释放掉 { // If last Chunk is empty, just change what deallocChunk_ // points to, and release the last. Otherwise, swap an empty // Chunk with the last, and then release it. // 这里为了效率,将vector的最后一个元素与emptyChunk交换,然后pop_back ,而不是直接erase Chunk * lastChunk = &chunks_.back(); if ( lastChunk == deallocChunk_ ) //最后一个元素,直接指向emptyChunk就可以 deallocChunk_ = emptyChunk_; else if ( lastChunk != emptyChunk_ ) std::swap( *emptyChunk_, *lastChunk ); assert( lastChunk->HasAvailable( numBlocks_ ) ); lastChunk->Release(); chunks_.pop_back(); //防止该release的chunk是allocChunk if ( ( allocChunk_ == lastChunk ) || allocChunk_->IsFilled() ) allocChunk_ = deallocChunk_; } emptyChunk_ = deallocChunk_; } // prove either emptyChunk_ points nowhere, or points to a truly empty Chunk. assert( ( NULL == emptyChunk_ ) || ( emptyChunk_->HasAvailable( numBlocks_ ) ) ); }
最后剩下一个问题就是,在dealloc的时候如何知道释放的内存所在的chunk,在dealloc函数中的第一行代码可以看出算法在VicintyFind中实现,大致思想是用deallocChunk指向的位置向两端查找,通过Chunk::HasBlock判断p指向的内存是否属于某个Chunk
Chunk * FixedAllocator::VicinityFind( void * p ) const { if ( chunks_.empty() ) return NULL; const std::size_t chunkLength = numBlocks_ * blockSize_; Chunk * lo = deallocChunk_; Chunk * hi = deallocChunk_ + 1; const Chunk * loBound = &chunks_.front(); const Chunk * hiBound = &chunks_.back() + 1; // Special case: deallocChunk_ is the last in the array if (hi == hiBound) hi = NULL; { if (lo) { if ( lo->HasBlock( p, chunkLength ) ) return lo; if ( lo == loBound ) { lo = NULL; if ( NULL == hi ) break; } else --lo; } if (hi) { if ( hi->HasBlock( p, chunkLength ) ) return hi; if ( ++hi == hiBound ) { hi = NULL; if ( NULL == lo ) break; } } } return NULL; }
SmallObjAllocator
SmallObjAllocator维护一个固定长度的FixedAllocator数组,初始化的时候确定,SmallObjAllocator维护着三个成员变量:
/// Pointer to array of fixed-size allocators. Loki::FixedAllocator * pool_; /// Largest object size supported by allocators. const std::size_t maxSmallObjectSize_; /// Size of alignment boundaries. const std::size_t objectAlignSize_;
通过构造函数能够了解数据结构的含义:
// SmallObjAllocator::SmallObjAllocator --------------------------------------- //底层由多个([maxobjectSize/objectAlignSize]个)pool组成,每个pool内存大小最大为pagesize, //pool的一块内存由numBlock个blockSize组成,其中blocksize=(i+1)*alignSize, numBlock=pagesize/blockSize //numBlock <= UCHAR_MAX,不同下标的pool分配的block不一样,下标i的pool分配的block大小为(i+1)*alignSize SmallObjAllocator::SmallObjAllocator( std::size_t pageSize, std::size_t maxObjectSize, std::size_t objectAlignSize ) : pool_( NULL ), maxSmallObjectSize_( maxObjectSize ), objectAlignSize_( objectAlignSize ) { assert( 0 != objectAlignSize ); const std::size_t allocCount = GetOffset( maxObjectSize, objectAlignSize );//GetOffset向上取整( numBytes + alignExtra ) / alignment 其中numBytes=maxObjectSize, alignExtra = objectAlignSize -1 pool_ = new FixedAllocator[ allocCount ]; for ( std::size_t i = 0; i < allocCount; ++i ) pool_[ i ].Initialize( ( i+1 ) * objectAlignSize, pageSize ); }
SmallObjAllocator中的维护的pool不同下标的FixedAllocator分配的内存大小不同,下标约大分配的内存越大,pagesize指定最底层的连续内存chunk的大小,通过SmallObjAllocator分配内存的时候,如果需要的内存大小超过了maxObjectSize则自动调用malloc或者new进行分配,如果小于maxObjectSize则通过计算从pool_中得到一个合适的FiexObjAllocator,它分配的内存大小刚好大于或者等于所需的内存大小:
void * SmallObjAllocator::Allocate( std::size_t numBytes, bool doThrow ) { //超过MaxObjSize,调用C语言的malloc或者C++的new if ( numBytes > GetMaxObjectSize() ) return DefaultAllocator( numBytes, doThrow ); if ( 0 == numBytes ) numBytes = 1; const std::size_t index = GetOffset( numBytes, GetAlignment() ) - 1; // 得到对应pool中的小标,对应的FiexAllocator固定分配的内存大小刚好满足numBytes const std::size_t allocCount = GetOffset( GetMaxObjectSize(), GetAlignment() ); (void) allocCount; assert( index < allocCount ); FixedAllocator & allocator = pool_[ index ]; assert( allocator.BlockSize() >= numBytes ); assert( allocator.BlockSize() < numBytes + GetAlignment() ); void * place = allocator.Allocate(); //内存不足的情况出现,因为底层的chunk可能有空的chunk,所以调用TrimExcessMemory,尝试释放pool中每个FixedAllocator下chunks可能存在的emptyChunk,尽可能的把用户态内存先归还给操作系统,然后在分配给用户态 if ( ( NULL == place ) && TrimExcessMemory() ) place = allocator.Allocate(); if ( ( NULL == place ) && doThrow ) //如果还不行的话,没救了,看看要不要抛出异常,否则返回NULL { #ifdef _MSC_VER throw std::bad_alloc( "could not allocate small object" ); #else // GCC did not like a literal string passed to std::bad_alloc. // so just throw the default-constructed exception. throw std::bad_alloc(); #endif } return place; }
上面的TrimExcessMemory不做分析,感兴趣可以自己看源代码.释放内存的时候比较麻烦,因为你不知道释放的内存多大,如果知道释放的内存大小,那么久能很快的找到对应的FixedAllocator在pool的位置,但是我们通过C语言的free和C++ 的delete是不需要传入大小的,因此需要通过特殊的算法查找释放的内存所属的FixedAllocator.
Loki的做法比较简单,直接遍历所有的pool中的FixedAllocator,查看该块内存是否属于自己的chunk,如果是就进行释放
void SmallObjAllocator::Deallocate( void * p ) { if ( NULL == p ) return; assert( NULL != pool_ ); FixedAllocator * pAllocator = NULL; const std::size_t allocCount = GetOffset( GetMaxObjectSize(), GetAlignment() ); Chunk * chunk = NULL; for ( std::size_t ii = 0; ii < allocCount; ++ii ) { chunk = pool_[ ii ].HasBlock( p ); //遍历pool,查看该块内存是否属于对应的FixedAllocator if ( NULL != chunk ) { pAllocator = &pool_[ ii ]; break; } } if ( NULL == pAllocator ) { DefaultDeallocator( p ); return; } assert( NULL != chunk ); const bool found = pAllocator->Deallocate( p, chunk ); (void) found; assert( found ); }
smallObject
上面的文字仅仅是分析SmallObj中内存的分配、回收过程以及内部的数据结构,前面说过loki库是基于策略的,也就是说通过实现一个模板框架,通过传入算法策略(这里是通过模板参数)我们能够改变框架内部的过程,通过不同策略的组合我们能够实现无数的功能.SmallObj还有两个外包类:
AllocatorSingleton
它继承自SmallObjAllocator,AllocatorSingleton有多个模板参数,主要的策略算法有用于管理自己单件模式的生命期策略,线程同步策略,类原型:
template < template <class, class> class ThreadingModel = LOKI_DEFAULT_THREADING_NO_OBJ_LEVEL, std::size_t chunkSize = LOKI_DEFAULT_CHUNK_SIZE, std::size_t maxSmallObjectSize = LOKI_MAX_SMALL_OBJECT_SIZE, std::size_t objectAlignSize = LOKI_DEFAULT_OBJECT_ALIGNMENT, template <class> class LifetimePolicy = LOKI_DEFAULT_SMALLOBJ_LIFETIME, class MutexPolicy = LOKI_DEFAULT_MUTEX > class AllocatorSingleton : public SmallObjAllocator { // ... // 单件模式 inline static AllocatorSingleton & Instance( void ) { return MyAllocatorSingleton::Instance(); } }
SmallObjectBase
该类提供线程锁操作,重载了new与delete,属于应用层提供给用户使用,内存的申请与释放都是通过单例AllocatorSingleton.allocate来操作
template < template <class, class> class ThreadingModel, std::size_t chunkSize, std::size_t maxSmallObjectSize, std::size_t objectAlignSize, template <class> class LifetimePolicy, class MutexPolicy > class SmallObjectBase{ //.... typedef AllocatorSingleton< ThreadingModel, chunkSize, maxSmallObjectSize, objectAlignSize, LifetimePolicy > ObjAllocatorSingleton; typedef typename ObjAllocatorSingleton::MyAllocatorSingleton MyAllocatorSingleton; // 重载new操作符 static void * operator new ( std::size_t size ) throw ( std::bad_alloc ) { typename MyThreadingModel::Lock lock; (void)lock; // get rid of warning return MyAllocatorSingleton::Instance().Allocate( size, true ); } //.... }
实际使用的SmallObject仅仅是继承自SmallObjectBase
结束语
至此分析Loki.SmallObj源代码暂时结束,最后发现完完整整的写下自己的思考与分析过程是比较重要的,但是要写清楚写明白,能让自己以后看懂,让别人看懂更难。写完下来发现自己的写作水平有待提高,因为一直没有写作的习惯,分析代码都是在源代码中写注释和笔记,后来发现这样的学习方式不太好,因为一些重要的要点不可能说以后忘记了再去打开source insight从一堆代码里面查找,这样的效率太低,不利于记忆与学习。通过写blog的方式可以将自己的思路与想法表达出来,利于记忆,以后能够回过头来复习,同时还能锻炼自己写作与表达能力,分享知识。还是一句话:坚持。
原文 http://www.cnblogs.com/UnGeek/p/4537114.html