标签:其他 用法 定义 相关 random sci 互斥锁 for 格式
一 concurrent.futures模块
开启进程池和线程池模块。
线程池和进程池的区别:进程池开启的数量范围只能在CPU对的核数以内。而线程池的数量可以自己随便的定义,默认为是核数的五倍。
相关用法:
ThreadPoolExecutor:创建一个线程池
ProcessPoolExecutor:创建一个进程池
Executor:是一个抽象类,上面的两个都是方法继承这个抽象类的
submit:异步的提交方式。
map:简化提交的方式,map自带循环,只能单纯的提交,用于int类型,并且没有放回的结果。
shutdown:提供的一个借口,等待进程池或着线程池执行完毕过后,回收调那里的资源。它里面的wait=True的时候,要等待执行解释后才会有返回结果;如果wait=False,就会理解返回结果,等到任务执行结束后才能回收调进程池或者线程池里面的资源。但是不能提交任务了。
result:拿到结果。
# import concurrent.futures # import os # import time # import random # # def work(n): # print(‘%s is working‘%os.getpid()) # time.sleep(random.random()) # return n # # if __name__==‘__main__‘: # excutor=concurrent.futures.ProcessPoolExecutor(4) # futures=[] # for i in range(20): # future=excutor.submit(work,i) # futures.append(future) # excutor.shutdown(wait=True) # print(‘%s is zhuing‘%os.getpid()) # for future in futures: # print(future.result())
简写如下:
# import concurrent.futures # import os # import time # import random # def work(n): # print(‘%s is working‘ % os.getpid()) # time.sleep(random.random()) # return n # if __name__ == ‘__main__‘: # excutor = concurrent.futures.ThreadPoolExecutor(4) # futures = [excutor.submit(work, i) for i in range(20)] # excutor.shutdown(wait=True) # print(‘%s is zhuing‘ % os.getpid()) # for future in futures: # print(future.result())
回调函数:add_done_callback后面括号里加上一个回调函数,回调函数接收的就是第一个函数返回的一个对像,使用时,还需要在回调函数的内部sunmit提交一下。
如下:
# import concurrent.futures # import requests # import time # import random # import os # def work(url): # print(‘%s is %s‘%(os.getpid(),url)) # ret=requests.get(url) # time.sleep(3) # if ret.status_code==200: # print(‘%s DONE %s‘%(os.getpid(),url)) # return {‘url‘:url,‘text‘:ret.text} # def foo(ret): # ret = ret.result() # print(‘%s FOO %s‘%(os.getpid(),ret[‘url‘])) # time.sleep(1) # res=‘%s;长度:%s‘%(ret[‘url‘],len(ret[‘text‘])) # with open(‘a.txt‘,‘a‘,encoding=‘utf-8‘)as f: # f.write(res+‘\n‘) # # if __name__==‘__main__‘: # url_list = [ # ‘http://tool.chinaz.com/regex/‘, # ‘http://www.cnblogs.com/fangjie0410/‘, # "http://www.cnblogs.com/xuanan", # "http://www.cnblogs.com/bg0131/p/6430943.html", # "http://www.cnblogs.com/wupeiqi/", # "http://www.cnblogs.com/linhaifeng/", # "http://www.cnblogs.com/Eva-J/articles/7125925.html", # "http://www.cnblogs.com/Eva-J/articles/6993515.html", # ] # excutres=concurrent.futures.ProcessPoolExecutor() # for i in url_list: # excutres.submit(work,i).add_done_callback(foo) # # print(‘主‘,os.getpid())
异常处理:exception:异常接口
concurrent.futures:自带异常,所以python内置的异常是不能识别出来的。
raise:抛出异常
import concurrent.futures import os import time import random def work(n): print(‘%s is working‘ % os.getpid()) time.sleep(random.random()) raise Exception return n if __name__ == ‘__main__‘: excutor = concurrent.futures.ThreadPoolExecutor(4) futures = [] for i in range(20): future = excutor.submit(work, i).result() futures.append(future) excutor.shutdown(wait=True) print(‘%s is zhuing‘ % os.getpid()) for future in futures: print(future)
cuncel:取消终止异常
二 死锁现象和递归锁现象
什么是死锁:各自拿着对方想要的一把锁,但是各自缺一把锁而不能释放
Lock:就是互斥锁但是容易出现死锁。只能够acquire一次,只要锁不释放(selease),就不能acquire了。
死锁现象如下:
# import threading # import time # import random # l1=threading.Lock() # l2=threading.Lock() # class Func(threading.Thread): # def run(self): # self.aa() # self.bb() # # def aa(self): # l1.acquire() # print(111) # l2.acquire() # print(222) # l2.release() # l1.release() # # def bb(self): # l2.acquire() # print(333) # time.sleep(random.random()) # l1.acquire() # print(444) # l1.release() # l2.release() # # if __name__==‘__main__‘: # for i in range(10): # ret=Func() # ret.start()
RLock:递归锁,可以赋值多个变量。
递归锁现象如下:
# import threading # import time # import random # r1=r2=threading.RLock() # class Func(threading.Thread): # def run(self): # self.aa() # self.bb() # # def aa(self): # r1.acquire() # print(111) # r2.acquire() # print(222) # r2.release() # r1.release() # # def bb(self): # r1.acquire() # print(333) # time.sleep(random.random()) # r2.acquire() # print(444) # r2.release() # r1.release() # # if __name__==‘__main__‘: # for i in range(10): # ret=Func() # ret.start()
递归锁:没加一次锁,引用技术就会加1,没减一次锁,引用技术就会减一,并且可以同时acquire多次,只要技术不为0,就不能被其他的线程抢到。
三 信号量
什么是信号量:其实就是锁,同时能够创建多个锁,实现了一个并发的效果。超出锁的范围的任务就只有等待所得释放,才能够抢到锁。相当于产生一堆新的线程和进程。
Semaphore:创建信号量,同时管理一个内置的计数器
# import threading # import time # import random # import os # def task(n): # with sm: # time.sleep(random.randint(1, 5)) # print(‘%s is tasking‘%threading.current_thread().getName()) # # if __name__==‘__main__‘: # sm=threading.Semaphore(5) # for i in range(20): # t=threading.Thread(target=task,args=(i,)) # t.start()
可以指定信号量的个数。
四 事件Event
Event:创建事件。用于线程之间的通信。Event对象实现了简单的线程之间通信机制,它提供了设置信号,清除信号,等待等用于实现线程间的通信。
set:设置Event内部的信号为真。
wait:只有在内部信号为真的时候才会很快的执行并完成返回。当Event对象的内部信号标志位假时,则wait方法一直等待到其为真时才返回。
timeout:wait里面的一个参数,时间参数,等待的时间范围。
使用Event对象的clear()方法可以清除Event对象内部的信号标志,即将其设为假,当使用Event的clear方法后,isSet()方法返回假
is_set:判断是否传入了信号
isSet:返回event的状态值
# import threading # import time # import random # e=threading.Event() # def work(): # print(‘%s 正在检测‘%threading.current_thread().getName()) # time.sleep(random.randint(1,5)) # e.set() # # def foo(): # count=1 # while not e.is_set(): # if count > 3: # raise TimeoutError(‘等待超时‘) # print(‘%s 正在等待%s次连接‘%(threading.current_thread().getName(),count)) # e.wait(timeout=random.randint(1,5)) # count+=1 # print(‘%s 正在连接‘%threading.current_thread().getName()) # if __name__==‘__main__‘: # t1=threading.Thread(target=work) # t2=threading.Thread(target=foo) # t1.start() # t2.start()
五 定时器
Timer:定时器,是thread的一个派生类,用于在指定的时间后调用调用某个方法。
# import threading # import random # def hello(n): # print(‘hello‘,n) # # if __name__==‘__main__‘: # for i in range(20): # t=threading.Timer(random.random(),hello,args=(i,)) # t.start()
六 线程queue:队列
Queue:先放入队列里的数据先读取出来
# import queue # import time # import random # import threading # q=queue.Queue(5) # def work(n): # time.sleep(random.randint(1,5)) # q.put(‘%s is working‘%n) # print(n) # # def foo(): # time.sleep(random.randint(1,3)) # print(q.get()) # # if __name__==‘__main__‘: # for i in range(20): # t1=threading.Thread(target=work,args=(i,)) # t2=threading.Thread(target=foo) # t1.start() # t2.start()
PriorityQueue:先读取优先级最高的数据,传参时,以元组的格式传入,前面传入数字,后面传入内容。数字从小到大排的优先级,如果是数字相同,就会按照ascii码从小到大排序
# import queue # import time # import random # import threading # q=queue.PriorityQueue() # def work(n): # time.sleep(random.randint(1,5)) # q.put(‘%s is working‘%n) # print(n) # # def foo(): # time.sleep(random.randint(1,3)) # print(q.get()) # # if __name__==‘__main__‘: # for i in range(20): # t1=threading.Thread(target=work,args=(i,)) # t2=threading.Thread(target=foo) # t1.start() # t2.start()
LifoQueue:先进后出,也叫做堆栈。读取数据时,按照时间短的先读取。
# import queue # import random # import time # import threading # q=queue.LifoQueue() # def work(n): # time.sleep(random.randint(1, 5)) # q.put((random.randint(1,20),‘jie_%s‘%n)) # print(n) # # def foo(): # print(q.get()) # time.sleep(random.randint(1,3)) # # if __name__==‘__main__‘: # for i in range(30): # t1=threading.Thread(target=work,args=(1,)) # t2=threading.Thread(target=foo) # t1.start() # t2.start()
标签:其他 用法 定义 相关 random sci 互斥锁 for 格式
原文地址:http://www.cnblogs.com/fangjie0410/p/7678515.html