标签:ons 内核 生成器 用户 队列 之间 运行 多核 异步调用
异步的使用场景
爬虫:
1.从目标站点下载网页数据,本质是HTML格式字符串
2.用re从字符串中提取出你所需要的数据
#使用进程池
from concurrent.futures import ProcessPoolExecutor
import requests,re,os
def get_data(url):
print(‘%s正在请求%s‘%(os.getpid(),url))
response = requests.get(url) #获取网页数据包含报头
print(‘%s请求%s成功!‘%(os.getpid(),url))
return response
def parser(response):
htm = response.content.decode(‘utf-8‘) #拿到数据
ls = re.findall(‘href=.*?com‘,htm)
print(‘解析数据完成!共%s个连接‘%len(ls))
if __name__ == ‘__main__‘:
urls = [‘https://www.baidu.com/‘,
‘https://www.sina.com/‘,
‘https://www.tmall.com/‘,
‘https://www.taobao.com/‘,
‘https://www.jd.com/‘,
‘https://www.python.org/‘,
‘https://www.apple.com/‘]
pool = ProcessPoolExecutor(3)
#下面代码将任务变为串行
for i in urls:
obj = pool.submit(get_data,i)
parser(obj.result())
#改为下面的代码
objs = []
for i in urls:
obj = pool.submit(get_data,i)
objs.append(obj)
pool.shutdown() #请求依然是并发,但是请求的结果不能被立即处理
for i in objs: #解析数据时串行了
parser(i.result())
进程池使用异步回调来处理结果
from concurrent.futures import ProcessPoolExecutor
import requests,re,os
def get_data(url):
print(‘%s正在请求%s‘%(os.getpid(),url))
response = requests.get(url) #获取网页数据包含报头
print(‘%s请求%s成功!‘%(os.getpid(),url))
return response
def parser(obj):
response = obj.result()
htm = response.content.decode(‘utf-8‘) #拿到数据
ls = re.findall(‘href=.*?com‘,htm)
print(‘%s解析数据完成!共%s个连接‘%(os.getpid(),len(ls)))
if __name__ == ‘__main__‘:
urls = [‘https://www.baidu.com/‘,
‘https://www.sina.com/‘,
‘https://www.tmall.com/‘,
‘https://www.taobao.com/‘,
‘https://www.jd.com/‘,
‘https://www.python.org/‘,
‘https://www.apple.com/‘]
pool = ProcessPoolExecutor(3)
for i in urls:
obj = pool.submit(get_data,i)
obj.add_done_callback(parser)
线程池使用异步回调来处理结果
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import requests,re
def get_data(url):
print(‘%s正在请求%s‘%(current_thread().name,url))
response = requests.get(url) #获取网页数据包含报头
print(‘%s请求%s成功!‘%(current_thread().name,url))
return response
def parser(obj):
response = obj.result()
htm = response.content.decode(‘utf-8‘) #拿到数据
ls = re.findall(‘href=.*?com‘,htm)
print(‘%s解析数据完成!共%s个连接‘%(current_thread().name,len(ls)))
urls = [‘https://www.baidu.com/‘,
‘https://www.sina.com/‘,
‘https://www.tmall.com/‘,
‘https://www.taobao.com/‘,
‘https://www.jd.com/‘,
‘https://www.python.org/‘,
‘https://www.apple.com/‘]
pool = ThreadPoolExecutor(3)
for i in urls:
obj = pool.submit(get_data, i)
obj.add_done_callback(parser)
a 交给 b一个任务,b在执行完成后回过头调用了a的一个函数 就称之为回调函数
通常异步任务都会和回调函数一起使用
使用方法:
使用add_done_callback()函数,给Future对象绑定一个回调函数
注意:在多进程中回调函数 是交给主进程来执行的,而在多线程中,回调函数是谁有空谁来执行(不是主线程)
一.线程队列
from queue import Queue,LifoQueue,PriorityQueue #队列先进先出 q = Queue() q.put(‘a‘) q.put(‘b‘) print(q.get()) print(q.get()) #先进后出堆栈 q = LifoQueue() #last in first out q.put(‘a‘) q.put(‘b‘) q.put(‘c‘) print(q.get()) print(q.get()) print(q.get()) #优先级队列,取出顺序由小到大 优先级可以是数字或者字符,只要能够比较大小即可,必须是同类型的比较 q = PriorityQueue() q.put((1,‘a‘)) q.put((2,‘b‘)) q.put((3,‘c‘)) print(q.get()) print(q.get()) print(q.get())
二.事件
from threading import Thread,Event
e = Event() #创建一个事件
import time
def start():
print(‘正在启动服务器!‘)
time.sleep(5)
print(‘服务器已启动!‘)
e.set() #把事件的值设置为True
def connect():
#重试三次
for i in range(3):
print(‘正在等待服务器启动...‘)
e.wait(1) #会阻塞直到对方把时间设置为True
if e.set():
print(‘连接成功!‘)
break
else: #如果3次都没有成功,则打印这个消息
print(‘服务器还未启动!‘)
t1 = Thread(target=start).start()
t2 = Thread(target=connect).start()
三.协程
引子:
在单线程下实现并发
如果一个线程能够检测到IO操作并且将其设置为非阻塞,并自动切换到其他的任务就可以提高CPU的利用率,指的就是在单线程下实现并发.
如何实现并发?
并发=切换任务+保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,这就可以实现单线并发.
import time
def task1():
a = 1
while True:
print(‘run task1‘)
a += 1
print(a)
yield
def task2():
g = task1()
while True:
print(‘run task2‘)
time.sleep(1)
next(g)
task2()
上述虽然实现了并发,但对效率的影响是好是坏?我们测试一下!
import time
def task1():
a = 0
for i in range(10000000):
a += i
yield
def task2():
b = 0
g = task1()
for i in range(10000000):
b += i
next(g)
s = time.time()
task2()
print(time.time()-s)
结果:3.6185712814331055
def task1():
a = 0
for i in range(10000000):
a += i
def task2():
b = 0
for i in range(10000000):
b += i
s = time.time()
task1()
task2()
print(time.time()-s)
结果:1.799281358718872
我们使用单线程并发是为了提高效率的,但是对于纯计算而言,这样的并发反而降低了效率
greenlet模块是将复杂的yield进行了封装,简化了代码
import greenlet,time
def task1(name):
print(‘%s run task1‘%name)
time.sleep(3)
g2.switch(name)
print(‘task1 run‘)
g2.switch()
def task2(name):
print(‘%s run task2‘%name)
g1.switch()
print(‘task2 run‘)
g1 = greenlet.greenlet(task1)
g2 = greenlet.greenlet(task2)
g1.switch(‘henry‘)
gevent 即可检测IO又可以实现单线程并发
什么是协程?
协程:单线程下的并发,又称微线程.
协程是一种用户态的轻量级线程,及协程是由用户程序自己来控制调节的.
1.python的线程属于内核级别的,由操作系统控制调节(如果单线程遇到IO或者执行时间过长就会被迫交出cpu执行权限,切换到其他线程运行)
2.单线程内开启协程,一旦遇到IO,就会从应用程序级别控制切换,以此来提升效率.
为什么要有协程?
CPython中无法并行执行任务导致效率低,而协程就是将单线程的效率最大化
协程的优缺点:
优点:
1.协程切换开销更小,属于程序级别的切换,操作系统完全感知不到
2.单线程内就可以实现并发的效果,最大限度利用cpu
缺点:
1.协程本质是在单线程下,无法利用多核,可以使一个程序开启多个进程,每个进程开启多个线程,每个线程内开启协程
2.协程指的是单个线程,一旦协程出现阻塞,将会阻塞整个线程
gevent的使用
from gevent import monkey
import gevent,time
monkey.patch_all()
def task1():
print(‘task1 run‘)
time.sleep(3)
print(‘task2 run‘)
def task2():
print(‘task2 run‘)
print(‘task2 run‘)
g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)
#g1.join()
#g2.join()
# 等待所有任务结束
# 那么一旦发生io 这个任务就立马结束了
gevent.joinall([g1,g2])
标签:ons 内核 生成器 用户 队列 之间 运行 多核 异步调用
原文地址:https://www.cnblogs.com/lizeqian1994/p/10222125.html