接下来,我们实现一个OPC UA客户端。这个客户端会连接到我们刚刚创建的服务器,并读取“Sinumerik”对象下的所有变量。读取到的数据会被写入到一个CSV文件中,每5秒更新一次。
import csv
from datetime import datetime
import os
import time
from opcua import Client, ua
# 创建客户端并连接到服务器
client = Client("opc.tcp://localhost:4840/")
client.connect()
# 获取服务器对象节点
objects = client.get_objects_node()
sinumerik_node = objects.get_child(["2:Sinumerik"])
# 遍历所有子节点并返回变量节点的路径和数值
def get_variables(node, path=""):
variables = {}
children = node.get_children()
for child in children:
new_path = f"{path}/{child.get_browse_name().Name}"
if child.get_node_class() == ua.NodeClass.Variable:
value = child.get_value()
variables[new_path] = value
else:
variables.update(get_variables(child, new_path))
return variables
# 处理CSV文件
filename = 'opcua_data.csv'
if os.path.exists(filename):
modification_time = os.path.getmtime(filename)
modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')
new_filename = f"{filename}_{modification_time_str}"
os.rename(filename, new_filename)
# 写入数据到CSV文件
with open(filename, mode='w', newline='') as csvfile:
first_run = True
try:
while True:
variables = get_variables(sinumerik_node)
if first_run:
fieldnames = variables.keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
first_run = False
if writer:
writer.writerow(variables)
csvfile.flush()
time.sleep(5)
except KeyboardInterrupt:
client.disconnect()
通过以上代码,我们创建了一个简单的OPC UA服务器和客户端。服务器定期更新变量的值,而客户端则定期读取这些值并保存到CSV文件中。这只是一个起点,你可以根据实际需求对代码进行扩展和优化。希望这篇文章能帮助你快速入门OPC UA的Python编程。