标签:ddr 爬取网页 spider 分配 连接失败 nec res cal std
//server import socket, select, re, queue, redis from multiprocessing import Pool, cpu_count from pymongo import MongoClient host = ‘192.168.1.107‘ ConnectionList = [] Recv_buffer = 4096000 Client_Status = {} Client_Num = {} redis1 = redis.Redis(host=‘localhost‘, port=6379, db=0) Num = 0 class Distributed_Web_Crawler: def __init__(self, port): self.url_num = 1 self.queue = queue.Queue() self.db = MongoClient().CrawSpider.content self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((host, port)) self.server_socket.listen(10) self.pool = Pool(cpu_count() - 1) ConnectionList.append(self.server_socket) print("服务器运行在端口:" + str(port)) address = ‘https://movie.douban.com/‘ self.queue.put(address) redis1.set(address, 0) self.main() def main(self): global Num while 1: if not self.queue.empty() and ConnectionList.__len__() > 1 is not None: self.pool.apply_async(self.task_manage()) read_sockets, write_sockets, error_sockets = select.select(ConnectionList, [], []) for sock in read_sockets: if sock == self.server_socket: conn, addr = self.server_socket.accept() ConnectionList.append(conn) core_num = conn.recv(Recv_buffer).decode(‘utf8‘) Client_Status[conn] = core_num Client_Num[conn] = Client_Num.__len__() + 1 print(‘客户端 ‘ + addr[0] + ‘:‘ + str(addr[1]) + ‘已连接,核心数: ‘ + core_num + ‘\n编号为‘ + str(Client_Num[ conn])) else: data = sock.recv(Recv_buffer) if data: Contents = data.decode(‘utf8‘).split(‘Page_ContentPPPPPP///////‘) # print(‘收到‘+str(Client_Num[sock])+‘号机发来数据,正在处理‘) Client_Status[sock] = int(Client_Status[sock]) + len(Contents) print(‘编号‘+str(Client_Num[sock])+‘可用核心‘+str(Client_Status[sock])) for content in Contents: if content: self.pool.apply_async(self.web_page_resolution(content)) else: print(‘客户端 ‘ + addr[0] + ‘:‘ + str(addr[1]) + ‘断开连接‘) sock.close() Client_Status.pop(sock) Client_Num.pop(sock) ConnectionList.remove(sock) def web_page_resolution(self, content): db = MongoClient().Web.data db.insert({‘page_content‘: content}) pattern = re.compile(‘https://movie.douban.com/(.*?)"‘) urls = re.findall(string=content, pattern=pattern) for url in urls: url = ‘https://movie.douban.com/‘ + url if redis1.get(url) is None: redis1.set(url, self.url_num) self.queue.put(url) self.url_num += 1 def task_manage(self): urls = ‘‘ for socket in ConnectionList: if socket != self.server_socket: while not self.queue.empty() and int(Client_Status[socket]) != 0: urls = urls + self.queue.get() + ‘ ‘ Client_Status[socket] = int(Client_Status[socket]) - 1 # print(‘向‘ + str(Client_Num[socket]) + ‘号终端分配任务‘) socket.send(urls.encode(‘utf8‘)) if __name__ == "__main__": port = 8888 Distributed_Web_Crawler(port, )
//Client import socket, sys, select from multiprocessing import cpu_count from requests import get from multiprocessing import Pool p = Pool(cpu_count() - 1) host = ‘192.168.0.103‘ Page_contents = [] def crawler_page(url): print("正在爬取网页" + url) content = get(url).content.decode(‘utf8‘) + ‘Page_ContentPPPPPP///////‘ print(url + "爬取完成,正在向服务器发送数据") s.send(content.encode(‘utf8‘)) def listing(): while 1: rlist = [sys.stdin, s] read_list, write_list, error_list = select.select(rlist, [], []) for sock in read_list: if sock == s: data = sock.recv(4096).decode(‘utf8‘) if data != ‘quit‘ and data: urls = data.split() if len(urls) == 1: p.apply_async(crawler_page(urls[0])) else: for url in urls: p.apply_async(crawler_page(url)) urls.remove(url) elif data == ‘quit‘: print(‘接收到服务器关闭指令,客户端正在退出‘) sys.exit() else: print(‘服务器连接失败,正在退出‘) sys.exit() if __name__ == "__main__": port = 8888 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(3) try: s.connect((‘192.168.1.107‘, port)) except: print("无法连接至服务器,请检查地址后重试") sys.exit() print("已连接至服务器,开始发送机器信息\n核心数:" + str(cpu_count())) s.send(str(cpu_count()).encode(‘utf8‘)) listing()
标签:ddr 爬取网页 spider 分配 连接失败 nec res cal std
原文地址:http://www.cnblogs.com/INnoVationv2/p/6072211.html