标签:subprocess 标准输出 超时
当我们需要执行外部命令或自己写一个自动化执行器时,需要使用到启动进程并获取输出的操作
首先,我们启动进程采用Python的subprocess模块,为了保证标准输出和标准错误输出能够正常运行,启动两个线程来检测输出结果部分
class Daemon(threading.Thread): def __init__(self, workDir, logFunction=None, *args): threading.Thread.__init__(self) self.process = None self.args = args self.workDir = workDir self.errList = [] self.stdOut = "" self.stdErr = "" self.isEnd = False self.logFunction = logFunction def _start_process(self): if self.workDir == "": self.workDir = None allParas = [] for arg in self.args: if arg: allParas.extend(arg) commandStr = " ".join(allParas) try: commandStr = commandStr.encode("gbk") except: pass try: self.runPath = self.runPath.encode("gbk") except: pass lines = ["@echo off"] lines.append(commandStr) tempFilePath = util.get_temp_file("bat") tempScriptFile = open(tempFilePath, "w") tempScriptFile.write("\n".join(lines)) tempScriptFile.close() self.process = subprocess.Popen("\"" + tempFilePath + "\"", stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.workDir, shell=True) # 循环输出标准输出内容,避免阻塞 stdoutThread = ReadLogThread(self.process.stdout) stdoutThread.start() stderrThread = ReadLogThread(self.process.stderr) stderrThread.start() while self.process.poll() is None: time.sleep(0.01) self.isEnd = True self.stdOut, self.stdErr = stdoutThread.get_log(), stderrThread.get_log() def run(self): try: self._start_process() except: log.exception(traceback.format_exc()) def stop(self): try: if self.process.pid == 0: return os.system("TaskKill /PID %s /T /F" % self.process.pid) except: log.exception(traceback.format_exc()) pass
class ReadLogThread(threading.Thread): def __init__(self, _pipe): threading.Thread.__init__(self) self._pipe = _pipe self.logList = [] def run(self): while True: try: line = self._pipe.readline() if line == "": break self.logList.append(line) except: break def get_log(self): return "".join(self.logList)
我们除了启动进程外,一般还会给进程加一个超时时间,那么代码如下:
def start_process(workDir, timeout=5 * 60, logFunction=None, *args): daemon = Daemon(workDir, logFunction, *args) daemon.start() start = 0 isTimeout = False daemonStd = "" daemonErr = "" while not daemon.isEnd: if start > timeout: daemon.stop() isTimeout = True daemonErr = "\n".join(daemon.errList) break time.sleep(0.1) start += 0.1 if daemon.stdOut != "": daemonStd = daemon.stdOut if daemon.stdErr != "": daemonErr = daemon.stdErr return util.get_unicode_str(daemonStd), util.get_unicode_str(daemonErr), isTimeout
欢迎关注“搜狗测试”公众号,每天一篇测试相关的文章与您分享,共同讨论软件测试的技术与发展
转载请注明:http://blog.csdn.net/sogouauto
标签:subprocess 标准输出 超时
原文地址:http://blog.csdn.net/sogouauto/article/details/44056823