标签:
今天已是学习Python的第十一天,来干一碗鸡汤继续今天的内容,今天的鸡汤是:超越别人对你的期望。本篇博客主要介绍以下几点内容:
线程的基本使用;
线程的锁机制;
生产者消费之模型(队列);
如何自定义线程池;
进程的基本使用;
进程的锁机制;
进程之间如何实现数据共享;
进程池;
协程的基本使用。
    上篇博客已经介绍过如何创建多线程的程序,在这里在复习一下如何创建线程过程以及线程的一些方法:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | importthreadingclassMyThread(threading.Thread):         #首先继承线程类        def__init__(self,func,args):        self.func =func        self.args =args        super(MyThread,self).__init__()   #执行父类的所有构造方法                                              defrun(self):                        #因为会在创建线程后自动触发run方法,我们自定义run方法,让线程来执行此方法        self.func(self.args)deff1(args):    print(args)obj =MyThread(f1,123)obj.start()                        #开启线程#结果输出:123 | 
线程的方法:
start:线程准备就绪,等待CPU调度;
setName:为线程设置名称;
getName:获取线程名称;
setDaemon(布尔值):设置为主线程是否等待子线程执行(默认False);
如果是将setDaemon设置成True,主线程执行过程中,子线程也在进行,主线程执行完毕后,子线程不论成功与否,均停止主线程不会等子线程;
如果值为False,主线程执行过程中,子线程也在执行,主线程执行完毕后,等待子线程也执行完成后,程序停止。
join(秒):表示主线程到此,会等待子线程执行,参数表示主线程在此最多等待N秒后,继续往下执行;
run:线程被CPU调度后自动执行线程对象的run方法。
    下面我们来介绍一下线程的锁机制,由于线程之间是进行随机调度,并且每个线程可能只执行N条操作,当多个线程同时修改同一条数据时可能会出现脏数据,所以出现了线程锁。在python中分为三种线程锁:互斥锁(lock,Rlock)、信号量(Semaphore)、事件(event),还有一个条件(Condition)配合线程锁来使用,下面分别介绍这几种锁:
(1)、互斥锁(lock,Rlock)
我们先看一下不加线程锁的程序的执行结果:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | importthreadingimporttimeNUM =10deffunc(i):    globalNUM    NUM -=1    time.sleep(1)    print(NUM)fori inrange(10):                #创建10个线程,去执行上面的函数    t =threading.Thread(target=func,args=(i,))    t.start()#因为没有线程锁,10个线程同时去修改上面的NUM,导致出现脏数据,结果:0000000000 | 
当我们加上线程锁后,效果就会避免上面现象的发生:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | importthreadingimporttimeNUM =10deffunc(i,l):    globalNUM    l.acquire()   #加锁,    NUM -=1    time.sleep(2)    print(NUM,i)    l.release()   #开锁# lock = threading.Lock()    #只能锁一次,一般不推荐使用lock =threading.RLock()     #推荐使用Rlock,可以在程序中锁一次或多次,一次性只能允许一个线程操作fori inrange(10):    t =threading.Thread(target=func,args=(i,lock,))    t.start()#结果:90817263544536271809 | 
(2)、信号量(Semaphore)
上面我们介绍了互斥锁,我们发现,互斥锁同时只能允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,比如肯德基有3个购餐的窗口,那最多只允许3个人购买,后面的人只能等前面的人买完才能购买。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | importthreadingimporttimeNUM =10deffunc(i,l):    globalNUM    l.acquire()   #加锁,    NUM -=1    time.sleep(1)    print(NUM,i)    l.release()   #开锁lock =threading.BoundedSemaphore(5)    #一次可以允许多个线程更改数据fori inrange(10):    t =threading.Thread(target=func,args=(i,lock,))    t.start()#结果5个线程同时修改数据:50413332140605070908 | 
(3)、事件(event)
Python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法:set、wait、clear。
事件处理的机制:全局定义了一个"Flag",如果"Flag"值为Flase,那么当程序执行event.wait方法时就会阻塞,如果"Flag"值为True,那么event.wait方法时便不再阻塞。
event.clear:将"Flag"设置成False,(加锁);
event.set:将"Flag"设置成True,(解锁)。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | importthreadingdeffunc(i,e):    print(i)    e.wait()           #检测是什么状态,如果是锁状态,会在此等待,如果无锁状态,直接执行下面操作,默认是锁状态    print(i+100)event =threading.Event()fori inrange(10):    t =threading.Thread(target=func,args=(i,event,))    t.start()event.clear()          #主动设置成锁状态inp =input(">>>:")ifinp ==‘1‘:    event.set()        #解锁#结果:0123456789>>>:1100102103104105107108109101106 | 
(4)、条件(Condition)
使得线程等待,只有满足条件的时候,才释放N个线程去更改数据,下面通过两种方法来演示加条件的线程锁操作:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | importthreadingdeffunc(i,con):    print(i)    con.acquire()    con.wait()             #代码执行到这会阻塞,当主线程条件成立后,才会继续往下执行    print(i+100)    con.release()c =threading.Condition()  #创建条件,满足这个条件会执行线程fori inrange(10):    t =threading.Thread(target=func,args=(i,c,))    t.start()whileTrue:    inp =input(‘>>>:‘)  #获取用户输入,输入几,允许几个线程操作    ifinp ==‘q‘:        break    c.acquire()    c.notify(int(inp))    #notify:通知其他线程,那些挂起的线程接到这个通知之后会开始运行。通常三个方法放一起,代码格式规定    c.release()#结果:0123456789>>>:2>>>:1001013>>>:103102104 | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | importthreadingdefcondition():    ret =False    r =input(‘>>>:‘)  #获取用户输入,如果是true,就允许一个线程执行    ifr ==‘true‘:        ret =True    else:        ret =False    returnretdeffunc(i,con):    print(i)    con.acquire()    con.wait_for(condition)    print(i+100)    con.release()c =threading.Condition()fori inrange(10):    t =threading.Thread(target=func,args=(i,c,))    t.start()#结果:>>>:123456789true100>>>: | 
(5)、Timer
Timer:定时器,指定N秒之后执行某操作。
| 1 2 3 4 5 6 7 8 | fromthreading importTimerdefhello():    print("hello, world")t =Timer(1, hello)   #线程等待1秒,执行后面的函数t.start() | 
    Queue模块实现了多生产者、多消费者队列,它特别适用于多线程编程。Queue类中实现了所有需要的锁语义,Queue模块实现了四种类型的队列:
queue.Queue:先进先出队列(FIFO),第一加入队列的任务,被第一个取出;
queue.LifoQueue:后进先出队列(LIFO),最后加入队列的任务,被第一个取出
queue.PriorityQueue:优先级队列,保持队列数据有序,是根据权重判断取出顺序,最小值被先取出。
queue.deque:双向队列,一种支持向两端高效地插入数据、支持随机访问的容器
下面通过例子来详细介绍一下先进先出队列的使用方法:
queue.Queue(先进先出):
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | importqueueq =queue.Queue(2)    #队列最大支持两个链接q.put(11)             #向队列中放入元素q.put(12)print(q.qsize())      #输出队列的的大小print(q.get())        #移除列队元素并将元素返回print(q.get())#结果:2#表示队列中有两个元素1112 | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | importqueueq =queue.Queue(2)                #队列最大支持两个链接     q.put(11)                         #向队列中放入元素q.put(12)print(q.empty())                  #判断队列是否为空#q.put(22)                        #如果队列里满了,会在此阻塞,因为队列最大支持两个链接#q.put(22,timeout=2)              #如果我们使用这种方式会在这阻塞2秒然后报错q.put(33,block=False,timeout=2)   #block= False 设置程序不阻塞,直接报错print(q.get())print(q.get())# print(q.get())        #同样在移除元素的时候也有相同的方法,可以设置超时时间print(q.get(timeout=2))#结果,报错:  File"E:/project/Day11/线程/s1.py", line 51, in<module>    q.put(33,block=False,timeout=2)  File"C:\Users\Henry\AppData\Local\Programs\Python\Python35\lib\queue.py", line 130, input    raiseFullqueue.Full | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | importqueueq =queue.Queue(5)q.put(123)q.put(456)print(q.get())  q.task_done()   #在完成一项工作后,会向队列发送一个确认信号,知道取完数据后,join才会终止程序,要么join会一直阻塞print(q.get())q.task_done()q.join()        #实际上意味着等到队列为空,再执行别的操作#结果:123456 | 
通过上面的例子,我们总结一下queue队列提供的公共方法:
Queue.put:向队列中放入元素,block是否阻塞(默认True),timeout阻塞时的超时时间;
Queue.get:移除队列中的元素,block是否阻塞,timeout阻塞时超时时间;
queue.Queue(Maxsize):Maxsize,设置队列支持最大的个数;
Queue.qsize:队列的真实个数;
Queue.join,Queue.task_done:阻塞进程,当队列中任务执行完毕后,不再阻塞;
Queue.empty:判断队列是否为空。
queue.LifoQueue(后进先出):
| 1 2 3 4 5 6 7 8 | importqueueq =queue.LifoQueue()       #后进先出q.put(123)q.put(456)print(q.get())#结果:456 | 
queue.PriorityQueue(优先级队列):
| 1 2 3 4 5 6 7 8 | q =queue.PriorityQueue()   #根据优先级处理q.put((1,"jack1"))    #在优先级相同的情况下,后根据顺序输出q.put((2,"jack2"))q.put((3,"jack3"))print(q.get())#结果:(1, ‘jack1‘) | 
queue.deque(高性能双向队列):
| 1 2 3 4 5 6 7 8 9 10 11 12 | importqueueq=queue.deque()          #双向队列q.append((123))q.append(234)q.appendleft(456)         #从左边去一个值print(q.pop())print(q.popleft())#结果:234456 | 
为什么说它是高性能的队列我们来对比双向队列、普通队列和列表的处理速度我们一起来看一下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | importtimeimportqueueimportcollectionsq =collections.deque()t0 =time.clock()fori inrange(1000000):    q.append(1)fori inrange(1000000):    q.popleft()print(‘deque‘, time.clock() -t0)q =queue.Queue(2000000)t0 =time.clock()fori inrange(1000000):    q.put(1)fori inrange(1000000):    q.get()print(‘Queue‘, time.clock() -t0)q =[]t0 =time.clock()fori inrange(1000000):    q.append(i)fori inrange(1000000):    q.insert(0,i)print(‘list ‘, time.clock() -t0)#结果:deque 1.2658434773287475Queue 36.728385720614725list#这个结果忽略吧,太长时间了.... | 
下面结合上面的知识来写一个生产者消费者模型:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | #生产者消费者模型,解耦的意思就是两个程序之间,互相没有关联了,互不影响。importqueueimportthreadingimporttimeq =queue.Queue(20)      #队列里最多存放20个元素defproductor(arg):            #生成者,创建30个线程来请求吃包子,往队列里添加请求元素    q.put(str(arg) +‘- 包子‘) fori inrange(30):    t =threading.Thread(target=productor,args=(i,))    t.start()defconsumer(arg):       #消费者,接收到队列请求以后开始生产包子,来消费队列里的请求    whileTrue:        print(arg,q.get())        time.sleep(2)forj inrange(3):    t =threading.Thread(target=consumer,args=(j,))    t.start() | 
    在使用多线程处理任务也不是线程越多越好,由于在切换线程的时候,需要切换上下文环境,依然会造成CPU的大量开销。为了解决这个问题,就引出了线程池的概念。预先创建一个批线程,然过来的任务立刻能够使用,使用完以后自动释放来去处理新的任务,在Python中,没有内置的较好的线程池模块,需要自己实现或使用第三方模块,下面我们尝试来自定义一个线程池:
初级版:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | importqueueimportthreadingimporttimeclassThreadPool:    def__init__(self,maxsize=5):        self.maxsize =maxsize            #线程池大小为5        self._q =queue.Queue(maxsize)        fori inrange(maxsize):            self._q.put(threading.Thread) #先往队列里插入的线程池大小的元素,元素为Threading.Thread类,等待处理请求    defget_thread(self):        returnself._q.get()    defadd_thread(self):        self._q.put(threading.Thread)pool =ThreadPool(5)             #创建线程池deftask(arg,p):    """    在队列里添加一个元素    :param arg: 循环的数值    :param p: 线程池的对象    :return:     """    print(arg)    time.sleep(1)    p.add_thread()fori inrange(100):    t =pool.get_thread()         #get,threading.Thread类去消费队列里的一个线程    obj =t(target=task,args=(i,pool,))      obj.start() | 
进阶版:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | #!/usr/bin/env python# -*- coding:utf-8 -*-importqueueimportthreadingimportcontextlibimporttimeStopEvent =object()classThreadPool(object):    def__init__(self, max_num, max_task_num =None):        ifmax_task_num:            self.q =queue.Queue(max_task_num)        else:            self.q =queue.Queue()        self.max_num =max_num        self.cancel =False        self.terminal =False        self.generate_list =[]        self.free_list =[]    defrun(self, func, args, callback=None):        """        线程池执行一个任务        :param func: 任务函数        :param args: 任务函数所需参数        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)        :return: 如果线程池已经终止,则返回True否则None        """        ifself.cancel:            return        iflen(self.free_list) ==0andlen(self.generate_list) < self.max_num:            self.generate_thread()        w =(func, args, callback,)        self.q.put(w)    defgenerate_thread(self):        """        创建一个线程        """        t =threading.Thread(target=self.call)        t.start()    defcall(self):        """        循环去获取任务函数并执行任务函数        """        current_thread =threading.currentThread        self.generate_list.append(current_thread)        event =self.q.get()        whileevent !=StopEvent:            func, arguments, callback =event            try:                result =func(*arguments)                success =True            exceptException as e:                success =False                result =None            ifcallback isnotNone:                try:                    callback(success, result)                exceptException as e:                    pass            with self.worker_state(self.free_list, current_thread):                ifself.terminal:                    event =StopEvent                else:                    event =self.q.get()        else:            self.generate_list.remove(current_thread)    defclose(self):        """        执行完所有的任务后,所有线程停止        """        self.cancel =True        full_size =len(self.generate_list)        whilefull_size:            self.q.put(StopEvent)            full_size -=1    defterminate(self):        """        无论是否还有任务,终止线程        """        self.terminal =True        whileself.generate_list:            self.q.put(StopEvent)        self.q.empty()    @contextlib.contextmanager    defworker_state(self, state_list, worker_thread):        """        用于记录线程中正在等待的线程数        """        state_list.append(worker_thread)        try:            yield        finally:            state_list.remove(worker_thread)# How to usepool =ThreadPool(5)defcallback(status, result):    # status, execute action status    # result, execute action return value    passdefaction(i):    print(i)fori inrange(30):    ret =pool.run(action, (i,), callback)time.sleep(5)print(len(pool.generate_list), len(pool.free_list))print(len(pool.generate_list), len(pool.free_list))# pool.close()# pool.terminate() | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | #!/usr/bin/env python# -*- coding: utf-8 -*-#Author:HaiFeng Difrommultiprocessing importProcess   #所有进程相关的模块都在multiprocessing模块中调用importthreadingimporttimedeffoo(i):    print(‘say hi:‘,i)if__name__==‘__main__‘:      #注意:进程在windows创建需要加上__name__函数,Linux环境下不用    fori inrange(10):       #创建10个进程        p =Process(target=foo,args=(i,))        p.start()#结果:say hi: 4say hi: 3say hi: 1say hi: 2say hi: 7say hi: 0say hi: 5say hi: 8say hi: 9say hi: 6 | 
注意:由于进程之间的数据需要各自持有一份,所以创建进程需要非常大的开销。
进程各自持有一份数据,默认是无法共享数据的。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | #!/usr/bin/env python# -*- coding: utf-8 -*-frommultiprocessing importProcessli =[]deffoo(i):    li.append(i)    print(‘say hi‘,li)if__name__==‘__main__‘:    fori inrange(10):        p =Process(target=foo,args=(i,))        p.start()    print(‘ending‘,li)#结果:ending []say hi [5]say hi [9]say hi [2]say hi [1]say hi [3]say hi [6]say hi [7]say hi [0]say hi [8]say hi [4] | 
通过上面的例子可以看出,每个进程都有自己的一份数据,没有共享数据,在Python中我们通常通过调用第三方模块的方式来实现进程之间的数据共享,主要是调用multiprocessing的Queues、Array、Manager这三个模块。下面我们通过例子类看一下具体用法:
方法一:通过调用queues共享数据
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | frommultiprocessing importProcessfrommultiprocessing importqueuesimportmultiprocessingdeffoo(i,arg):    arg.put(i)    print(‘say hi‘,i,arg.qsize())if__name__==‘__main__‘:    li =queues.Queue(20,ctx=multiprocessing)    fori inrange(10):        p =Process(target=foo,args=(i,li,))        p.start()#结果:say hi 45say hi 26say hi 36say hi 66say hi 06say hi 16say hi 78say hi 58say hi 99say hi 810 | 
方法二:通过调用数组Array来共享数据
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | frommultiprocessing importProcessfrommultiprocessing importArrayimportmultiprocessingdeffoo(i,arg):    arg[i] =i +100    foritem inarg:        print(item)    print(‘============‘)if__name__==‘__main__‘:    li =Array(‘i‘,10)    fori inrange(10):        p =Process(target=foo,args=(i,li,))        p.start() | 
    当看到这端代码时有个疑问就是Array数组中的‘i‘,是什么?在Array类在实例化的时候必须指定数组的数据类型和数组的大小,具体数据类型的对照请参考下面的对应关系:
| 1 2 3 4 5 6 | ‘c‘: ctypes.c_char,    ‘u‘: ctypes.c_wchar,‘b‘: ctypes.c_byte,    ‘B‘: ctypes.c_ubyte,‘h‘: ctypes.c_short,   ‘H‘: ctypes.c_ushort,‘i‘: ctypes.c_int,     ‘I‘: ctypes.c_uint,‘l‘: ctypes.c_long,    ‘L‘: ctypes.c_ulong,‘f‘: ctypes.c_float,   ‘d‘: ctypes.c_double | 
方法三:通过调用Manager字典来共享数据
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | frommultiprocessing importProcessfrommultiprocessing importManagerimportmultiprocessingdeffoo(i,arg):    arg[i] =i +100    print(arg.values())if__name__==‘__main__‘:    obj =Manager()    li =obj.dict()    fori inrange(10):        p =Process(target=foo,args=(i,li,))        p.start()        p.join()    importtime    time.sleep(0.1)#结果:[100][100, 101][100, 101, 102][100, 101, 102, 103][100, 101, 102, 103, 104][100, 101, 102, 103, 104, 105][100, 101, 102, 103, 104, 105, 106][100, 101, 102, 103, 104, 105, 106, 107][100, 101, 102, 103, 104, 105, 106, 107, 108][100, 101, 102, 103, 104, 105, 106, 107, 108, 109] | 
为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。在multprocessing里也有与线程一样支持Rlock,Lock,Event,Condition,Semaphore几种锁,用法也相同,我们来看一下进程数的例子:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | #!/usr/bin/env python# -*- coding: utf-8 -*-frommultiprocessing importProcess, Array, RLockdefFoo(lock,temp,i):    """    将第0个数加100    """    lock.acquire()    temp[0] =100+i    foritem intemp:        print(i,‘----->‘,item)    lock.release()lock =RLock()temp =Array(‘i‘, [11, 22, 33, 44])fori inrange(20):    p =Process(target=Foo,args=(lock,temp,i,))    p.start()#结果:[100][100, 101][100, 101, 102][100, 101, 102, 103][100, 101, 102, 103, 104][100, 101, 102, 103, 104, 105][100, 101, 102, 103, 104, 105, 106][100, 101, 102, 103, 104, 105, 106, 107][100, 101, 102, 103, 104, 105, 106, 107, 108][100, 101, 102, 103, 104, 105, 106, 107, 108, 109] | 
4、进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可使用的进程,那么程序就会等待,知道进程池中有可用进程为止。
进程池中有两个方法:
apply:子进程串行执行任务,达不到并发的效果;
apply_async:apply的异步版本,支持并发,推荐使用这个。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | frommultiprocessing importPoolimporttimedeff1(arg):    time.sleep(1)    print(arg)if__name__==‘__main__‘:    pool =Pool(5)    fori inrange(30):        #pool.apply(func=f1,args=(i,))      #子进程串行执行任务,达不到并发        pool.apply_async(func=f1,args=(i,)) #支持并发    pool.close()         #等待所有进程结束后,才关闭进程池    # time.sleep(2)       # pool.terminate()   #立即关闭进程池    pool.join()          #主进程等待所有子进程执行完毕,必须在close或terminate之后 | 
总结一句话:IO密集型使用多线程,计算密集型使用多进程。
线程和进程的操作是由程序触发系统接口,最后的执行者是系统,而协程的操作则是程序员自己。
协程的原理:利用一个线程,分解一个线程成为多个"微线程",程序级别。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定摸个代码块执行顺序。
协程的适用场景:当程序存在大量不需要CPU的操作时(IO),适用于协程。
使用协协程需要调用两个模块:greenlet模块(底层)、gevent模块(高性能)。
使用greenlet模块:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | fromgreenlet importgreenlet deftest1():    print(12)    gr2.switch()    print(34)    gr2.switch()  deftest2():    print(56)    gr1.switch()    print(78) gr1 =greenlet(test1)gr2 =greenlet(test2)gr1.switch()     #greenlet通过switch方法在不同任务之间进行切换 #结果:12563478 | 
使用gevent模块:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | importgeventdeffoo():    print(‘Running in foo‘)    gevent.sleep(0)    print(‘Explicit context switch to foo again‘)defbar():    print(‘Explicit context to bar‘)    gevent.sleep(0)    print(‘Implicit context switch back to bar‘)gevent.joinall([    gevent.spawn(foo),    gevent.spawn(bar),])#结果:Running infooExplicit context to barExplicit context switch to foo againImplicit context switch back to bar | 
 下面的例子我们去分别请求多个网站,遇到IO操作来实现自动切换:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | fromgevent importmonkey; monkey.patch_all()importgeventimportrequestsdeff(url):    print(‘GET: %s‘%url)    resp =requests.get(url)    data =resp.text    print(‘%d bytes received from %s.‘%(len(data), url))gevent.joinall([])#结果:GET: https://www.python.org/GET: https://www.yahoo.com/GET: https://github.com/47394bytes received fromhttps://www.python.org/.25534bytes received fromhttps://github.com/.449991bytes received fromhttps://www.yahoo.com/. | 
? 今天的内容就到这里了,例子中少了不少的注释,还有一些自己不太理解,见谅。
标签:
原文地址:http://www.cnblogs.com/phennry/p/5693630.html