基于Python的HTTP接口自动化测试框架实现

基于Python的HTTP接口自动化测试框架实现

今天我们来讲一下基于Python的HTTP接口自动化测试框架的实现,范例如下:

一、测试需求描述

对服务后台一系列的http接口功能测试。

输入:根据接口描述构造不同的参数输入值

输出:XML文件

eg:http://xxx.com/xxx_product/test/content_book_list.jsp?listid=1

二、实现方法

1、选用Python脚本来驱动测试

2、采用Excel表格管理测试数据,包括用例的管理、测试数据录入、测试结果显示等等,这个需要封装一个Excel的类即可。

3、调用http接口采用Python封装好的API即可

4、测试需要的http组装字符转处理即可

5、设置2个检查点,XML文件中的返回值字段(通过解析XML得到);XML文件的正确性(文件对比)

6、首次执行测试采用半自动化的方式,即人工检查输出的XML文件是否正确,一旦正确将封存XML文件,为后续回归测试的预期结果,如果发现错误手工修正为预期文件。(注意不是每次测试都人工检查该文件,只首次测试的时候才检查)

三、Excel表格样式

基于Python的HTTP接口自动化测试框架实现

四、实现代码(代码才是王道,有注释很容易就能看明白的)

1、测试框架代码


import os,sys, urllib, httplib, profile, datetime, time

from xml2dict import XML2Dict

import win32com.client

from win32com.client import Dispatch

#Excel表格中测试结果底色

OK_COLOR=0xffffff

NG_COLOR=0xff

#NT_COLOR=0xffff

NT_COLOR=0xC0C0C0

#Excel表格中测试结果汇总显示位置

TESTTIME=[1, 14]

TESTRESULT=[2, 14]

#Excel模版设置

#self.titleindex=3        #Excel中测试用例标题行索引

#self.casebegin =4        #Excel中测试用例开始行索引

#self.argbegin   =2       #Excel中参数开始列索引

#self.argcount  =5        #Excel中支持的参数个数

#self.resultcol =28       #TestResult列

class create_excel:

    def __init__(self, sFile, dtitleindex=3, dcasebegin=4, dargbegin=3, dargcount=5, dresultcol=28):

        self.xlApp = win32com.client.Dispatch('et.Application')   #MS:Excel  WPS:et

        try:

            self.book = self.xlApp.Workbooks.Open(sFile)

        except:

            print_error_info()

            print "打开文件失败"

            exit()

        self.file=sFile

        self.titleindex=dtitleindex

        self.casebegin=dcasebegin

        self.argbegin=dargbegin

        self.argcount=dargcount

        self.resultcol=dresultcol

        self.allresult=[]

    def close(self):

        #self.book.Close(SaveChanges=0)

        self.book.Save()

        self.book.Close()

        #self.xlApp.Quit()

        del self.xlApp

    def read_data(self, iSheet, iRow, iCol):

        try:

            sht = self.book.Worksheets(iSheet)

            sValue=str(sht.Cells(iRow, iCol).Value)

        except:

            self.close()

            print('读取数据失败')

            exit()

        #去除'.0'

        if sValue[-2:]=='.0':

            sValue = sValue[0:-2]

        return sValue

    def write_data(self, iSheet, iRow, iCol, sData, color=OK_COLOR):

        try:

            sht = self.book.Worksheets(iSheet)

            sht.Cells(iRow, iCol).Value = sData.decode("utf-8")

            sht.Cells(iRow, iCol).Interior.Color=color

            self.book.Save()

        except:

            self.close()

            print('写入数据失败')

            exit()

    #获取用例个数    

    def get_ncase(self, iSheet):

        try:

            return self.get_nrows(iSheet)-self.casebegin+1

        except:

            self.close()

            print('获取Case个数失败')

            exit()

    def get_nrows(self, iSheet):

        try:

            sht = self.book.Worksheets(iSheet)

            return sht.UsedRange.Rows.Count

        except:

            self.close()

            print('获取nrows失败')

            exit()

    def get_ncols(self, iSheet):

        try:

            sht = self.book.Worksheets(iSheet)

            return sht.UsedRange.Columns.Count

        except:

            self.close()

            print('获取ncols失败')

            exit()

    def del_testrecord(self, suiteid):

        try:

            #为提升性能特别从For循环提取出来

            nrows=self.get_nrows(suiteid)+1

            ncols=self.get_ncols(suiteid)+1

            begincol=self.argbegin+self.argcount

            #提升性能

            sht = self.book.Worksheets(suiteid)

            for row in range(self.casebegin, nrows):

                for col in range(begincol, ncols):

                    str=self.read_data(suiteid, row, col)

                    #清除实际结果[]

                    startpos = str.find('[')

                    if startpos>0:

                        str = str[0:startpos].strip()

                        self.write_data(suiteid, row, col, str, OK_COLOR)

                    else:

                        #提升性能

                        sht.Cells(row, col).Interior.Color = OK_COLOR

                #清除TestResul列中的测试结果,设置为NT

                self.write_data(suiteid, row, self.resultcol, 'NT', NT_COLOR)

        except:

            self.close()

            print('清除数据失败')

            exit()

#执行调用

def run_case(IPPort, url):

    conn = httplib.HTTPConnection(IPPort)

    conn.request("GET", url)

    rsps = conn.getresponse()

    data = rsps.read()

    conn.close()

    return data

#获取用例基本信息[Interface,argcount,[ArgNameList]]

def get_caseinfo(Data, SuiteID):

    caseinfolist=[]

    sInterface=Data.read_data(SuiteID, 1, 2) 

    argcount=int(Data.read_data(SuiteID, 2, 2)) 

    #获取参数名存入ArgNameList 

    ArgNameList=[]

    for i in range(0, argcount):

        ArgNameList.append(Data.read_data(SuiteID, Data.titleindex, Data.argbegin+i))  

    caseinfolist.append(sInterface)

    caseinfolist.append(argcount)

    caseinfolist.append(ArgNameList)

    return caseinfolist

#获取输入

def get_input(Data, SuiteID, CaseID, caseinfolist):

    sArge=''

    #参数组合

    for j in range(0, caseinfolist[1]):

        sArge=sArge+caseinfolist[2][j]+'='+Data.read_data(SuiteID, Data.casebegin+CaseID, Data.argbegin+j)+'&' 

    #去掉结尾的&字符

    if sArge[-1:]=='&':

        sArge = sArge[0:-1]   

    sInput=caseinfolist[0]+sArge    #组合全部参数

    return sInput

#结果判断 

def assert_result(sReal, sExpect):

    sReal=str(sReal)

    sExpect=str(sExpect)

    if sReal==sExpect:

        return 'OK'

    else:

        return 'NG'

#将测试结果写入文件

def write_result(Data, SuiteId, CaseId, resultcol, *result):

    if len(result)>1:

        ret='OK'

        for i in range(0, len(result)):

            if result[i]=='NG':

                ret='NG'

                break

        if ret=='NG':

            Data.write_data(SuiteId, Data.casebegin+CaseId, resultcol,ret, NG_COLOR)

        else:

            Data.write_data(SuiteId, Data.casebegin+CaseId, resultcol,ret, OK_COLOR)

        Data.allresult.append(ret)

    else:

        if result[0]=='NG':

            Data.write_data(SuiteId, Data.casebegin+CaseId, resultcol,result[0], NG_COLOR)

        elif result[0]=='OK':

            Data.write_data(SuiteId, Data.casebegin+CaseId, resultcol,result[0], OK_COLOR)

        else:  #NT

            Data.write_data(SuiteId, Data.casebegin+CaseId, resultcol,result[0], NT_COLOR)

        Data.allresult.append(result[0])

    #将当前结果立即打印

    print 'case'+str(CaseId+1)+':', Data.allresult[-1]

#打印测试结果

def statisticresult(excelobj):

    allresultlist=excelobj.allresult

    count=[0, 0, 0]

    for i in range(0, len(allresultlist)):

        #print 'case'+str(i+1)+':', allresultlist[i]

        count=countflag(allresultlist[i],count[0], count[1], count[2])

    print 'Statistic result as follow:'

    print 'OK:', count[0]

    print 'NG:', count[1]

    print 'NT:', count[2]

#解析XmlString返回Dict

def get_xmlstring_dict(xml_string):

    xml = XML2Dict()

    return xml.fromstring(xml_string)

#解析XmlFile返回Dict 

def get_xmlfile_dict(xml_file):

    xml = XML2Dict()

    return xml.parse(xml_file)

#去除历史数据expect[real]

def delcomment(excelobj, suiteid, iRow, iCol, str):

    startpos = str.find('[')

    if startpos>0:

        str = str[0:startpos].strip()

        excelobj.write_data(suiteid, iRow, iCol, str, OK_COLOR)

    return str

#检查每个item (非结构体)

def check_item(excelobj, suiteid, caseid,real_dict, checklist, begincol):

    ret='OK'

    for checkid in range(0, len(checklist)):

        real=real_dict[checklist[checkid]]['value']

        expect=excelobj.read_data(suiteid, excelobj.casebegin+caseid, begincol+checkid)

        #如果检查不一致测将实际结果写入expect字段,格式:expect[real]

        #将return NG

        result=assert_result(real, expect)

        if result=='NG':

            writestr=expect+'['+real+']'

            excelobj.write_data(suiteid, excelobj.casebegin+caseid, begincol+checkid, writestr, NG_COLOR)

            ret='NG'

    return ret

#检查结构体类型

def check_struct_item(excelobj, suiteid, caseid,real_struct_dict, structlist, structbegin, structcount):

    ret='OK'

    if structcount>1:  #传入的是List

        for structid in range(0, structcount):

            structdict=real_struct_dict[structid]

            temp=check_item(excelobj, suiteid, caseid,structdict, structlist, structbegin+structid*len(structlist))

            if temp=='NG':

                ret='NG'

    else: #传入的是Dict

        temp=check_item(excelobj, suiteid, caseid,real_struct_dict, structlist, structbegin)

        if temp=='NG':

            ret='NG'

    return ret

#获取异常函数及行号

def print_error_info():

    """Return the frame object for the caller's stack frame."""

    try:

        raise Exception

    except:

        f = sys.exc_info()[2].tb_frame.f_back

    print (f.f_code.co_name, f.f_lineno)  

#测试结果计数器,类似Switch语句实现

def countflag(flag,ok, ng, nt): 

    calculation  = {'OK':lambda:[ok+1, ng, nt],  

                         'NG':lambda:[ok, ng+1, nt],                      

                         'NT':lambda:[ok, ng, nt+1]}     

    return calculation[flag]()

  

2、项目测试代码

from testframe import *

STRUCTCOL=14  #结构体开始列

#content_book_list用例集

def content_book_list_suite():

    print 'Test Begin,please waiting...'

    suiteid=4   #设置测试输入Sheetindex=1

    excelobj.del_testrecord(suiteid)  #清除历史测试数据

    casecount=excelobj.get_ncase(suiteid)

    checklist=['resultCode', 'listid', 'name', 'total', 'from', 'size']

    structitem=['bookid', 'name', 'title', 'progress', 'words', 'authorid', 'authorName']

    #获取Case基本信息

    caseinfolist=get_caseinfo(excelobj, suiteid)

    for caseid in range(0, casecount):

        #检查是否执行该Case

        if excelobj.read_data(suiteid,excelobj.casebegin+caseid, 2)=='N':

            write_result(excelobj, suiteid, caseid, excelobj.resultcol, 'NT')

            continue #当前Case结束,继续执行下一个Case

        #获取测试数据

        sInput=get_input(excelobj, suiteid, caseid, caseinfolist)   

        XmlString=run_case(com_ipport, sInput)       #执行调用

        dict=get_xmlfile_dict(os.getcwd()+'\doc\Test.xml')

        itemdict=dict['root']

        ret1=check_item(excelobj, suiteid, caseid,itemdict, checklist, excelobj.argbegin+excelobj.argcount)

        structdict=dict['root']['books']['book']

        ret2=check_struct_item(excelobj, suiteid, caseid,structdict, structitem, STRUCTCOL, 2)

        write_result(excelobj, suiteid, caseid, excelobj.resultcol, ret1, ret2)

    print 'Test End!'

  

3、测试入口

from testframe import *

from xxx_server_case import *

from xxx_server_case import *

import xxx_server_case

#服务系统接口测试

#设置测试环境

xxx_server_case.excelobj=create_excel(os.getcwd()+'\doc\TestDemo_testcase.xls')

xxx_server_case.com_ipport='XXX.16.1.35:8080'

begin =datetime.datetime.now()

#Add testsuite begin

content_book_list_suite()

#Add other suite from here

#Add testsuite end

#profile.run("content_book_list_suite()")

print 'Total Time:', datetime.datetime.now()-begin

statisticresult(xxx_server_case.excelobj)

xxx_server_case.excelobj.close()

  

基于Python的HTTP接口自动化测试框架实现

如需了解更新更多的干货文章请关注【python全自动化测试】或者私信我

 

上一篇:基于Selenium+Python的web自动化测试框架


下一篇:MySQL重置密码(OSX)