标签:
先问自己两个问题:
是否有很多请求需要重复性的进行处理?
而且每个请求是相互独立的?
你是否需要等待IO操作,或是文件操作?
如果你回答YES,那么你需要一个线程池来帮助你。
通常情况下,IO操作都会需要很长的一段时间才能完成。所以,在一个单线程的应用程序中,在IO操作期间,系统资源都会进行等待。如果使用多线程,效率就会大大的提高。所以我们需要线程池更高效的完成多线程操作。
一个应用程序其实应该避免频繁的创建和销毁线程。
内存池就是可以帮助管理线程,避免频繁的创建和销毁,更加高效。
线程池会创建一些特定的线程,并且等待被请求。一旦有请求到达了线程池,一个线程就会被激活并且开始执行。当这个请求完成的时候,这个线程会退回到等待请求的状态。当用户请求destroy的时候,线程池中所有的线程都会退出。
设计图如下:
ThreadPool
这个类将会创建、管理、析构线程池。所以,你的应用程序需要创建ThreadPool类的一个对象。
AbstractRequest
这表示线程池的一个请求。客户端应用程序需要继承这个类,并且根据自己的情况实现线程中要执行的函数。
Logger
记录一些日志,错误等信息。
.h
#ifndef _THREAD_POOL_MGR_H_
#define _THREAD_POOL_MGR_H_
#include <windows.h>
#include <list>
namespace TP
{
/**
* Logger - This is base class for the error logger class and it is polymorphic.
* The users of the ThreadPool create a class which derived from this
* and override LogError() and LogInfo() for their own error logging mechanism.
* The default error logging will be in output window.
*/
class Logger
{
public:
// Constructor
Logger(){};
// Destructor
virtual ~Logger(){};
// Log error description.
void LogError( const long lActiveReq_i, const std::wstring& wstrError_i );
// Log information.
void LogInfo( const long lActiveReq_i, const std::wstring& wstrInfo_i );
// Override this function to log errors. Default log will be in output window.
virtual void LogError( const std::wstring& wstrError_i );
// Override this function to log informations. Default log will be in output window.
virtual void LogInfo( const std::wstring& wstrInfo_i );
private:
// Log thread ID, Active thread count and last error.
void PrepareLog( const long lActiveReq_i, std::wstring& wstrLog_io );
};
/**
* SyncObject - The class is a wrapper of Critical section object to provide
* synchronization for thread pool.
*/
class SyncObject
{
public:
// Constructor
SyncObject()
{
::InitializeCriticalSection( &m_stCriticalSection );
}
// Destructor
~SyncObject()
{
::DeleteCriticalSection( &m_stCriticalSection );
}
// Lock critical section.
bool Lock()
{
::EnterCriticalSection( &m_stCriticalSection );
return true;
}
// Unlock critical section.
bool Unlock()
{
::LeaveCriticalSection( &m_stCriticalSection );
return true;
}
private:
SyncObject( const SyncObject& );
SyncObject& operator = ( const SyncObject& );
private:
// Critical section object.
CRITICAL_SECTION m_stCriticalSection;
};
/**
* AutoLock - This class own synchronization object during construction and
* release the ownership during the destruction.
*/
class AutoLock
{
public:
/**
* Parameterized constructor
*
* @param LockObj_i - Synchronization object.
* @return Nil
* @exception Nil
* @see Nil
* @since 1.0
*/
AutoLock( SyncObject& LockObj_i ) : m_pSyncObject( &LockObj_i )
{
if( NULL != m_pSyncObject )
{
m_pSyncObject->Lock();
}
}
/**
* Destructor.
*
* @param Nil
* @return Nil
* @exception Nil
* @see Nil
* @since 1.0
*/
~AutoLock()
{
if( NULL != m_pSyncObject )
{
m_pSyncObject->Unlock();
m_pSyncObject = NULL;
}
}
private:
SyncObject* m_pSyncObject;
};
/**
* AbstractRequest - This is abstract base class for the request to be processed in thread pool.
* and it is polymorphic. The users of the ThreadPool must create a class
* which derived from this and override Execute() function.
*/
class AbstractRequest
{
public:
// Constructor
AbstractRequest() : m_bAborted( false ), m_usRequestID( 0u ){}
// Destructor
virtual ~AbstractRequest(){}
// Thread procedure to be override in derived class. This function should return if request aborted.
// Abort request can check by calling IsAborted() function during time consuming operation.
virtual long Execute() = 0;
// Set request ID.
void SetRequestID( unsigned short uRequestID_i )
{
AutoLock LockRequest( m_LockWorkerThread );
m_usRequestID = uRequestID_i;
}
// Get request ID.
unsigned short GetRequestID()
{
AutoLock LockRequest( m_LockWorkerThread );
return m_usRequestID;
}
// Abort the processing of the request.
void Abort()
{
AutoLock LockRequest( m_LockWorkerThread );
m_bAborted = true;
}
// Clear abort flag for re-posting the same request.
void ClearAbortFlag()
{
AutoLock LockRequest( m_LockWorkerThread );
m_bAborted = false;
}
protected:
// Check for the abort request
bool IsAborted()
{
AutoLock LockRequest( m_LockWorkerThread );
return m_bAborted;
}
// Prepare error or information log.
void PrepareLog( std::wstring& wstrLog_io );
protected:
// Synchronization object for resource locking.
SyncObject m_LockWorkerThread;
private:
// Abort flag.
bool m_bAborted;
// Request Identifier.
unsigned short m_usRequestID;
};
/**
* AutoCounter - Increment and decrement counter
*/
class AutoCounter
{
public:
// Constructor.
AutoCounter( unsigned short& usCount_io,
SyncObject& Lock_io ) :
m_usCount( usCount_io ), m_LockThread( Lock_io )
{
AutoLock Lock( m_LockThread );
m_usCount++;
}
// Destructor.
~AutoCounter()
{
AutoLock Lock( m_LockThread );
m_usCount--;
}
private:
// Counter variable.
unsigned short& m_usCount;
// Synchronization object for resource locking.
SyncObject& m_LockThread;
};
typedef std::list<AbstractRequest*> REQUEST_QUEUE;
/**
* ThreadPool - This class create and destroy thread pool based on the request.
* The requested to be processed can be post to pool as derived object of
* AbstractRequest. Also a class can be derive from Logger to error and
* information logging.
*/
class ThreadPool
{
public:
// Constructor.
ThreadPool();
// Destructor.
~ThreadPool();
// Create thread pool with specified number of threads.
bool Create( const unsigned short usThreadCount_i, Logger* pLogger_io = NULL );
// Destroy the existing thread pool.
bool Destroy();
// Post request to thread pool for processing.
bool PostRequest( AbstractRequest* pRequest_io );
private:
AbstractRequest* PopRequest( REQUEST_QUEUE& RequestQueue_io );
bool AddThreads();
bool NotifyThread();
bool ProcessRequests();
bool WaitForRequest();
bool DestroyPool();
bool IsDestroyed();
void SetDestroyFlag( const bool bFlag_i );
void CancelRequests();
void LogError( const std::wstring& wstrError_i );
void LogInfo( const std::wstring& wstrInfo_i );
static UINT WINAPI ThreadProc( LPVOID pParam_i );
private:
ThreadPool( const ThreadPool& );
ThreadPool& operator = ( const ThreadPool& );
private:
// Used for thread pool destruction.
bool m_bDestroyed;
// Hold thread count in the pool.
unsigned short m_usThreadCount;
// Released semaphore count.
unsigned short m_usSemaphoreCount;
// Active thread count.
unsigned short m_lActiveThread;
// Active thread count.
unsigned short m_usPendingReqCount;
// Manage active thread count in pool.
HANDLE m_hSemaphore;
// Hold thread handles.
HANDLE* m_phThreadList;
// Request queue.
REQUEST_QUEUE m_RequestQueue;
// Synchronization object for resource locking.
SyncObject m_LockWorkerThread;
// User defined error and information logger class.
Logger* m_pLogger;
// Default error and information logger.
Logger m_Logger;
};
} // namespace TP
#endif // #ifndef _THREAD_POOL_MGR_H_
.cpp
/**
* @author : Suresh
*/
#include "ThreadPool.h"
#include <sstream>
#include <iomanip>
namespace TP
{
/**
* Log error description.
*
* @param lActiveReq_i - Count of active requests.
* @param wstrError_i - Error message.
*/
void Logger::LogError( const long lActiveReq_i, const std::wstring& wstrError_i )
{
std::wstring wstrLog( wstrError_i );
PrepareLog( lActiveReq_i, wstrLog );
LogError( wstrLog );
}
/**
* Log information.
*
* @param lActiveReq_i - Count of active requests.
* @param wstrInfo_i - Information message.
*/
void Logger::LogInfo( const long lActiveReq_i, const std::wstring& wstrInfo_i )
{
std::wstring wstrLog( wstrInfo_i );
PrepareLog( lActiveReq_i, wstrLog );
LogInfo( wstrLog );
}
/**
* Override this function to log errors. Default log will be in output window.
*
* @param wstrError_i - Error description
*/
void Logger::LogError( const std::wstring& wstrError_i )
{
OutputDebugString( wstrError_i.c_str());
}
/**
* Override this function to log informations. Default log will be in output window.
*
* @param wstrInfo_i - Information description.
*/
void Logger::LogInfo( const std::wstring& wstrInfo_i )
{
OutputDebugString( wstrInfo_i.c_str());
}
/**
* Log thread ID, Active thread count and last error.
*
* @param lActiveReq_i - Active thread count.
* @param wstrLog_io - Error or information description
*/
void Logger::PrepareLog( const long lActiveReq_i, std::wstring& wstrLog_io )
{
std::wstringstream wstrmLog;
wstrmLog << L"##TP## [TID=" << std::setfill( L‘0‘ ) << std::setw(8) << ::GetCurrentThreadId()
<< L"] [ACTIVE REQUEST=" << std::setw(4) << lActiveReq_i
<< L"] [LAST ERROR=" << std::setw(4) << ::GetLastError()
<< L"] " << wstrLog_io.c_str() << + L"]";
wstrLog_io = wstrmLog.str();
}
/**
* Prepare error or information log.
*
* @param wstrLog_io - Log information
*/
void AbstractRequest::PrepareLog( std::wstring& wstrLog_io )
{
std::wstringstream wstrmLog;
wstrmLog << std::setfill( L‘0‘ );
wstrmLog << L"##RQ## [RID=" << std::setw(8) << GetRequestID()
<< L"] [Desc=" << wstrLog_io.c_str() << + L"]";
wstrLog_io = wstrmLog.str();
}
/**
* Constructor
*/
ThreadPool::ThreadPool() : m_bDestroyed( false ),
m_usThreadCount( 0u ),
m_usSemaphoreCount( 0u ),
m_lActiveThread( 0u ),
m_usPendingReqCount( 0u ),
m_hSemaphore( NULL ),
m_phThreadList( NULL ),
m_pLogger( &m_Logger )
{
}
/**
* Destructor
*/
ThreadPool::~ThreadPool()
{
if( NULL != m_phThreadList )
{
if( !Destroy())
{
LogError( L"Destroy() failed" );
}
}
}
/**
* Create thread pool with specified number of threads.
*
* @param usThreadCount_i - Thread count.
* @param pLogger_i - Logger instance to log errors and informations
*/
bool ThreadPool::Create( const unsigned short usThreadCount_i, Logger* pLogger_i )
{
try
{
// Assign logger object. If user not provided then use existing and
// error will be logged in output window.
m_pLogger = ( NULL != pLogger_i ) ? pLogger_i : &m_Logger;
// Check thread pool is initialized already.
if( NULL != m_phThreadList )
{
LogError( L"ThreadPool already created" );
return false;
}
// Validate thread count.
if( 0 == usThreadCount_i )
{
LogError( L"Minimum allowed thread count is one" );
return false;
}
if( usThreadCount_i > 64 )
{
LogError( L"Maximum allowed thread count is 64" );
return false;
}
LogInfo( L"Thread pool creation requested" );
// Initialize values.
m_lActiveThread = 0u;
m_usSemaphoreCount = 0u;
m_usPendingReqCount = 0u;
m_usThreadCount = usThreadCount_i;
// Create semaphore for thread count management.
m_hSemaphore = CreateSemaphore( NULL, 0, m_usThreadCount, NULL );
if( NULL == m_hSemaphore )
{
LogError( L"Semaphore creation failed" );
m_usThreadCount = 0u;
return false;
}
// Create worker threads and make pool active
if( !AddThreads())
{
LogError( L"Threads creation failed" );
Destroy();
return false;
}
SetDestroyFlag( false );
LogInfo( L"Thread pool created successfully" );
return true;
}
catch( ... )
{
LogError( L"Exception occurred in Create()" );
return false;
}
}
/**
* Destroy thread pool.
*/
bool ThreadPool::Destroy()
{
try
{
// Check whether thread pool already destroyed.
if( NULL == m_phThreadList )
{
LogError( L"ThreadPool is already destroyed or not created yet" );
return false;
}
// Cancel all requests.
CancelRequests();
// Set destroyed flag to true for exiting threads.
SetDestroyFlag( true );
// Release remaining semaphores to exit thread.
{
AutoLock LockThread( m_LockWorkerThread );
if( m_lActiveThread < m_usThreadCount )
{
if( NULL == ReleaseSemaphore( m_hSemaphore, m_usThreadCount - m_lActiveThread, NULL ))
{
LogError( L"Failed to release Semaphore" );
return false;
}
}
}
// Wait for destroy completion and clean the thread pool.
if( !DestroyPool())
{
LogError( L"Thread pool destruction failed" );
return false;
}
LogInfo( L"Thread Pool destroyed successfully" );
return true;
}
catch( ... )
{
LogError( L"Exception occurred in Destroy()" );
return false;
}
}
/**
* Post request to thread pool for processing
*
* @param pRequest_io - Request to be processed.
*/
bool ThreadPool::PostRequest( AbstractRequest* pRequest_io )
{
try
{
AutoLock LockThread( m_LockWorkerThread );
if( NULL == m_phThreadList )
{
LogError( L"ThreadPool is destroyed or not created yet" );
return false;
}
m_RequestQueue.push_back( pRequest_io );
if( m_usSemaphoreCount < m_usThreadCount )
{
// Thread available to process, so notify thread.
if( !NotifyThread())
{
LogError( L"NotifyThread failed" );
// Request notification failed. Try after some time.
m_usPendingReqCount++;
return false;
}
}
else
{
// Thread not available to process.
m_usPendingReqCount++;
}
return true;
}
catch( ... )
{
LogError( L"Exception occurred in PostRequest()" );
return false;
}
}
/**
* Pop request from queue for processing.
*
* @param RequestQueue_io - Request queue.
* @return AbstractRequest* - Request pointer.
*/
AbstractRequest* ThreadPool::PopRequest( REQUEST_QUEUE& RequestQueue_io )
{
AutoLock LockThread( m_LockWorkerThread );
if( !RequestQueue_io.empty())
{
AbstractRequest* pRequest = RequestQueue_io.front();
RequestQueue_io.remove( pRequest );
return pRequest;
}
return 0;
}
/**
* Create specified number of threads. Initial status of threads will be waiting.
*/
bool ThreadPool::AddThreads()
{
try
{
// Allocate memory for all threads.
m_phThreadList = new HANDLE[m_usThreadCount];
if( NULL == m_phThreadList )
{
LogError( L"Memory allocation for thread handle failed" );
return false;
}
// Create worker threads.
DWORD dwThreadID = 0;
for( unsigned short usIdx = 0u; usIdx < m_usThreadCount; usIdx++ )
{
// Create worker thread
m_phThreadList[usIdx] = CreateThread( 0, 0,
reinterpret_cast<LPTHREAD_START_ROUTINE>( ThreadPool::ThreadProc ),
this, 0, &dwThreadID );
if( NULL == m_phThreadList[usIdx] )
{
LogError( L"CreateThread failed" );
return false;
}
}
return true;
}
catch( ... )
{
LogError( L"Exception occurred in AddThreads()" );
return false;
}
}
/**
* Add request to queue and release semaphore by one.
*/
bool ThreadPool::NotifyThread()
{
try
{
AutoLock LockThread( m_LockWorkerThread );
// Release semaphore by one to process this request.
if( NULL == ReleaseSemaphore( m_hSemaphore, 1, NULL ))
{
LogError( L"ReleaseSemaphore failed" );
return false;
}
m_usSemaphoreCount++;
return true;
}
catch( ... )
{
LogError( L"Exception occurred in NotifyThread()" );
m_RequestQueue.pop_back();
return false;
}
}
/**
* Process request in queue.
*/
bool ThreadPool::ProcessRequests()
{
bool bContinue( true );
do
{
try
{
LogInfo( L"Thread WAITING" );
// Wait for request.
if( !WaitForRequest())
{
LogError( L"WaitForRequest() failed" );
continue;
}
// Thread counter.
AutoCounter Counter( m_lActiveThread, m_LockWorkerThread );
LogInfo( L"Thread ACTIVE" );
// Check thread pool destroy request.
if( IsDestroyed())
{
LogInfo( L"Thread EXITING" );
break;
}
// Get request from request queue.
AbstractRequest* pRequest = PopRequest( m_RequestQueue );
if( NULL == pRequest )
{
LogError( L"PopRequest failed" );
continue;
}
// Execute the request.
long lReturn = pRequest->Execute();
if( NULL != lReturn )
{
LogError( L"Request execution failed" );
continue;
}
// Check thread pool destroy request.
if( IsDestroyed())
{
LogInfo( L"Thread EXITING" );
break;
}
AutoLock LockThread( m_LockWorkerThread );
// Inform thread if any pending request.
if( m_usPendingReqCount > 0 )
{
if( m_usSemaphoreCount < m_usThreadCount )
{
// Thread available to process, so notify thread.
if( !NotifyThread())
{
LogError( L"NotifyThread failed" );
continue;
}
m_usPendingReqCount--;
}
}
}
catch( ... )
{
LogError( L"Exception occurred in ProcessRequests()" );
continue;
}
}
while( bContinue );
return true;
}
/**
* Wait for request queuing to thread pool.
*/
bool ThreadPool::WaitForRequest()
{
try
{
// Wait released when requested queued.
DWORD dwReturn = WaitForSingleObject( m_hSemaphore, INFINITE );
if( WAIT_OBJECT_0 != dwReturn )
{
LogError( L"WaitForSingleObject failed" );
return false;
}
AutoLock LockThread( m_LockWorkerThread );
m_usSemaphoreCount--;
// Clear previous error.
::SetLastError( 0 );
return true;
}
catch( ... )
{
LogError( L"Exception occurred in WaitForRequest()" );
return false;
}
}
/**
* Destroy and clean up thread pool.
*/
bool ThreadPool::DestroyPool()
{
try
{
// Wait for the exist of threads.
DWORD dwReturn = WaitForMultipleObjects( m_usThreadCount, m_phThreadList, TRUE, INFINITE );
if( WAIT_OBJECT_0 != dwReturn )
{
LogError( L"WaitForMultipleObjects failed" );
return false;
}
// Close all threads.
for( USHORT uIdx = 0u; uIdx < m_usThreadCount; uIdx++ )
{
if( TRUE != CloseHandle( m_phThreadList[uIdx] ))
{
LogError( L"CloseHandle failed for threads" );
return false;
}
}
// Clear memory allocated for threads.
delete[] m_phThreadList;
m_phThreadList = 0;
// Close the semaphore
if( TRUE != CloseHandle( m_hSemaphore ))
{
LogError( L"CloseHandle failed for semaphore" );
return false;
}
// Clear request queue.
m_RequestQueue.clear();
return true;
}
catch( ... )
{
LogError( L"Exception occurred in DestroyPool()" );
return false;
}
}
/**
* Check for destroy request.
*/
inline bool ThreadPool::IsDestroyed()
{
// Avoid synchronization issues if destroy requested after validation.
AutoLock LockThread( m_LockWorkerThread );
// During thread pool destruction all semaphores are released
// to exit all threads.
return m_bDestroyed;
}
/**
* Set destroy flag
*/
inline void ThreadPool::SetDestroyFlag( const bool bFlag_i )
{
AutoLock LockThread( m_LockWorkerThread );
m_bDestroyed = bFlag_i;
}
/**
* Cancel all processing request in pool.
*/
void ThreadPool::CancelRequests()
{
try
{
// Avoid synchronization issues if destroy requested after validation.
AutoLock LockThread( m_LockWorkerThread );
LogInfo( L"Thread pool destroy requested" );
// Clear main queue.
m_RequestQueue.clear();
}
catch( ... )
{
LogError( L"Exception occurred in CancelRequests()" );
}
}
/**
* Log error in thread pool.
*
* @param wstrError_i - Error description.
*/
void ThreadPool::LogError( const std::wstring& wstrError_i )
{
if( NULL != m_pLogger )
{
m_pLogger->LogError( m_lActiveThread, wstrError_i );
}
}
/**
* Log information in thread pool.
*
* @param wstrInfo_i - Information description.
*/
void ThreadPool::LogInfo( const std::wstring& wstrInfo_i )
{
if( NULL != m_pLogger )
{
m_pLogger->LogInfo( m_lActiveThread, wstrInfo_i );
}
}
/**
* worker thread procedure.
*
* @param pParam_i - ThreadPool instance.
* @return UINT - Return 0 on success.
*/
UINT ThreadPool::ThreadProc( LPVOID pParam_i )
{
ThreadPool* pThreadPool = NULL;
try
{
ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>( pParam_i );
if( NULL == pThreadPool )
{
return 1;
}
if( !pThreadPool->ProcessRequests())
{
pThreadPool->LogError( L"ProcessRequests() failed" );
return 1;
}
return 0;
}
catch( ... )
{
if( NULL != pThreadPool )
{
pThreadPool->LogError( L"Exception occurred in ThreadProc()" );
}
return 1;
}
}
} // namespace TP
Whether Thread Pool is Needed for You?
标签:
原文地址:http://blog.csdn.net/wangshubo1989/article/details/51246494