python+pcap+dpkt 抓包小实例

  1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*- 3 4 """ 网络数据包捕获与分析程序 """ 5 6 import pcap 7 import dpkt 8 import json 9 import re 10 import time 11 from urllib import unquote 12 13 # 过滤输出目标ip 14 dst_lists = [ 15, # nslookup dpdcs.4399sy.com.hk 16, # nslookup dpdcs.4399en.com 17, # nslookup dpdcs.4399sy.ru 18, # nslookup dpdcs.4399th.com 19, # nslookup sdkdcs.4399sy.com 20, # nslookup udpdcs.4399sy.com 21, # nslookup udpdcs.4399sy.com 22 ] 23 24 req_data = "" 25 times = 0 26 27 28 def capt_data(eth_name="eth0", p_type=None): 29 """ 30 捕获网卡数据包 31 :param eth_name 网卡名,eg. eth0,eth3... 32 :param p_type 日志捕获类型 1:sdk日志用例分析 2:目标域名过滤输出 3:原始数据包 33 :return: 34 """ 35 36 pc = pcap.pcap(eth_name) 37 pc.setfilter(tcp port 80) # 设置监听过滤器 38 print start capture.... 39 if pc: 40 for p_time, p_data in pc: # p_time为收到时间,p_data为收到数据 41 anly_capt(p_time, p_data, p_type) 42 43 44 def anly_capt(p_time, p_data, p_type): 45 """ 46 解析数据包 47 :param p_data 收到数据 48 :param p_type 日志捕获类型 1:sdk日志用例分析 2:目标域名过滤输出 3:原始数据包 49 :return: 50 """ 51 52 p = dpkt.ethernet.Ethernet(p_data) 53 if p.data.__class__.__name__ == IP: 54 ip_data = p.data 55 src_ip = %d.%d.%d.%d % tuple(map(ord, list(ip_data.src))) 56 dst_ip = %d.%d.%d.%d % tuple(map(ord, list(ip_data.dst))) 57 if p.data.data.__class__.__name__ == TCP: 58 tcp_data = p.data.data 59 if tcp_data.dport == 80: 60 # print tcp_data.data 61 if tcp_data.data: 62 # 调用日志模块,对日志进行处理 63 if p_type == 1: 64 # sdk日志用例分析 65 if dst_ip in dst_lists: 66 tmp = tcp_data.data.strip() 67 global req_data, times 68 if tmp.startswith("POST") or tmp.startswith("GET"): # or times > 0 69 if req_data: 70 haiwai_log_case(req_data) 71 req_data = tmp + "\n" 72 # times = 0 73 else: 74 req_data = req_data + tmp 75 # times = times + 1 76 77 elif p_type == 2: 78 # 目标域名过滤输出 79 if dst_ip in dst_lists: 80 print "tcp_data:", tcp_data.data 81 82 else: 83 # 无过滤条件输出 84 print "tcp_data:", tcp_data.data 85 86 87 # android 日誌類型,从data中获取 88 log_type_from_data = { 89 90 open_game: u[打开游戏], 91 network_check: u[网络监测], 92 open_login: u[登录界面前], 93 select_server: u[选服日志], 94 create_role: u[创角日志], 95 role_level_change: u[等级日志], 96 97 # 海外,俄语 98 activity_open: u[打开游戏], 99 load_start_before_login: u[加载开始], 100 load_finish_before_login: u[加载结束], 101 activity_before_login: u[登录界面前], 102 click_enter: u[进入游戏], 103 get_user_server_login: u[选服日志], 104 user_create_role: u[创角日志], 105 role_login: u[角色登录], 106 enter_success: u[成功进入游戏], 107 role_level: u[等级日志], 108 user_online: u[在线日志], 109 exit_success: u[退出游戏], 110 111 } 112 113 # ios日誌類型,从请求资源路径获取 114 log_type_from_path = { 115 activity_open.php: u[打开游戏], 116 load_start_before_login.php: u[加载开始], 117 load_finish_before_login.php: u[加载结束], 118 activity_before_login.php: u[登录界面前], 119 click_enter.php: u[进入游戏], 120 get_user_server_login.php: u[选服日志], 121 user_create_role.php: u[创角日志], 122 role_login.php: u[角色登录], 123 enter_success.php: u[成功进入游戏], 124 user_online.php: u[在线日志], 125 role_level.php: u[等级日志], 126 exit_success.php: u[退出游戏], 127 share.php: u[分享日志], 128 init_info.php: u[初始化日志], 129 event.php: u[事件日志], 130 user_login.php: u[user_login], 131 user_server_login.php: u[user_server_login], 132 enter_game.php: u[enter_game], 133 } 134 135 # 过滤path 136 filter_out_list = [ 137 u/, 138 plugin/error/check, 139 service/version/get_info, 140 ] 141 142 # 过滤打印出属于列表中的host的日志。 143 host_list = [ 144 dpdcs.4399sy.com.hk, 145 dpdcs.4399en.com, 146 dpdcs.4399sy.ru, 147 dpdcs.4399th.com, 148 sdkdcs.4399sy.com, 149 udpdcs.4399sy.com, 150 ] 151 152 153 def formattime(t): # 日期字段格式化 154 return time.strftime(%c, time.gmtime(t + 8 * 3600)) 155 156 157 def req_to_dict(req_string): 158 """ 159 将请求数据转换为dic 160 :param req_string: 161 :return: 162 """ 163 req_dict = {} 164 req_string = req_string.strip() 165 if len(req_string) > 0: 166 req_string = unquote(req_string) 167 # print "req_string_after_unquote:",req_string 168 m1 = re.search("(GET|POST)(.*)\?(.*)HTTP/1.1", req_string) # (method,path,param) 169 m2 = re.search("Host:(.*)", req_string) # (host,) 170 # m3 = re.search("\sdata=(.*)\s", req_string) # (body,) 171 m4 = re.search("\sdata=([\s\S]*)", req_string) # (body,) 172 # m5 = re.search("eventTime\":\"(\d+)", req_string) # (eventTime,) 173 m5 = re.search("eventTime\"\s*:\s*\"(\d+)", req_string) 174 if m1: 175 req_dict["method"] = m1.group(1).strip() 176 req_dict["path"] = m1.group(2).strip()[1:] 177 param_string = m1.group(3).strip() 178 if param_string: 179 param_string = param_string.split("&") 180 param_dict = {} 181 for item in param_string: 182 tmp_list = item.split("=") 183 if len(tmp_list) > 1: 184 param_dict[tmp_list[0]] = tmp_list[1] 185 req_dict["param"] = param_dict 186 if m2: 187 req_dict["host"] = m2.group(1).strip() 188 if m4: 189 try: 190 body = m4.group(1).replace("\n", "") 191 body = json.loads(body) 192 except ValueError: 193 print "\033[1;31;40m" 194 print "m4:Error:body ValueError,req_string-->%s" % req_string 195 print "\033[0m" 196 body = {} 197 req_dict["body"] = body 198 199 if m5: 200 req_dict["eventTime"] = formattime(int(m5.group(1))) 201 202 return req_dict 203 204 205 def haiwai_log_assert(req_dict): 206 """ 207 日志断言处理,输出分析结果 208 :param req_dict: 209 :return: 210 """ 211 212 # 从 data 中获取日志类型 213 if isinstance(req_dict, dict) and req_dict.get("body"): 214 if req_dict.get("body").get("data"): 215 data_type = req_dict.get("body").get("data").keys() 216 data_type_set = set(data_type) 217 types_key_set = set(log_type_from_data.keys()) 218 intersect = data_type_set.intersection(types_key_set) 219 if intersect: 220 log_type = intersect.pop() 221 print "\033[1;31;40m %s log pass!--from body data || EventTime:-->[%s] \033[0m" % ( 222 log_type_from_data.get(log_type), req_dict.get("eventTime")) 223 print req_dict 224 else: 225 if common in data_type and len(data_type) == 2: 226 data_type.remove(common) 227 print "\033[1;31;40m %s log not register!--from body data \033[0m" % data_type 228 229 # 从 path 中获取日志类型 230 path = req_dict.get("path") 231 host = req_dict.get("host") 232 if host in host_list: 233 if path in log_type_from_path.keys(): 234 eventTime = "" 235 if req_dict.get("eventTime"): 236 eventTime = req_dict.get("eventTime") 237 else: 238 if req_dict.get("param"): 239 eventTime = req_dict.get("param").get("time") 240 if eventTime: 241 eventTime = formattime(int(eventTime)) 242 print "\033[1;31;40m %s log pass--from url || EventTime:-->[%s] \033[0m" % ( 243 log_type_from_path.get(path), eventTime) 244 print req_dict 245 elif path and path not in filter_out_list: 246 print "\033[1;31;40m %s log not register!--from url! \033[0m" % path 247 print req_dict 248 249 250 def client_log_check(log_type, req_dict, platform="sy"): 251 """ 252 检查SDK客户端请求字段,返回测试结果集 253 :param log_type: 日志类型 254 :param req_dict: 日志字典 255 :param platform: 测试平台 256 :return: 257 """ 258 pass 259 260 261 def haiwai_log_case(req_string): 262 """ 263 日志用例集 264 一:将数据包转换为dict 265 二:对日志分析处理,输出测试结果 266 """ 267 268 req_dict = req_to_dict(req_string) 269 haiwai_log_assert(req_dict) 270 271 272 if __name__ == __main__: 273 try: 274 capt_data("eth3", 1) 275 except TypeError: 276 capt_data("eth3", 1)


