在传统服务器结构中,常是有一个总的监听线程监听有没有新的用户连接服务器,每当有一个新的用户进入,服务器就开启一个新的线程用户处理这 个用户的数据包。这个线程只服务于这个用户,当用户与服务器端关闭连接以后,服务器端销毁这个线程。(关于并发服务器更多详情,请看《并发服务器》)。
然而频繁地开辟与销毁线程极大地占用了系统的资源,而且在大量用户的情况下,系统为了开辟和销毁线程将浪费大量的时间和资源。线程池提供了一个解决外部大量用户与服务器有限资源的矛盾。
线程池和传统的一个用户对应一个线程的处理方法不同,它的基本思想就是在程序开始时就在内存中开辟一些线程,线程的数目是固定的,他们独自形成一个类,屏蔽了对外的操作,而服务器只需要将数据包交给线程池就可以了。当有新的客户请求到达时,不是新创建一个线程为其服务,而是从“池子”中选择一个空闲的线程为新的客户请求服务,服务完毕后,线程进入空闲线程池中。如果没有线程空闲的话,就将数据包暂时积累, 等待线程池内有线程空闲以后再进行处理。通过对多个任务重用已经存在的线程对象,降低了对线程对象创建和销毁的开销。当客户请求 时,线程对象已经存在,可以提高请求的响应时间,从而整体地提高了系统服务的表现。
一般来说实现一个线程池主要包括以下几个组成部分:
1)线程管理器:用于创建并管理线程池。
2)工作线程:线程池中实际执行任务的线程。在初始化线程时会预先创建好固定数目的线程在池中,这些初始化的线程一般处于空闲状态,一般不占用 CPU,占用较小的内存空间。
3)任务接口:每个任务必须实现的接口,当线程池的任务队列中有可执行任务时,被空闲的工作线程调去执行(线程的闲与忙是通过互斥量实现的),把任务抽象出来形成接口,可以做到线程池与具体的任务无关。
4)任务队列:用来存放没有处理的任务,提供一种缓冲机制,实现这种结构有好几种方法,常用的是队列,主要运用先进先出原理,另外一种是链表之类的数据结构,可以动态的为它分配内存空间,应用中比较灵活,此教程就是用到的链表。
什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了。
线程池实现示例代码如下:
thread_pool.h 的示例代码:
#ifndef __THREAD_POOL_H__ #define __THREAD_POOL_H__ #include <pthread.h> /********************************************************************* * 任务回调函数,也可根据需要自行修改 *********************************************************************/ typedef void *(*pool_task_f)(void *arg); /********************************************************************* * 任务句柄 *********************************************************************/ typedef struct _task{ pool_task_f process;/*回调函数,任务运行时会调用此函数,注意也可声明成其它形式*/ void *arg; /*回调函数的参数*/ struct _task *next; }pool_task; /********************************************************************* * 线程池句柄 *********************************************************************/ typedef struct { pthread_t *threadid; /* 线程号 */ int threads_limit; /* 线程池中允许的活动线程数目 */ int destroy_flag; /* 是否销毁线程池 , 0销毁,1不销毁*/ pool_task *queue_head; /* 链表结构,线程池中所有等待任务 */ int task_in_queue; /* 当前等待队列的任务数目 */ pthread_mutex_t queue_lock; /* 锁 */ pthread_cond_t queue_ready; /* 条件变量 */ }pool_t; /********************************************************************* *功能: 初始化线程池结构体并创建线程 *参数: pool:线程池句柄 threads_limit:线程池中线程的数量 *返回值: 无 *********************************************************************/ void pool_init(pool_t *pool, int threads_limit); /********************************************************************* *功能: 销毁线程池,等待队列中的任务不会再被执行, 但是正在运行的线程会一直,把任务运行完后再退出 *参数: 线程池句柄 *返回值: 成功:0,失败非0 *********************************************************************/ int pool_uninit(pool_t *pool); /********************************************************************* *功能: 向线程池中添加一个任务 *参数: pool:线程池句柄 process:任务处理函数 arg:任务参数 *返回值: 0 *********************************************************************/ int pool_add_task(pool_t *pool, pool_task_f process, void *arg); #endif
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <assert.h> #include "thread_pool.h" static void *pool_thread_server(void *arg); /********************************************************************* *功能: 初始化线程池结构体并创建线程 *参数: pool:线程池句柄 threads_limit:线程池中线程的数量 *返回值: 无 *********************************************************************/ void pool_init(pool_t *pool, int threads_limit) { pool->threads_limit = threads_limit; pool->queue_head = NULL; pool->task_in_queue = 0; pool->destroy_flag = 0; /*创建存放线程ID的空间*/ pool->threadid = (pthread_t *)calloc(threads_limit, sizeof(pthread_t)); int i = 0; /*初始化互斥锁和条件变量*/ pthread_mutex_init(&(pool->queue_lock), NULL); pthread_cond_init(&(pool->queue_ready), NULL); /*循环创建threads_limit个线程*/ for (i = 0; i < threads_limit; i++){ pthread_create(&(pool->threadid[i]), NULL, pool_thread_server, pool); } return; } /********************************************************************* *功能: 销毁线程池,等待队列中的任务不会再被执行, 但是正在运行的线程会一直,把任务运行完后再退出 *参数: 线程池句柄 *返回值: 成功:0,失败非0 *********************************************************************/ int pool_uninit(pool_t *pool) { pool_task *head = NULL; int i; pthread_mutex_lock(&(pool->queue_lock)); if(pool->destroy_flag)/* 防止两次调用 */ return -1; pool->destroy_flag = 1; pthread_mutex_unlock(&(pool->queue_lock)); /* 唤醒所有等待线程,线程池要销毁了 */ pthread_cond_broadcast(&(pool->queue_ready)); /* 阻塞等待线程退出,否则就成僵尸了 */ for (i = 0; i < pool->threads_limit; i++) pthread_join(pool->threadid[i], NULL); free(pool->threadid); /* 销毁等待队列 */ pthread_mutex_lock(&(pool->queue_lock)); while(pool->queue_head != NULL){ head = pool->queue_head; pool->queue_head = pool->queue_head->next; free(head); } pthread_mutex_unlock(&(pool->queue_lock)); /*条件变量和互斥量也别忘了销毁*/ pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); return 0; } /********************************************************************* *功能: 向任务队列中添加一个任务 *参数: pool:线程池句柄 process:任务处理函数 arg:任务参数 *返回值: 无 *********************************************************************/ static void enqueue_task(pool_t *pool, pool_task_f process, void *arg) { pool_task *task = NULL; pool_task *member = NULL; pthread_mutex_lock(&(pool->queue_lock)); if(pool->task_in_queue >= pool->threads_limit){ printf("task_in_queue > threads_limit!\n"); pthread_mutex_unlock (&(pool->queue_lock)); return; } task = (pool_task *)calloc(1, sizeof(pool_task)); assert(task != NULL); task->process = process; task->arg = arg; task->next = NULL; pool->task_in_queue++; member = pool->queue_head; if(member != NULL){ while(member->next != NULL) /* 将任务加入到任务链连的最后位置. */ member = member->next; member->next = task; }else{ pool->queue_head = task; /* 如果是第一个任务的话,就指向头 */ } printf("\ttasks %d\n", pool->task_in_queue); /* 等待队列中有任务了,唤醒一个等待线程 */ pthread_cond_signal (&(pool->queue_ready)); pthread_mutex_unlock (&(pool->queue_lock)); } /********************************************************************* *功能: 从任务队列中取出一个任务 *参数: 线程池句柄 *返回值: 任务句柄 *********************************************************************/ static pool_task *dequeue_task(pool_t *pool) { pool_task *task = NULL; pthread_mutex_lock(&(pool->queue_lock)); /* 判断线程池是否要销毁了 */ if(pool->destroy_flag){ pthread_mutex_unlock(&(pool->queue_lock)); printf("thread 0x%lx will be destroyed\n", pthread_self()); pthread_exit(NULL); } /* 如果等待队列为0并且不销毁线程池,则处于阻塞状态 */ if(pool->task_in_queue == 0){ while((pool->task_in_queue == 0) && (!pool->destroy_flag)){ printf("thread 0x%lx is waitting\n", pthread_self()); /* 注意:pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁 */ pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); } }else{ /* 等待队列长度减去1,并取出队列中的第一个元素 */ pool->task_in_queue--; task = pool->queue_head; pool->queue_head = task->next; printf("thread 0x%lx received a task\n", pthread_self()); } pthread_mutex_unlock(&(pool->queue_lock)); return task; } /********************************************************************* *功能: 向线程池中添加一个任务 *参数: pool:线程池句柄 process:任务处理函数 arg:任务参数 *返回值: 0 *********************************************************************/ int pool_add_task(pool_t *pool, pool_task_f process, void *arg) { enqueue_task(pool, process, arg); return 0; } /********************************************************************* *功能: 线程池服务程序 *参数: 略 *返回值: 略 *********************************************************************/ static void *pool_thread_server(void *arg) { pool_t *pool = NULL; pool = (pool_t *)arg; while(1){ pool_task *task = NULL; task = dequeue_task(pool); /*调用回调函数,执行任务*/ if(task != NULL){ printf ("thread 0x%lx is busy\n", pthread_self()); task->process(task->arg); free(task); task = NULL; } } /*这一句应该是不可达的*/ pthread_exit(NULL); return NULL; }
#include <stdio.h> #include <unistd.h> #include "thread_pool.h" void *task_test(void *arg) { printf("\t\tworking on task %d\n", (int)arg); sleep(1); /*休息一秒,延长任务的执行时间*/ return NULL; } void thread_pool_demo(void) { pool_t pool; int i = 0; pool_init(&pool, 2);//初始化一个线程池,其中创建2个线程 sleep(1); for(i = 0; i < 5; i++){ sleep(1); pool_add_task(&pool, task_test, (void *)i);//添加一个任务 } sleep(4); pool_uninit(&pool);//删除线程池 } int main (int argc, char *argv[]) { thread_pool_demo(); return 0; }
参考资料:http://blog.csdn.net/hubi0952
原文地址:http://blog.csdn.net/tennysonsky/article/details/46490099