标签:类型 for use comm 指定 同文件 win turn 存在
import os
import tarfile
import zipfile
import threading
import winrm
LOCAL_FILE_PATH = "C:\\"
THREAD_MAX = threading.BoundedSemaphore(10)
CONN_FLAG = False
class Trans(object):
def __init__(self, username, password):
self.username = username
self.password = password
self.unzip_file_path = None
self.ip_list = []
self.conns = []
self.failed_ip_list = []
self.thread_pool = list()
def connection(self):
if not self.ip_list:
print("ip不能为空")
return
for ip in self.ip_list:
try:
conn_obj = winrm.Session(‘http://‘ + ip + ‘:5985/wsman‘, auth=(self.username, self.password))
# conn_obj.run_cmd("dir")
self.conns.append(conn_obj)
except Exception as e:
self.failed_ip_list.append(ip)
print(e)
return self.conns, self.failed_ip_list
# def file_exist(self, zip_file_path, end_with):
# abs_zip_file_path = os.path.abspath(‘%s‘ % zip_file_path)
# unzip_file_name = abs_zip_file_path.split(‘\\‘)[-1].rstrip(end_with)
# file_path = os.path.join("C:\\", unzip_file_name)
#
# if os.path.exists(file_path):
# print("相同文件名已存在,请检查路径")
# return
#
# if not os.path.exists(zip_file_path):
# print("文件不存在,请检查文件路径")
# return
#
# return file_path
def unzip_package(self, zip_file_path):
"""
1. 打开本地文件
2. 判断文件类型
tar包
zip包
其他
3. 解压文件到指定目录
:param zip_file_path: 压缩包文件路径
"""
if zip_file_path.endswith(‘.zip‘):
try:
zip_file = zipfile.ZipFile(zip_file_path)
# 得到压缩包里所有文件
zip_list = zip_file.namelist()
self.unzip_file_path = zip_list[0]
for f in zip_list:
# 循环解压文件到指定目录
zip_file.extract(f, LOCAL_FILE_PATH)
# 关闭文件,释放内存
zip_file.close()
except Exception as e:
print(e)
if zip_file_path.endswith(‘.tar.gz‘):
try:
tar = tarfile.open(zip_file_path, "r:gz")
file_names = tar.getnames()
for file_name in file_names:
tar.extract(file_name, LOCAL_FILE_PATH)
tar.close()
unzip_file_name = zip_file_path.split(‘\\‘)[-1].rstrip(‘.tar.gz‘)
unzip_file_path = os.path.join("C:\\", unzip_file_name)
self.unzip_file_path = unzip_file_path
except Exception as e:
print(e)
# else:
# print("请检查文件格式")
# return
def check_md5(self, local_md5_file, remote_md5_file):
pass
def mul_thread(self):
if not self.conns:
self.connection()
for conn_obj in self.conns:
# THREAD_MAX.acquire()
_thread = threading.Thread(target=self.list_dir, args=(conn_obj, ‘C:\\apache-zookeeper-3.6.1-bin.tar.gz‘))
_thread.start()
self.thread_pool.append(_thread)
for th in self.thread_pool:
th.join()
def get_md5(self):
unzip_file_path = self.unzip_file_path
bat_command = ‘‘‘
set DIR={}
for /R %DIR% %%f in (*.*) do (
echo | certutil -hashfile %%f MD5 >> test3.txt
)
‘‘‘.format(unzip_file_path)
with open(‘test4.bat‘, ‘a‘, encoding=‘utf-8‘) as bat_file:
bat_file.write(bat_command)
os.system("test4.bat")
get_md5()
def list_dir(self, conn_obj, file_path):
def send_file(file_name, remote_file_path):
with open(file_name, "rb") as f:
for line in f:
if line == b‘\r\n‘:
continue
text = line.decode(‘utf-8‘)
write_file_comm = "echo %s >> %s".format(text.strip("\r\n"), remote_file_path)
conn_obj.run_cmd(write_file_comm)
dir_or_files = os.listdir(file_path)
for dir_or_file in dir_or_files:
dir_or_file = os.path.join(file_path, dir_or_file)
if os.path.isdir(dir_or_file):
conn_obj.run_cmd(‘mkdir %s‘ % dir_or_file)
self.list_dir(conn_obj, dir_or_file)
else:
remote_file = dir_or_file
comm = ‘cd. > %s‘ % remote_file
conn_obj.run_cmd(comm)
send_file(dir_or_file, remote_file)
# THREAD_MAX.release()
if __name__ == ‘__main__‘:
t = Trans("test", "test")
t.ip_list = [‘10.10.10.10‘]
t.connection()
t.unzip_package(‘./apache-zookeeper-3.6.1-bin.tar.gz‘)
t.mul_thread()
python实现文件解压,遍历及使用winrm模块上传到Windows主机
标签:类型 for use comm 指定 同文件 win turn 存在
原文地址:https://www.cnblogs.com/liuhuan086/p/13298239.html