串口代码:
import serial
import serial.tools.list_ports
class Communication():
#初始化
def __init__(self,com,bps,timeout):
self.port = com
self.bps = bps
self.timeout = timeout
global Ret
try:
# 打开串口,并得到串口对象
self.main_engine= serial.Serial(self.port,self.bps,timeout=self.timeout)
# 判断是否打开成功
if (self.main_engine.is_open):
Ret = True
except Exception as e:
print("---异常---:", e)
# 打印设备基本信息
def Print_Name(self):
print(self.main_engine.name) #设备名字
print(self.main_engine.port)#读或者写端口
print(self.main_engine.baudrate)#波特率
‘‘‘print(self.main_engine.bytesize)#字节大小
print(self.main_engine.parity)#校验位
print(self.main_engine.stopbits)#停止位
print(self.main_engine.timeout)#读超时设置
print(self.main_engine.writeTimeout)#写超时
print(self.main_engine.xonxoff)#软件流控
print(self.main_engine.rtscts)#软件流控
print(self.main_engine.dsrdtr)#硬件流控
print(self.main_engine.interCharTimeout)#字符间隔超时‘‘‘
#打开串口
def Open_Engine(self):
self.main_engine.open()
#关闭串口
def Close_Engine(self):
self.main_engine.close()
print(self.main_engine.is_open) # 检验串口是否打开
# 打印可用串口列表
@staticmethod
def Print_Used_Com():
port_list = list(serial.tools.list_ports.comports())
print(port_list)
#接收指定大小的数据
#从串口读size个字节。如果指定超时,则可能在超时后返回较少的字节;如果没有指定超时,则会一直等到收完指定的字节数。
def Read_Size(self,size):
return self.main_engine.read(size=size)
#接收一行数据
# 使用readline()时应该注意:打开串口时应该指定超时,否则如果串口没有收到新行,则会一直等待。
# 如果没有超时,readline会报异常。
def Read_Line(self):
return self.main_engine.readline()
#发数据
def Send_data(self,data):
self.main_engine.write(data)
#接收数据
#一个整型数据占两个字节
#一个字符占一个字节
def Seed_SELF(self,data):
self.main_engine.write(data.encode("utf-8")) # 写数据
print("写总字节数:", data)
def Recive_data(self):
# 循环接收数据,此为死循环,可用线程实现
print("开始接收数据:")
#self.main_engine.write(‘11111‘.encode("utf-8"))
while True:
if self.main_engine.in_waiting:
str = self.main_engine.read(self.main_engine.in_waiting).decode("utf-8")
if (str[0:5] == "exit"):
break
else:
print("收到数据:", str)
break
Communication.Print_Used_Com()
Ret =False #是否创建成功标志
Engine1 = Communication("com4", 115200, 5)
Engine1.Print_Name
if __name__ == "__main__":
if (Ret):
#rett = Engine1.Send_data(‘1111‘)
Engine1.Send_data("00055".encode("utf-8"))
Engine1.Recive_data()
Engine1.Send_data("1110".encode("utf-8"))
Engine1.Recive_data()
Engine1.Close_Engine()
文件名为qq2021_7_7通过串口的短接(tx和rx相连)实现串口的实现和测试
import unittest
from unittest import mock
from qq2021_7_7 import Communication
class MyTestCase(unittest.TestCase):
def test_COM(self):
com01 = Communication("com4", 115200, 5)
com01.com_test = mock.Mock(
return_value={"Ret": True, "COM": ‘COM4‘, "timeout": 5})
com = ‘COM3‘
bps = 9600
timeout = 4
res = com01.com_test(com, bps, timeout)
print(res)
self.assertEqual(res["Ret"], True)
def test_Open_com(self):
self.Open_Engine = mock.Mock(
return_value=True)
self.assertEqual(self.Open_Engine(), True)
def test_Send_com(self):
self.Send_data = mock.Mock(
return_value = 111)
self.assertEqual(self.Send_data(), 111)
def test_Recive_data(self):
self.Recive_data = mock.Mock(
return_value = 111)
res = self.Recive_data(111)
self.assertEqual(res, 111)
if __name__ == ‘__main__‘:
unittest.main()
通过简单的单元测试实现串口的检测,其中send的返回值和recive的返回值是自己臆想的。后面会改进。