功能概述
提供出restful接口给前端,可以调用自动化脚本,查询脚本,展示自动化执行报告列表,执行自动化脚本时核心逻辑在于调用远程服务器上的执行脚本,将生成的报告存入数据库。报告展示利用nginx,可直接读取在服务器上的html报告文件
实现
执行远程命令模块:
import paramiko
from common.logger import logger
class LinuxExecutor():
def execute_command(self, host, username, password, command):
client = paramiko.SSHClient()
# 自动添加策略,保存服务器的主机名和密钥信息,如果不添加,那么不再本地know_hosts文件中记录的主机将无法连接
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接SSH服务端,以用户名和密码进行认证
logger.info("执行远程主机命令:"+host+": "+command)
client.connect(hostname=host, port=22, username=username, password=password, banner_timeout=200)
stdin, stdout, stderr = client.exec_command(command)
# 打印执行结果
response = stdout.read().decode('utf-8')
# logger.info("服务器返回: "+response)
print(stdout.read().decode('utf-8'))
return response
# 关闭SSHClient
client.close()
if __name__ == '__main__':
executor = LinuxExecutor()
executor.execute_command('192.168.213.65', 'root', 'password', 'ls /service')
SQLAlchemy使用:利用flask的sql组件,可以轻松的实现数据库操作,不必跟底层的sql语句打交道,但这里有些细节需要解决,当调用query.all()方法或者query.paginate()时,返回的是一个特殊的对象列表,如果想返回给前端,需要做个特殊处理,在model里加入一个to_json的方法:
class ExecuteRecords(db.Model):
"""执行记录"""
__tablename__ = "execute_records"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
execute_model = db.Column(db.String(32))
report = db.Column(db.String(128))
execute_time = db.Column(db.String(32))
executor = db.Column(db.String(16))
def to_json(self):
dict = self.__dict__
if "_sa_instance_state" in dict:
del dict["_sa_instance_state"]
return dict
controller层:
from flask import Flask,request
import json
import sys
sys.path.append(r"F:\workspace\python\autotestweb")
from common.restful_response import *
from common.linux_execute import LinuxExecutor
from common.config_reader import *
import datetime
import sys
from entity.model import *
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
command = 'ls /service/autotestscript/'
executor = LinuxExecutor()
host = reader.get_value('server', 'host')
username = reader.get_value('server', 'username')
password = reader.get_value('server', 'password')
print(sys.path)
#配置数据库连接的对象
class Config(object):
"""配置参数"""
# sqlalchemy的配置参数
SQLALCHEMY_DATABASE_URI = "mysql://root:intel%40123@localhost:3306/autotestweb_development"
# 设置sqlalchemy自动更跟踪数据库(数据库表手动更新是同步跟新到对象)
SQLALCHEMY_TRACK_MODIFICATIONS = True
#查询时会显示原始SQL语句
SQLALCHEMY_ECHO = True
#
# #加载数据库配置对象
app.config.from_object(Config)
#
# 创建数据库sqlalchemy工具对象
db = SQLAlchemy(app)
class ExecuteRecords(db.Model):
"""执行记录"""
__tablename__ = "execute_records"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
execute_model = db.Column(db.String(32))
report = db.Column(db.String(128))
execute_time = db.Column(db.String(32))
executor = db.Column(db.String(16))
def to_json(self):
dict = self.__dict__
if "_sa_instance_state" in dict:
del dict["_sa_instance_state"]
return dict
class TestJson:
def toJSON(self):
return json.dumps(self, default=lambda o: o.__dict__,
sort_keys=True, indent=1)
@app.route('/queryByProject')
def query_by_project():
project = request.args["project"]
query_command = command+project
response = executor.execute_command(host, username, password, query_command)
response = str(response).lstrip().split('\n')
del response[-1]
return success('success',response)
@app.route('/queryByService')
def query_by_service():
project = request.args["project"]
service = request.args["service"]
query_command = command + project + "/" + service + "/api/TestCase"
response = executor.execute_command(host, username, password, query_command)
response = str(response).lstrip().split('\n')
del response[-1]
return success('success',response)
@app.route('/execute')
def execute_testcase():
project = request.args["project"]
service = request.args["service"]
testcase_list = request.args["testcase"]
execute_time = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
report_name = execute_time+".html"
# 远程服务器上的execute脚本接收四个参数,项目,服务,自动化脚本,报告名称
cmdstr = "sh /service/autotestscript/execute.sh "+project+" "+service+" "+testcase_list+" "+report_name
response = executor.execute_command(host, username, password, cmdstr)
report_location = "http://192.168.213.65/"+project+"/"+service+"/api/output/"+execute_time+".html"
record = ExecuteRecords(execute_model=service,
report=report_location,
execute_time=execute_time, executor='system')
db.session.add(record)
db.session.commit()
return success('success','ok')
@app.route('/listReport')
def list_report():
result = []
page_size = request.args["pageSize"]
page_number = request.args["pageNum"]
t = ExecuteRecords.query.all()
f = ExecuteRecords.query.paginate(page=int(page_number), per_page=int(page_size), error_out=False)
for record in f.items:
g = record.to_json()
result.append(record.to_json())
return jsonify({"pageSize":page_size,"pageNum":page_number,"total":f.total,"data":result}), 200
if __name__ == '__main__':
app.run(host="0.0.0.0")
遇到的问题
1、SQLALCHEMY_DATABASE_URI里的密码有@特殊字符:@可以用%40替代,做转义
2、将SQLAlchemy.db类型的对象转为json
def to_json(self):
dict = self.__dict__
if "_sa_instance_state" in dict:
del dict["_sa_instance_state"]
return dict
sqlalchemy查询结果转为json并通过restapi接口返回的解决方案
3、flask-sqlalchemy分页查询
4、pycharm运行正常,命令行执行时报no module named xxx的错误
在导入项目包之前在sys.path加入工程目录,比如这里本来报common找不到,
from flask import Flask,request
import json
import sys
sys.path.append(r"F:\workspace\python\autotestweb")
from common.restful_response import *
from common.linux_execute import LinuxExecutor
from common.config_reader import *
5、部署到服务器上后,端口无法调用:在app.run里加入host=0.0.0.0,但如果是本地调试时需要把这个去掉,不然无法debug到断点
参考链接
数据库操作及Flask-SQLALchemy使用
教你十分钟写出restful接口
所需依赖
aniso8601==9.0.1
bcrypt==3.2.0
certifi==2021.10.8
cffi==1.15.0
charset-normalizer==2.0.9
click==8.0.3
colorama==0.4.4
cryptography==36.0.0
Flask==2.0.2
Flask-RESTful==0.3.9
Flask-SQLAlchemy==2.5.1
greenlet==1.1.2
idna==3.3
importlib-metadata==4.8.2
itsdangerous==2.0.1
Jinja2==3.0.3
MarkupSafe==2.0.1
mysqlclient==2.1.0
paramiko==2.8.1
pycparser==2.21
PyNaCl==1.4.0
pytz==2021.3
requests==2.26.0
six==1.16.0
SQLAlchemy==1.4.27
typing_extensions==4.0.1
urllib3==1.26.7
Werkzeug==2.0.2
wincertstore==0.2
zipp==3.6.0
服务器执行脚本
# !/bin/bash
source /etc/profile
cd /service/autotestscript
if [ $3 == "all" ];then
testCaseList=`ls $1/$2/api/TestCase`
else
testCaseList=$3
fi
time=$(date "+%Y-%m-%d-%H-%M-%S")
report=$4
dot=","
count=0
for testCase in ${testCaseList[@]}
do
testCaseList[$count]="${testCase}${dot}"
count=$((count+1))
done
testCaseList=`echo ${testCaseList[*]}|sed 's/ //g'`
bundle exec ruby /service/autotestscript/Main.rb --includelist ${testCaseList} --outputfile ${report} --env beta --project $1 --service $2 --type api --runserver 192.168.213.65 --testuser system --mailto noone