标签:单元测试 异步编程 现在 解释 ken 交互 fir 忽略 调用
class concurrent.futures.Executor
Executor是一个抽象类,它提供了异步执行调用的方法。它不能直接使用,但可以通过它的两个子类ThreadPoolExecutor或者ProcessPoolExecutor进行调用。
我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。
Future可以把它理解为一个在未来完成的操作,这是异步编程的基础,传统编程模式下比如我们操作queue.get的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。
1.多线程ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor,ALL_COMPLETED,ProcessPoolExecutor,wait,as_completed,FIRST_COMPLETED
import requests
import time
urls = ["http://127.0.0.1:8000/index", "http://127.0.0.1:8000/stuTable/"]
def load_url(url, timeout):
print(f"{url} start 时间 %s"%time.asctime())
s=requests.request("get",url, timeout=timeout)
print(f"{url} end时间 %s"%time.asctime())
return str(s.json())
wokers=ThreadPoolExecutor(max_workers=5)
tasks=[wokers.submit(load_url,i,timeout=10) for i in urls]
for task in as_completed(tasks):
print(task.result())
excutor对象互相引用引发死锁
ThreadPoolExecutor是Executor使用线程池异步执行调用的子类。
当与之关联的可调用对象Future等待另一个对象的结果时,就会发生死锁Future。例如:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
和:excutor 调用woker不足引发无法返回
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
2.as_completed
as_completed()方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先通知主线程。
参数是任务(submit的返回值)列表
例子见上。
3.
wait方法可以让主线程阻塞,直到满足设定的要求。
wait方法接收3个参数,等待的任务序列、超时时间以及等待条件。等待条件return_when默认为ALL_COMPLETED,表明要等待所有的任务都结束。等待条件还可以设置为FIRST_COMPLETED,表示第一个任务完成就停止等待。
from concurrent.futures import ThreadPoolExecutor,ALL_COMPLETED,ProcessPoolExecutor,wait,as_completed,FIRST_COMPLETED
import requests
import time
urls = ["http://127.0.0.1:8000/index", "http://127.0.0.1:8000/stuTable/"]
def load_url(url, timeout):
print(f"{url} start 时间 %s"%time.asctime())
s=requests.request("get",url, timeout=timeout)
print(f"{url} end时间 %s"%time.asctime())
return str(s.json())
wokers=ThreadPoolExecutor(max_workers=5)
tasks=[wokers.submit(load_url,i,timeout=10) for i in urls]
# for task in as_completed(tasks):
# print(task.result())
res=wait(tasks,timeout=10,return_when=FIRST_COMPLETED)
print(res.done)
for i in tasks:
print(i.result(),i.done(),i.cancelled())
结果分析:
return_when=FIRST_COMPLETED
http://127.0.0.1:8000/index start 时间 Sat Dec 14 12:17:48 2019
http://127.0.0.1:8000/stuTable/ start 时间 Sat Dec 14 12:17:48 2019
http://127.0.0.1:8000/index end时间 Sat Dec 14 12:17:48 2019
{<Future at 0x33a67b0 state=finished returned str>}
{‘user‘: ‘test001‘, ‘msg‘: ‘this is test index view ‘} True False
http://127.0.0.1:8000/stuTable/ end时间 Sat Dec 14 12:17:51 2019
{‘A‘: 888, ‘NN‘: 899} True False
第一个完成后就直接返回了完成的对象,即使后面通过打印获取到后完成的task的结果,
concurrent.futures.wait(fs,timeout = None,return_when = ALL_COMPLETED )等待fs给定的Future实例(可能由其他Executor实例创建 )完成。返回一组命名的2元组。第一组名为,包含在等待完成之前完成的期货(完成或取消的期货)。第二组名为,包含未完成的期货(待定或正在运行的期货)。donenot_done
超时可用于控制返回之前等待的最大秒数。 超时可以是int或float。如果未指定timeout或None,则等待时间没有限制。
return_when指示该函数何时应返回。它必须是以下常量之一:
|
不变 |
描述 |
|---|---|
|
|
以后完成或取消操作时,该函数将返回。 |
|
|
当将来通过引发异常结束时,该函数将返回。如果没有未来引发例外,则等同于 |
|
|
当所有期货结束或被取消时,该函数将返回。 |
concurrent.futures.as_completed(fs,timeout = None )返回由fs给定的Future实例(可能由不同的Executor实例创建)的迭代器,该迭代器将在完成时生成期货(完成或取消的期货)。由fs给定的任何重复的期货将被退回一次。之前完成的任何期货 都将首先产生。返回的迭代器引发if 调用,并且从原始调用到超时秒后,结果不可用。 超时可以是int或float。如果 未指定timeout或,则等待时间没有限制。as_completed()concurrent.futures.TimeoutError__next__()as_completed()None
4.ProcessPoolExecutor对象
本ProcessPoolExecutor类是Executor使用的过程池异步执行调用子类。 ProcessPoolExecutor使用该multiprocessing模块,它可以避开全局解释器锁定,但也意味着只能执行和返回可拾取对象。
该__main__模块必须可由工作程序子进程导入。这意味着ProcessPoolExecutor在交互式解释器中将不起作用。
从可调用对象提交到的调用Executor或Future方法ProcessPoolExecutor将导致死锁。
concurrent.futures.ProcessPoolExecutor(max_workers = None,mp_context = None,initializer = None,initargs =())Executor使用最多max_workers进程池异步执行调用的子类。如果max_workers是None或者没有给出,将默认为机器上的处理器数量。如果max_workers小于或等于0,则将ValueError 引发a。在Windows上,max_workers必须等于或小于61。如果不是,ValueError则将被引发。如果max_workers是None,那么61即使有更多处理器可用,默认选择也将是最多。 mp_context可以是多处理上下文,也可以是“无”。它将用来发动工人。如果mp_context是None 如果未指定,则使用默认的多处理上下文。
初始化程序是一个可选的可调用对象,它在每个工作进程开始时被调用;initargs是传递给初始化程序的参数的元组。如果初始化器引发异常,则所有当前暂挂的作业都会引发BrokenProcessPool,以及任何尝试向池中提交更多作业的尝试。
在版本3.3中进行了更改:当其中一个工作进程突然终止时, BrokenProcessPool现在会引发错误。以前,行为是不确定的,但是对执行器或其期货的操作通常会冻结或死锁。
from multiprocessing import Process,Lock
def func_mutiprocess(i):
def ss():
l = []
for i in range(1000000000000000000):
l.append(i)
loc=Lock()
loc.acquire()
ss()
print(f"this process {i},{time.asctime()}")
loc.release()
def process_input():
pool=[]
pools=[Process(target=func_mutiprocess,args=(i,) ) for i in range(multiprocessing.cpu_count())]
for p in pools:
p.start()
pool.append(p)
for j in pool:
j.join()
if __name__ == ‘__main__‘:
while True:
try:
process_input()
except Exception as e:
pass
这个一个多进程引发内存占用100%爆掉的反面案例,原因是利用多进程创建超大列表容器
关于Future:
所述Future类封装一个可调用的异步执行。 Future实例由创建Executor.submit()。
concurrent.futures.Future封装可调用对象的异步执行。 Future 实例是由Executor.submit()测试人员创建的,除了测试外,不应直接创建。
cancel()尝试取消呼叫。如果该调用当前正在执行或正在运行,并且无法取消,则该方法将返回
False,否则,该调用将被取消并且该方法将返回True。
cancelled()
True如果呼叫已成功取消,则返回。
running()
True如果当前正在执行该调用且无法取消该调用,则返回。
done()返回
True如果调用成功取消或结束运行。
result(timeout = None )返回调用返回的值。如果呼叫尚未完成,则此方法将等待超时秒数。如果呼叫未在超时秒内完成,则将
concurrent.futures.TimeoutError引发a。超时可以是int或float。如果未指定timeout或None,则等待时间没有限制。如果在完成之前取消了未来,
CancelledError则将被提出。如果调用引发,则此方法将引发相同的异常。
exception(timeout = None )返回调用引发的异常。如果呼叫尚未完成,则此方法将等待超时秒数。如果呼叫未在超时秒内完成,则将
concurrent.futures.TimeoutError引发a。 超时可以是int或float。如果未指定timeout或None,则等待时间没有限制。如果在完成之前取消了未来,
CancelledError则将被提出。如果呼叫完成而没有加注,
None则返回。
add_done_callback(fn )将可调用的fn附加到将来。 当取消未来或完成运行时,将调用fn,并将future作为唯一参数。
添加的可调用对象按添加顺序被调用,并且始终在属于添加它们的进程的线程中调用。如果可调用对象引发
Exception子类,则将其记录并忽略。如果callable引发BaseException子类,则该行为未定义。如果将来已经完成或被取消,则将立即调用fn。
以下Future方法适用于单元测试和 Executor实现。
set_running_or_notify_cancel()仅
Executor在执行与Future和单元测试相关的工作之前,实现应调用此方法。如果方法返回
False,Future则取消,Future.cancel()即被调用并返回True。等待Future完成的所有线程(即通过as_completed()或wait())都将被唤醒。如果该方法返回,
True则该Future不会被取消并已处于运行状态,即对的调用Future.running()将返回True。该方法只能被调用一次,不能在调用之后
Future.set_result()或Future.set_exception()已经被调用。
set_result(结果)设置与
Futureto 结果关联的工作结果。此方法仅应由
Executor实现和单元测试使用。在版本3.8中更改:
concurrent.futures.InvalidStateError如果Future已经完成,则引发此方法 。
set_exception(例外)设置与相关的工作结果
Future的 异常。Exception此方法仅应由
Executor实现和单元测试使用。在版本3.8中更改:
concurrent.futures.InvalidStateError如果Future已经完成,则引发此方
标签:单元测试 异步编程 现在 解释 ken 交互 fir 忽略 调用
原文地址:https://www.cnblogs.com/SunshineKimi/p/12038700.html