标签:ext 线程 控制 bsp main error pid 背景 net
一、背景
因为经常有爬虫数据处理的需求,之前一直用的多线程+requests方式爬虫,但是这种方式有两个问题:
1、当请求很多,并发稍微多一点的时候电脑特别卡
2、每次变更请求包,比如post换get,需要用不同的方式解析出header等数据不是很方便
主要基于这两个原因,搞了一个基于socket+协程的方式高效实现这种并发爬虫需求
二、代码如下
import asyncio import binascii import gzip import socket import ssl class Spider(object): def __init__( self, parameter_iter, concurrent_num=1, request_data="", protocol="https", timeout=3, host="", port=0, data_type="hex", ): self.parameter_iter = parameter_iter self.concurrent_num = concurrent_num self.request_data = request_data self.encrypt = 1 if protocol == "https" else 0 self.timeout = timeout self.host = host self.port = port self.data_type = data_type self.result = [] # 看原始数据是hex格式还是字符串格式,对其解码。hex基本用不到 if self.data_type == "hex": self.request_data = self.request_data.strip() self.request_data = binascii.unhexlify(self.request_data).strip() + b"\r\n\r\n" else: self.request_data = self.request_data.strip() + "\r\n\r\n" self.request_data = self.request_data.encode( encoding="UTF-8", errors="strict" ) # 如果没有设置host/port,从数据包里取 if not self.host or not self.port: try: for header in self.request_data.split(b"\n"): if header.startswith(b"Host:"): host = header.split(b" ")[1].strip() if host.find(b":") > -1: self.host, self.port = host.split(b":") elif self.encrypt: self.host = host self.port = 443 else: self.host = host self.port = 80 except Exception as e: print("parse error %s" % e) pass async def send_request(self, parameter=""): if self.encrypt: # 如果是https等加密协议,用ssl sock = ssl.wrap_socket(socket.socket()) else: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout) try: sock.connect((self.host, int(self.port))) except Exception as e: print("error123 %s" % e) sock.send(self.request_data.replace(b"*##*", str(parameter).encode("utf-8"))) response = b"" response_code = -1 compression_flag = 0 chunk_flag = 1 content_length = -1 parse_header_flag = 1 while True: data = sock.recv(1024) if response.find(b"\r\n\r\n") > -1 and parse_header_flag: parse_header_flag = 0 response_code = response.split(b"\r\n")[0].split(b" ")[1] header_list, body = response.split(b"\r\n\r\n") for header in header_list.split(b"\r\n"): if header.startswith(b"Content-Length:"): # 判断body有多长,用来结束sock接收数据 chunk_flag = 0 content_length = int(header.split(b" ")[1]) elif header == b"Content-Encoding: gzip": # 判断body是否gzip压缩了 compression_flag = 1 response = body # print(content_length, len(response), response_code, response) # 两种情况:1、如果分块传输没有Content-Length,就看是不是0结尾2、如果有Content-Length,接收body到Content-Length长度结束 if data: if chunk_flag == 1 and response.strip().endswith(b"\r\n0"): break else: response += data if content_length == len(response): break else: break sock.close() # 解析分块传输 parse_data = b"" # response = response.strip() if chunk_flag: try: # 先格式化chunk数据包,获取原始数据 count = 20 while count: count -= 1 if response: data_len, left_data = response.split(b"\r\n", 1) # print(data_len, left_data) data_len = int(data_len, 16) parse_data += left_data[:data_len] if data_len == b"0": break else: response = left_data[data_len + 2 :] except Exception as e: print(e) # 解压缩 if compression_flag: # 如果压缩了需要解压数据包 response = gzip.decompress(parse_data) self.result.append((response_code, response)) async def spider_request(self): while True: try: data = next(self.parameter_iter) await self.send_request(parameter=data) except StopIteration: return async def main(self): task_list = [] for _ in range(self.concurrent_num): task_list.append(asyncio.create_task(self.spider_request())) for task in task_list: await task def run(self): asyncio.run(self.main()) return self.result if __name__ == "__main__": # 下面两个_request_data等效 _request_data = """GET / HTTP/1.1 Host: www.baidu.com User-Agent: curl/7.64.1 Accept: */*""" # _request_data = "474554202f20485454502f312e310a486f73743a207777772e62616964752e636f6d0a557365722d4167656e743a206375726c2f372e36342e310a4163636570743a202a2f2a0d0a" _concurrent_num = 2 # 并发请求数,控制速度 _parameter_iter = iter(range(10)) # 爬虫过程中需要动态替换的参数,被替换的参数为*##* _protocol = "http" # 请求用的协议 _timeout = 3 # 单个请求超时时间 _data_type = "str" # 请求包编码,支持hex或str格式 result = Spider( parameter_iter=_parameter_iter, concurrent_num=_concurrent_num, request_data=_request_data, protocol=_protocol, timeout=_timeout, data_type=_data_type, ).run() for info in result: print(info)
三、优势:
1、如果应用场景是http/https的爬虫,不用做7层包的解析,只需要替换_request_data,通用性强
2、基于协程性能好一点
3、因为基于socket,理论上扩展比较方便,可以改成支持各种协议
4、支持字符串或hex格式的请求包
四、涉及的知识点
1、怎么判断sock.recv已经接收完了数据(这里只涉及http/https的分块传输或其他传输)
2、分块传输解码
标签:ext 线程 控制 bsp main error pid 背景 net
原文地址:https://www.cnblogs.com/xugongzi007/p/14453711.html