码迷,mamicode.com
首页 > 编程语言 > 详细

python学习第十课续 :线程池

时间:2016-01-16 07:45:29      阅读:620      评论:0      收藏:0      [点我收藏+]

标签:线程池 pipe queue

线程分步走

         t=threading.Thread(target=fun,args=())

         t.start()

         执行流程:

        threading.Thread(target=fun,args=()) à
          self.__target = target
          self.__name = str(name or
_newname())
          self.__args = args
        t.start  à
          _start_new_thread(self.__bootstrap, ())

                            _start_new_thread àthread.start_new_thread àpass

                        self.__bootstrapàself.__bootstrap_inner()àself.run()à

                self.__target(*self.__args,**self.__kwargs)

 

在网上找到一篇改写Thread类的run方法的例子,可以扩展思路

         http://www.cnblogs.com/vamei/archive/2012/10/11/2720042.html

# A program to simulate selling tickets in multi-thread way

# Written by Vamei

 

import threading

import time

import os

 

# This function could be any function to do other chores.

def doChore():

    time.sleep(0.5)

 

# Function for each thread

class BoothThread(threading.Thread):

    def __init__(self, tid, monitor):

        self.tid          = tid

        self.monitor = monitor

        threading.Thread.__init__(self)

    def run(self):

        while True:

            monitor[‘lock‘].acquire()    # Lock; or wait if other thread is holding the lock

            if monitor[‘tick‘] != 0:

                monitor[‘tick‘] = monitor[‘tick‘] - 1          # Sell tickets

                print(self.tid,‘:now left:‘,monitor[‘tick‘])   # Tickets left

                doChore()                          # Other critical operations

            else:

                print("Thread_id",self.tid," No more tickets")

                os._exit(0)                   # Exit the whole process immediately

            monitor[‘lock‘].release()                          # Unblock

            doChore()                               # Non-critical operations

 

# Start of the main function

monitor = {‘tick‘:100, ‘lock‘:threading.Lock()}

 

# Start 10 threads

for k in range(10):

    new_thread = BoothThread(k, monitor)

    new_thread.start()

 

         首先,定义了一个类BoothThread, 这个类继承自thread.Threading类。然后把要进行的操作统统放入到BoothThread类的run()方法中。实例化booththread的时候,第一个参数就不必为一个函数了,它可以定义为类booththread需要的参数。

注意

1. booththread的类中,因为它继承自threading.Thread,所以在它的__init__中,必须调用基本类的构造器threading.Thread.__init__(self)

2. 本例没有使用全局变量声明global,而是使用了一个词典monitor存放全局变量,然后把词典作为参数传递给线程函数。由于词典是可变数据对象,所以当它被传递给函数的时候,函数所使用的依然是同一个对象,相当于被多个线程所共享。这也是多线程乃至于多进程编程的一个技巧 (应尽量避免global声明的用法,因为它并不适用于windows平台)

 

Pipe Queue

IPC,进程间通信(Inter-Process Communication)就是指多个进程之间相互通信,交换信息的方法,常用的一般是sockerpcpipe和消息队列等。multiprocessing提供了IPC(PipeQueue),使Python多进程并发,效率上更高

         Pipe

import multiprocessing as mul

 

def proc1(pipe):

    pipe.send(‘hello,i am 1‘)

    print(‘proc1 rec:‘,pipe.recv())

 

def proc2(pipe):

    pipe.send(‘hello,i am 2‘)

    print(‘proc2 rec:‘,pipe.recv())

#    pipe.send(‘hello, too‘)

 

# Build a pipe

pipe = mul.Pipe()

 

# Pass an end of the pipe to process 1

p1   = mul.Process(target=proc1, args=(pipe[0],))

# Pass the other end of the pipe to process 2

p2   = mul.Process(target=proc2, args=(pipe[1],))

p1.start()

p2.start()

p1.join()

p2.join()

                                             

输出结果:

(‘proc1 rec:‘, ‘hello,i am 2‘)

(‘proc2 rec:‘, ‘hello,i am 1‘)

        

multiprocessing.Pipe([duplex])

Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.

If duplex is True (the default) then the pipe is bidirectional. If duplex is False then the pipe is unidirectional: conn1 can only be used for receiving messages and conn2 can only be used for sending messages.

#生成两个对象,用这两个对象,来互相的交流。每个对象代表Pipe的一端(Connection对象)。我们对Pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。

duplex默认为True,表示双向,单向管道只允许管道一端的进程输入,另一端输出,而双向管道则允许从两端输入输出。Pipe主要是用来两个进程之间进行通信。也可以多个进程占用一端,与另一端的进程通信,但同一端的进程之间不能进行通信

import multiprocessing as mul

 

def proc1(pipe):

    pipe.send(‘hello,i am 1‘)

    print(‘proc1 rec:‘,pipe.recv())

    print(‘proc1 rec:‘,pipe.recv())

 

def proc2(pipe):

    pipe.send(‘hello,i am 2‘)

    print(‘proc2 rec:‘,pipe.recv())

 

def proc3(pipe):

    pipe.send(‘hello,i am 3‘)

 

pipe = mul.Pipe()

 

p1   = mul.Process(target=proc1, args=(pipe[0],))

p2   = mul.Process(target=proc2, args=(pipe[1],))

p3   = mul.Process(target=proc3, args=(pipe[1],))

p1.start()

p2.start()

p3.start()

p1.join()

p2.join()

p3.join()

         输出结果:

         (‘proc1 rec:‘, ‘hello,i am 2‘)

         (‘proc1 rec:‘, ‘hello,i am 3‘)

         (‘proc2 rec:‘, ‘hello,i am 1‘)

 

         Queue

         QueuePipe相类似,都是先进先出的结构。但Queue允许多个进程放入,多个进程从队列取出对象。Queue使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。

         import Queue

queue = Queue.Queue(10)

for i in range (5):

    queue.put(i)

print queue.get()

print queue.get()

print queue.qsize()

 

下面的例子:(http://www.cnblogs.com/vamei/

# Written by Vamei

import os

import multiprocessing

import time

#==================

# input worker

def inputQ(queue):

    info = str(os.getpid()) + ‘(put):‘ + str(time.time())

    queue.put(info)

 

# output worker

def outputQ(queue,lock):

    info = queue.get()

    lock.acquire()

    print (str(os.getpid()) + ‘(get):‘ + info)

    lock.release()

#===================

# Main

record1 = []   # store input processes

record2 = []   # store output processes

lock  = multiprocessing.Lock()    # To prevent messy print

queue = multiprocessing.Queue(3)

 

# input processes

for i in range(10):

    process = multiprocessing.Process(target=inputQ,args=(queue,))

    process.start()

    record1.append(process)

 

# output processes

for i in range(10):

    process = multiprocessing.Process(target=outputQ,args=(queue,lock))

    process.start()

    record2.append(process)

 

for p in record1:

    p.join()

 

queue.close()  # No more object will come, close the queue

 

for p in record2:

    p.join()

        

 

multiprocessing

         queues.py  __all__ = [‘Queue‘, ‘SimpleQueue‘, ‘JoinableQueue‘]

                  Queue -- type using a pipe, buffer and thread

                  joinablequeue -- A queue type which also supports join() and task_done() methods

                  simplequeue -- really just a locked pipe

         connection.py

                  def Pipe(duplex=True)

Queue.py   __all__ = [‘Empty‘, ‘Full‘, ‘Queue‘, ‘PriorityQueue‘, ‘LifoQueue‘]

一些进程使用put()Queue中放入字符串,这个字符串中包含PID和时间。另一些进程从Queue中取出,并打印自己的PID以及get()的字符串。

如果例中的multiprocessing.Queue换成Queue.Queue,就不能执行,为什么?

, Queue.Queue没有close()方法,multiprocess.Queue

 

 

线程池

         简单的线程池 

         import Queue,time

         class threadpool(object):

                  def __init__(self,maxnum=20):

                      self.queue = Queue.Queue(maxnum)

                          for i in xrange(maxnum):

                                   self.queue.put(threading.Thread)

                  def get_thread(self):

                          self.queue.get()

                  def add_thread(self):

                          self.queue.put(threading.Thread)

         pool = threadpool(10)

         def foo(arg,pool):

                  print arg

                  time.sleep(0.2)

                  pool.add_thread()

                 

         for i in range(20):

                  thread = pool.get_thread()

                  t = thread(target = foo,args=(i,pool))

                  t.start()

 

         用列表来代替queue实现相同功能

         import time

         import threading

         class threadpool():

                  def __init__(self,maxnum = 20):

                          self.t_list = []

                          self.lock = threading.Lock()

                          for i in xrange(maxnum):

                                   self.t_list.append(threading.Thread)

                  def get_thread(self):

                          self.lock.acquire()            #用来模拟线程安全

                          while len(self.t_list) == 0:

                                   time.sleep(1)

                          x = self.t_list.pop(0)

                          self.lock.release()

                          return x

                  def add_thread(self):

                          self.t_list.append(threading.Thread)

         pool = threadpool(10)

         def foo(i,p):

                  print i

                  time.sleep(2)

                  p.add_thread()

         for i in xrange(30):

                  thread = pool.get_thread()

                  t = thread(target=foo,args=(i,pool))

                  t.start()

 

         复杂的线程池

         from Queue import Queue

         import contextlib  --上下文管理

         import threading

          

         WorkerStop = object()  #定义一个对象,没有什么含义,只是用来当个东西用

         class ThreadPool:

                  workers = 0

                  threadFactory = threading.Thread

                  currentThread = staticmethod(threading.currentThread)

                  def __init__(self, maxthreads=20, name=None):

                           self.q = Queue(0)

                          self.max = maxthreads

                          self.name = name

                          self.waiters = []

                          self.working = []

                 def start(self):

                          while self.workers < min(self.max, self.q.qsize()):

                                   self.startAWorker()

                 def startAWorker(self):

                          self.workers += 1

                          name = "PoolThread-%s-%s" % (self.name or id(self), self.workers)

                          newThread = self.threadFactory(target=self._worker, name=name)

                          newThread.start()

                 def callInThread(self, func, *args, **kw):

                          self.callInThreadWithCallback(None, func, *args, **kw)

                 def callInThreadWithCallback(self, onResult, func, *args, **kw):

                          o = (func, args, kw, onResult)

                          self.q.put(o)

                  

                  @contextlib.contextmanager

                  def _workerState(self, stateList, workerThread):

                          stateList.append(workerThread)

                          try:

                                   yield

                          finally:

                                   stateList.remove(workerThread)

                 def _worker(self):

                          ct = self.currentThread()

                          o = self.q.get()

                          while o is not WorkerStop:

                                   with self._workerState(self.working, ct):

                                            function, args, kwargs, onResult = o

                                            del o

                                            try:

                                                     result = function(*args, **kwargs)

                                                     success = True

                                            except:

                                                     success = False

                                                     if onResult is None:

                                                             pass

                                                    else:

                                                             pass

          

                                            del function, args, kwargs

                                           if onResult is not None:

                                                     try:

                                                             onResult(success, result)

                                                     except:

                                                             #context.call(ctx, log.err)

                                                             pass

                                                    del onResult, result

                                  with self._workerState(self.waiters, ct):

                                            o = self.q.get()

          

                  def stop(self):

                          while self.workers:

                                   self.q.put(WorkerStop)

                                   self.workers -= 1

          

         def show(arg):

                  import time

                  time.sleep(1)

                  print arg

         pool = ThreadPool(20)

          

         for i in range(500):

                  pool.callInThread(show, i)

          

         pool.start()

         pool.stop()

 

python学习第十课续 :线程池

标签:线程池 pipe queue

原文地址:http://120662.blog.51cto.com/110662/1735465

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!