1 线程池技术简介
“池”化技术通常都是为了应对“小”的特点而开发出来的,比如:
内存池是针对小块内存的申请和释放过于频繁导致的效率低下问题。先分配一定量的内存,按照大小分类,当程序需要小块内存(这里的小是相对而言的,看实现方式)时,就从某个大块内存中截取小块内存,用完了之后,就再放入大块内存中。当然,这里说的只是基本的思想,在实现的时候,有针对不同的分配方式的优化方案。
2 线程池的实现方案
初始化时,线程池创建默认个数的线程,并将线程ID存放到工作者队列中。
在线程池中设置了一个finish的结束变量,当用户调用了threadpool_destroy()时,就设置该变量,并通知所有的工作者,然后等待所有的工作者线程退出。
每个线程循环检测工作队列,当工作队列不为空时,就从该队列中取出一个工作执行,如果工作队列为空并且设置了finish变量,就退出工作者线程,如果工作队列为空,但是没有设置finish变量,则继续等待工作。
本实现没有考虑负载均衡。
3 线程池的源代码及注释
/* threadpool.h */
#ifndef _THREAD_POOL_H
#define _THREAD_POOL_H
#include <pthread.h>
#define THREADPOOL_MAX_THREADS 8 /* 最大线程数 */
#define THREADPOOL_MIN_THREADS 1 /* 最小线程数 */
#define THREADPOOL_DEF_THREADS 4 /* 默认线程数 */
struct work_queue_s {
void* (*routine)(void *); /* 工作例程 */
void *arg; /* 工作例程参数 */
struct work_queue_s *next;
};
typedef struct work_queue_s work_queue_t; /* 工作队列 */
struct worker_queue_s {
pthread_t id; /* 线程ID */
struct worker_queue_s *next;
};
typedef struct worker_queue_s worker_queue_t; /* 工作者队列 */
struct threadpool_s {
int finish; /* 是否结束 */
int cur_thread_num; /* 当前线程数 */
worker_queue_t *workers; /* 工作者队列 */
work_queue_t *first; /* 工作队列首指针 */
work_queue_t *last; /* 工作队列尾指针 */
pthread_cond_t queue_nonempty; /* 工作队列是否为空的条件变量 */
pthread_mutex_t queue_lock; /* 工作队列锁 */
};
typedef struct threadpool_s threadpool_t; /* 线程池结构 */
/* 创建线程池 */
threadpool_t* threadpool_create();
/* 向线程池中添加工作 */
int threadpool_insert_work(threadpool_t*, void* (*)(void *), void*);
/* 创建工作者 */
int threadpool_create_worker(threadpool_t*);
/* 销毁工作者 */
int threadpool_destroy_worker(threadpool_t*);
/* 销毁线程池 */
int threadpool_destroy(threadpool_t*);
#endif/* threadpool.c */
#include <stdio.h>
#include <stdlib.h>
#include "threadpool.h"
/* 创建线程,分配线程池结构,初始化结构中的各个元素,创建默认个数的线程 */
threadpool_t* threadpool_create()
{
threadpool_t *tp = (threadpool_t *)malloc(sizeof(threadpool_t));
if(tp == NULL) {
fprintf(stderr, "%s: malloc failed\n", __FUNCTION__);
return tp;
}
tp->finish = 0;
tp->cur_thread_num = 0;
tp->workers = NULL;
tp->first = NULL;
tp->last = NULL;
if(pthread_cond_init(&tp->queue_nonempty, NULL) != 0) {
fprintf(stderr, "%s: pthread_cond_init failed\n", __FUNCTION__);
free(tp);
return NULL;
}
if(pthread_mutex_init(&tp->queue_lock, NULL) != 0) {
fprintf(stderr, "%s: pthread_mutex_init failed\n", __FUNCTION__);
free(tp);
return NULL;
}
int cnt = THREADPOOL_DEF_THREADS;
while(cnt--) {
threadpool_create_worker(tp);
}
printf("create threadpool success\n");
printf("contain %d threads\n", THREADPOOL_DEF_THREADS);
return tp;
}
/* 往线程池中添加工作 */
int threadpool_insert_work(threadpool_t *tp, void* (*routine)(void *), void* arg)
{
work_queue_t *wq = malloc(sizeof(work_queue_t));
if(wq == NULL) {
fprintf(stderr, "%s: malloc failed\n", __FUNCTION__);
return -1;
}
wq->routine = routine;
wq->arg = arg;
wq->next = NULL;
pthread_mutex_lock(&tp->queue_lock);
if(tp->first == NULL && tp->last == NULL) {
tp->first = wq;
tp->last = wq;
}
else {
tp->last->next = wq;
tp->last = wq;
}
pthread_mutex_unlock(&tp->queue_lock);
pthread_cond_signal(&tp->queue_nonempty);
return 0;
}
/* 工作者线程的执行函数 */
void* thread_routine(void* arg)
{
work_queue_t *wq = NULL;
threadpool_t *tp = (threadpool_t *)arg;
while(1) {
pthread_mutex_lock(&tp->queue_lock);
while(tp->finish == 0 && tp->first == NULL) {
pthread_cond_wait(&tp->queue_nonempty, &tp->queue_lock);
}
if(tp->finish && tp->first == NULL) {
pthread_mutex_unlock(&tp->queue_lock);
pthread_exit(NULL);
}
work_queue_t *wq = NULL;
wq = tp->first;
tp->first = wq->next;
if(wq->next == NULL) {
tp->last = NULL;
}
pthread_mutex_unlock(&tp->queue_lock);
wq->routine(wq->arg);
printf("current thread: %u\n", (unsigned int)pthread_self());
free(wq);
}
}
/* 创建工作者,并将工作者的线程ID记录到工作者队列中 */
int threadpool_create_worker(threadpool_t *tp)
{
pthread_t tid;
if(pthread_create(&tid, NULL, thread_routine, tp) != 0) {
fprintf(stderr, "%s: pthread_create failed\n", __FUNCTION__);
return -1;
}
worker_queue_t *worker = (worker_queue_t *)malloc(sizeof(worker_queue_t));
if(worker == NULL) {
fprintf(stderr, "%s: malloc failed\n", __FUNCTION__);
return -1;
}
worker->id = tid;
worker->next = NULL;
worker->next = tp->workers;
tp->workers = worker;
tp->cur_thread_num++;
printf("create worker %u\n", (unsigned int)tid);
return 0;
}
/* 销毁工作者 */
int threadpool_destroy_worker(threadpool_t *tp)
{
worker_queue_t *worker = NULL;
if(tp->workers == NULL) {
return -1;
}
worker = tp->workers;
tp->workers = worker->next;
tp->cur_thread_num--;
pthread_t tid = worker->id;
free(worker);
if(pthread_join(tid, NULL) != 0) {
fprintf(stderr, "%s: pthread_join failed\n", __FUNCTION__);
return -1;
}
printf("destroy %u success\n", (unsigned int)tid);
return 0;
}
/* 销毁线程池,等待所有线程结束 */
int threadpool_destroy(threadpool_t *tp)
{
pthread_mutex_lock(&tp->queue_lock);
tp->finish = 1;
pthread_mutex_unlock(&tp->queue_lock);
pthread_cond_broadcast(&tp->queue_nonempty);
int cnt = tp->cur_thread_num;
printf("ready to destroy %d worker\n", cnt);
while(cnt--) {
threadpool_destroy_worker(tp);
}
free(tp);
return 0;
}/* test_threadpool.c */
#include <stdio.h>
#include "threadpool.h"
void *routine(void *arg)
{
printf("%d\n", (int)arg);
return NULL;
}
int main(int argc, char const *argv[])
{
threadpool_t *tp = threadpool_create();
int i = 0;
while(i < 10) {
threadpool_insert_work(tp, routine, (void*)i);
++i;
}
threadpool_destroy(tp);
return 0;
}
create worker 3076107072 create worker 3067714368 create worker 3059321664 create worker 3050928960 create threadpool success contain 4 threads 0 ready to destroy 4 worker 1 current thread: 3067714368 2 current thread: 3067714368 3 current thread: 3067714368 4 current thread: 3067714368 5 current thread: 3067714368 6 current thread: 3067714368 7 current thread: 3067714368 8 current thread: 3067714368 9 current thread: 3067714368 current thread: 3076107072 destroy 3050928960 success destroy 3059321664 success destroy 3067714368 success destroy 3076107072 success
如果将工作函数改成这样:
void *routine(void *arg)
{
sleep(1);
printf("%d\n", (int)arg);
return NULL;
}create worker 3075525440 create worker 3067132736 create worker 3058740032 create worker 3050347328 create threadpool success contain 4 threads ready to destroy 4 worker 2 1 current thread: 3067132736 0 current thread: 3075525440 3 current thread: 3050347328 current thread: 3058740032 5 current thread: 3075525440 6 7 current thread: 3058740032 current thread: 3050347328 4 current thread: 3067132736 destroy 3050347328 success 9 current thread: 3058740032 8 current thread: 3075525440 destroy 3058740032 success destroy 3067132736 success destroy 3075525440 success
5 实现过程中遇到的问题
(1)在编写多线程程序时,通常会用到锁。使用锁的时候,特别要注意的是:锁保护的是哪那个成员,那个成员是否有必要用锁保护。比如,这里的工作队列,由于工作队列可能会被多个线程使用,某些线程想要从中获取工作,某个线程想向其中添加工作,于是,需要用锁来进行保护。而这里的工作者队列呢?由于工作者队列不会被多个线程使用,它只能被主线程使用,因此,不需要用锁进行保护。
(2)关于条件变量。在使用条件变量时,如果有多个条件,在修改任意一个条件时,都要进行通知。
原文地址:http://blog.csdn.net/luofengmacheng/article/details/26450473