码迷,mamicode.com
首页 > 编程语言 > 详细

线程池

时间:2016-07-21 18:09:55      阅读:398      评论:0      收藏:0      [点我收藏+]

标签:

线程池

引入线程池的背景

为什么需要线程池呢?

        设想一下,如果我们使用有任务就开启一个子线程处理,处理完成后,销毁子线程或等得子线程自然死亡,那么如果我们的任务所需时间比较短,但是任务数量比较多,那么更多的时间是花在线程的创建和结束上面,效率肯定就低了。

  线程池的原理:

        既然是线程池(Thread pool),其实名字很形象,就是把指定数量的可用子线程放进一个"池里",有任务时取出一个线程执行,任务执行完后,并不立即销毁线程,而是放进线程池中,等待接收下一个任务。这样内存和cpu的开销也比较小,并且我们可以控制线程的数量。

线程池的实现:

        线程池有很多种实现方式,在python中,已经给我们提供了一个很好的实现方式:Queue-队列。因为python中Queue本身就是同步的,所以也就是线程安全的,所以我们可以放心的让多个线程共享一个Queue。

        那么说到线程池,那么理应也得有一个任务池,任务池中存放着待执行的任务,各个线程到任务池中取任务执行,那么用Queue来实现任务池是最好不过的。

技术分享

线程池的注意事项:

虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。在使用线程池时需注意线程池大小与性能的关系,注意并发风险、死锁、资源不足和线程泄漏等问题。
1、线程池大小。多线程应用并非线程越多越好,需要根据系统运行的软硬件环境以及应用本身的特点决定线程池的大小。

一般来说,如果代码结构合理的话,线程数目与CPU 数量相适合即可。
如果线程运行时可能出现阻塞现象,可相应增加池的大小;如有必要可采用自适应算法来动态调整线程池的大小,以提高CPU 的有效利用率和系统的整体性能。
2、并发错误。多线程应用要特别注意并发错误,要从逻辑上保证程序的正确性,注意避免死锁现象的发生。
3、线程泄漏。这是线程池应用中一个严重的问题,当任务执行完毕而线程没能返回池中就会发生线程泄漏现象。

 线程池要点:

线程池要点:
1、通过判断等待的任务数量和线程池中的最大值,取最小值来判断开启多少线程来工作
比如:
任务数是3,进程池最大20  ,那么咱们只需要开启3个线程就行了。
任务数是500,进程池是20,那么咱们只开20个线程就可以了。
取最小值

2、实现线程池正在运行,有一个查看的功能,查看一下现在线程里面活跃的线程是多少等待的是多少?

线程总共是多少,等待中多少,正在运行中多少
作用:
方便查看当前线程池状态
能获取到这个之后就可以当线程一直处于空闲状态

查看状态用:上下文管理来做,非常nice的一点

3、关闭线程

简单线程池实现

技术分享
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import Queue
import threading
import time

‘‘‘
这个简单的例子的想法是通过:
1、利用Queue特性,在Queue里创建多个线程对象
2、那我执行代码的时候,去queue里去拿线程!
如果线程池里有可用的,直接拿。
如果线程池里没有可用,那就等。
3、线程执行完毕,归还给线程池
‘‘‘

class ThreadPool(object): #创建线程池类
    def __init__(self,max_thread=20):#构造方法,设置最大的线程数为20
        self.queue = Queue.Queue(max_thread) #创建一个队列
        for i in xrange(max_thread):#循环把线程类加入到队列中
            self.queue.put(threading.Thread)
            #把线程的类名放进去,执行完这个Queue

    def get_thread(self):#定义方法从队列里获取线程
        return self.queue.get()

    def add_thread(self):#定义方法在队列里添加线程
        self.queue.put(threading.Thread)

pool = ThreadPool(10)

def func(arg,p):
    print arg
    time.sleep(2)
    p.add_thread() #当前线程执行完了,我在队列里加一个线程!

for i in xrange(300):
    thread = pool.get_thread() #线程池10个线程,每一次循环拿走一个!默认queue.get(),如果队列里没有数据就会等待。
    t = thread(target=func,args=(i,pool))
    t.start()


‘‘‘
self.queue.put(threading.Thread) 添加的是类不是对象,在内存中如果相同的类只占一份内存空间
并且如果这里存储的是对象的话每次都的新增都得在内存中开辟一段内存空间

还有如果是对象的话:下面的这个语句就不能这么调用了!
for i in xrange(300):
    thread = pool.get_thread()
    t = thread(target=func,args=(i,pool))
    t.start()
    通过查看源码可以知道,在thread的构造函数中:self.__args = args  self.__target = target  都是私有字段那么调用就应该这么写

for i in xrange(300):
    ret = pool.get_thread()
    ret._Thread__target = func
    ret._Thread__args = (i,pool)
    ret.start()
‘‘‘
View Code

多功能线程池实现方法

技术分享base 1
技术分享
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#  Author: Jason Wang

import contextlib
import threading
import time
import random

doing = []
def number(l2):
    while True:
        print(len(l2))
        time.sleep(1)

t = threading.Thread(target=number,args=(doing,))  #开启一个线程,每一秒打印列表,当前工作中的线程数量
t.start()


#添加管理上下文的装饰器
@contextlib.contextmanager
def show(li,iterm):
    li.append(iterm)
    yield
    ‘‘‘
    yield冻结这次操作,就出去了,with就会捕捉到,然后就会执行with下的代码块,当with下的代码块
    执行完毕后就会回来继续执行yield下面没有执行的代码块!
    然后就执行完毕了
    如果with代码块中的非常耗时,那么doing的长度是不是一直是1,说明他没执行完呢?我们就可以获取到正在执行的数量,当他with执行完毕后
    执行yield的后续的代码块。把他移除后就为0了!
    ‘‘‘
    li.remove(iterm)


def task(arg):
    with show(doing,1):#通过with管理上下文进行切换
        print(len(doing))
        time.sleep(10) #等待10秒这里可以使用random模块来操作~

for i in range(20): #开启20个线程执行
    temp = threading.Thread(target=task,args=(i,))
    temp.start()

‘‘‘
作用:我们要记录正在工作的的列表
比如正在工作的线程我把加入到doing这个列表中,如果工作完成的把它从doing列表中移除。
通过这个机制,就可以获取现在正在执行的线程都有多少
‘‘‘
base 2

 

技术分享
#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = luo_t
from Queue import Queue
import contextlib
import threading

WorkerStop = object()


class ThreadPool:
    workers = 0
    threadFactory = threading.Thread
    currentThread = staticmethod(threading.currentThread)

    def __init__(self, maxthreads=20, name=None):
        self.q = Queue(0) #这里创建一个队列,如果是0的话表示不限制,现在这个队列里放的是任务
        self.max = maxthreads #定义最大线程数
        self.name = name
        self.waiters = []#这两个是用来计数的
        self.working = []#这两个是用来技术的

    def start(self):
        #self.max 最大线程数
        #q.qisze(),任务个数
        needSize = self.q.qsize()
        while self.workers < min(self.max, needSize):#min(10,20)取最小值
            #wokers默认为0  【workers = 0】
            ‘‘‘
            举例来说:
            while self.workers < min(self.max, needSize):
            这个循环,比如最大线程为20,咱们的任务个数为10,取最小值为10
            每次循环开1个线程,并且workers自增1,那么循环10次后,开了10个线程了workers = 10 ,那么workers就不小于10了
            就不开线程了,我线程开到最大了,你们这10个线程去消耗这10个任务去吧
            并且这里不阻塞,创建完线程就去执行了!
            每一个线程都去执行_worker方法去了
            ‘‘‘
            self.startAWorker()

    def startAWorker(self):
        self.workers += 1
        newThread = self.threadFactory(target=self._worker, name=shuaige) #创建一个线程并去执行_worker方法
        newThread.start()

    def callInThread(self, func, *args, **kw):
        self.callInThreadWithCallback(None, func, *args, **kw)

    def callInThreadWithCallback(self, onResult, func, *args, **kw):
        o = (func, args, kw, onResult)
        self.q.put(o)


    @contextlib.contextmanager
    def _workerState(self, stateList, workerThread):
        stateList.append(workerThread)
        try:
            yield
        finally:
            stateList.remove(workerThread)

    def _worker(self):
        ct = self.currentThread()
        o = self.q.get() #去队列里取任务,如果有任务就O就会有值,每个任务是个元组,有方法,有参数
        while o is not WorkerStop:
            with self._workerState(self.working, ct):  #上下文切换
                function, args, kwargs, onResult = o
                del o
                try:
                    result = function(*args, **kwargs)
                    success = True
                except:
                    success = False
                    if onResult is None:
                        pass
                    else:
                        pass

                del function, args, kwargs

                if onResult is not None:
                    try:
                        onResult(success, result)
                    except:
                        #context.call(ctx, log.err)
                        pass

                del onResult, result

            with self._workerState(self.waiters, ct): #当线程工作完闲暇的时候,在去取任务执行
                o = self.q.get()

    def stop(self): #定义关闭线程方法
        while self.workers: #循环workers值
            self.q.put(WorkerStop) #在队列中增加一个信号~
            self.workers -= 1 #workers值-1 直到所有线程关闭


def show(arg):
    import time
    time.sleep(1)
    print arg


pool = ThreadPool(10)

#创建500个任务,队列里添加了500个任务
#每个任务都是一个元组(方法名,动态参数,动态参数,默认为NoNe)
for i in range(100):
    pool.callInThread(show, i)

pool.start()  #队列添加完成之后,开启线程让线程一个一个去队列里去拿

pool.stop() #当上面的任务都执行完之后,线程中都在等待着在队列里去数据呢!
‘‘‘
我们要关闭所有的线程,执行stop方法,首先workers这个值是当前的线程数量,我们给线程发送一个信号“WorkerStop”
在线程的工作里:        while o is not WorkerStop:   如果线程获取到这个值就不执行了,然后这个线程while循环就停止了,等待
python的垃圾回收机制,回收。

然后在self.workers -= 1 ,那么所有的线程收到这个信号之后就会停止!!!
over~
‘‘‘
复杂thread pool

 

程序启动之初只将最小线程数的线程放在池中,并将线程设置为阻塞状态,用守护线程来查看任务队列,当任务队列中有任务时,则停止线程的阻塞状态,让它们到队列中去获取任务,执行,如果需要返回结果,则将结果返回结果队列。当任务很多,线程池中没有闲置的线程且当前线程数小于线程池最大线程数时,将创建新的线程(这里使用了yield)来接收新的任务,线程执行完毕后,则回到阻塞状态,长期闲置的线程会自动销毁,但池中线程永远不小于在最小线程数。当最小线程数和最大线程数相等的时候,内部就基本和野生线程相同啦~~~

技术分享

武sir的线程池

技术分享
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object()


class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.empty()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)



# How to use


pool = ThreadPool(5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass


def action(i):
    print(i)

for i in range(30):
    ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()
teacher code v1

 

老师的方法和上面的例子中不同的是,自定义了线程的start方法,当启动线程的时候才初始化线程池,并根据线程池定义的数量和任务数量取min,而不是先开启定义的线程数等待命令,在一定程度上避免了空线程对内存的消耗。

with知识点

  这里要介绍一个知识点。我们在做上下文管理的时候,用到过with。

  我们如何自定义一个with方法呢?

技术分享

如此一来,我们便可以实现对线程状态的监控和管理了。将正在运行中的线程,加入到一个列表中,并使用yield返回,当线程执行完之后,再从这个列表中移除,就可以知道哪些线程是正在运行的啦。

python 再带线程池模块threadpool,可以直接用pip安装

代码如下:

技术分享
#!/usr/bin/env python
# -*- coding:utf-8 -*-


"""
Easy to use object-oriented thread pool framework.

A thread pool is an object that maintains a pool of worker threads to perform
time consuming operations in parallel. It assigns jobs to the threads
by putting them in a work request queue, where they are picked up by the
next available thread. This then performs the requested operation in the
background and puts the results in another queue.

The thread pool object can then collect the results from all threads from
this queue as soon as they become available or after all threads have
finished their work. It‘s also possible, to define callbacks to handle
each result as it comes in.

The basic concept and some code was taken from the book "Python in a Nutshell,
2nd edition" by Alex Martelli, O‘Reilly 2006, ISBN 0-596-10046-9, from section
14.5 "Threaded Program Architecture". I wrapped the main program logic in the
ThreadPool class, added the WorkRequest class and the callback system and
tweaked the code here and there. Kudos also to Florent Aide for the exception
handling mechanism.

Basic usage::

    # >>> pool = ThreadPool(poolsize)
    # >>> requests = makeRequests(some_callable, list_of_args, callback)
    # >>> [pool.putRequest(req) for req in requests]
    # >>> pool.wait()

See the end of the module code for a brief, annotated usage example.

Website : http://chrisarndt.de/projects/threadpool/

"""
# __docformat__ = "restructuredtext en"
#
# __all__ = [
#     ‘makeRequests‘,
#     ‘NoResultsPending‘,
#     ‘NoWorkersAvailable‘,
#     ‘ThreadPool‘,
#     ‘WorkRequest‘,
#     ‘WorkerThread‘
# ]
# standard library modules
import sys
import threading
import traceback
#导入消息队列
try:
    import Queue            # Python 2
except ImportError:
    import queue as Queue   # Python 3


# exceptions
##定义未产生结果处于等待状态异常类
class NoResultsPending(Exception):
    """All work requests have been processed."""
    pass
##处理不存在可以使用的线程异常类
class NoWorkersAvailable(Exception):
    """No worker threads available to process remaining requests."""
    pass


# internal module helper functions
##内部异常解析函数
def _handle_thread_exception(request, exc_info):
    """Default exception handler callback function.

    This just prints the exception info via ``traceback.print_exception``.

    """
    traceback.print_exception(*exc_info)


# utility functions
def makeRequests(callable_, args_list, callback=None,
        exc_callback=_handle_thread_exception):
    """Create several work requests for same callable with different arguments.

    Convenience function for creating several work requests for the same
    callable where each invocation of the callable receives different values
    for its arguments.

    ``args_list`` contains the parameters for each invocation of callable.
    Each item in ``args_list`` should be either a 2-item tuple of the list of
    positional arguments and a dictionary of keyword arguments or a single,
    non-tuple argument.

    See docstring for ``WorkRequest`` for info on ``callback`` and
    ``exc_callback``.

    """
    requests = []
    for item in args_list:
        if isinstance(item, tuple):
            requests.append(
                WorkRequest(callable_, item[0], item[1], callback=callback,
                    exc_callback=exc_callback)
            )
        else:
            requests.append(
                WorkRequest(callable_, [item], None, callback=callback,
                    exc_callback=exc_callback)
            )
    return requests


# classes
class WorkerThread(threading.Thread):
    """后台线程,真正的工作线程,从请求队列(requestQueue)中获取work,
    并将执行后的结果添加到结果队列(resultQueue)"""

    def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
        """Set up thread in daemonic mode and start it immediatedly.

        ``requests_queue`` and ``results_queue`` are instances of
        ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a
        new worker thread.

        """
        threading.Thread.__init__(self, **kwds)
        ##设置为守护进程
        self.setDaemon(1)
        #定义请求队列
        self._requests_queue = requests_queue
        #定义结果队列
        self._results_queue = results_queue
        ##设置timeout时间
        self._poll_timeout = poll_timeout
        #设定线程锁
        self._dismissed = threading.Event()
        self.start()

    def run(self):
        # 每个线程尽可能多的执行work,所以采用loop,
        # 只要线程可用,并且requestQueue有work未完成,则一直loop

        while True:
            if self._dismissed.isSet():
                # 如果线程被终止,则退出循环
                break
            #获取下一个的执行的任务,如果在超时时间内获取不到新的任务,继续循环,等待进程是否被终止
            try:
            # Queue.Queue队列设置了线程同步策略,并且可以设置timeout。
            # 一直block,直到requestQueue有值,或者超时
                request = self._requests_queue.get(True, self._poll_timeout)
            except Queue.Empty:
                continue
            else:
                # 之所以在这里再次判断dimissed,是因为之前的timeout时间里,很有可能,该线程被dismiss
                if self._dismissed.isSet():
                    # we are dismissed, put back request in queue and exit loop
                    #如果进程被终止了,要把获取的任务重新放回消息队里中去
                    self._requests_queue.put(request)
                    break
                try:
                    #将任务执行结果放到self._results_queue队列中去
                    result = request.callable(*request.args, **request.kwds)
                    self._results_queue.put((request, result))
                except:
                    request.exception = True
                    self._results_queue.put((request, sys.exc_info()))

    def dismiss(self):
        # 设置一个标志,表示完成当前work之后,退出
        self._dismissed.set()


class WorkRequest:
    """放到消息队列中可以被执行的请求
    """

    def __init__(self, callable_, args=None, kwds=None, requestID=None,
            callback=None, exc_callback=_handle_thread_exception):
        """
        :param callable_:可定制的,执行work的函数
        :param args:列表参数
        :param kwds:字典参数
        :param requestID:请求id
        :param callback:可定制的,处理resultQueue队列元素的函数
        :param exc_callback:可定制的,处理异常的函数
        :return:
        """

        if requestID is None:
            self.requestID = id(self)
        else:
            try:
                self.requestID = hash(requestID)
            except TypeError:
                raise TypeError("requestID must be hashable.")
        self.exception = False
        self.callback = callback
        self.exc_callback = exc_callback
        self.callable = callable_
        self.args = args or []
        self.kwds = kwds or {}

    def __str__(self):
        return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" %             (self.requestID, self.args, self.kwds, self.exception)

class ThreadPool:
    """线程池,分配工作请求,收集处理结果
    """

    def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
        """
        :param num_workers: 初始化线程的数量
        :param q_size: requestQueue队列的初始大小
        :param resq_size: result队列的初始大小
        :param poll_timeout:
        :return:设置工作线程WorkerThread的timeout,也就是等待requestQueue的timeout
        """
        """建立线程池,并且开启num_workers个线程,当resuestQUEUE,resultQueue队列中有任务,并且队列以满
        线程池会阻塞把任务分配给线程接
        当请求队列和处理结果队列都不为空是,又出现死锁的可能,因此要设置timout超时时间大于0,
            If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
            the possibilty of a deadlock, when the results queue is not pulled
            regularly and too many jobs are put in the work requests queue.
            To prevent this, always set ``timeout > 0`` when calling
            ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.

        """
        self._requests_queue = Queue.Queue(q_size)
        self._results_queue = Queue.Queue(resq_size)
        self.workers = []
        self.dismissedWorkers = []
        self.workRequests = {}
        ##产生制定数目的线程,设定超时时间
        self.createWorkers(num_workers, poll_timeout)

    def createWorkers(self, num_workers, poll_timeout=5):
        ##创造新的线程给线程池,并设定判断检查线程是否被取消的时间间隔
        """Add num_workers worker threads to the pool.

        ``poll_timout`` sets the interval in seconds (int or float) for how
        ofte threads should check whether they are dismissed, while waiting for
        requests.

        """
        for i in range(num_workers):
            self.workers.append(WorkerThread(self._requests_queue,
                self._results_queue, poll_timeout=poll_timeout))

    def dismissWorkers(self, num_workers, do_join=False):
        ‘‘‘停用num_workers数量的线程,并加入dismiss_list‘‘‘
        dismiss_list = []
        for i in range(min(num_workers, len(self.workers))):
            worker = self.workers.pop()
            worker.dismiss()
            dismiss_list.append(worker)

        if do_join:
            for worker in dismiss_list:
                worker.join()
        else:
            self.dismissedWorkers.extend(dismiss_list)

    def joinAllDismissedWorkers(self):
        ‘‘‘join 所有停用的thread‘‘‘
        for worker in self.dismissedWorkers:
            worker.join()
        self.dismissedWorkers = []

    def putRequest(self, request, block=True, timeout=None):
        """Put work request into work queue and save its id for later."""
        ##判断确认是WorkRequest的实例
        assert isinstance(request, WorkRequest)
        # don‘t reuse old work requests
        #判断不存在exception异常的情况
        assert not getattr(request, exception, None)
        # 当queue满了,也就是容量达到了前面设定的q_size,它将一直阻塞,直到有空余位置,或是timeout
        self._requests_queue.put(request, block, timeout)
        self.workRequests[request.requestID] = request

    def poll(self, block=False):
        """Process any new results in the queue."""
        ##处理任何队列中任何新的结果
        while True:
            # 如果没有新的任务,触发无结果异常
            if not self.workRequests:
                raise NoResultsPending
            # 如果线程被阻塞,并且五可用线程,触发无进程异常
            elif block and not self.workers:
                raise NoWorkersAvailable
            try:
                # get back next results
                request, result = self._results_queue.get(block=block)
                # has an exception occured?
                if request.exception and request.exc_callback:
                    request.exc_callback(request, result)
                # hand results to callback, if any
                if request.callback and not                        (request.exception and request.exc_callback):
                    request.callback(request, result)
                del self.workRequests[request.requestID]
            except Queue.Empty:
                break

    def wait(self):
        """Wait for results, blocking until all have arrived."""
        while 1:
            try:
                self.poll(True)
            except NoResultsPending:
                break


################
# USAGE EXAMPLE
################

if __name__ == __main__:
    import random
    import time

    # the work the threads will have to do (rather trivial in our example)
    def do_something(data):
        time.sleep(random.randint(1,5))
        result = round(random.random() * data, 5)
        # just to show off, we throw an exception once in a while
        if result > 5:
            raise RuntimeError("Something extraordinary happened!")
        return result

    # this will be called each time a result is available
    def print_result(request, result):
        print("**** Result from request #%s: %r" % (request.requestID, result))

    # this will be called when an exception occurs within a thread
    # this example exception handler does little more than the default handler
    def handle_exception(request, exc_info):
        if not isinstance(exc_info, tuple):
            # Something is seriously wrong...
            print(request)
            print(exc_info)
            raise SystemExit
        print("**** Exception occured in request #%s: %s" %           (request.requestID, exc_info))

    # assemble the arguments for each job to a list...
    data = [random.randint(1,10) for i in range(20)]
    # ... and build a WorkRequest object for each item in data
    requests = makeRequests(do_something, data, print_result, handle_exception)
    # to use the default exception handler, uncomment next line and comment out
    # the preceding one.
    #requests = makeRequests(do_something, data, print_result)

    # or the other form of args_lists accepted by makeRequests: ((,), {})
    data = [((random.randint(1,10),), {}) for i in range(20)]
    requests.extend(
        makeRequests(do_something, data, print_result, handle_exception)
        #makeRequests(do_something, data, print_result)
        # to use the default exception handler, uncomment next line and comment
        # out the preceding one.
    )

    # we create a pool of 3 worker threads
    print("Creating thread pool with 3 worker threads.")
    main = ThreadPool(3)

    # then we put the work requests in the queue...
    for req in requests:
        main.putRequest(req)
        print("Work request #%s added." % req.requestID)
    # or shorter:
    # [main.putRequest(req) for req in requests]

    # ...and wait for the results to arrive in the result queue
    # by using ThreadPool.wait(). This would block until results for
    # all work requests have arrived:
    # main.wait()

    # instead we can poll for results while doing something else:
    i = 0
    while True:
        try:
            time.sleep(0.5)
            main.poll()
            print("Main thread working...")
            print("(active worker threads: %i)" % (threading.activeCount()-1, ))
            if i == 10:
                print("**** Adding 3 more worker threads...")
                main.createWorkers(3)
            if i == 20:
                print("**** Dismissing 2 worker threads...")
                main.dismissWorkers(2)
            i += 1
        except KeyboardInterrupt:
            print("**** Interrupted!")
            break
        except NoResultsPending:
            print("**** No pending results.")
            break
    if main.dismissedWorkers:
        print("Joining all dismissed worker threads...")
        main.joinAllDismissedWorkers()
threadpool

 

##线程池之TaskManager

线程池的实现主要分两部分,一部分是TaskMagager,即任务管理类,用来调度任务,一部分是Work,即具体需要执行的业务代码。线程池的这种设计模式在很多地方都可以借鉴。

技术分享
class TaskManager():

    def __init__(self,maxTasks,maxThreads):
        #最大任务书,也就是Queue的容量
        self._maxTasks = maxTasks;
        #线程池中线程数量    
        self._maxThreads = maxThreads;
        #业务代码
        ….
        ….

        #任务池
        self._taskQueue = Queue.Queue(maxTasks);
        #线程池,使用列表实现
        self._threads = [];

        #在__init__中调用方法
        self.initThreads();
        self.initTaskQueue();

    #初始化任务池
    def initTaskQueue(self):
        while True:
        #业务代码
            if not self._taskQueue.full():
                getTasks(self._maxTasks - self._taskQueue.qsize());
                for task in taskMap["tasks"]:
                self._taskQueue.put(task);
                time.sleep(1);

    #初始化线程池
    def initThreads(self):
        for i in range(self._maxThreads):
        #调用每个线程执行的具体任务
        self._threads.append(Work(self,self._reportUrl));

    def getTask(self):
        return self._taskQueue.get();

#具体执行的任务
class Work(threading.Thread):
    def __init__(self,taskmgr):
    threading.Thread.__init__(self);
    self._logger = logging.getLogger("");
    self.start();

    def run(self):
        while True:
            try:
                #取出任务并执行相关操作
                self._taskmgr.getTask();
                ……
                ……

                time.sleep(1);
            except Exception,e:
                self._logger.exception(e);            
TaskManager

 

TaskManager 

    先来看TaskManager,主要包含四个方法,一个构造方法,接受传进来的参数,执行任务池和线程池的大小等初始化信息,然后调用initTaskQueue和initThread方法初始化任务池和线程池。

    最后一个方法getTask返回TaskManager类的一个实例。

Work 

    执行具体的业务

过程分析 

  1. TaskManager的__init__方法初始化线程池和任务池
  2. initTaskQueue方法,初始化任务池,将任务填充到任务队列。
  3. initThreads方法,初始化线程池,调用Work类执行任务。
  4. getTask方法,返回TaskManager实例,主要作用是传给Work类,让子线程从任务队列中取出任务执行。
  5. Work类的__init__方法初始化线程,并启动线程。
  6. Run方法,执行任务,并且从任务队列中取出任务。

关键点:

  1. 在主线程,也即TaskManager的initTaskQueue方法中获取任务并填充任务池
  2. 在各个子线程中,也即Work类的run方法中获取任务池中的任务并执行。

    这里需要注意的是,前面提到过,Python中的Queue是线程安全的,Queue的get方法是阻塞式,也即,如果Queue为空,子线程取不到任务,会进行等待,直到Queue中有任务可取。

    三、在TaskManager的__init__方法中,最好先启动线程,在启动任务池。

self.initThreads();

self.initTaskQueue();

否则在initTaskQueue(主线程)中的while循环会一直执行,将会阻塞线程池的执行。在第二点中说明过,先启动线程池,就算任务池没有任务,子线程也会阻塞等待任务池中出现新任务

 参考:

http://www.cnblogs.com/coser/archive/2012/03/10/2389264.html

http://www.cnblogs.com/wupeiqi/articles/4839959.html

http://www.cnblogs.com/Eva-J/p/5106564.html

线程池

标签:

原文地址:http://www.cnblogs.com/jasonwang-2016/p/5687831.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!