码迷,mamicode.com
首页 > 编程语言 > 详细

异步调用,线程队列,时间,协程

时间:2019-01-04 20:23:14      阅读:214      评论:0      收藏:0      [点我收藏+]

标签:ons   内核   生成器   用户   队列   之间   运行   多核   异步调用   

异步的使用场景

爬虫:

1.从目标站点下载网页数据,本质是HTML格式字符串

2.用re从字符串中提取出你所需要的数据

#使用进程池
from concurrent.futures import ProcessPoolExecutor
import requests,re,os
def get_data(url):
    print(‘%s正在请求%s‘%(os.getpid(),url))
    response = requests.get(url) #获取网页数据包含报头
    print(‘%s请求%s成功!‘%(os.getpid(),url))
    return response
def parser(response):
    htm = response.content.decode(‘utf-8‘) #拿到数据
    ls = re.findall(‘href=.*?com‘,htm)
    print(‘解析数据完成!共%s个连接‘%len(ls))
if __name__ == ‘__main__‘:
    urls = [‘https://www.baidu.com/‘,
            ‘https://www.sina.com/‘,
            ‘https://www.tmall.com/‘,
            ‘https://www.taobao.com/‘,
            ‘https://www.jd.com/‘,
            ‘https://www.python.org/‘,
            ‘https://www.apple.com/‘]
    pool = ProcessPoolExecutor(3)
    
    #下面代码将任务变为串行
    for i in urls:
        obj = pool.submit(get_data,i)
        parser(obj.result())
    
    #改为下面的代码
    objs = []
    for i in urls:
        obj = pool.submit(get_data,i)
        objs.append(obj)
    pool.shutdown() #请求依然是并发,但是请求的结果不能被立即处理
    for i in objs:  #解析数据时串行了
        parser(i.result())

进程池使用异步回调来处理结果

from concurrent.futures import ProcessPoolExecutor
import requests,re,os
def get_data(url):
    print(‘%s正在请求%s‘%(os.getpid(),url))
    response = requests.get(url) #获取网页数据包含报头
    print(‘%s请求%s成功!‘%(os.getpid(),url))
    return response
def parser(obj):
    response = obj.result()
    htm = response.content.decode(‘utf-8‘) #拿到数据
    ls = re.findall(‘href=.*?com‘,htm)
    print(‘%s解析数据完成!共%s个连接‘%(os.getpid(),len(ls)))
if __name__ == ‘__main__‘:
    urls = [‘https://www.baidu.com/‘,
            ‘https://www.sina.com/‘,
            ‘https://www.tmall.com/‘,
            ‘https://www.taobao.com/‘,
            ‘https://www.jd.com/‘,
            ‘https://www.python.org/‘,
            ‘https://www.apple.com/‘]
    pool = ProcessPoolExecutor(3)
    for i in urls:
        obj = pool.submit(get_data,i)
        obj.add_done_callback(parser)

线程池使用异步回调来处理结果

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import requests,re
def get_data(url):
    print(‘%s正在请求%s‘%(current_thread().name,url))
    response = requests.get(url) #获取网页数据包含报头
    print(‘%s请求%s成功!‘%(current_thread().name,url))
    return response
def parser(obj):
    response = obj.result()
    htm = response.content.decode(‘utf-8‘) #拿到数据
    ls = re.findall(‘href=.*?com‘,htm)
    print(‘%s解析数据完成!共%s个连接‘%(current_thread().name,len(ls)))


urls = [‘https://www.baidu.com/‘,
        ‘https://www.sina.com/‘,
        ‘https://www.tmall.com/‘,
        ‘https://www.taobao.com/‘,
        ‘https://www.jd.com/‘,
        ‘https://www.python.org/‘,
        ‘https://www.apple.com/‘]
pool = ThreadPoolExecutor(3)
for i in urls:
    obj = pool.submit(get_data, i)
    obj.add_done_callback(parser) 

什么是回调函数?

a 交给 b一个任务,b在执行完成后回过头调用了a的一个函数 就称之为回调函数

通常异步任务都会和回调函数一起使用

使用方法:

使用add_done_callback()函数,给Future对象绑定一个回调函数

注意:在多进程中回调函数 是交给主进程来执行的,而在多线程中,回调函数是谁有空谁来执行(不是主线程)

 

一.线程队列

from queue import Queue,LifoQueue,PriorityQueue
#队列先进先出
q = Queue()
q.put(‘a‘)
q.put(‘b‘)
print(q.get())
print(q.get())

#先进后出堆栈
q = LifoQueue()   #last in first out
q.put(‘a‘)
q.put(‘b‘)
q.put(‘c‘)
print(q.get())
print(q.get())
print(q.get())

#优先级队列,取出顺序由小到大 优先级可以是数字或者字符,只要能够比较大小即可,必须是同类型的比较
q = PriorityQueue()
q.put((1,‘a‘))
q.put((2,‘b‘))
q.put((3,‘c‘))
print(q.get())
print(q.get())
print(q.get())

二.事件

是用于协调多个线程工作的,当一个线程要执行某个操作的时候,需要获取另一个线程的状态

from threading import Thread,Event
e = Event() #创建一个事件
import time
def start():
    print(‘正在启动服务器!‘)
    time.sleep(5)
    print(‘服务器已启动!‘)
    e.set() #把事件的值设置为True
def connect():
    #重试三次
    for i in range(3):
        print(‘正在等待服务器启动...‘)
        e.wait(1) #会阻塞直到对方把时间设置为True
        if e.set():
            print(‘连接成功!‘)
            break
    else: #如果3次都没有成功,则打印这个消息
        print(‘服务器还未启动!‘)
t1 = Thread(target=start).start()
t2 = Thread(target=connect).start()

三.协程

引子:

在单线程下实现并发

如果一个线程能够检测到IO操作并且将其设置为非阻塞,并自动切换到其他的任务就可以提高CPU的利用率,指的就是在单线程下实现并发.

 

如何实现并发?

并发=切换任务+保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,这就可以实现单线并发.

 

python中的生成器可以实现并发

import time
def task1():
    a = 1
    while True:
        print(‘run task1‘)
        a += 1
        print(a)
        yield
def task2():
    g = task1()
    while True:
        print(‘run task2‘)
        time.sleep(1)
        next(g)
task2()

上述虽然实现了并发,但对效率的影响是好是坏?我们测试一下!
import time
def task1():
    a = 0
    for i in range(10000000):
        a += i
        yield

def task2():
    b = 0
    g = task1()
    for i in range(10000000):
        b += i
        next(g)
s = time.time()
task2()
print(time.time()-s)
结果:3.6185712814331055

def task1():
    a = 0
    for i in range(10000000):
        a += i

def task2():
    b = 0
    for i in range(10000000):
        b += i

s = time.time()
task1()
task2()
print(time.time()-s)
结果:1.799281358718872

我们使用单线程并发是为了提高效率的,但是对于纯计算而言,这样的并发反而降低了效率

  

greenlet模块实现并发

greenlet模块是将复杂的yield进行了封装,简化了代码

import greenlet,time
def task1(name):
    print(‘%s run task1‘%name)
    time.sleep(3)
    g2.switch(name)
    print(‘task1 run‘)
    g2.switch()

def task2(name):
    print(‘%s run task2‘%name)
    g1.switch()
    print(‘task2 run‘)
g1 = greenlet.greenlet(task1)
g2 = greenlet.greenlet(task2)
g1.switch(‘henry‘)

上述所示:无论直接使用yield还是greenlet模块都不能检测到IO操作,遇到IO同样进入阻塞状态,所以此时的并发没有任何意义

 

gevent 即可检测IO又可以实现单线程并发

什么是协程?

协程:单线程下的并发,又称微线程.

协程是一种用户态的轻量级线程,及协程是由用户程序自己来控制调节的.

1.python的线程属于内核级别的,由操作系统控制调节(如果单线程遇到IO或者执行时间过长就会被迫交出cpu执行权限,切换到其他线程运行)

2.单线程内开启协程,一旦遇到IO,就会从应用程序级别控制切换,以此来提升效率.

 

为什么要有协程?

CPython中无法并行执行任务导致效率低,而协程就是将单线程的效率最大化

 

协程的优缺点:

优点:

1.协程切换开销更小,属于程序级别的切换,操作系统完全感知不到

2.单线程内就可以实现并发的效果,最大限度利用cpu

缺点:

1.协程本质是在单线程下,无法利用多核,可以使一个程序开启多个进程,每个进程开启多个线程,每个线程内开启协程

2.协程指的是单个线程,一旦协程出现阻塞,将会阻塞整个线程

 

gevent的使用

from gevent import monkey
import gevent,time
monkey.patch_all()
def task1():
    print(‘task1 run‘)
    time.sleep(3)
    print(‘task2 run‘)
def task2():
    print(‘task2 run‘)
    print(‘task2 run‘)
g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)
#g1.join()
#g2.join()
# 等待所有任务结束
# 那么一旦发生io 这个任务就立马结束了
gevent.joinall([g1,g2])

 

异步调用,线程队列,时间,协程

标签:ons   内核   生成器   用户   队列   之间   运行   多核   异步调用   

原文地址:https://www.cnblogs.com/lizeqian1994/p/10222125.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!