标签:表达 异步 tor content 携程 list app sele status
一、多线程实现并发
from concurrent.futures import ThreadPoolExecutor import requests import time # 把大象放冰箱一共分几步 #1:找一个冰箱 导入并创建线程池(ThreadPoolExecutor) #2:大象放进去 pool.submit(task, url) #3:关门 pool.shutdown(wait=True) def task(url): # 创建get请求 response = requests.get(url) print(url, response) # 创建一个线程池 pool = ThreadPoolExecutor(7) url_list = [ ‘http://www.cnblogs.com/wupeiqi‘, ‘http://huaban.com/favorite/beauty/‘, ‘http://www.bing.com‘, ‘http://www.zhihu.com‘, ‘http://www.sina.com‘, ‘http://www.baidu.com‘, ‘http://www.autohome.com.cn‘, ] for url in url_list: pool.submit(task, url) pool.shutdown(wait=True)
# 并发--线程 from concurrent.futures import ThreadPoolExecutor import requests import time # 放好大象你说一声啊 #1:找一个冰箱 导入并创建线程池(ThreadPoolExecutor) #2:大象放进去 pool.submit(task, url) #3:关门 pool.shutdown(wait=True) def task(url): response=requests.get(url) return response def done(future,*args,**kwargs): response= future.result() print(response.status_code,response.content) pool = ThreadPoolExecutor(7) url_list = [ ‘http://www.cnblogs.com/wupeiqi‘, ‘http://huaban.com/favorite/beauty/‘, ‘http://www.bing.com‘, ‘http://www.zhihu.com‘, ‘http://www.sina.com‘, ‘http://www.baidu.com‘, ‘http://www.autohome.com.cn‘, ] for url in url_list: v = pool.submit(task,url) v.add_done_callback(done) pool.shutdown(wait=True)
二、多进程并发和多线程并发代码类似,只是引入的包不同
from concurrent.futures import ProcessPoolExecutor import requests import time def task(url): response = requests.get(url) print(url,response) # 写正则表达式 if __name__ == "__main__": pool = ProcessPoolExecutor(7) url_list = [ ‘http://www.cnblogs.com/wupeiqi‘, ‘http://huaban.com/favorite/beauty/‘, ‘http://www.bing.com‘, ‘http://www.zhihu.com‘, ‘http://www.sina.com‘, ‘http://www.baidu.com‘, ‘http://www.autohome.com.cn‘, ] for url in url_list: pool.submit(task, url) pool.shutdown(wait=True)
from concurrent.futures import ProcessPoolExecutor import requests import time def task(url): response = requests.get(url) return response def done(future,*args,**kwargs): response = future.result() print(response.status_code,response.content) if __name__ == "__main__": pool = ProcessPoolExecutor(7) url_list = [ ‘http://www.cnblogs.com/wupeiqi‘, ‘http://huaban.com/favorite/beauty/‘, ‘http://www.bing.com‘, ‘http://www.zhihu.com‘, ‘http://www.sina.com‘, ‘http://www.baidu.com‘, ‘http://www.autohome.com.cn‘, ] for url in url_list: v=pool.submit(task, url) v.add_done_callback(done) pool.shutdown(wait=True)
三、协程 + 异步IO
需要安装一下包 pip install aiohttp、requests、greenlet、gevent、grequests
import asyncio @asyncio.coroutine def task(): print(‘before...task...‘) yield from asyncio.sleep(5) #仅支持TCP获取结果,大象内部携程并发 print(‘end...task...‘) tasks=[task(),task()] loop = asyncio.get_event_loop() #打开冰箱门 loop.run_until_complete(asyncio.gather(*tasks)) #大象放进去 loop.close() #关门
import asyncio @asyncio.coroutine def task(host,url=‘/‘): print(‘before...task...‘,host,url) reader,writer= yield from asyncio.open_connection(host,80) #仅支持TCP获取结果,大象内部携程并发 request_header_content = "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (url, host,) request_header_content = bytes(request_header_content, encoding=‘utf-8‘) writer.write(request_header_content) yield from writer.drain() text = yield from reader.read() print(‘end‘, host, url, text) writer.close() tasks=[task(‘www.cnblogs.com‘, ‘/Yk2012/‘), task(‘www.baidu.com‘)] loop = asyncio.get_event_loop() #打开冰箱门 loop.run_until_complete(asyncio.gather(*tasks)) #大象放进去 loop.close() #关门
import aiohttp import asyncio @asyncio.coroutine def fetch_async(url): print(url) response = yield from aiohttp.request(‘GET‘, url) print(url, response) response.close() tasks = [fetch_async(‘http://www.baidu.com/‘), fetch_async(‘http://www.chouti.com/‘)] event_loop = asyncio.get_event_loop() results = event_loop.run_until_complete(asyncio.gather(*tasks)) event_loop.close()
import asyncio import requests @asyncio.coroutine def task(func, *args): print(func,args) loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args) # requests.get(‘http://www.cnblogs.com/wupeiqi/‘) response = yield from future print(response.url, response.content) tasks = [ task(requests.get, ‘http://www.cnblogs.com/wupeiqi/‘), task(requests.get, ‘http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091‘) ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
import grequests import grequests request_list = [ grequests.get(‘https://baidu.com/‘), ] # ##### 执行并获取响应列表 ##### response_list = grequests.map(request_list,size=5) print(response_list)
四、自定义异步IO
import socket import select sk = socket.socket() #1、连接 sk.connect({‘www.baidu.com‘,80,}) #IO阻塞 #2、发送消息 sk.send(b‘GET / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\n‘) #3、等待服务端响应 data = sk.recv(8096) #IO阻塞 #4、关闭连接 sk.close()
import socket import select class HttpResponse: def __init__(self,recv_data): self.recv_data = recv_data self.header_dict = {} self.body = None self.initialize() def initialize(self): headers, body = self.recv_data.split(b‘\r\n\r\n‘, 1) self.body = body header_list = headers.split(b‘\r\n‘) for h in header_list: h_str = str(h,encoding=‘utf-8‘) v = h_str.split(‘:‘,1) if len(v) == 2: self.header_dict[v[0]] = v[1] class HttpRequest: def __init__(self,socket,host,callback): self.socket = socket self.host=host self.callback = callback #会掉函数 def fileno(self): ‘‘‘有了fileno方法,那么这个对象也可以select了‘‘‘ return self.socket.fileno() class AsyncRequest: def __init__(self): self.conn = [] # 用于检测消息是否接收完成 self.connection = [] # 用于检测是否已经连接成功 def add_request(self,host,callback): try: sk = socket.socket() #创建sk, sk.setblocking(0) #关闭阻塞 sk.connect((host,80,)) #sk连接 except BlockingIOError as e: pass # 创建一个Request对象(sk, host) request = HttpRequest(sk,host,callback) # 把这个对象放到两个属性中 self.conn.append(request) self.connection.append(request) def run(self): while True: rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05) for w in wlist: print(w.host, ‘连接成功...‘) # 只要能循环到,表示socket和服务器端已经连接成功 发送消息 tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n" %(w.host,) w.socket.send(bytes(tpl,encoding=‘utf-8‘)) self.connection.remove(w) #链接成功后,从connection中删除 for r in rlist: recv_data = bytes() #为了防止数据丢失,需要while True一直接收 while True: try : chunck = r.socket.recv(8096) recv_data += chunck # 接收消息 except Exception as e: break # 接收完消息,赋给HttpResponse resonse = HttpResponse(recv_data) r.callback(resonse) #接收完响应,调用回调函数 r.socket.close() #r接收完响应,关闭请求。 self.conn.remove(r) #接收完响应,从conn中移除 # 数据接收完成后,conn为空。结束循环 if len(self.conn) == 0: break def f1(response): print(‘保存到文件‘,response.header_dict) def f2(response): print(‘保存到数据库‘, response.header_dict) url_list = [ {‘host‘:‘www.baidu.com‘,‘callback‘: f1}, {‘host‘:‘cn.bing.com‘,‘callback‘: f2}, {‘host‘:‘www.cnblogs.com‘,‘callback‘: f2}, ] req = AsyncRequest() for item in url_list: req.add_request(item[‘host‘],item[‘callback‘]) req.run()
标签:表达 异步 tor content 携程 list app sele status
原文地址:https://www.cnblogs.com/YK2012/p/11881652.html