码迷,mamicode.com
首页 > 编程语言 > 详细

简单线程池的实现

时间:2016-07-03 20:06:06      阅读:292      评论:0      收藏:0      [点我收藏+]

标签:

1. 什么是线程池

线程池是线程的集合,拥有若干个线程,线程池中的线程一般用于执行大量的且相对短暂的任务。如果一个任务执行的时间很长,那么就不适合放在线程池中处理,比如说一个任务的执行时间跟进程的生命周期是一致的,那么这个线程的处理就没有必要放到线程池中调度,用一个普通线程即可。


线程池中线程的个数太少的话会降低系统的并发量,太多的话又会增加系统的开销。一般而言,线程池中线程的个数与线程的类型有关系,线程的类型分为


1.     计算密集型任务;

2.     I/O密集型任务。


计算密集型任务是占用CPU资源的,很少被外界的事件打断,CPU的个数是一定的,所以并发数是一定的,因此线程个数等于CPU的个数时是最高效的。


I/O密集型任务意味着执行期间可能会被I/O中断,也就是说这个线程会被挂起,这时的线程个数应该大于CPU的个数。


线程池的本质是生产者与消费者模型的应用。生产者线程向任务队列中添加任务,一旦队列有任务到来,如果线程池有空闲线程,就唤醒空闲线程来执行任务,如果没有空闲线程,并且线程数没有达到阈值(线程池中线程的最大值),就创建新线程来执行任务。


当任务增加的时候能够动态的增加线程池中线程的数量,直到达到一个阈值。这个阈值就是线程池中线程的最大值。


当任务执行完毕时,能够动态的销毁线程池中的线程池。

 

2.线程池的实现

我在 Ubuntu 系统下用C语言写的程序(传送门:github),这是一个非常简单的线程池实现,代码量约300行,仅仅说明线程池的工作原理,我会文章最后给出扩展线程池的思路,使之成为一个拥有C/S架构,socket通信的线程池。


目前而言,用到的知识点就两个:

1.     pthread_mutex_t:互斥锁;

2.     pthread_cond_t:条件变量;


互斥锁和条件变量要配合使用,我在另一篇博客(点击打开链接)里给出了使用方法,有兴趣的童鞋可以去看一下~


我们的小线程池共有五个文件:

1.             condition.h:把互斥锁和条件变量组合在一起,形成一个条件结构体,condition.h就是这个结构体的声明;

2.             condition.c:与condition.h对应,定义了操作条件结构体的函数;

3.             threadpool.h:包含两个结构体,一个是线程控制块(TCB),另一个是线程池结构体,还有三个函数:初始化线程池、销毁线程池、向线程池中添加任务;

4.             threadpool.c:threadpool.h的实现;

5.             main.c:主函数;


除此之外,我使用autotool编译程序,因此还有两个脚本文件:

1.     makefile.am:定义文件之间的依赖关系;

2.     build.sh:编译脚本;

 

接下来让我们深入代码去理解线程池~

 

Condition.h,为了实现线程同步,普遍的做法是将互斥锁(pthread_mutex_t)和条件变量(pthread_cond_t)配合在一起使用,最好的做法就是让两者组合成一个结构体,pthread_mutex_t 和 pthread_cond_t 同属于 pthread.h 头文件:

/*******************************************************************
*  Copyright(c) 2016 Chen Gonghao
*  All rights reserved.
*
*  chengonghao@yeah.net
******************************************************************/

#ifndef _CONDITION_H_
#define _CONDITION_H_

#include <pthread.h>

/* 将互斥锁和条件变量封装成一个结构体 */
typedef struct condition
{
   pthread_mutex_t pmutex ;
   pthread_cond_t pcond ;
} condition_t ;

/* 初始化结构体 */
int condition_init( condition_t* cond ) ;

/* 拿到结构体中的互斥锁 */
int condition_lock( condition_t* cond ) ;

/* 释放结构体中的互斥锁 */
int condition_unlock( condition_t* cond ) ;

/* 使消费者线程等待在条件变量上 */
int condition_wait( condition_t* cond ) ;

/* 使消费者线程等待在条件变量上,abstime:等待超时 */
int condition_timedwait( condition_t* cond, const struct timespec* abstime ) ;

/* 生产者线程通知等待在条件变量上的消费者线程 */
int condition_signal( condition_t* cond ) ;

/* 生产者线程向等待在条件变量上的消费者线程广播 */
int condition_broadcast( condition_t* cond ) ;

/* 销毁结构体 */
int condition_destroy( condition_t* cond ) ;

#endif


 

Condition.c,有8个针对条件结构体的函数,condition.c就是这8个函数的定义,很简单,都是转而调用pthread.h头文件中的库函数。

/*******************************************************************
*  Copyright(c) 2016 Chen Gonghao
*  All rights reserved.
*
*  chengonghao@yeah.net
******************************************************************/

#include "condition.h"

int condition_init( condition_t* cond ) 
{
   int status ;
   if ( ( status = pthread_mutex_init( &cond->pmutex, NULL ) ) )
   {
      return status ;
   }

   if ( ( status = pthread_cond_init( &cond->pcond, NULL ) ) )
   {
      return status ;
   }

   return 0 ;
}

int condition_lock( condition_t* cond ) 
{
   return pthread_mutex_lock( &cond->pmutex) ;
}

int condition_unlock( condition_t* cond ) 
{
   return pthread_mutex_unlock( &cond->pmutex ) ;
}

int condition_wait( condition_t* cond ) 
{
   return pthread_cond_wait( &cond->pcond, &cond->pmutex ) ;
}

int condition_timedwait( condition_t* cond, const struct timespec* abstime ) 
{
   return pthread_cond_timedwait( &cond->pcond, &cond->pmutex, abstime ) ;
}

int condition_signal( condition_t* cond ) 
{
   return pthread_cond_signal( &cond->pcond ) ;
}

int condition_broadcast( condition_t* cond ) 
{
   return pthread_cond_broadcast( &cond->pcond ) ;
}

int condition_destroy( condition_t* cond ) 
{
   int status ;
   if ( ( status = pthread_mutex_destroy( &cond->pmutex ) ) ) 
   {
      return status ;
   }

   if ( ( status = pthread_cond_destroy( &cond->pcond ) ) ) 
   {
      return status ;
   }

   return 0 ;
}


 

Pthreadpool.h:包含两个结构体,线程控制块(TCB)task_t和线程池结构体threadpool,还声明了三个针对线程池的操作:初始化线程池、销毁线程池、向线程池中添加任务。

/*******************************************************************
*  Copyright(c) 2016 Chen Gonghao
*  All rights reserved.
*
*  chengonghao@yeah.net
******************************************************************/

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_

#include "condition.h"

/* 线程控制块(task control block),以单向链表的形式组织 TCB */
typedef struct task
{
   void *( *run ) ( void* arg ) ; // 线程的执行函数
   void *arg ; // 执行函数的参数
   struct tast* next ; // 指向下一个 TCB
} task_t ;

/* 线程池结构体 */
typedef struct threadpool
{
   condition_t ready ; // 条件变量结构体
   task_t* first ; // TCB 链表的头指针
   task_t* last ; // TCB 链表的尾指针
   int counter ; // TCB 的总数量
   int idle ; // 空闲 TCB 的个数
   int max_threads ; // 最大线程数量
   int quit ; // 线程池销毁标志
} threadpool_t ;

/* 初始化线程池 */
void threadpool_init( threadpool_t* pool, int threads ) ;

/* 向线程池中添加任务 */ 
void threadpool_add_task( threadpool_t* pool, void* ( *run )( void* arg ), void* arg ) ;

/* 销毁线程池 */
void threadpool_destroy( threadpool_t* pool ) ;

#endif


 

Threadpool.c,共四个函数,对线程池的操作。

/*******************************************************************
*  Copyright(c) 2016 Chen Gonghao
*  All rights reserved.
*
*  chengonghao@yeah.net
******************************************************************/

#include "threadpool.h"
#include <errno.h>
#include <time.h>

/* 线程入口函数 */
void* thread_runtime( void* arg ) 
{
   struct timespec abstime ;
   int timeout ;
   
   // 拿到线程池对象
   threadpool_t* pool = ( threadpool_t* ) arg ;

   while ( 1 ) 
   {
      timeout = 0 ;

      /************************** 进入临界区 ***********************/

      condition_lock( &pool->ready ) ;
      
      // 空闲线程数加 1
      ++ pool->idle ;

      // 如果线程链表为空,而且线程池处于运行状态,那么线程就该等待任务的到来
      while ( pool->first == NULL && pool->quit == 0 )
      {
         printf( "thread 0x%x is waiting\n", (int)pthread_self() ) ;
         clock_gettime( CLOCK_REALTIME, &abstime ) ;
         abstime.tv_sec += 2 ;
         int status = condition_timedwait( &pool->ready, &abstime ) ;
         if ( status == ETIMEDOUT )
         {
            printf( "thread 0x%x is wait timed out\n", (int)pthread_self() ) ;
            timeout = 1 ;
            break ;
         }
      }

      // 如果线程等待超时
      if ( timeout && pool->first == NULL )
      {
         -- pool->counter ; // 那么线程数量减 1
         condition_unlock( &pool->ready ) ; // 释放互斥锁
         break ; // 跳出 while,注意,break 之后,线程入口函数执行完毕,线程将不复存在
      }

      // 线程获得任务
      -- pool->idle ; // 线程池空闲数减 1
      if ( pool->first != NULL )
      {
         task_t* t = pool->first ; // 从链表头部取出 TCB
         pool->first = t->next ; // 指向下一个 TCB

         // 执行任务需要一定时间,所以要先解锁
         // 以便生产者可以往链表中加入任务
         // 以及其他消费者可以等待任务
         condition_unlock( &pool->ready ) ;

         t->run( t->arg ) ; // 执行任务的回调函数

         free( t ) ; // 任务执行完毕,销毁 TCB

         condition_lock( &pool->ready ) ;
      } 

      // quit == 1 说明要销毁线程池
      if ( pool->quit && pool->first == NULL )
      {
         -- pool->counter ;
         if ( pool->counter == 0 ) 
         {
            condition_signal( &pool->ready ) ; // 唤醒等待在条件变量上的主线程
         }
         condition_unlock( &pool->ready ) ;
         break ;
      }

      condition_unlock( &pool->ready ) ;

      /************************** 退出临界区 ***********************/
   }

   return NULL ;
}

/* 初始化线程池 */
void threadpool_init( threadpool_t* pool, int threads ) 
{
   condition_init( &pool->ready ) ; // 初始化条件变量结构体
   pool->first = NULL ; // 设置线程链表头指针
   pool->last = NULL ; // 设置线程链表尾指针
   pool->counter = 0 ; // 设置线程池当前线程数
   pool->idle = 0 ; // 设置线程池当前空闲线程数
   pool->max_threads = threads ; // 设置线程池最大线程数
   pool->quit = 0 ; // quit = 0,线程池运行状态;quit = 1,线程池销毁状态
}

/* 向线程池中添加线程 */
void threadpool_add_task( threadpool_t* pool, void* (*run)( void* arg ), void* arg ) 
{
   task_t* newtask = ( task_t* ) malloc ( sizeof( task_t ) ) ; // 创建线程控制块
   newtask->run = run ; // 设置线程的回调函数
   newtask->arg = arg ; // 设置回调函数的参数
   newtask->next = NULL ; // 新加入的线程会被添加到链表尾部

   /************************** 进入临界区 ***********************/

   condition_lock( &pool->ready ) ; // 拿到互斥锁

   // 把新创建的 TCB 添加到线程链表中
   if ( pool->first == NULL ) 
   {
   	  // 如果线程链表为空,则 TCB 作为链表头部
      pool->first = newtask ;
   }
   else
   {
   	  // 如果线程链表不为空,加入到链表尾部
      pool->last->next = newtask ;
   }
   pool->last = newtask ; // 修改链表尾指针
   
   // 如果有空闲线程,那么就唤醒空闲线程
   if ( pool->idle > 0 ) 
   {
      condition_signal( &pool->ready ) ; // 通知等待在条件变量上的空闲线程
   }
   else if ( pool->counter < pool->max_threads ) 
   {
   	  // 如果没有空闲线程可用,而且当前线程数量小于线程池的容量,我们就创建一个线程
      pthread_t tid ;
      pthread_create( &tid, NULL, thread_runtime, pool ) ; // 指定新线程的起始函数为 thread_runtime,把线程池传递给 thread_runtime
      ++ pool->counter ;
   }

   condition_unlock( &pool->ready ) ; // 释放互斥锁

   /************************** 退出临界区 ***********************/
}

/* 销毁线程池 */
void threadpool_destroy( threadpool_t* pool ) 
{
   if ( pool->quit ) 
   {
      return ;
   } 

   /************************** 进入临界区 ***********************/

   condition_lock( &pool->ready ) ;

   // 设置退出标志为真
   pool->quit = 1 ;

   // 如果线程池中正在运行着线程,那么我们需要等待线程执行完毕再销毁
   if ( pool->counter > 0 ) 
   {
      if ( pool->idle > 0 )
      {
         condition_broadcast( &pool->ready ) ;
      }
      while ( pool->counter > 0 ) 
      {
         condition_wait( &pool->ready ) ; // 主线程(main 函数所在线程)将等待在条件变量上
      }
   }

   condition_unlock( &pool->ready ) ;

   /************************** 退出临界区 ***********************/
   
   // 销毁条件变量
   condition_destroy( &pool->ready ) ;
}


 

最后是主函数啦,main.c

/*******************************************************************
*  Copyright(c) 2016 Chen Gonghao
*  All rights reserved.
*
*  chengonghao@yeah.net
******************************************************************/

#include "threadpool.h"

/* 定义线程池最大线程数 */
#define MAX_POOL_SIZE	3

/* 每个任务的回调函数 */
void* mytask( void* arg ) 
{
   printf( " thread 0x%x is working on task %d\n", (int)pthread_self(), *(int*)arg ) ;
   sleep( 1 ) ;
   free( arg ) ;
   return NULL ;
}

int main( void ) 
{
   threadpool_t pool ; // 定义一个线程池变量
   threadpool_init( &pool, MAX_POOL_SIZE ) ; // 初始化线程池

   // 向线程池中添加 10 个任务,每个任务的处理函数都是 mytask
   for ( int i = 0; i < 10; ++ i ) 
   {
      int* arg = (int*)malloc( sizeof( int ) ) ;
      *arg = i ;
      threadpool_add_task( &pool, mytask, arg ) ;
   }

   threadpool_destroy( &pool ) ; // 销毁线程池
   
   return 0 ;
}


 

画个图来理解线程池的原理:

技术分享

可以发现生产者的代码主要位于 thread_add_task()函数,消费者的代码主要位于 thread_runtime() 函数。

细心的童鞋也许发现了:thread_runtime()中只有while循环,三个消费者同时执行while 循环,不可能保证循环有序的从生产线上取任务,任务来临时如何保证三个消费者不发生争抢行为呢?这就是条件变量的作用了,当消费者发现生产线为空时,就依次睡在条件变量上,睡在条件变量上的关键代码如下:

int status = condition_timedwait( &pool->ready, &abstime ) ;


如果睡超时了就要自动销毁喔~

当任务来临时,生产者有义务通知睡眠的消费者,唤醒后者,起来嗨~,关键代码如下:

condition_signal(&pool->ready ) ; // 唤醒等待在条件变量上的主线程


执行结果如下:

技术分享


我设置的线程池容量为 2,任务数是 10,因此有 2 个线程处理 10 个任务。

从上图可以发现,两个线程分别是0xaa1de700 和 0xaa9df700,这两个线程依次处理 0 ~ 9 号任务,没有任务处理时,便进入等待状态(is waiting),等待超时的结果就是自动销毁,然后主线程退出,程序结束。

 

3.扩展线程池

以上就是一个最最小的线程池实现了,这只是一个玩具,仅供了解线程池的实现。但是我们可以把小玩具扩展成一个有模有样的线程池:C/S架构、socket 通信,以下是扩展思路,所有扩展的代码都在:simple_thread_pool/extension/

客户端:

1.     把 socket 封个包,也就是把原生 socket 封装成一个类,我已经做了这一步,代码在:simple_thread_pool/extension/ossSocket/

2.     做一个命令工厂类(CommandFactory),根据客户端的输入,生产出一系列的命令对象;

3.     客户端类(Client),其中包含 socket 类和 CommandFactory;

技术分享 


服务器端:

       服务器端基本上就是对本文介绍的小线程池的扩展。

1.在main函数中添加监听函数:TCPListener();

2.TCPListener()的代码主要由 threadpool_add_task() 和 threadpool_runtime()的代码构成;

 

很多基础软件,比如数据库、服务器,底层都跑着一个线程池,要是有时间的话,我会扩展这个小线程池,然后写博客介绍技术实现。

简单线程池的实现

标签:

原文地址:http://blog.csdn.net/chengonghao/article/details/51791917

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!