标签:ext 线程 控制 bsp main error pid 背景 net
一、背景
因为经常有爬虫数据处理的需求,之前一直用的多线程+requests方式爬虫,但是这种方式有两个问题:
1、当请求很多,并发稍微多一点的时候电脑特别卡
2、每次变更请求包,比如post换get,需要用不同的方式解析出header等数据不是很方便
主要基于这两个原因,搞了一个基于socket+协程的方式高效实现这种并发爬虫需求
二、代码如下
import asyncio
import binascii
import gzip
import socket
import ssl
class Spider(object):
def __init__(
self,
parameter_iter,
concurrent_num=1,
request_data="",
protocol="https",
timeout=3,
host="",
port=0,
data_type="hex",
):
self.parameter_iter = parameter_iter
self.concurrent_num = concurrent_num
self.request_data = request_data
self.encrypt = 1 if protocol == "https" else 0
self.timeout = timeout
self.host = host
self.port = port
self.data_type = data_type
self.result = []
# 看原始数据是hex格式还是字符串格式,对其解码。hex基本用不到
if self.data_type == "hex":
self.request_data = self.request_data.strip()
self.request_data = binascii.unhexlify(self.request_data).strip() + b"\r\n\r\n"
else:
self.request_data = self.request_data.strip() + "\r\n\r\n"
self.request_data = self.request_data.encode(
encoding="UTF-8", errors="strict"
)
# 如果没有设置host/port,从数据包里取
if not self.host or not self.port:
try:
for header in self.request_data.split(b"\n"):
if header.startswith(b"Host:"):
host = header.split(b" ")[1].strip()
if host.find(b":") > -1:
self.host, self.port = host.split(b":")
elif self.encrypt:
self.host = host
self.port = 443
else:
self.host = host
self.port = 80
except Exception as e:
print("parse error %s" % e)
pass
async def send_request(self, parameter=""):
if self.encrypt: # 如果是https等加密协议,用ssl
sock = ssl.wrap_socket(socket.socket())
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
try:
sock.connect((self.host, int(self.port)))
except Exception as e:
print("error123 %s" % e)
sock.send(self.request_data.replace(b"*##*", str(parameter).encode("utf-8")))
response = b""
response_code = -1
compression_flag = 0
chunk_flag = 1
content_length = -1
parse_header_flag = 1
while True:
data = sock.recv(1024)
if response.find(b"\r\n\r\n") > -1 and parse_header_flag:
parse_header_flag = 0
response_code = response.split(b"\r\n")[0].split(b" ")[1]
header_list, body = response.split(b"\r\n\r\n")
for header in header_list.split(b"\r\n"):
if header.startswith(b"Content-Length:"): # 判断body有多长,用来结束sock接收数据
chunk_flag = 0
content_length = int(header.split(b" ")[1])
elif header == b"Content-Encoding: gzip": # 判断body是否gzip压缩了
compression_flag = 1
response = body
# print(content_length, len(response), response_code, response)
# 两种情况:1、如果分块传输没有Content-Length,就看是不是0结尾2、如果有Content-Length,接收body到Content-Length长度结束
if data:
if chunk_flag == 1 and response.strip().endswith(b"\r\n0"):
break
else:
response += data
if content_length == len(response):
break
else:
break
sock.close()
# 解析分块传输
parse_data = b""
# response = response.strip()
if chunk_flag:
try:
# 先格式化chunk数据包,获取原始数据
count = 20
while count:
count -= 1
if response:
data_len, left_data = response.split(b"\r\n", 1)
# print(data_len, left_data)
data_len = int(data_len, 16)
parse_data += left_data[:data_len]
if data_len == b"0":
break
else:
response = left_data[data_len + 2 :]
except Exception as e:
print(e)
# 解压缩
if compression_flag: # 如果压缩了需要解压数据包
response = gzip.decompress(parse_data)
self.result.append((response_code, response))
async def spider_request(self):
while True:
try:
data = next(self.parameter_iter)
await self.send_request(parameter=data)
except StopIteration:
return
async def main(self):
task_list = []
for _ in range(self.concurrent_num):
task_list.append(asyncio.create_task(self.spider_request()))
for task in task_list:
await task
def run(self):
asyncio.run(self.main())
return self.result
if __name__ == "__main__":
# 下面两个_request_data等效
_request_data = """GET / HTTP/1.1
Host: www.baidu.com
User-Agent: curl/7.64.1
Accept: */*"""
# _request_data = "474554202f20485454502f312e310a486f73743a207777772e62616964752e636f6d0a557365722d4167656e743a206375726c2f372e36342e310a4163636570743a202a2f2a0d0a"
_concurrent_num = 2 # 并发请求数,控制速度
_parameter_iter = iter(range(10)) # 爬虫过程中需要动态替换的参数,被替换的参数为*##*
_protocol = "http" # 请求用的协议
_timeout = 3 # 单个请求超时时间
_data_type = "str" # 请求包编码,支持hex或str格式
result = Spider(
parameter_iter=_parameter_iter,
concurrent_num=_concurrent_num,
request_data=_request_data,
protocol=_protocol,
timeout=_timeout,
data_type=_data_type,
).run()
for info in result:
print(info)
三、优势:
1、如果应用场景是http/https的爬虫,不用做7层包的解析,只需要替换_request_data,通用性强
2、基于协程性能好一点
3、因为基于socket,理论上扩展比较方便,可以改成支持各种协议
4、支持字符串或hex格式的请求包
四、涉及的知识点
1、怎么判断sock.recv已经接收完了数据(这里只涉及http/https的分块传输或其他传输)
2、分块传输解码
标签:ext 线程 控制 bsp main error pid 背景 net
原文地址:https://www.cnblogs.com/xugongzi007/p/14453711.html