封装ADB Pipe 模块
import os import subprocess import sys from subprocess import * import threading import setting class commandPipe(): def __init__(self,command,func,exitFunc,readyFunc=None,shell=True,stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE,code="GBK"): ‘‘‘ :param command:命令 :param func:正常的输出函数 :param exitFunc:异常反馈函数 :param readyFunc:当管道创建完毕后调用 ‘‘‘ self._thread = threading.Thread(target=self.__run,args=(command,shell,stdin,stdout,stderr,readyFunc)) self._code=code self._func=func self._exitFunc=exitFunc self._readyFunc=readyFunc def __run(self,command,shell,stdin,stdout,stderr,readyFunc): ‘‘‘私有函数‘‘‘ global tmp try: self._process=subprocess.Popen( command, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,) except OSError as e: self._exitFunc(e) fun=self._process.stdout.readline if readyFunc!=None: threading.Thread(target=readyFunc).start() while True: line=fun() if not line: break try: tmp=line.decode(self._code) except Exception as e: print(e) self._func(tmp) self._process.stdout.close() def start(self): self._thread.start() def close(self): #self._process.stdout.close() self._thread.join() del self # def run_cmd(self,command): # ret=subprocess.Popen(command,shell=True,stdout=subprocess.PIPE).stdout.read() # ret=ret.decode(‘gbk‘) # if ret.strip()=="": # print("error:{}".format(str(ret))) # else: # print("Success:{}".format(str(ret))) # return ret if __name__==‘__main__‘: #cmd="adb devices | sed -n ‘2p‘ | awk ‘{print $1}‘" cmd=setting.deviceId.cmd def exitFunc(exception): print("Exception is {}".format(exception)) def readyFunc(): print("Start running the command: {}".format(setting.deviceId.cmd_name)) def func(line): print("The standard output for {} is {}".format(setting.deviceId.cmd_name,line)) # E=commandPipe(cmd,func,exitFunc,readyFunc) # E.start() # E.close()
调用
import setting from commandPipe import commandPipe class runCmd(): instance=None def __new__(cls, *args, **kwargs): if cls.instance is None: cls.instance = super().__new__(cls) return cls.instance def __init__(self,name): self.name=name def exitFunc(self,exception): print("Exception is {}".format(exception)) def readyFunc(self): print("Start running the command: {}".format(self.name)) def func(self,line): self.line=line print("The standard output for {} is {}".format(self.name,line)) def run(self): #tmp=self.name s="setting.{}.cmd".format(self.name) #cmd=(setting.{}.cmd) E= commandPipe(eval(s),self.func,self.exitFunc,self.readyFunc) E.start() E.close() return self.line if __name__==‘__main__‘: test=runCmd("deviceId").run()