线程分步走
t=threading.Thread(target=fun,args=())
t.start()
执行流程:
threading.Thread(target=fun,args=()) à
self.__target = target
self.__name = str(name or _newname())
self.__args = args
t.start à
_start_new_thread(self.__bootstrap, ())
_start_new_thread àthread.start_new_thread àpass
self.__bootstrapàself.__bootstrap_inner()àself.run()à
self.__target(*self.__args,**self.__kwargs)
在网上找到一篇改写Thread类的run方法的例子,可以扩展思路
http://www.cnblogs.com/vamei/archive/2012/10/11/2720042.html
# A program to simulate selling tickets in multi-thread way
# Written by Vamei
import threading
import time
import os
# This function could be any function to do other chores.
def doChore():
time.sleep(0.5)
# Function for each thread
class BoothThread(threading.Thread):
def __init__(self, tid, monitor):
self.tid = tid
self.monitor = monitor
threading.Thread.__init__(self)
def run(self):
while True:
monitor[‘lock‘].acquire() # Lock; or wait if other thread is holding the lock
if monitor[‘tick‘] != 0:
monitor[‘tick‘] = monitor[‘tick‘] - 1 # Sell tickets
print(self.tid,‘:now left:‘,monitor[‘tick‘]) # Tickets left
doChore() # Other critical operations
else:
print("Thread_id",self.tid," No more tickets")
os._exit(0) # Exit the whole process immediately
monitor[‘lock‘].release() # Unblock
doChore() # Non-critical operations
# Start of the main function
monitor = {‘tick‘:100, ‘lock‘:threading.Lock()}
# Start 10 threads
for k in range(10):
new_thread = BoothThread(k, monitor)
new_thread.start()
首先,定义了一个类BoothThread, 这个类继承自thread.Threading类。然后把要进行的操作统统放入到BoothThread类的run()方法中。实例化booththread的时候,第一个参数就不必为一个函数了,它可以定义为类booththread需要的参数。
注意
1. 在booththread的类中,因为它继承自threading.Thread,所以在它的__init__中,必须调用基本类的构造器threading.Thread.__init__(self)
2. 本例没有使用全局变量声明global,而是使用了一个词典monitor存放全局变量,然后把词典作为参数传递给线程函数。由于词典是可变数据对象,所以当它被传递给函数的时候,函数所使用的依然是同一个对象,相当于被多个线程所共享。这也是多线程乃至于多进程编程的一个技巧 (应尽量避免global声明的用法,因为它并不适用于windows平台)。
Pipe 和 Queue
IPC,进程间通信(Inter-Process Communication),就是指多个进程之间相互通信,交换信息的方法,常用的一般是socke,rpc,pipe和消息队列等。multiprocessing提供了IPC(Pipe和Queue),使Python多进程并发,效率上更高
Pipe
import multiprocessing as mul
def proc1(pipe):
pipe.send(‘hello,i am 1‘)
print(‘proc1 rec:‘,pipe.recv())
def proc2(pipe):
pipe.send(‘hello,i am 2‘)
print(‘proc2 rec:‘,pipe.recv())
# pipe.send(‘hello, too‘)
# Build a pipe
pipe = mul.Pipe()
# Pass an end of the pipe to process 1
p1 = mul.Process(target=proc1, args=(pipe[0],))
# Pass the other end of the pipe to process 2
p2 = mul.Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
输出结果:
(‘proc1 rec:‘, ‘hello,i am 2‘)
(‘proc2 rec:‘, ‘hello,i am 1‘)
multiprocessing.Pipe([duplex])
Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.
If duplex is True (the default) then the pipe is bidirectional. If duplex is False then the pipe is unidirectional: conn1 can only be used for receiving messages and conn2 can only be used for sending messages.
#生成两个对象,用这两个对象,来互相的交流。每个对象代表Pipe的一端(Connection对象)。我们对Pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。
duplex默认为True,表示双向,单向管道只允许管道一端的进程输入,另一端输出,而双向管道则允许从两端输入输出。Pipe主要是用来两个进程之间进行通信。也可以多个进程占用一端,与另一端的进程通信,但同一端的进程之间不能进行通信
import multiprocessing as mul
def proc1(pipe):
pipe.send(‘hello,i am 1‘)
print(‘proc1 rec:‘,pipe.recv())
print(‘proc1 rec:‘,pipe.recv())
def proc2(pipe):
pipe.send(‘hello,i am 2‘)
print(‘proc2 rec:‘,pipe.recv())
def proc3(pipe):
pipe.send(‘hello,i am 3‘)
pipe = mul.Pipe()
p1 = mul.Process(target=proc1, args=(pipe[0],))
p2 = mul.Process(target=proc2, args=(pipe[1],))
p3 = mul.Process(target=proc3, args=(pipe[1],))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
输出结果:
(‘proc1 rec:‘, ‘hello,i am 2‘)
(‘proc1 rec:‘, ‘hello,i am 3‘)
(‘proc2 rec:‘, ‘hello,i am 1‘)
Queue
Queue与Pipe相类似,都是先进先出的结构。但Queue允许多个进程放入,多个进程从队列取出对象。Queue使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。
import Queue
queue = Queue.Queue(10)
for i in range (5):
queue.put(i)
print queue.get()
print queue.get()
print queue.qsize()
下面的例子:(http://www.cnblogs.com/vamei/)
# Written by Vamei
import os
import multiprocessing
import time
#==================
# input worker
def inputQ(queue):
info = str(os.getpid()) + ‘(put):‘ + str(time.time())
queue.put(info)
# output worker
def outputQ(queue,lock):
info = queue.get()
lock.acquire()
print (str(os.getpid()) + ‘(get):‘ + info)
lock.release()
#===================
# Main
record1 = [] # store input processes
record2 = [] # store output processes
lock = multiprocessing.Lock() # To prevent messy print
queue = multiprocessing.Queue(3)
# input processes
for i in range(10):
process = multiprocessing.Process(target=inputQ,args=(queue,))
process.start()
record1.append(process)
# output processes
for i in range(10):
process = multiprocessing.Process(target=outputQ,args=(queue,lock))
process.start()
record2.append(process)
for p in record1:
p.join()
queue.close() # No more object will come, close the queue
for p in record2:
p.join()
multiprocessing
queues.py __all__ = [‘Queue‘, ‘SimpleQueue‘, ‘JoinableQueue‘]
Queue -- type using a pipe, buffer and thread
joinablequeue -- A queue type which also supports join() and task_done() methods
simplequeue -- really just a locked pipe
connection.py
def Pipe(duplex=True)
Queue.py __all__ = [‘Empty‘, ‘Full‘, ‘Queue‘, ‘PriorityQueue‘, ‘LifoQueue‘]
一些进程使用put()在Queue中放入字符串,这个字符串中包含PID和时间。另一些进程从Queue中取出,并打印自己的PID以及get()的字符串。
如果例中的multiprocessing.Queue换成Queue.Queue,就不能执行,为什么?
另, Queue.Queue没有close()方法,multiprocess.Queue有
线程池
简单的线程池
import Queue,time
class threadpool(object):
def __init__(self,maxnum=20):
self.queue = Queue.Queue(maxnum)
for i in xrange(maxnum):
self.queue.put(threading.Thread)
def get_thread(self):
self.queue.get()
def add_thread(self):
self.queue.put(threading.Thread)
pool = threadpool(10)
def foo(arg,pool):
print arg
time.sleep(0.2)
pool.add_thread()
for i in range(20):
thread = pool.get_thread()
t = thread(target = foo,args=(i,pool))
t.start()
用列表来代替queue实现相同功能
import time
import threading
class threadpool():
def __init__(self,maxnum = 20):
self.t_list = []
self.lock = threading.Lock()
for i in xrange(maxnum):
self.t_list.append(threading.Thread)
def get_thread(self):
self.lock.acquire() #用来模拟线程安全
while len(self.t_list) == 0:
time.sleep(1)
x = self.t_list.pop(0)
self.lock.release()
return x
def add_thread(self):
self.t_list.append(threading.Thread)
pool = threadpool(10)
def foo(i,p):
print i
time.sleep(2)
p.add_thread()
for i in xrange(30):
thread = pool.get_thread()
t = thread(target=foo,args=(i,pool))
t.start()
复杂的线程池
from Queue import Queue
import contextlib --上下文管理
import threading
WorkerStop = object() #定义一个对象,没有什么含义,只是用来当个东西用
class ThreadPool:
workers = 0
threadFactory = threading.Thread
currentThread = staticmethod(threading.currentThread)
def __init__(self, maxthreads=20, name=None):
self.q = Queue(0)
self.max = maxthreads
self.name = name
self.waiters = []
self.working = []
def start(self):
while self.workers < min(self.max, self.q.qsize()):
self.startAWorker()
def startAWorker(self):
self.workers += 1
name = "PoolThread-%s-%s" % (self.name or id(self), self.workers)
newThread = self.threadFactory(target=self._worker, name=name)
newThread.start()
def callInThread(self, func, *args, **kw):
self.callInThreadWithCallback(None, func, *args, **kw)
def callInThreadWithCallback(self, onResult, func, *args, **kw):
o = (func, args, kw, onResult)
self.q.put(o)
@contextlib.contextmanager
def _workerState(self, stateList, workerThread):
stateList.append(workerThread)
try:
yield
finally:
stateList.remove(workerThread)
def _worker(self):
ct = self.currentThread()
o = self.q.get()
while o is not WorkerStop:
with self._workerState(self.working, ct):
function, args, kwargs, onResult = o
del o
try:
result = function(*args, **kwargs)
success = True
except:
success = False
if onResult is None:
pass
else:
pass
del function, args, kwargs
if onResult is not None:
try:
onResult(success, result)
except:
#context.call(ctx, log.err)
pass
del onResult, result
with self._workerState(self.waiters, ct):
o = self.q.get()
def stop(self):
while self.workers:
self.q.put(WorkerStop)
self.workers -= 1
def show(arg):
import time
time.sleep(1)
print arg
pool = ThreadPool(20)
for i in range(500):
pool.callInThread(show, i)
pool.start()
pool.stop()
原文地址:http://120662.blog.51cto.com/110662/1735465