测试ADB Pipe的封装

封装ADB Pipe 模块

import os
import subprocess
import sys
from subprocess import *
import threading
import setting

class commandPipe():

    def __init__(self,command,func,exitFunc,readyFunc=None,shell=True,stdin=subprocess.PIPE,
                 stdout=subprocess.PIPE,stderr=subprocess.PIPE,code="GBK"):
        ‘‘‘
        :param command:命令
        :param func:正常的输出函数
        :param exitFunc:异常反馈函数
        :param readyFunc:当管道创建完毕后调用
        ‘‘‘

        self._thread = threading.Thread(target=self.__run,args=(command,shell,stdin,stdout,stderr,readyFunc))
        self._code=code
        self._func=func
        self._exitFunc=exitFunc
        self._readyFunc=readyFunc


    def __run(self,command,shell,stdin,stdout,stderr,readyFunc):
        ‘‘‘私有函数‘‘‘
        global tmp
        try:
            self._process=subprocess.Popen(
                command,
                shell=shell,
                stdin=stdin,
                stdout=stdout,
                stderr=stderr,)
        except OSError as e:
            self._exitFunc(e)

        fun=self._process.stdout.readline
        if readyFunc!=None:
            threading.Thread(target=readyFunc).start()
        while True:
            line=fun()
            if not line:
                break
            try:
                tmp=line.decode(self._code)
            except Exception as e:
                print(e)
        self._func(tmp)
        self._process.stdout.close()

    def start(self):
        self._thread.start()

    def close(self):
        #self._process.stdout.close()
        self._thread.join()
        del self

    # def run_cmd(self,command):
    #     ret=subprocess.Popen(command,shell=True,stdout=subprocess.PIPE).stdout.read()
    #     ret=ret.decode(‘gbk‘)
    #     if ret.strip()=="":
    #         print("error:{}".format(str(ret)))
    #     else:
    #         print("Success:{}".format(str(ret)))
    #     return ret

if __name__==‘__main__‘:
    #cmd="adb devices | sed -n ‘2p‘ | awk ‘{print $1}‘"
    cmd=setting.deviceId.cmd


    def exitFunc(exception):
        print("Exception is  {}".format(exception))

    def readyFunc():
        print("Start running the command: {}".format(setting.deviceId.cmd_name))

    def func(line):
        print("The standard output for {} is {}".format(setting.deviceId.cmd_name,line))

    # E=commandPipe(cmd,func,exitFunc,readyFunc)
    # E.start()
    # E.close()

 调用

import setting
from commandPipe import commandPipe

class runCmd():
    instance=None

    def __new__(cls, *args, **kwargs):
        if cls.instance is None:
            cls.instance = super().__new__(cls)
        return cls.instance

    def __init__(self,name):
        self.name=name

    def exitFunc(self,exception):
        print("Exception is  {}".format(exception))

    def readyFunc(self):
        print("Start running the command: {}".format(self.name))

    def func(self,line):
        self.line=line
        print("The standard output for {} is {}".format(self.name,line))


    def run(self):
        #tmp=self.name
        s="setting.{}.cmd".format(self.name)
        #cmd=(setting.{}.cmd)
        E= commandPipe(eval(s),self.func,self.exitFunc,self.readyFunc)
        E.start()
        E.close()
        return self.line


if __name__==‘__main__‘:
    test=runCmd("deviceId").run()

 

测试ADB Pipe的封装

上一篇:mysql 锁


下一篇:Oracle 实现数据表插入时主键列自增