好久也没写过博客了,距离上一写的博文到现在也过去了四年。这段时间Urumqi yq突然爆发,单位暂时也不让回。一个人宅着没事就刷刷抖音看看短视频,作为一位有故事的男人【狗头】,抖音推荐的视频还是挺符合个人口味的,于是就萌生了把这些好看的视频全部保存的想法。之前喜欢用一些免费的公众号小程序去下载无水印的视频,可是没过多久这些小程序不是失效就是需要变相付费下载,最为一名资深白嫖党【狗头】岂是能忍的,然后就各种查资料,一顿操作猛如虎后,也没有什么简便方法。还是自己写程序把,可是自从工作以来最多就用用office三件套,编程都荒废了,用python写个hello world都会把单词拼错【汗】,没关系我们可以学,大不了从头再来,个人自学、在B站上学、面向github编程学。以下省略千余字学习过程。
mitmproxy is a free and open source interactive HTTPS proxy.
安装:pip install mitmproxy
安装后有3个命令行工具:mitmproxy, mitmdump, mitmweb
post_api = ‘https://aweme-lq.snssdk.com/aweme/v1/aweme/post/......‘ feed_api = ‘https://aweme-lq.snssdk.com/aweme/v2/feed/......‘ favo_api = ‘https://aweme-lq.snssdk.com/aweme/v1/aweme/favorite/......‘
然后又经过一番各种Google,结果发现:protobuf 有一套自己的语法。不了解 Protobuf 协议语法和用法的话也无法反解数据。也就是说在没有那个抖音自定义的( .proto 文件)情况下,基本上是无法逆序列化解析。经过一番资料查找各种踩坑之后,借助工具,使用google提供的的protoc编译工具,这个工具提供反解析参数。这样就能获取protobuf数据包的大致信息。
protoc –decode_raw < douyin_feed.bin > 1.txt
无果。好吧在网上找了个某第三方抖音分享视频下载网站,简单分析了它的接口,照着网站js加密参数 。后来偶然间发现了一篇帖子(不好意思
ok,主体框架搭好了,我们开始吧。talk is cheap show me the code…
""" =================================================== -*- coding:utf-8 -*- Author :GadyPu E_mail :Gadypy@gmail.com Time :2020/8/ 0004 下午 12:03 FileName :mitmproxy_douyin_get_url_scripts.py ==================================================== """ import mitmproxy.http import json import time import struct from socket import * post_api = ‘https://aweme-lq.snssdk.com/aweme/v1/aweme/post/‘ feed_api = ‘https://aweme-lq.snssdk.com/aweme/v2/feed/‘ favo_api = ‘https://aweme-lq.snssdk.com/aweme/v1/aweme/favorite/‘ def send_data_to_server(header_dict, type): ‘‘‘ :param header_dict 获取到的数据包字典 :param type 原视频类型,feed,post,favo 与服务端通信发送数据,使用自定义协议 每次调用就创建一个套接字,用完就关闭 ‘‘‘ tcp_client_socket = None host = ‘‘ port = 9527 address = (host, port) try: tcp_client_socket = socket(AF_INET, SOCK_STREAM) tcp_client_socket.connect(address) if type == ‘post‘ or type == ‘favo‘: json_data = json.dumps(header_dict) json_bytes = json_data.encode(‘utf-8‘) tcp_client_socket.send(struct.pack(‘i‘, len(json_bytes))) tcp_client_socket.send(json_bytes) #print(header_dict) elif type == ‘feed‘: #先发送协议头用struct打包,包含要发送的数据大小 data_len = header_dict[‘size‘] byte_arr = header_dict[‘content‘] new_dict = { ‘type‘: ‘feed‘, ‘size‘: data_len } json_data = json.dumps(new_dict) json_bytes = json_data.encode(‘utf-8‘) tcp_client_socket.send(struct.pack(‘i‘, len(json_bytes))) tcp_client_socket.send(json_bytes) chunk_size = 1024 start = 0 end = 1 * chunk_size #print(‘new_dict...........................:‘, new_dict) #发送protubuf数据,每次发送1024个字节 while True: if data_len // chunk_size > 0: read_bytes = byte_arr[start : end] start = end end += chunk_size data_len -= chunk_size tcp_client_socket.send(read_bytes) #print(read_bytes) else: read_bytes = byte_arr[start : ] tcp_client_socket.send(read_bytes) break except: pass if tcp_client_socket: tcp_client_socket.close() def get_local_time(create_time): ‘‘‘ :param create_time 原视频的发布时间,linux时间戳 :return: 返回年月日格式的日期 ‘‘‘ time_local = time.localtime(int(create_time)) pub_date = time.strftime("%Y-%m-%d", time_local) return pub_date def response(flow: mitmproxy.http.HTTPFlow): if flow.request.url.startswith(post_api) or flow.request.url.startswith(favo_api): if flow.response.status_code == 200: url_json = json.loads(flow.response.text) if url_json and url_json[‘aweme_list‘]: for aweme_list in url_json[‘aweme_list‘]: aweme_id = aweme_list[‘aweme_id‘] create_time = aweme_list[‘create_time‘] create_time = get_local_time(create_time) type = ‘post‘ if flow.request.url.startswith(post_api) else ‘favo‘ header_dict = { ‘type‘: type, ‘aweme_id_create_time‘: aweme_id + ‘_‘ + create_time, ‘nickname‘: aweme_list[‘author‘][‘nickname‘], ‘play_url‘: aweme_list[‘video‘][‘play_addr‘][‘url_list‘][0] } send_data_to_server(header_dict, type) elif flow.request.url.startswith(feed_api): if flow.response.status_code == 200: procbuf = flow.response.content feed_dict = { ‘type‘: "feed", ‘content‘: procbuf, ‘size‘: len(procbuf) } #print(‘procbuf len................‘, len(procbuf)) send_data_to_server(feed_dict, ‘feed‘) addons = { response() }
""" =================================================== -*- coding:utf-8 -*- Author :GadyPu E_mail :Gadypy@gmail.com Time :2020/8/ 0004 下午 FileName :parase_data.py ==================================================== """ import os import re import json import time import requests import random import hashlib from lxml import etree import warnings warnings.filterwarnings(‘ignore‘) ‘‘‘ class Get_real_play_addr(object): def __init__(self): self.request_url = ‘http://3g.gljlw.com/diy/ttxs_dy2.php?‘ self.headers = { ‘User-Agent‘: ‘Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36‘ } def parase_play_addr(self, url): paly_url = ‘‘ r = str(random.random())[2:] s = hashlib.md5((url + ‘@&^‘ + r).encode()).hexdigest() params = { ‘url‘: url, ‘r‘: r, ‘s‘: s } try: response = requests.get(url = self.request_url, headers = self.headers, params = params) if response.status_code == 200: content = response.content.decode(‘utf-8‘) html = etree.HTML(content) paly_url = html.xpath(‘//source/@src‘)[0] if paly_url: return paly_url except: print("network error cannot parase play_addr...") return None ‘‘‘ # 打开protobuf文件,用正则表达式匹配出所有的分享链接地址,用set去重 class Get_url_from_protobuf(object): def __init__(self): self.pat = r‘(?<=\")https://www.iesdouyin.com/share/video/.*(?=\")‘ self.command = r‘ --decode_raw <‘ def get_url(self, exe_path, file_path): try: fp = os.popen(exe_path + self.command + file_path) if fp: src = fp.read() fp.close() url_list = re.findall(self.pat, src) url_list = set(url_list) return url_list except: print(‘decode protobuf failed...‘) return None def get_local_time(create_time): time_local = time.localtime(int(create_time)) pub_date = time.strftime("%Y-%m-%d", time_local) return pub_date # 获取分享视频的下载地址 def Get_real_play_addr_by_web(aweme_id): headers = { ‘User-Agent‘: ‘Mozilla/5.0 (Linux; Android 6.0; ZTE BA520 Build/MRA58K; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/55.0.2883.77 Mobile Safari/537.36‘ } api_url = ‘https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids=‘ + aweme_id response = requests.get(url = api_url, headers = headers, verify = False) if response.status_code == 200: response_json = response.json() play_addr = response_json[‘item_list‘][0][‘video‘][‘play_addr‘][‘url_list‘][0] create_time = response_json[‘item_list‘][0][‘create_time‘] create_time = get_local_time(create_time) play_addr = play_addr.replace(‘playwm‘, ‘play‘, 1) # 返回下载地址和视频的发布时间 return (play_addr, create_time) return None, None #‘https://www.iesdouyin.com/share/video/6854870744690625805/?region=CN&mid=6854870758414781191‘ #print(Get_real_play_addr_by_web("6854870744690625805"))
""" =================================================== -*- coding:utf-8 -*- Author :GadyPu E_mail :Gadypy@gmail.com Time :2020/8/ 0004 下午 FileName :douyin_video_downloads.py ==================================================== """ import requests import json import os import time import sys import threading import struct from queue import Queue from socket import * from parase_data import Get_url_from_protobuf from parase_data import Get_real_play_addr_by_web import warnings warnings.filterwarnings("ignore") headers = { ‘User-Agent‘: ‘Mozilla/5.0 (Linux; Android 6.0; ZTE BA520 Build/MRA58K; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/55.0.2883.77 Mobile Safari/537.36‘ } que = Queue() chunk_size = 1024 #下载线程 def Download(path, index): while True: global que if que.empty(): print("No.{} thread is waiting for data...".format(index)) data = que.get() dir_name = data[‘type‘] file_name = data[‘aweme_id_create_time‘] if dir_name == ‘feed‘: play_url, create_time = Get_real_play_addr_by_web(data[‘aweme_id_create_time‘]) file_name = file_name + ‘_‘ + create_time if not play_url: continue dir_path = os.path.join(path, dir_name) else: dir_path = os.path.join(path, dir_name, data[‘nickname‘]) play_url = data[‘play_url‘] if not os.path.exists(dir_path): os.makedirs(dir_path) file_path = os.path.join(dir_path, file_name + ‘.mp4‘) read_size = 0 if os.path.exists(file_path): continue try: response = requests.get(url = play_url, headers = headers, verify = False) if response.status_code == 200: #print(response.headers) total_szie = int(response.headers[‘Content-Length‘]) print("NO.{} thread is downloading... {} filesize {} bytes".format(index, data[‘aweme_id_create_time‘] + ‘.mp4‘, total_szie)) t_1 = time.time() with open(file_path, "wb") as fp: for data in response.iter_content(chunk_size = chunk_size): if data: fp.write(data) read_size += chunk_size #print(‘No.{} threading is downloading: {} ...: {}%‘.format(index, file_path, str(round(read_size / total_szie * 100, 2)))) print("No.{} thread finshed! total cost: {}s".format(index, str(round(time.time() - t_1, 2)))) time.sleep(0.2) else: print("cannot conneted with the servers...") except: print("downloading %s failed... network error please try againg"%play_url) #que.put(data) # 服务端用于接收mitm脚本发送的数据 def run(exe_path, file_path): PORT = 9527 HOST = ‘‘ address = (HOST, PORT) tcp_server_socket = socket(AF_INET, SOCK_STREAM) tcp_server_socket.bind(address) print("the server is lunching, listeing the port {}...".format(address[1])) tcp_server_socket.listen(5) while True: try: client_socket, client_address = tcp_server_socket.accept() print(‘the client{} linked:{}‘.format(client_address, time.asctime(time.localtime(time.time())))) data = client_socket.recv(4) header_size = struct.unpack(‘i‘, data)[0] header_bytes = client_socket.recv(header_size) header_json = json.loads(header_bytes.decode(‘utf-8‘)) if header_json[‘type‘] == ‘post‘ or header_json[‘type‘] == ‘favo‘: que.put(header_json) else: chunk_size = 1024 read_size = 0 total_size = header_json[‘size‘] with open(file_path, ‘wb‘) as fp: while read_size < total_size: data = client_socket.recv(chunk_size) if data: fp.write(data) read_size += len(data) probuf = Get_url_from_protobuf() url_list = probuf.get_url(exe_path, file_path) for url in url_list: try: feed_dict = { ‘type‘: ‘feed‘, ‘feed_url‘: url, ‘aweme_id_create_time‘: url[38: 57] #只是视频的id,并没有发布时间 } que.put(feed_dict) except: continue client_socket.close() except: tcp_server_socket.close() print("never run here...") if __name__ == "__main__": dir_path = sys.argv[1] if dir_path.endswith(‘/‘): dir_path += ‘/‘ if not os.path.exists(dir_path): os.makedirs(dir_path) #dir_path = r‘C:\Users\Administrator\Desktop\pytho_src\douyin\videos‘ thread_list = [] for i in range(4): if i == 0: thread_list.append(threading.Thread(target = run, args = (r‘.\protobuf\protoc.exe‘, r‘.\probuf.bin‘, ))) else: thread_list.append(threading.Thread(target = Download, args = (dir_path , i + 1, ))) thread_list[i].setDaemon = True [i.start() for i in thread_list] [i.join() for i in thread_list] # #run(r‘.\protobuf\protoc.exe‘, r‘.\probuf.bin‘) # print("finish!")
