标签:
转自:http://blog.csdn.net/lmh12506/article/details/7753952
前段时间在github上开了个库,准备实现自己的线程池的,因为换工作的事,一直也没有实现,参考这篇文章准备着手实现一下。
什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了。
下面是Linux系统下用C语言创建的一个线程池。线程池会维护一个任务链表(每个CThread_worker结构就是一个任务)。
pool_init()函数预先创建好max_thread_num个线程,每个线程执thread_routine ()函数。该函数中
- while (pool->cur_queue_size == 0)
- {
- pthread_cond_wait (&(pool->queue_ready),&(pool->queue_lock));
- }
表示如果任务链表中没有任务,则该线程出于阻塞等待状态。否则从队列中取出任务并执行。
pool_add_worker()函数向线程池的任务链表中加入一个任务,加入后通过调用pthread_cond_signal (&(pool->queue_ready))唤醒一个出于阻塞状态的线程(如果有的话)。
pool_destroy ()函数用于销毁线程池,线程池任务链表中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出。
下面贴出完整代码
- #include <stdio.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <sys/types.h>
- #include <pthread.h>
- #include <assert.h>
- /*
- *线程池里所有运行和等待的任务都是一个CThread_worker
- *由于所有任务都在链表里,所以是一个链表结构
- */
- typedef struct worker
- {
-
- void *(*process) (void *arg);
- void *arg;
- struct worker *next;
- } CThread_worker;
- /*线程池结构*/
- typedef struct
- {
- pthread_mutex_t queue_lock;
- pthread_cond_t queue_ready;
-
- CThread_worker *queue_head;
-
- int shutdown;
- pthread_t *threadid;
-
- int max_thread_num;
-
- int cur_queue_size;
- } CThread_pool;
- int pool_add_worker (void *(*process) (void *arg), void *arg);
- void *thread_routine (void *arg);
- static CThread_pool *pool = NULL;
- void
- pool_init (int max_thread_num)
- {
- pool = (CThread_pool *) malloc (sizeof (CThread_pool));
- pthread_mutex_init (&(pool->queue_lock), NULL);
- pthread_cond_init (&(pool->queue_ready), NULL);
- pool->queue_head = NULL;
- pool->max_thread_num = max_thread_num;
- pool->cur_queue_size = 0;
- pool->shutdown = 0;
- pool->threadid =
- (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));
- int i = 0;
- for (i = 0; i < max_thread_num; i++)
- {
- pthread_create (&(pool->threadid[i]), NULL, thread_routine,
- NULL);
- }
- }
- /*向线程池中加入任务*/
- int
- pool_add_worker (void *(*process) (void *arg), void *arg)
- {
-
- CThread_worker *newworker =
- (CThread_worker *) malloc (sizeof (CThread_worker));
- newworker->process = process;
- newworker->arg = arg;
- newworker->next = NULL;
- pthread_mutex_lock (&(pool->queue_lock));
-
- CThread_worker *member = pool->queue_head;
- if (member != NULL)
- {
- while (member->next != NULL)
- member = member->next;
- member->next = newworker;
- }
- else
- {
- pool->queue_head = newworker;
- }
- assert (pool->queue_head != NULL);
- pool->cur_queue_size++;
- pthread_mutex_unlock (&(pool->queue_lock));
-
- 注意如果所有线程都在忙碌,这句没有任何作用*/
- pthread_cond_signal (&(pool->queue_ready));
- return 0;
- }
- /*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直
- 把任务运行完后再退出*/
- int
- pool_destroy ()
- {
- if (pool->shutdown)
- return -1;
- pool->shutdown = 1;
-
- pthread_cond_broadcast (&(pool->queue_ready));
-
- int i;
- for (i = 0; i < pool->max_thread_num; i++)
- pthread_join (pool->threadid[i], NULL);
- free (pool->threadid);
-
- CThread_worker *head = NULL;
- while (pool->queue_head != NULL)
- {
- head = pool->queue_head;
- pool->queue_head = pool->queue_head->next;
- free (head);
- }
-
- pthread_mutex_destroy(&(pool->queue_lock));
- pthread_cond_destroy(&(pool->queue_ready));
-
- free (pool);
-
- pool=NULL;
- return 0;
- }
- void *
- thread_routine (void *arg)
- {
- printf ("starting thread 0x%x\n", pthread_self ());
- while (1)
- {
- pthread_mutex_lock (&(pool->queue_lock));
-
- pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/
- while (pool->cur_queue_size == 0 && !pool->shutdown)
- {
- printf ("thread 0x%x is waiting\n", pthread_self ());
- pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));
- }
-
- if (pool->shutdown)
- {
-
- pthread_mutex_unlock (&(pool->queue_lock));
- printf ("thread 0x%x will exit\n", pthread_self ());
- pthread_exit (NULL);
- }
- printf ("thread 0x%x is starting to work\n", pthread_self ());
-
- assert (pool->cur_queue_size != 0);
- assert (pool->queue_head != NULL);
-
-
- pool->cur_queue_size--;
- CThread_worker *worker = pool->queue_head;
- pool->queue_head = worker->next;
- pthread_mutex_unlock (&(pool->queue_lock));
-
- (*(worker->process)) (worker->arg);
- free (worker);
- worker = NULL;
- }
-
- pthread_exit (NULL);
- }
下面是测试代码
- void *
- myprocess (void *arg)
- {
- printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);
- sleep (1);
- return NULL;
- }
- int
- main (int argc, char **argv)
- {
- pool_init (3);
-
-
- int *workingnum = (int *) malloc (sizeof (int) * 10);
- int i;
- for (i = 0; i < 10; i++)
- {
- workingnum[i] = i;
- pool_add_worker (myprocess, &workingnum[i]);
- }
-
- sleep (5);
-
- pool_destroy ();
- free (workingnum);
- return 0;
- }
将上述所有代码放入threadpool.c文件中,
在Linux输入编译命令
$ gcc -o threadpool threadpool.c -lpthread
以下是运行结果
starting thread 0xb7df6b90
thread 0xb7df6b90 is waiting
starting thread 0xb75f5b90
thread 0xb75f5b90 is waiting
starting thread 0xb6df4b90
thread 0xb6df4b90 is waiting
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 0
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 1
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 2
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 3
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 4
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 5
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 6
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 7
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 8
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 9
thread 0xb75f5b90 is waiting
thread 0xb6df4b90 is waiting
thread 0xb7df6b90 is waiting
thread 0xb75f5b90 will exit
thread 0xb6df4b90 will exit
thread 0xb7df6b90 will exit
Linux平台下线程池的原理及实现
标签:
原文地址:http://www.cnblogs.com/lit10050528/p/5116572.html