码迷,mamicode.com
首页 > 其他好文 > 详细

高性能异步爬虫

时间:2020-01-29 14:24:48      阅读:70      评论:0      收藏:0      [点我收藏+]

标签:url   控制   cio   操作   状态码   response   添加   其他   test   

异步IO

所谓「异步 IO」,就是你发起一个 IO阻塞 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。

实现异步IO的方式

单线程+异步协程实现异步IO操作

异步协程用法

从 Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础的,在 Python 3.5 则增加了 async/await,使得协程的实现更加方便。首先我们需要了解下面几个概念:


    • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足某些条件的时候,函数就会被循环执行。程序是按照设定的顺序从头执行到尾,运行的次数也是完全按照设定。当在编写异步程序时,必然其中有部分程序的运行耗时是比较久的,需要先让出当前程序的控制权,让其在背后运行,让另一部分的程序先运行起来。当背后运行的程序完成后,也需要及时通知主程序已经完成任务可以进行下一步操作,但这个过程所需的时间是不确定的,需要主程序不断的监听状态,一旦收到了任务完成的消息,就开始进行下一步。loop就是这个持续不断的监视器。


    • coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到事件循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。


    • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。


    • future:代表将来执行或还没有执行的任务,实际上和 task 没有本质区别。


我们还需要了解 async/await 关键字,它是从 Python 3.5 才出现的,专门用于定义协程。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。

定义一个协程

import asyncio
async def execute(x):
    print(Number:, x)
coroutine = execute(1)
print(Coroutine:, coroutine)
print(After calling execute)
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print(After calling loop)

-----------------
输出结果
Coroutine: 
After calling execute
Number: 1
After calling loop

我们引入了 asyncio 这个包,这样我们才可以使用 async 和 await,然后我们使用 async 定义了一个 execute() 方法,方法接收一个数字参数,方法执行之后会打印这个数字。

随后我们直接调用了这个方法,然而这个方法并没有执行,而是返回了一个 coroutine 协程对象。随后我们使用 get_event_loop() 方法创建了一个事件循环 loop,并调用了 loop 对象的 run_until_complete() 方法将协程注册到事件循环 loop 中,然后启动。最后我们才看到了 execute() 方法打印了输出结果。

可见,async 定义的方法就会变成一个无法直接执行的 coroutine 对象,必须将其注册到事件循环中才可以执行。

task的使用:

 task,它是对 coroutine 对象的进一步封装,它里面相比 coroutine 对象多了运行状态,比如 running、finished 等,我们可以用这些状态来获取协程对象的执行情况。

在上面的例子中,当我们将 coroutine 对象传递给 run_until_complete() 方法的时候,实际上它进行了一个操作就是将 coroutine 封装成了 task 对象,我们也可以显式地进行声明

import asyncio
async def execute(x):
    print(Number:, x)
    return x

coroutine = execute(1)
print(Coroutine:, coroutine)
print(After calling execute)

loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(Task:, task)
loop.run_until_complete(task)
print(Task:, task)
print(After calling loop)

------------------
输出结果
Coroutine: 
After calling execute
Task: >
Number: 1
Task:  result=1>
After calling loop

 ensure_future()的使用

定义 task 对象还有一种方式,就是直接通过 asyncio 的 ensure_future() 方法,返回结果也是 task 对象,这样的话我们就可以不借助于 loop 来定义,即使我们还没有声明 loop 也可以提前定义好 task 对象,写法如下:

import asyncio

async def execute(x):
    print(Number:, x)
    return x

coroutine = execute(1)
print(Coroutine:, coroutine)
print(After calling execute)

task = asyncio.ensure_future(coroutine)
print(Task:, task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(Task:, task)
print(After calling loop)

绑定回调

也可以为某个 task 绑定一个回调方法,来看下面的例子:

import asyncio
import requests

async def request():
    url = https://www.baidu.com
    status = requests.get(url).status_code
    return status

def callback(task):
    print(Status:, task.result())

coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print(Task:, task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(Task:, task)


---------------------
输出
Task:  cb=[callback() at demo.py:11]>
Status: 
Task:  result=>

我们定义了一个 request() 方法,请求了百度,返回状态码,但是这个方法里面我们没有任何 print() 语句。随后我们定义了一个 callback() 方法,这个方法接收一个参数,是 task 对象,

然后调用 print() 方法打印了 task 对象的结果。这样我们就定义好了一个 coroutine 对象和一个回调方法,我们现在希望的效果是,当 coroutine 对象执行完毕之后,就去执行声明的 callback() 方法。

那么它们二者怎样关联起来呢?很简单,只需要调用 add_done_callback() 方法即可,我们将 callback() 方法传递给了封装好的 task 对象,这样当 task 执行完毕之后就可以调用 callback() 方法了,

同时 task 对象还会作为参数传递给 callback() 方法,调用 task 对象的 result() 方法就可以获取返回结果了。

 

多任务协程

想执行多次请求应该怎么办呢?我们可以定义一个 task 列表,然后使用 asyncio 的 wait() 方法即可执行。

import asyncio
import time


async def request(url):
    print(正在下载, url)
    # 在异步协程中如果出现了同步模块相关的代码,那么就无法实现异步。
    # time.sleep(2)
    # 当在asyncio中遇到阻塞操作必须进行手动挂起
    await asyncio.sleep(2)
    print(下载完毕, url)
    return url

# 回调函数
def callback_func(task):
    print(callback -> + task.result())  # callback对应函数的返回值


start = time.time()
urls = [
    www.baidu.com,
    www.sogou.com,
    www.goubanjia.com
]
# 任务列表:存放多个任务对象
stasks = []
for url in urls:
    c = request(url)
    task = asyncio.ensure_future(c)
    # 将回调函数绑定到对象中
    task.add_done_callback(callback_func)
    stasks.append(task)
# 创建事件循环对象
loop = asyncio.get_event_loop()

# 需要将任务列表封装到wait中
loop.run_until_complete(asyncio.wait(stasks))
print(time.time() - start)

-----
输出:
正在下载 www.baidu.com
正在下载 www.sogou.com
正在下载 www.goubanjia.com
下载完毕 www.baidu.com
下载完毕 www.sogou.com
下载完毕 www.goubanjia.com
callback ->www.baidu.com
callback ->www.sogou.com
callback ->www.goubanjia.com
2.003114700317383

高性能异步爬虫---aiohttp

aiohttp可以实现单线程并发IO操作,

安装pip install aiohttp

aiohttp使用

发起请求

async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get(https://www.baidu.com) as resposne:
            print(await resposne.text())
loop = asyncio.get_event_loop()
tasks = [fetch(),]
loop.run_until_complete(asyncio.wait(tasks))

添加请求参数

params = {key: value, page: 10}
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get(https://www.baidu.com/s,params=params) as resposne:
            print(await resposne.url)
loop = asyncio.get_event_loop()
tasks = [fetch(),]
loop.run_until_complete(asyncio.wait(tasks))

UA伪装

url = http://httpbin.org/user-agent
headers = {User-Agent: test_user_agent}
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get(url,headers=headers) as resposne:
            print(await resposne.text())

自定义cookies

url = http://httpbin.org/cookies
cookies = {cookies_name: test_cookies}
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get(url,cookies=cookies) as resposne:
            print(await resposne.text())

post请求参数

url = http://httpbin.org
payload = {username: zhang, password: 123456}
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.post(url, data=payload) as resposne:
            print(await resposne.text())

设置代理

url = "http://python.org"
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get(url, proxy="http://some.proxy.com") as resposne:
        print(resposne.status)

异步IO处理

# 环境安装:pip install aiohttp
# 使用该模块中的ClientSession
import requests
import asyncio
import time
import aiohttp


async def get_page(url):
    async with aiohttp.ClientSession() as session:
        # get()、post():
        # headers,params/data,proxy=‘http://ip:port‘
        async with await session.get(url) as response:
            # text()返回字符串形式的响应数据
            # read()返回的二进制形式的响应数据
            # json()返回的就是json对象
            # 注意:获取响应数据操作之前一定要使用await进行手动挂起
            page_text = await response.text()
            # print(page_text)
        return page_text

# 回调函数


def callback_func(task):
    print(callback -> + task.result()[0:50])  # callback对应函数的返回值


start = time.time()
urls = [
    https://www.baidu.com/,
    https://www.sogou.com/,
    https://www.douban.com/
]
# 任务列表:存放多个任务对象
stasks = []
for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    # 将回调函数绑定到对象中
    task.add_done_callback(callback_func)
    stasks.append(task)
loop = asyncio.get_event_loop()

# 需要将任务列表封装到wait中
loop.run_until_complete(asyncio.wait(stasks))
print(time.time() - start)

 

高性能异步爬虫

标签:url   控制   cio   操作   状态码   response   添加   其他   test   

原文地址:https://www.cnblogs.com/xiao-apple36/p/12240094.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!