标签:else ports with python __init__ encode count time ret
# coding=utf-8
from queue import Queue
import nmap
import threading
import requests
import chardet
import re
import json
import os
# 存储所有扫描的ip和端口服务
final_domains = []
# 存储每个ip的端口的临时列表
ports = []
# 全局锁
glock = threading.Lock()
# 端口服务扫描类
class PortScan(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
# 作为线程类,需要重写run方法
def run(self):
while not self._queue.empty():
scan_ip = self._queue.get() # 从队列出取出每一个扫描的ip
glock.acquire()
try:
self.portscan(scan_ip)
self.Scan(scan_ip)
except Exception as e:
print(e)
pass
glock.release()
# 调用masscan识别端口
def portscan(self, scan_ip):
temp_ports = [] # 设定一个临时端口列表
os.system(‘masscan.exe ‘ + scan_ip + ‘ -p 1-100 -oJ masscan.json --rate 2000‘)
# 提取json文件中的端口
with open(‘masscan.json‘, ‘r‘) as f:
for line in f:
if line.startswith(‘{ ‘):
temp = json.loads(line[:-2]) # 取出一条完整json形式的数据
temp_ports.append(str(temp["ports"][0]["port"]))# 端口取出加入临时端口中
print(temp_ports)
if len(temp_ports) > 25:
temp_ports.clear() # 如果端口数量大于30,说明可能存在防火墙,属于误报,清空列表
else:
ports.extend(temp_ports) # 小于30则放到总端口列表里
# 获取网站的web应用程序名和网站标题信息
def Title(self,scan_url_port, service_name):
try:
resp = requests.get(scan_url_port, timeout=3, verify=False)
# 获取网站的页面编码并且应用
detectencode = chardet.detect(resp.content) # 利用chardet模块检测编码
response = re.findall(r‘<title>(.*?)</title>‘, resp.content.decode(detectencode[‘encoding‘]), re.S) # re.S的作用 匹配的时候扩展到整个字符串(包括换行这些\n)
if response: # 如果访问的时候正则匹配到<title>标签
# 将页面解码为utf-8,获取中文标题
# 如果访问的时候正则匹配到title标签
res = response[0]
banner = resp.headers[‘server‘]
final_domains.append(scan_url_port + ‘\t‘ + banner + ‘\t‘ + res)
else:
final_domains.append(scan_url_port + ‘\t‘ + service_name + ‘\t‘ + "获取标题失败,请手动尝试!!!")
except:
pass
# 调用nmap识别服务
def Scan(self,scan_ip):
nm = nmap.PortScanner()
try:
for port in ports:
ret = nm.scan(scan_ip, port, arguments=‘-Pn -sS‘)
service_name = ret[‘scan‘][scan_ip][‘tcp‘][int(port)][‘name‘]
print(‘[*] 主机 ‘ + scan_ip + ‘ 的 ‘ + str(port) + ‘ 端口服务为: ‘ + service_name)
if ‘http‘ in service_name or service_name == ‘sun-answerbook‘:
if service_name == ‘https‘ or service_name == ‘https-alt‘:
scan_url_port = ‘https://‘ + scan_ip + ‘:‘ + str(port)
self.Title(scan_url_port, service_name)
else:
scan_url_port = ‘http://‘ + scan_ip + ‘:‘ + str(port)
self.Title(scan_url_port, service_name)
else:
# 形式为:["47.96.196.217:443 https","47.96.196.217:80 blackice-icecap"]....
final_domains.append(scan_ip + ‘:‘ + str(port) + ‘\t端口服务为: ‘ + service_name)
except Exception as e:
print(e)
pass
ports.clear() # 扫一次清理一次
# 启用多线程扫描
def main():
queue = Queue(1000)
try:
f = open(‘ip.txt‘, ‘r‘) # 把文本中的内容按\n分割加入到queue队列中
for line in f.readlines():
final_ip = line.strip(‘\n‘)
queue.put(final_ip)
print("加入队列---->" + final_ip)
threads = [] # 创建一个列表
thread_count = 50
for i in range(thread_count):
threads.append(PortScan(queue)) # 列表中每个都作为一个线程进行扫描
for t in threads:
t.start()
for t in threads:
t.join()
f.close()
except:
pass
if __name__ == ‘__main__‘:
main() # 主进程
tmp_domians = []
for tmp_domain in final_domains: # 循环一次 转移到tmp_domains里面
if tmp_domain not in tmp_domians:
tmp_domians.append(tmp_domain)
for url in tmp_domians:
with open(‘scan_url_port.txt‘, ‘a‘) as ff:
ff.write(url+‘\n‘)
标签:else ports with python __init__ encode count time ret
原文地址:https://www.cnblogs.com/zpchcbd/p/12609695.html