1 #!/usr/bin/python 2 # -*- coding: utf-8 -*- 3 # __author__ = "JieYao" 4 5 import os 6 import string 7 import argparse 8 import shutil 9 import json 10 import time 11 import subprocess 12 13 usage = "该程序为框架下Tool的一键化测试工具,支持连续测试多组数据。会提取对应的测试结果文件放到指定目录。\n" 14 parser = argparse.ArgumentParser(description = usage) 15 parser.add_argument("-c", "--code_file", help="源码文件路径", required = True) 16 parser.add_argument("-i", "--id", help="用户ID", required = True) 17 parser.add_argument("-type", "--type", help="工具所属流程和模块,格式如\"meta.beta_diversity.pca\",\"meta.otunetwork\"", required = True) 18 parser.add_argument("-p", "--para_file", help="参数文件路径,文件中每行代表一次测试,各个参数用tab隔开,以name:value的形式写,不需要引号", required = True) 19 parser.add_argument("-o", "--output_dir", help="重设输出文件路径", required = False, default="./") 20 args = vars(parser.parse_args()) 21 22 23 args[‘code_file‘] = os.path.abspath(args[‘code_file‘]) 24 args[‘para_file‘] = os.path.abspath(args[‘para_file‘]) 25 args[‘output_dir‘] = os.path.abspath(args[‘output_dir‘]) 26 27 try: 28 shutil.copyfile(args[‘code_file‘], "/mnt/ilustre/users/sanger-dev/biocluster/src/mbio/tools/" + args[‘type‘].replace(".", "/") + ".py") 29 except: 30 print "代码文件copy到%s下时出现错误,请检查type参数" % "/mnt/ilustre/users/sanger-dev/biocluster/src/mbio/tools/" + args[‘type‘].replace(".", "/") + ".py" 31 exit(0) 32 33 if args[‘output_dir‘] and not os.path.exists(args[‘output_dir‘]): 34 os,mkdir(args[‘output_dir‘]) 35 36 para_data = [] 37 for s in open(args[‘para_file‘]).readlines(): 38 s = s.strip().split() 39 if not s: 40 continue 41 tmp_dict = dict() 42 for data in s: 43 position = data.find(":") 44 tmp_dict[data[:position]] = data[position+1 : len(data)] 45 para_data += [tmp_dict] 46 47 option_type = dict() 48 with open(args[‘code_file‘], "r") as tmp_file: 49 for text in tmp_file.readlines(): 50 if "name" in text and "type" in text: 51 text = text.strip() 52 if text[-1] != "}": 53 text = text[0:-1] 54 try: 55 data = json.loads(text) 56 if "type" in data.keys() and "name" in data.keys(): 57 option_type[data["name"]] = data["type"] 58 except: 59 continue 60 61 std_option = dict() 62 63 file_name = os.path.split(args[‘code_file‘])[1] 64 json_name = "single_" + os.path.splitext(file_name)[0] + ".json" 65 py_name = args["output_dir"] + ‘/test_single_‘ + file_name 66 date = time.strftime(‘%Y%m%d‘,time.localtime(time.time())) 67 log_file = "/mnt/ilustre/users/sanger-dev/workspace/%s/Single_%s" %(date, args[‘id‘]) 68 69 with open(py_name, "w") as tmp_file: 70 tmp_file.write("#!/mnt/ilustre/users/sanger-dev/app/program/Python/bin/python\n") 71 tmp_file.write("from mbio.workflows.single import SingleWorkflow\n") 72 tmp_file.write("from biocluster.wsheet import Sheet\n") 73 tmp_file.write("wsheet = Sheet(\"%s\")\n" % json_name) 74 tmp_file.write("wf = SingleWorkflow(wsheet)\n") 75 tmp_file.write("wf.run()\n") 76 77 for i in range(len(para_data)): 78 for keys in para_data[i].keys(): 79 if keys in option_type.keys(): 80 if option_type[keys] == "bool": 81 para_data[i][keys] = bool(para_data[i][keys]) 82 elif option_type[keys] == "int": 83 para_data[i][keys] = int(para_data[i][keys]) 84 elif option_type[keys] == "float": 85 para_data[i][keys] = float(para_data[i][keys]) 86 else: 87 continue 88 std_option[‘type‘] = "tool" 89 std_option[‘name‘] = args[‘type‘] 90 std_option[‘options‘] = para_data[i] 91 std_option[‘id‘] = args[‘id‘] 92 93 with open(json_name, "w") as tmp_file: 94 tmp_file.write(json.dumps(std_option, indent=4)) 95 96 task_ID = "" 97 print "开始第%d组参数的测试。" %(i+1) 98 try: 99 os.system("python %s > /dev/null" % py_name) 100 print "第%d组参数测试运行完成" %(i+1) 101 except: 102 print "第%d组参数测试运行失败" %(i+1) 103 continue 104 for text in open(log_file + "/log.txt", "r").readlines(): 105 if "ID" not in text: 106 continue 107 text = text.strip()[text.find("ID")+3:] 108 while len(text)>0 and text[0] not in "1234567890": 109 text = text[1:] 110 task_ID = text[:5] 111 group_name = args[‘output_dir‘] + "/Option_Group_%d_" %(i+1) 112 if os.path.exists(group_name + task_ID): 113 shutil.rmtree(group_name + task_ID) 114 os.mkdir(group_name + task_ID) 115 calc_path = log_file + "/" + os.path.splitext(file_name)[0].capitalize() 116 for tmp_name in os.listdir(calc_path): 117 files = os.path.join(calc_path, tmp_name) 118 if os.path.isdir(files): 119 os.system("cp -r %s %s" %(files, group_name + task_ID)) 120 if task_ID in files: 121 os.system("cp %s %s" %(files, group_name + task_ID)) 122 shutil.move(json_name, group_name + task_ID)