标签:
1.说明
接触多线程已经有较长一段时间了,在工作中也经常用到多线程线程池等,于是打算写一个通用的模板类,方便以后的调用。当开始写的时候,我觉得这应该比较easy,能够很快的实现。而在写的过程中才发现不是那么容易。
这篇文档有模板的相关内容:http://blog.csdn.net/lqk1985/article/details/3136364
1、首先考虑到的是通用性,怎样让这个多线程通用呢?可能会因为业务的不同效果不一样,思来想去,决定采用resquest->process(多线程处理)->res模型吧。而不是将处理结果直接放在request(直接放在request应该可以减少多次new res实例吧)
2、对于任务的传入,结果的传出,是否采用阻塞模式呢?后来决定通过函数重载两种模式都支持。
3、对于多任务,多输出,很自然的采用了std::queue,构造一个线程安全的队列予以处理。
4、多线程怎样根据不同的业务进行不同的处理,重载任务基类的实现函数,由业务自己决定处理方式。只是将这个任务的执行这交给线程池来实现。
感觉所有的问题都解决了,就开始动工了,前前后后大概花了1天的时间,写成了一个通用的模板,个人感觉适用性还是挺强的,直接任务的传入,就可以取结果了,由线程池予以任务处理。方便又高效。
5、居然忘了说:该模板类只支持类指针模式,不支持类模式,后面正确两种模式都支持吧。
6、任务封装感觉好麻烦,以后还是重构下吧,居然没有UML(2015-7-3重新整理该笔记)
好了废话少说直接贴代码(在linux平台测试并通过):
2.实现
1、模板线程池cthreadpool.h
#ifndef __CTHREADPOOL_H
#define __CTHREADPOOL_H
/*
*threadpool::初始化时,只能用类指针
*
*push_work::添加任务
*
*get_workres::获取处理结果
*/
#include <iostream>
#include <queue>
#include <pthread.h>
#include "ctaskqueue.h"
template <class DTYPEIN, class DTYPEOUT>
class cthreadpool
{
public:
cthreadpool(){}
~cthreadpool(){}
static cthreadpool<DTYPEIN, DTYPEOUT>* getinstance();
static bool create_threadpool(int num);
static void destroy_threadpool();
virtual void process();
public:
static void* run(void* arg);
bool create_thread();
void push_work(DTYPEIN request);
void task_deal();
DTYPEOUT get_workres(); //阻塞
DTYPEOUT get_workres(int type); //非阻塞
private:
static cthreadpool<DTYPEIN, DTYPEOUT>* m_threadpool;
ctaskqueue<DTYPEIN> m_quein;
ctaskqueue<DTYPEOUT> m_queout;
};
template <class DTYPEIN, class DTYPEOUT>
cthreadpool< DTYPEIN, DTYPEOUT> * cthreadpool<DTYPEIN, DTYPEOUT>::m_threadpool=NULL;
template <class DTYPEIN, class DTYPEOUT>
cthreadpool<DTYPEIN, DTYPEOUT>* cthreadpool< DTYPEIN, DTYPEOUT>::getinstance()
{
return m_threadpool;
}
template <class DTYPEIN, class DTYPEOUT>
bool cthreadpool< DTYPEIN, DTYPEOUT>::create_threadpool(int num)
{
m_threadpool = new cthreadpool<DTYPEIN, DTYPEOUT>;
for (int i=0; i<num; i++)
{
if (!m_threadpool->create_thread())
return false;
}
return true;
}
template <class DTYPEIN, class DTYPEOUT>
void cthreadpool< DTYPEIN, DTYPEOUT>::destroy_threadpool()
{
delete m_threadpool;
}
template <class DTYPEIN, class DTYPEOUT>
void cthreadpool< DTYPEIN, DTYPEOUT>::process()
{
task_deal();
}
template <class DTYPEIN, class DTYPEOUT>
void* cthreadpool< DTYPEIN, DTYPEOUT>::run(void* arg)
{
cthreadpool<DTYPEIN, DTYPEOUT>* pthis = (cthreadpool<DTYPEIN, DTYPEOUT>*) arg;
pthis->process();
}
template <class DTYPEIN, class DTYPEOUT>
bool cthreadpool< DTYPEIN, DTYPEOUT>::create_thread()
{
pthread_t id;
if (pthread_create(&id, NULL, run, this) == 0)
return true;
return false;
}
template <class DTYPEIN, class DTYPEOUT>
void cthreadpool< DTYPEIN, DTYPEOUT>::push_work(DTYPEIN request)
{
m_quein.push_element(request);
}
template <class DTYPEIN, class DTYPEOUT>
void cthreadpool< DTYPEIN, DTYPEOUT>::task_deal()
{
while (1)
{
DTYPEIN taskin = NULL;
taskin = m_quein.get_element();
if (taskin != NULL)
{
DTYPEOUT taskout = dynamic_cast<DTYPEOUT>(taskin->task_process());
m_queout.push_element(taskout);
delete taskin;
}
}
}
template <class DTYPEIN, class DTYPEOUT>
DTYPEOUT cthreadpool< DTYPEIN, DTYPEOUT>::get_workres()
{
return m_queout.get_element();
}
template <class DTYPEIN, class DTYPEOUT>
DTYPEOUT cthreadpool< DTYPEIN, DTYPEOUT>::get_workres(int type)
{
return m_queout.get_element(type);
}
#endif
2、线程池任务队列 ctaskqueue.h
#ifndef __CTASKQUEUE_H
#define __CTASKQUEUE_H
#include <queue>
#include <pthread.h>
#include <iostream>
#include "csemlock.hpp"
#include "cscalock.hpp"
#include "cmutex.hpp"
#include <semaphore.h>
template <class KIND>
class ctaskqueue
{
public:
ctaskqueue()
{
sem_init(&_sem_cond, 0, 1);
}
~ctaskqueue()
{
sem_destroy(&_sem_cond);
}
void push_element(KIND com);
KIND get_element();
KIND get_element(int type);
private:
std::queue<KIND> m_queue;
cmutex _mutex_lock;
//csemlock _cond_lock;
cscalock _sca_lock;
sem_t _sem_cond;
};
template <class KIND>
void ctaskqueue<KIND>::push_element(KIND com)
{
_mutex_lock.lock_mutex();
if (m_queue.empty())
{
m_queue.push(com);
sem_post(&_sem_cond);
std::cout<<"thrd-id:"<<pthread_self()<<" send a broadcast."<<std::endl;
}
else
{
m_queue.push(com);
}
_mutex_lock.unlock_mutex();
}
template <class KIND>
KIND ctaskqueue<KIND>::get_element()
{
KIND taskin = NULL;
while(true)
{
_mutex_lock.lock_mutex();
if (m_queue.empty())
{
_mutex_lock.unlock_mutex();
std::cout<<"thrd-id:"<<pthread_self()<<" will block."<<std::endl;
sem_wait(&_sem_cond);
continue;
}
KIND val = NULL;
if (!m_queue.empty())
{
val = m_queue.front();
m_queue.pop();
}
_mutex_lock.unlock_mutex();
return val;
}
}
template <class KIND>
KIND ctaskqueue<KIND>::get_element(int type)
{
KIND val = NULL;
_mutex_lock.lock_mutex();
if (!m_queue.empty())
{
val = m_queue.front();
m_queue.pop();
}
_mutex_lock.unlock_mutex();
return val;
}
#endif
3、线程同步和信号量相关代码,确保线程安全在在任务到来时能够及时唤醒
信号量封装csemlock.h
#ifndef __CSEMLOCK_H
#define __CSEMLOCK_H
#include <pthread.h>
class csemlock
{
public:
csemlock();
~csemlock();
void sem_condwait();
void sem_condsend();
void sem_releaselock();
private:
pthread_mutex_t* _semmutex;
pthread_cond_t* _semcond;
};
inline void csemlock::sem_condwait()
{
pthread_mutex_lock(_semmutex);
pthread_cond_wait( _semcond, _semmutex);
}
inline void csemlock::sem_condsend()
{
pthread_cond_broadcast(_semcond);
}
inline void csemlock::sem_releaselock()
{
pthread_mutex_unlock(_semmutex);
}
inline csemlock::csemlock()
{
_semmutex = new pthread_mutex_t;
pthread_mutex_init(_semmutex, NULL);
_semcond = new pthread_cond_t;
pthread_cond_init(_semcond, NULL);
}
inline csemlock::~csemlock()
{
pthread_mutex_destroy(_semmutex);
pthread_cond_destroy(_semcond);
delete _semmutex;
delete _semcond;
}
#endif
线程锁的封装cmutex
#ifndef __CMUTEX_H
#define __CMUTEX_H
#include <pthread.h>
class cmutex
{
public:
cmutex()
{
_lock = new pthread_mutex_t;
pthread_mutex_init(_lock, NULL);
}
~cmutex()
{
pthread_mutex_destroy(_lock);
delete _lock;
}
void lock_mutex();
void unlock_mutex();
int trylock_mutex();
private:
pthread_mutex_t* _lock;
} ;
inline void cmutex::lock_mutex()
{
pthread_mutex_lock(_lock);
}
inline void cmutex::unlock_mutex()
{
pthread_mutex_unlock(_lock);
}
inline int cmutex::trylock_mutex()
{
return pthread_mutex_trylock(_lock);
}
#endif
无锁线程封装,锁的切换比互斥锁块2-3倍(测试结果,主机不同可能实际效果不一样,但性能确实会高很多)
cscalock.hpp
#ifndef __NOLOCKMODE_H
#define __NOLOCKMODE_H
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
class cscalock
{
public:
cscalock()
{
m_mutex = 0;
m_lock = 1;
m_unlock = 0;
}
~cscalock(){}
void sca_lock();
void sca_unlock();
bool sca_trylock();
private:
int m_mutex;
int m_lock;
int m_unlock;
};
inline void cscalock::sca_lock()
{
while (!(__sync_bool_compare_and_swap(&m_mutex, m_unlock, m_lock)))
{
usleep(10);
}
}
inline bool cscalock::sca_trylock()
{
int count = 0;
while (count < 5)
{
if (__sync_bool_compare_and_swap(&m_mutex, m_unlock, m_lock)) //if true
return true;
count++;
usleep(10);
}
return false;
}
inline void cscalock::sca_unlock()
{
__sync_bool_compare_and_swap(&m_mutex, m_lock, m_unlock);
}
#endif
好了以上就是线程池的实现代码了,现在要做的就是测试了。
4、线程池的任务基类(任务类必须继承该类,并实现处理函数)
threadtask.h 任务基类
#ifndef __CTHREADTASK_H
#define __CTHREADTASK_H
/*
*brief:线程池任务基类,需要使用该线程池必须要继承该类,并实现处理函数
*/
class cthreadtask
{
public:
virtual cthreadtask* task_process()=0;
};
#endif
任务输入类封装,包含了输入和输出:
#ifndef __CTESTCLASS_H
#define __CTESTCLASS_H
#include "cthreadtask.hpp"
#include <string.h>
#include <pthread.h>
#include <iostream>
/*
*brief:测试类,testclass为需要处理的类,testclassB 为处理的结果类
*
*处理过程为将testclass::m_data 字符串逆序。
*/
class testclassB:public cthreadtask
{
public:
testclassB(char* pdata)
{
memcpy(m_data, pdata, strlen(pdata)+1);
}
~testclassB(){}
void get_res()
{
std::cout<<"thrd-id:"<<m_thrdid<<" deal-res:"<<m_data<<std::endl;
}
void set_thrdid(pthread_t id)
{
m_thrdid = id;
}
public:
cthreadtask* task_process(){}
private:
char m_data[100];
pthread_t m_thrdid;
};
class testclass:public cthreadtask
{
public:
testclass(char* pdata)
{
memcpy(m_data, pdata, strlen(pdata)+1);
}
~testclass(){}
public:
cthreadtask* task_process();
private:
char m_data[100];
};
cthreadtask* testclass::task_process()
{
int ncur = strlen(m_data);
int nlen = ncur / 2;
ncur = ncur - 1;
char tmp;
for (int i=0; i<nlen; i++)
{
tmp = m_data[i];
m_data[i] = m_data[ncur];
m_data[ncur] = tmp;
ncur--;
}
testclassB* dealres = new testclassB(m_data);
dealres->set_thrdid(pthread_self());
return dealres;
}
#endif
main函数调用,让线程开始工作
#include <iostream>
#include <string.h>
#include "cthreadpool.h"
#include "cthreadtask.hpp"
#include "ctestclass.hpp"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <signal.h>
int dealnum = 0;
int putnum = 0;
void signal_deal(int no)
{
std::cout<<"put-num:"<<putnum<<std::endl;
std::cout<<"deal-num:"<<dealnum<<std::endl;
exit(0);
}
static void* run(void* arg)
{
testclassB* res;
while (1)
{
res = cthreadpool<testclass*, testclassB*>::getinstance()->get_workres(0);
if (res != NULL)
{
res->get_res();
dealnum++;
res->get_res();
delete res;
}
else
{
std::cout<<"get res = NULL"<<std::endl;
}
std::cout<<"deal-num:"<<dealnum<<std::endl;
usleep(1000);
}
}
int main(void)
{
//处理线程 request
if (cthreadpool<testclass*, testclassB*>::create_threadpool(5) == false)
{
std::cout<<"thread pool create error."<<std::endl;
cthreadpool<testclass*, testclassB*>::destroy_threadpool();
exit(0);
}
signal(SIGINT, signal_deal );
//获取处理结果线程
pthread_t id;
pthread_create(&id, NULL, run, NULL);
int num = 0;
usleep(100*1000);
while (1)
{
testclass* task1 = new testclass ("1234567890");
testclass* task2 = new testclass ("abcdefghij");
testclass* task3 = new testclass ("111102222");
testclass* task4 = new testclass ("22222222~33333333");
cthreadpool<testclass*, testclassB*>::getinstance()->push_work(task1);
cthreadpool<testclass*, testclassB*>::getinstance()->push_work(task2);
cthreadpool<testclass*, testclassB*>::getinstance()->push_work(task3);
cthreadpool<testclass*, testclassB*>::getinstance()->push_work(task4);
putnum += 4;
std::cout<<"the num:"<<putnum<<std::endl;
usleep(10*1000);
}
return 0;
}
版权声明:本文为博主原创文章,未经博主允许不得转载。
标签:
原文地址:http://blog.csdn.net/zhang_int_int/article/details/46739783