标签:单元测试 异步编程 现在 解释 ken 交互 fir 忽略 调用
class concurrent.futures.Executor
Executor是一个抽象类,它提供了异步执行调用的方法。它不能直接使用,但可以通过它的两个子类ThreadPoolExecutor或者ProcessPoolExecutor进行调用。
我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。
Future可以把它理解为一个在未来完成的操作,这是异步编程的基础,传统编程模式下比如我们操作queue.get的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。
1.多线程ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor,ALL_COMPLETED,ProcessPoolExecutor,wait,as_completed,FIRST_COMPLETED import requests import time urls = ["http://127.0.0.1:8000/index", "http://127.0.0.1:8000/stuTable/"] def load_url(url, timeout): print(f"{url} start 时间 %s"%time.asctime()) s=requests.request("get",url, timeout=timeout) print(f"{url} end时间 %s"%time.asctime()) return str(s.json()) wokers=ThreadPoolExecutor(max_workers=5) tasks=[wokers.submit(load_url,i,timeout=10) for i in urls] for task in as_completed(tasks): print(task.result())
excutor对象互相引用引发死锁
ThreadPoolExecutor
是Executor
使用线程池异步执行调用的子类。
当与之关联的可调用对象Future
等待另一个对象的结果时,就会发生死锁Future
。例如:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
和:excutor 调用woker不足引发无法返回
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
2.as_completed
as_completed()方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先通知主线程。
参数是任务(submit的返回值)列表
例子见上。
3.
wait方法可以让主线程阻塞,直到满足设定的要求。
wait方法接收3个参数,等待的任务序列、超时时间以及等待条件。等待条件return_when默认为ALL_COMPLETED,表明要等待所有的任务都结束。等待条件还可以设置为FIRST_COMPLETED,表示第一个任务完成就停止等待。
from concurrent.futures import ThreadPoolExecutor,ALL_COMPLETED,ProcessPoolExecutor,wait,as_completed,FIRST_COMPLETED import requests import time urls = ["http://127.0.0.1:8000/index", "http://127.0.0.1:8000/stuTable/"] def load_url(url, timeout): print(f"{url} start 时间 %s"%time.asctime()) s=requests.request("get",url, timeout=timeout) print(f"{url} end时间 %s"%time.asctime()) return str(s.json()) wokers=ThreadPoolExecutor(max_workers=5) tasks=[wokers.submit(load_url,i,timeout=10) for i in urls] # for task in as_completed(tasks): # print(task.result()) res=wait(tasks,timeout=10,return_when=FIRST_COMPLETED) print(res.done) for i in tasks: print(i.result(),i.done(),i.cancelled())
结果分析:
return_when=FIRST_COMPLETED
http://127.0.0.1:8000/index start 时间 Sat Dec 14 12:17:48 2019
http://127.0.0.1:8000/stuTable/ start 时间 Sat Dec 14 12:17:48 2019
http://127.0.0.1:8000/index end时间 Sat Dec 14 12:17:48 2019
{<Future at 0x33a67b0 state=finished returned str>}
{‘user‘: ‘test001‘, ‘msg‘: ‘this is test index view ‘} True False
http://127.0.0.1:8000/stuTable/ end时间 Sat Dec 14 12:17:51 2019
{‘A‘: 888, ‘NN‘: 899} True False
第一个完成后就直接返回了完成的对象,即使后面通过打印获取到后完成的task的结果,
concurrent.futures.
wait
(fs,timeout = None,return_when = ALL_COMPLETED )等待fs给定的Future
实例(可能由其他Executor
实例创建 )完成。返回一组命名的2元组。第一组名为,包含在等待完成之前完成的期货(完成或取消的期货)。第二组名为,包含未完成的期货(待定或正在运行的期货)。done
not_done
超时可用于控制返回之前等待的最大秒数。 超时可以是int或float。如果未指定timeout或None
,则等待时间没有限制。
return_when指示该函数何时应返回。它必须是以下常量之一:
不变 |
描述 |
---|---|
|
以后完成或取消操作时,该函数将返回。 |
|
当将来通过引发异常结束时,该函数将返回。如果没有未来引发例外,则等同于 |
|
当所有期货结束或被取消时,该函数将返回。 |
concurrent.futures.
as_completed
(fs,timeout = None )返回由fs给定的Future
实例(可能由不同的Executor
实例创建)的迭代器,该迭代器将在完成时生成期货(完成或取消的期货)。由fs给定的任何重复的期货将被退回一次。之前完成的任何期货 都将首先产生。返回的迭代器引发if 调用,并且从原始调用到超时秒后,结果不可用。 超时可以是int或float。如果 未指定timeout或,则等待时间没有限制。as_completed()
concurrent.futures.TimeoutError
__next__()
as_completed()
None
4.ProcessPoolExecutor对象
本ProcessPoolExecutor
类是Executor
使用的过程池异步执行调用子类。 ProcessPoolExecutor
使用该multiprocessing
模块,它可以避开全局解释器锁定,但也意味着只能执行和返回可拾取对象。
该__main__
模块必须可由工作程序子进程导入。这意味着ProcessPoolExecutor
在交互式解释器中将不起作用。
从可调用对象提交到的调用Executor
或Future
方法ProcessPoolExecutor
将导致死锁。
concurrent.futures.
ProcessPoolExecutor
(max_workers = None,mp_context = None,initializer = None,initargs =())Executor
使用最多max_workers进程池异步执行调用的子类。如果max_workers是None
或者没有给出,将默认为机器上的处理器数量。如果max_workers小于或等于0
,则将ValueError
引发a。在Windows上,max_workers必须等于或小于61
。如果不是,ValueError
则将被引发。如果max_workers是None
,那么61
即使有更多处理器可用,默认选择也将是最多。 mp_context可以是多处理上下文,也可以是“无”。它将用来发动工人。如果mp_context是None
如果未指定,则使用默认的多处理上下文。
初始化程序是一个可选的可调用对象,它在每个工作进程开始时被调用;initargs是传递给初始化程序的参数的元组。如果初始化器引发异常,则所有当前暂挂的作业都会引发BrokenProcessPool
,以及任何尝试向池中提交更多作业的尝试。
在版本3.3中进行了更改:当其中一个工作进程突然终止时, BrokenProcessPool
现在会引发错误。以前,行为是不确定的,但是对执行器或其期货的操作通常会冻结或死锁。
from multiprocessing import Process,Lock def func_mutiprocess(i): def ss(): l = [] for i in range(1000000000000000000): l.append(i) loc=Lock() loc.acquire() ss() print(f"this process {i},{time.asctime()}") loc.release() def process_input(): pool=[] pools=[Process(target=func_mutiprocess,args=(i,) ) for i in range(multiprocessing.cpu_count())] for p in pools: p.start() pool.append(p) for j in pool: j.join() if __name__ == ‘__main__‘: while True: try: process_input() except Exception as e: pass
这个一个多进程引发内存占用100%爆掉的反面案例,原因是利用多进程创建超大列表容器
关于Future:
所述Future
类封装一个可调用的异步执行。 Future
实例由创建Executor.submit()
。
concurrent.futures.
Future
封装可调用对象的异步执行。 Future
实例是由Executor.submit()
测试人员创建的,除了测试外,不应直接创建。
cancel
()尝试取消呼叫。如果该调用当前正在执行或正在运行,并且无法取消,则该方法将返回
False
,否则,该调用将被取消并且该方法将返回True
。
cancelled
()
True
如果呼叫已成功取消,则返回。
running
()
True
如果当前正在执行该调用且无法取消该调用,则返回。
done
()返回
True
如果调用成功取消或结束运行。
result
(timeout = None )返回调用返回的值。如果呼叫尚未完成,则此方法将等待超时秒数。如果呼叫未在超时秒内完成,则将
concurrent.futures.TimeoutError
引发a。超时可以是int或float。如果未指定timeout或None
,则等待时间没有限制。如果在完成之前取消了未来,
CancelledError
则将被提出。如果调用引发,则此方法将引发相同的异常。
exception
(timeout = None )返回调用引发的异常。如果呼叫尚未完成,则此方法将等待超时秒数。如果呼叫未在超时秒内完成,则将
concurrent.futures.TimeoutError
引发a。 超时可以是int或float。如果未指定timeout或None
,则等待时间没有限制。如果在完成之前取消了未来,
CancelledError
则将被提出。如果呼叫完成而没有加注,
None
则返回。
add_done_callback
(fn )将可调用的fn附加到将来。 当取消未来或完成运行时,将调用fn,并将future作为唯一参数。
添加的可调用对象按添加顺序被调用,并且始终在属于添加它们的进程的线程中调用。如果可调用对象引发
Exception
子类,则将其记录并忽略。如果callable引发BaseException
子类,则该行为未定义。如果将来已经完成或被取消,则将立即调用fn。
以下Future
方法适用于单元测试和 Executor
实现。
set_running_or_notify_cancel
()仅
Executor
在执行与Future
和单元测试相关的工作之前,实现应调用此方法。如果方法返回
False
,Future
则取消,Future.cancel()
即被调用并返回True。等待Future
完成的所有线程(即通过as_completed()
或wait()
)都将被唤醒。如果该方法返回,
True
则该Future
不会被取消并已处于运行状态,即对的调用Future.running()
将返回True。该方法只能被调用一次,不能在调用之后
Future.set_result()
或Future.set_exception()
已经被调用。
set_result
(结果)设置与
Future
to 结果关联的工作结果。此方法仅应由
Executor
实现和单元测试使用。在版本3.8中更改:
concurrent.futures.InvalidStateError
如果Future
已经完成,则引发此方法 。
set_exception
(例外)设置与相关的工作结果
Future
的 异常。Exception
此方法仅应由
Executor
实现和单元测试使用。在版本3.8中更改:
concurrent.futures.InvalidStateError
如果Future
已经完成,则引发此方
标签:单元测试 异步编程 现在 解释 ken 交互 fir 忽略 调用
原文地址:https://www.cnblogs.com/SunshineKimi/p/12038700.html