码迷,mamicode.com
首页 > 编程语言 > 详细

【Python】【五】【asyncio】

时间:2017-10-27 18:36:21      阅读:284      评论:0      收藏:0      [点我收藏+]

标签:you   主循环   需要   country   eps   +=   top   create   异步编程   

# -*- coding:utf-8 -*-
"""
#18.1 线程&协程
#栗子18-1 threading
import sys
import time
import itertools
import threading

class Signal:
go = True

def spin(msg, signal):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle(‘|/-\\‘):
status = char + ‘ ‘ + msg
write(status)
flush()
write(‘\x08‘ * len(status))
time.sleep(.1)
if not signal.go:
break
write(‘ ‘* len(status) + ‘\x08‘*len(status))

def slow_function():
time.sleep(1)
return 42



def supervisor():
signal = Signal()
spinner = threading.Thread(target=spin,args=(‘thinking!‘,signal))
print(‘spinner object:‘,spinner)
spinner.start()
result = slow_function()
signal.go = False
spinner.join()
return result

def main():
result = supervisor()
print(‘Answer:‘,result)
if __name__ == ‘__main__‘:
main()


‘‘‘
spinner object: <Thread(Thread-1, initial)>
| thinking!
/ thinking!
- thinking!
\ thinking!
| thinking!
/ thinking!
- thinking!
\ thinking!
| thinking!
/ thinking!
Answer: 42
‘‘‘


#栗子18-2 asyncio 实现

import asyncio
import sys
import itertools

@asyncio.coroutine
def spin(msg):
write,flush = sys.stdout.write,sys.stdout.flush
for char in itertools.cycle(‘|/-\\‘):
status = char + ‘ ‘ + msg
write(status)
flush()
write(‘\x08‘*len(status)) #这是显示文本式动画的诀窍所在:使用退格符(\x08)把光标移回来
try:
yield from asyncio.sleep(.1)
except asyncio.CancelledError:
break
write(‘ ‘*len(status) + ‘\x08‘*len(status)) #使用空格清除状态消息,把光标移回开头
@asyncio.coroutine
def slow_function():
# 假装等到I/O一段时间
yield from asyncio.sleep(1) #yield from asyncio.sleep(3) 表达式把控制权交给主循环,在休眠结束后恢复这个协程
return 42
@asyncio.coroutine
def supervisor():
spinner = asyncio.async(spin(‘thinking!‘))
print(‘spinner object:‘,spinner)
result = yield from slow_function() #驱动 slow_function() 函数。结束后,获取返回值。同时,事件循环继续运行,因为slow_function 函数最后使用 yield from asyncio.sleep(3) 表达式把控制权交回给了主循环。
spinner.cancel()
return result

def main():
loop = asyncio.get_event_loop() #获取事件循环的引用。
result = loop.run_until_complete(supervisor()) #驱动 supervisor 协程,让它运行完毕;这个协程的返回值是这次调用的返回值
loop.close()
print(‘Answer :‘,result)
if __name__ == ‘__main__‘:
main()
‘‘‘
spinner object: <Task pending coro=<spin() running at C:/Users/wangxue1/PycharmProjects/fluentPython/kongzhiliucheng/asyncio/__init__.py:69>>
| thinking!
/ thinking!
- thinking!
\ thinking!
| thinking!
/ thinking!
- thinking!
\ thinking!
| thinking!
/ thinking!
Answer : 42
‘‘‘


‘‘‘
#【比较】
这两种 supervisor 实现之间的主要区别概述如下。
asyncio.Task 对象差不多与 threading.Thread 对象等效。 Victor Stinner(本章的
特约技术审校)指出, “Task 对象像是实现协作式多任务的库(例如 gevent)中的
绿色线程(green thread) ”。
Task 对象用于驱动协程, Thread 对象用于调用可调用的对象。
Task 对象不由自己动手实例化,而是通过把协程传给 asyncio.async(...) 函数或
loop.create_task(...) 方法获取。
获取的 Task 对象已经排定了运行时间(例如,由 asyncio.async 函数排
定); Thread 实例则必须调用 start 方法,明确告知让它运行。
在线程版 supervisor 函数中, slow_function 函数是普通的函数,直接由线程调
用。在异步版 supervisor 函数中, slow_function 函数是协程,由 yield from
驱动。
没有 API 能从外部终止线程,因为线程随时可能被中断,导致系统处于无效状态。
如果想终止任务,可以使用 Task.cancel() 实例方法,在协程内部抛出
CancelledError 异常。协程可以在暂停的 yield 处捕获这个异常,处理终止请
求。
supervisor 协程必须在 main 函数中由 loop.run_until_complete 方法执行。
上述比较应该能帮助你理解,与更熟悉的 threading 模型相比, asyncio 是如何编排并
发作业的。
线程与协程之间的比较还有最后一点要说明:如果使用线程做过重要的编程,你就知道写
出程序有多么困难,因为调度程序任何时候都能中断线程。必须记住保留锁,去保护程序
中的重要部分,防止多步操作在执行的过程中中断,防止数据处于无效状态。
而协程默认会做好全方位保护,以防止中断。我们必须显式产出才能让程序的余下部分运
行。对协程来说,无需保留锁,在多个线程之间同步操作,协程自身就会同步,因为在任
意时刻只有一个协程运行。想交出控制权时,可以使用 yield 或 yield from 把控制权
交还调度程序。这就是能够安全地取消协程的原因:按照定义,协程只能在暂停的 yield
处取消,因此可以处理 CancelledError 异常,执行清理操作
‘‘‘



#18.1.1 故意不阻塞
‘‘‘
asyncio.Future 类与 concurrent.futures.Future 类的接口基本一致,不过实现方
式不同,不可以互换。 “PEP 3156—Asynchronous IO Support Rebooted:
the‘asyncio’Module”(https://www.python.org/dev/peps/pep-3156/)对这个不幸状况是这样说
的:
未来可能会统一 asyncio.Future 和 concurrent.futures.Future 类实现的期物
(例如,为后者添加兼容 yield from 的 __iter__ 方法)。

总之,因为 asyncio.Future 类的目的是与 yield from 一起使用,所以通常不需要使
用以下方法。
无需调用 my_future.add_done_callback(...),因为可以直接把想在期物运行结
束后执行的操作放在协程中 yield from my_future 表达式的后面。这是协程的一
大优势:协程是可以暂停和恢复的函数。
无需调用 my_future.result(),因为 yield from 从期物中产出的值就是结果
(例如, result = yield from my_future)
‘‘‘




#18.2 使用asyncio和aiohttp下载

import os
import sys
import time

import requests

import asyncio
import aiohttp

BASE_URL = ‘http://images.cnblogs.com/cnblogs_com/suren2017/1102909‘

POP20_CC = ‘T_JINGSE2 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20‘.split()

DEST_DIR = ‘downloads‘

MAX_WORKERS = 20
def save_flag(img,filename):
path = os.path.join(sys.path[0],DEST_DIR,filename)
path = path.replace(‘\\‘,‘/‘)
with open(path,‘wb‘) as fp:
fp.write(img)

@asyncio.coroutine
def get_flag(cc):
url = ‘{}/{cc}.PNG‘.format(BASE_URL,cc=cc.lower())
resp = yield from aiohttp.request(‘GET‘,url)
image = yield from resp.read()
return image

def show(text):
print(text,end=‘ ‘)
sys.stdout.flush()

@asyncio.coroutine
def download_one(cc):
image = yield from get_flag(cc)
show(cc)
save_flag(image,cc.lower() + ‘.PNG‘)
return cc


def download_many(cc_list):
loop = asyncio.get_event_loop()
to_do = [download_one(cc) for cc in sorted(cc_list)]
wait_coro = asyncio.wait(to_do) #虽然函数的名称是 wait,但它不是阻塞型函数。 wait 是一个协程,等传给它的所有协程运行完毕后结束
‘‘‘
asyncio.wait(...) 协程的参数是一个由期物或协程构成的可迭代对象; wait 会分别
把各个协程包装进一个 Task 对象。最终的结果是, wait 处理的所有对象都通过某种方
式变成 Future 类的实例。 wait 是协程函数,因此返回的是一个协程或生成器对
象; wait_coro 变量中存储的正是这种对象。为了驱动协程,我们把协程传给
loop.run_until_complete(...) 方法
‘‘‘
res,_ = loop.run_until_complete(wait_coro) #执行事件循环,直到 wait_coro 运行结束;事件循环运行的过程中,这个脚本会在这里阻塞。我们忽略 run_until_complete 方法返回的第二个元素
‘‘‘
loop.run_until_complete 方法的参数是一个期物或协程。如果是协
程, run_until_complete 方法与 wait 函数一样,把协程包装进一个 Task 对象中。协
程、期物和任务都能由 yield from 驱动,这正是 run_until_complete 方法对 wait
函数返回的 wait_coro 对象所做的事。 wait_coro 运行结束后返回一个元组,第一个元
素是一系列结束的期物,第二个元素是一系列未结束的期物。在示例 18-5 中,第二个元
素始终为空,因此我们把它赋值给 _,将其忽略。但是 wait 函数有两个关键字参数,如
果设定了可能会返回未结束的期物;这两个参数是 timeout 和 return_when
‘‘‘
loop.close()
return len(res)

def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = ‘\n{} flags downloaded in {:.2f}s‘
print(msg.format(count,elapsed))

if __name__ == ‘__main__‘:
main(download_many) #19 flags downloaded in 0.25s
‘‘‘
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038C9470>
t_jingse7 t_jingse11 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000388F128>
T_JINGSE4 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000000003877BE0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000387E8D0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000388FE48>
t_jingse8 T_jingse17 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038BC7B8>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000000003872C88>
t_jingse6 t_jingse10 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038BCBE0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000388F5F8>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000388FA20>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038B0B00>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000387E3C8>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000387E390>
T_JINGSE3 t_jingse9 T_JINGSE5 t_jingse13 T_jingse20 T_jingse16 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000000003868F28>
T_JINGSE2 t_jingse12 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038B02B0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038BC390>
T_jingse19 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038C9048>
T_jingse15 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038B06D8>
T_jingse18 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038B0F28>
T_jingse14
19 flags downloaded in 0.45s
‘‘‘
#【小结】
‘‘‘
使用 asyncio 包时,我们编写的异步代码中包含由 asyncio 本身驱动的
协程(即委派生成器),而生成器最终把职责委托给 asyncio 包或第三方库(如
aiohttp)中的协程。这种处理方式相当于架起了管道,让 asyncio 事件循环(通过我
们编写的协程)驱动执行低层异步 I/O 操作的库函数
‘‘‘





‘‘‘
18.3 避免阻塞型调用
Ryan Dahl(Node.js 的发明者)在介绍他的项目背后的哲学时说: “我们处理 I/O 的方式彻
底错了。 ” 他把执行硬盘或网络 I/O 操作的函数定义为阻塞型函数,主张不能像对待非
阻塞型函数那样对待阻塞型函数。为了说明原因,他展示了表 18-1 中的前两列。
“Introduction to Node.js”(https://www.youtube.com/watch?v=M-sc73Y-zQA)视频 4:55 处。
表18-1:使用现代的电脑从不同的存储介质中读取数据的延迟情况;第三栏按比例换
算成具体的时间,便于人类理解
存储介质 CPU 周期 按比例换算成“人类时间”
L1 缓存 3 3 秒
L2 缓存 14 14 秒
RAM 250 250 秒
硬盘 41 000 000 1.3 年
网络 240 000 000 7.6 年
为了理解表 18-1,请记住一点:现代的 CPU 拥有 GHz 数量级的时钟频率,每秒钟能运行
几十亿个周期。假设 CPU 每秒正好运行十亿个周期,那么 CPU 可以在一秒钟内读取 L1
缓存 333 333 333 次,读取网络 4 次(只有 4 次)。表 18-1 中的第三栏是拿第二栏中的各
个值乘以固定的因子得到的。因此,在另一个世界中,如果读取 L1 缓存要用 3 秒,那么
读取网络要用 7.6 年!
有两种方法能避免阻塞型调用中止整个应用程序的进程:
在单独的线程中运行各个阻塞型操作
把每个阻塞型操作转换成非阻塞的异步调用使用
‘‘‘

#18.4 改进asyncio下载脚本
#示例 18-7 flags2_asyncio.py:脚本的前半部分;余下的代码在示例 18-8 中
from enum import Enum
HTTPStatus = Enum(‘Status‘, ‘ok not_found error‘)
import collections
from collections import namedtuple
Result = namedtuple(‘Result‘,‘status cc‘)

import os
import sys
import time

import requests

BASE_URL = ‘http://images.cnblogs.com/cnblogs_com/suren2017/1102909‘

POP20_CC = ‘T_JINGSE200 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20‘.split()

DEST_DIR = ‘downloads‘

MAX_WORKERS = 20

import asyncio
import aiohttp
from aiohttp import web
import tqdm

DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

class FetchError(Exception):
def __init__(self,country_code):
self.country_code = country_code


def save_flag(img,filename):
path = os.path.join(sys.path[0],DEST_DIR,filename)
path = path.replace(‘\\‘,‘/‘)
with open(path,‘wb‘) as fp:
fp.write(img)

@asyncio.coroutine
def get_flag(cc):
url = ‘{}/{cc}.PNG‘.format(BASE_URL,cc=cc.lower())
#resp = yield from aiohttp.request(‘GET‘,url)
resp = yield from aiohttp.ClientSession().get(url)
if ‘200‘ in resp.text:
image = yield from resp.read()
return image
elif ‘404‘ in resp.text:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(code=resp.status,message=resp.reason,headers=resp.headers)


@asyncio.coroutine
def download_one(cc,semaphore,verbose):
try:
with (yield from semaphore): #在 yield from 表达式中把 semaphore 当成上下文管理器使用,防止阻塞整个系统:如果 semaphore 计数器的值是所允许的最大值,只有这个协程会阻塞。
image = yield from get_flag(cc)
except web.HTTPNotFound as exc:
status = HTTPStatus.not_found
msg = ‘not found‘
res = exc.response
res.status_code = 404
res.reason = ‘NOT FOUND‘
raise
except Exception as exc:
raise FetchError(cc) from exc #引入的raise X from Y 句法链接原来的异常
else:
save_flag(image,cc.lower() + ‘.PNG‘)
status = HTTPStatus.ok
msg = ‘OK‘

if verbose and msg: #如果在命令行中设定了 -v/--verbose 选项,显示国家代码和状态消息;这就是详细模式中看到的进度信息
print(cc,msg)
return Result(status,cc)


from tqdm import tqdm
@asyncio.coroutine
def download_coro(cc_list,verbose,concur_req):
counter = collections.Counter()
semaphore = asyncio.Semaphore(concur_req)
to_do = [download_one(cc,semaphore,verbose) for cc in sorted(cc_list)]
to_do_iter = asyncio.as_completed(to_do)
if not verbose:
to_do_iter = tqdm(to_do_iter,total=len(cc_list))
for future in to_do_iter:
try:
res = yield from future
except FetchError as exc:
country_code = exc.country_code
try:
error_msg = exc.__cause__.args[0]
except IndexError:
error_msg = exc.__cause__.__class__.__name__
if verbose and error_msg:
msg = ‘*** Error for {}: {}‘
print(msg.format(country_code,error_msg))
status = HTTPStatus.error
else:
status = res.status
counter[status] += 1

return counter

def download_many(cc_list,verbose,concur_req):
loop = asyncio.get_event_loop()
coro = download_coro(cc_list,verbose,concur_req) #download_many 函数只是实例化 downloader_coro 协程,然后通过run_until_complete 方法把它传给事件循环
counts = loop.run_until_complete(coro)
loop.close()
if loop.is_closed():
sys.exit(0)
return counts

def main():
t0 = time.time()
count = download_many(POP20_CC,verbose=False,concur_req=2)
elapsed = time.time() - t0
msg = ‘\n{} flags downloaded in {:.2f}s‘
print(msg.format(count,elapsed))

if __name__ == ‘__main__‘:
main()



#自己栗子1
import asyncio
import time

now = lambda : time.time()

@asyncio.coroutine
def do_some_work(x):
print(‘Waiting: ‘,x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)

print(‘TIME: ‘,now() - start)



#自己栗子2
import asyncio
import time

now = lambda : time.time()

@asyncio.coroutine
def do_some_work(x):
print(‘Waiting: ‘,x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print(‘TIME: ‘,now() - start)
‘‘‘
<Task pending coro=<do_some_work() running at C:\Python36\lib\asyncio\coroutines.py:208>>
Waiting: 2
<Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=None>
TIME: 0.0010001659393310547

‘‘‘



#自己栗子3
‘‘‘
协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类。保存了协程运行后的状态,用于未来获取协程的结果‘‘‘

import asyncio
import time

now = lambda : time.time()

@asyncio.coroutine
def do_some_work(x):
print(‘Waiting: ‘,x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print(‘TIME: ‘,now() - start)
‘‘‘
asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task,run_until_complete的参数是一个futrue对象。当传入一个协程,其内部会自动封装成task,task是Future的子类。isinstance(task, asyncio.Future)将会输出True
‘‘‘
print(isinstance(task,asyncio.Future))
‘‘‘
<Task pending coro=<do_some_work() running at C:\Python36\lib\asyncio\coroutines.py:208>>
Waiting: 2
<Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=None>
TIME: 0.0009999275207519531
True
‘‘‘


#自己栗子4 :绑定回调

import asyncio
import time

now = lambda : time.time()

@asyncio.coroutine
def do_some_work(x):
print(‘Waiting: ‘,x)
return ‘Done after {}s‘.format(x)

def callback(future):
print(‘Result: ‘,future)


start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)

print(‘TIME: ‘,now() - start)
‘‘‘
Waiting: 2
Result: <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=‘Done after 2s‘>
TIME: 0.002000093460083008
‘‘‘

#自己栗子5:绑定回调 ,如回调需要多个参数


import asyncio
import time

now = lambda : time.time()

@asyncio.coroutine
def do_some_work(x):
print(‘Waiting: ‘,x)
return ‘Done after {}s‘.format(x)

def callback(t,future):
print(‘Result: ‘,t,future)


start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
import functools
task.add_done_callback(functools.partial(callback,2))
loop.run_until_complete(task)

print(‘TIME: ‘,now() - start)

‘‘‘
Waiting: 2
Result: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=‘Done after 2s‘>
TIME: 0.002000093460083008
‘‘‘


#自己栗子6: future 和 result 。回调一致是很多异步编程的噩梦,程序员更喜欢用同步的编写方式写异步代码

import asyncio
import time

now = lambda : time.time()


async def do_some_work(x):
print(‘Waiting {}‘.format(x))
return ‘Done after {}s‘.format(x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

print(‘Task result:{}‘.format(task.result))
print(‘TIME: {}‘.format(now() - start))


‘‘‘
Waiting 2
Task result:<built-in method result of _asyncio.Task object at 0x0000000002F73AE8>
TIME: 0.002000093460083008
‘‘‘


#自己栗子7: 阻塞和await

import asyncio
import time

now = lambda : time.time()


async def do_some_work(x):
print(‘Waiting {}‘.format(x))
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

print(‘Task result:{}‘.format(task.result))
print(‘TIME: {}‘.format(now() - start))

‘‘‘
Waiting 2
Task result:<built-in method result of _asyncio.Task object at 0x0000000002F73A60>
TIME: 2.001114845275879
‘‘‘


#自己栗子8:并发&并行
#每当有阻塞任务时候就用await

import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
print(‘Task result: ‘,task.result())

print(‘Time: ‘,now() - start)
‘‘‘
Waiting : 1
Waiting : 2
Waiting : 4
Task result: Done after 1s
Task result: Done after 2s
Task result: Done after 4s
Time: 3.9912283420562744
‘‘‘


#自己栗子9 协程嵌套 [一] dones, pendings = await asyncio.wait(tasks)



import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

dones,pendings = await asyncio.wait(tasks)

for task in dones:
print(‘Task result: ‘,task.result())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

print(‘TIME: ‘,now() - start)
‘‘‘
Waiting : 1
Waiting : 2
Waiting : 4
Task result: Done after 2s
Task result: Done after 4s
Task result: Done after 1s
TIME: 4.007229328155518
‘‘‘


#自己栗子10 协程嵌套 [二] 如果使用的是 asyncio.gather创建协程对象,那么await的返回值就是协程运行的结果

import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

results = await asyncio.gather(*tasks)

for result in results:
print(‘Task result: ‘,result)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

print(‘TIME: ‘,now() - start)
‘‘‘
Waiting : 1
Waiting : 2
Waiting : 4
Task result: Done after 1s
Task result: Done after 2s
Task result: Done after 4s
TIME: 3.9892282485961914
‘‘‘


#自己栗子11 协程嵌套 [三] 不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果


import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

return await asyncio.gather(*tasks)




loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())

for result in results:
print(‘Task result: ‘, result)

print(‘TIME: ‘,now() - start)
‘‘‘
Waiting : 1
Waiting : 2
Waiting : 4
Task result: Done after 1s
Task result: Done after 2s
Task result: Done after 4s
TIME: 4.0052289962768555
‘‘‘

#自己栗子12 协程嵌套 [四 ] 不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果,使用asyncio.wait方式挂起协程。

import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

return await asyncio.wait(tasks)




loop = asyncio.get_event_loop()
dones,pendings = loop.run_until_complete(main())

for task in dones:
print(‘Task result: ‘, task.result())

print(‘TIME: ‘,now() - start)
‘‘‘
Waiting : 1
Waiting : 2
Waiting : 4
Task result: Done after 2s
Task result: Done after 4s
Task result: Done after 1s
TIME: 3.9912283420562744
‘‘‘


#自己栗子13 协程嵌套 [五]使用asyncio的as_completed方法


import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

for task in asyncio.as_completed(tasks):
result = await task
print(‘Task result: {}‘.format(result))




loop = asyncio.get_event_loop()
loop.run_until_complete(main())


print(‘TIME: ‘,now() - start)
‘‘‘
Waiting : 1
Waiting : 2
Waiting : 4
Task result: Done after 1s
Task result: Done after 2s
Task result: Done after 4s
TIME: 3.9912281036376953
‘‘‘


#自己栗子14 协程停止 【一】 main函数外进行事件循环的调用。这个时候,main相当于最外出的一个task,那么处理包装的main函数即可
‘‘‘
上面见识了协程的几种常用的用法,都是协程围绕着事件循环进行的操作。future对象有几个状态:

Pending
Running
Done
Cancelled
创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消。可以使用asyncio.Task获取事件循环的task‘

启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。然后通过循环asyncio.Task取消future。

‘‘‘


import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

done,pending = await asyncio.wait(tasks)
for task in done:
print(‘Task result: ‘,task.result())


loop = asyncio.get_event_loop()
task = asyncio.ensure_future(main())
try:
loop.run_until_complete(task)
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
print(‘*******************‘)
print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
loop.stop()
loop.run_forever() #True表示cannel成功,loop stop之后还需要再次开启事件循环,最后在close,不然还会抛出异常
finally:
loop.close()

print(‘TIME: ‘,now() - start)

‘‘‘
#不能再pycharm通过Ctrl+C,只能在Python交互环境里
Waiting: 1
Waiting: 2
Waiting: 4
{<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>}
*******************
True
TIME: 2.0158370780944824
‘‘‘



#自己栗子15 协程停止 【二】 tasks在外层,没有被包含在main函数里面
import asyncio

import time

now = lambda: time.time()
start = now()
async def do_some_work(x):
print(‘Waiting: ‘, x)

await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

start = now()

loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
for task in asyncio.Task.all_tasks():
print(task.cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()

print(‘TIME: ‘, now() - start)


‘‘‘
打印四个True,而不是三个,原因我也不知道
Waiting: 1
Waiting: 2
Waiting: 4
{<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>}
True
True
True
True
TIME: 0.8858370780944824
‘‘‘
"""



#自己栗子16 不同线程的时间循环
‘‘‘
很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被block。
启动上述代码之后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe方法注册的more_work方法,后者因为time.sleep操作是同步阻塞的,因此运行完毕more_work需要大致6 + 3
‘‘‘

from threading import Thread
import asyncio

import time

now = lambda: time.time()
start = now()





















【Python】【五】【asyncio】

标签:you   主循环   需要   country   eps   +=   top   create   异步编程   

原文地址:http://www.cnblogs.com/suren2017/p/7744707.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!