背景
在上篇中我们定义了两种通信协议基础的配置以及创建连接、重连机制、发送数据等等过程,在这篇文章中我们将重点来描述整个数据接收的过程,这个也是整个通信框架中非常重要的一个部分,在分析完整个过程后,本篇文章将贴出Communicator类的完整源码供分析文章使用。
接收过程
1.1 接收数据
我们先来看看这个TryReceive方法
/// <summary> /// Try receive data /// </summary> /// <param name="failReason"></param> /// <returns></returns> private void TryReceive() { string failReason = ""; bool isSucc = true; if (state == COMMUNICATION_STATE.IDLE || /*state == COMMUNICATION_STATE.WAITING_AFTER_CMD_SEND ||*/ state == COMMUNICATION_STATE.WAITING_CMD_RESPONSE) { if (CommunicationType == COMMUNICATION_TYPE.SERIAL) { isSucc = TrySerialReceive(out failReason); } else { isSucc = TryTcpReceive(out failReason); } } if (!isSucc) { retryConnectCnt++; communicationFailReason = failReason; Log.Write(LogCategory.Error, ComponentFullPath, communicationFailReason); retryConnectTimer.Start(ConnectionRetryTimeInterval * 1000); state = COMMUNICATION_STATE.CONNECTING_RETRY_WAIT; } }
在这个过程如果当前状态为Idle或者WAITING_CMD_RESPONSE的话就进入接收数据的过程,接收数据的时候根据通信过程分为Tcp接收数据和SerialPort接收数据,如果在这个接收数据的过程中发生了失败,则会再次进入到失败重连中,这个需要注意,下面我们具体看看这两个接收数据的过程。
我们先来看看串口接收过程
/// <summary> /// Try to receive data from serial port /// </summary> /// <param name="data"></param> /// <param name="isFailed"></param> /// <param name="failReason"></param> /// <returns>true: success</returns> private bool TrySerialReceive(out string failReason) { //do nothing, let void SerialPort_DataReceived(object sender, SerialDataReceivedEventArgs e) receive the data failReason = ""; return true; }
这里实际上什么也没做,这样很奇怪,其实在我们打开串口的时候订阅了串口的DataReceived事件,我们来看看这部分代码。
/// <summary> /// When serial port received something, adding the recevied buf behind the recvBuffer /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void SerialPort_DataReceived(object sender, SerialDataReceivedEventArgs e) { try { int len = serialPort.BytesToRead; if (len > 0) { byte[] curReadingBuf = new byte[len]; serialPort.Read(curReadingBuf, 0, len); lock (recvBufferLock) { if (recvBufferSize + len < recvBuffer.Length) { Array.Copy(curReadingBuf, 0, recvBuffer, recvBufferSize, len); recvBufferSize += len; } } } } catch (Exception ex) { Log.WriteExceptionCatch(ComponentFullPath, ex); } }
这段接收数据的过程需要注意,由于接收的过程可能处于多线程环境中所以必须加lock锁定 recvBuffer和recvBufferSize的操作,这个需要注意。
下面我再来看看Tcp接收数据的过程,这个过程和串口接收的时候大同小异,只不过写法略有不同。
/// <summary> /// Try to receive data from tcp /// </summary> /// <param name="failReason"></param> /// <returns>true: success</returns> private bool TryTcpReceive(out string failReason) { failReason = ""; try { int len = tcpSocket.Available; if (len > 0) { byte[] curReadingBuf = new byte[len]; tcpSocket.Receive(curReadingBuf, len, SocketFlags.None); lock (recvBufferLock) { if (recvBufferSize + len < recvBuffer.Length) { Array.Copy(curReadingBuf, 0, recvBuffer, recvBufferSize, len); recvBufferSize += len; } } } } catch (Exception ex) { Log.WriteExceptionCatch(ComponentFullPath, ex); failReason = ex.Message; return false; } return true; }
1.2 解析数据
这个也是非常重要的一个过程,接收到数据后我们需要将数据转化然后让调用方获取到最终的数据完成整个过程,我们首先来看看这个过程的代码,然后再一步步分析。
/// <summary> /// Process recevied data with head/tail seperator /// </summary> protected virtual void ProcessReceivedData() { lock (recvBufferLock) { if (recvBufferSize == 0) return; string recvBufString = string.Empty; if (IsNeedParseNonAsciiData) { recvBufString = ASCIIEncoding.UTF7.GetString(recvBuffer, 0, recvBufferSize); //for(int i = 0;i<recvBufferSize;i++) // recvBufString += (Char)recvBuffer[i]; } else { //recvBufString = ASCIIEncoding.ASCII.GetString(recvBuffer, 0, recvBufferSize); for (int i = 0; i < recvBufferSize; i++) recvBufString += (Char)recvBuffer[i]; } if (string.IsNullOrEmpty(messageSeperator)) { TryParseMessage(recvBufString); lock (recvBufferLock) { recvBufferSize = 0; } } else { int index = recvBufString.LastIndexOf(messageSeperator); if (index < 0) return; string subStr = recvBufString.Substring(0, index + messageSeperator.Length); string[] seperatedMessages = subStr.Split(new string[] { messageSeperator }, StringSplitOptions.RemoveEmptyEntries); foreach (var msg in seperatedMessages) { try { TryParseMessage(msg); } catch (Exception ex) { Log.WriteExceptionCatch(ComponentFullPath, ex); } } lock (recvBufferLock) { int leftLength = recvBufString.Length - index - messageSeperator.Length; if (leftLength > 0) { string leftStr = recvBufString.Substring(index + messageSeperator.Length, leftLength); Array.Copy(ASCIIEncoding.ASCII.GetBytes(leftStr), recvBuffer, leftLength); } recvBufferSize = leftLength; } } } }
这个里面有个非常重要的过程就是接收到的数据分隔符的问题,正常和外部通信的时候,当对方接收到命令以后会在回复的消息后面加上一个标志作为结束符,我们接收数据的时候也是以这个作为标志的,常用的有 CR、CRLF......这个时候我们就需要将接收到的数据进行分组处理了,而且只能接收完整的组数据,如果数据不完整我们任然需要将这些数据保留直到再次接收到结束标志的时候为止最为新的组数据。
这个里面接收到数据以后会将字节数组转化为string类型,然后基类进行解析处理,我们来看这个TryParseMessage的方法。
/// <summary> /// Try to parse received message /// </summary> /// <param name="recvMessage"></param> /// <returns></returns> protected void TryParseMessage(string receivedMessage) { var log = "[RECV] " + FormatLoggingMessage(receivedMessage); Log.Write(LogCategory.Debug, ComponentFullPath, log); Receive(receivedMessage); if (currentCommand != null) { bool? result = currentCommand._ParseReceviedMessage(receivedMessage); if (result.HasValue) { retryConnectCnt = 0; //reset connection retry count if communicator has got something if (state == COMMUNICATION_STATE.WAITING_CMD_RESPONSE || state == COMMUNICATION_STATE.WAITING_AFTER_CMD_SEND) state = COMMUNICATION_STATE.IDLE; currentCommand = null; } ReceivedMessageParsed?.Invoke(this, new ReceivedMsgParsedEventArgs(result)); } }
这个里面我们接收到数据的第一步就是记录当前接收信息到日志从而方便后续排查问题,后面我们调用了一个Receive的抽象方法用于子类进行重载从而根据自己的需要进行数据的解析。这里还有一个地方需要注意的就是解析数据的时候除了继承Communicator的子类通过重写Receive方法获取当前的数据之外,我们还会判断当前的成员变量currentCommand,如果存在当前命令也会调用之前定义在Command类下面的_ParseReceivedMessage方法,让当前发送命令对象也有机会解析当前读取到的这个数据,到了这里我们整个解析数据的整个过程就分析完毕了。
完整代码
这里我将整个上下篇分析的完整源码在这里展示从而方便对整个过程进行分析
#region Declarations using Pangea.Common.Attributes; using System; using System.Collections.Generic; using System.Text; using System.Threading; using Pangea.Common.Utility; using System.Net.Sockets; using System.IO.Ports; using Cimetrix.CimControlFramework.Common.Commands; using Cimetrix.Library.Logging; #endregion namespace Pangea.Core.Network { /// <summary> /// Message package structure /// </summary> public class Command { protected Communicator communicator; ManualResetEvent manualEvent; bool commandSucc; bool commandFailed; string errorCode; public Command(Communicator communicator) { this.communicator = communicator; manualEvent = new ManualResetEvent(false); } public Command(Communicator communicator, string commandString, double timeoutSec, bool needReply) { Data = ASCIIEncoding.ASCII.GetBytes(commandString); NeedReply = needReply; this.communicator = communicator; TimeoutSec = timeoutSec; manualEvent = new ManualResetEvent(false); } public Command(Communicator communicator, byte[] commandString, double timeoutSec, bool needReply) { Data = new byte[commandString.Length]; Array.Copy(commandString, Data, commandString.Length); NeedReply = needReply; this.communicator = communicator; TimeoutSec = timeoutSec; manualEvent = new ManualResetEvent(false); } public bool NeedReply { get; protected set; } public byte[] Data { get; protected set; } public double TimeoutSec { get; protected set; } public override string ToString() { if (communicator.IsNeedParseNonAsciiData) { return ASCIIEncoding.UTF7.GetString(Data); //string retString = string.Empty; //foreach (var b in Data) // retString += (Char)b; //return retString; } else return ASCIIEncoding.ASCII.GetString(Data); } public ICommandResult Execute() { communicator._EnqueueCommand(this); OnCommandExecuted(); manualEvent.WaitOne((int)(TimeoutSec * 1000)); if (commandSucc) return CommandResults.Succeeded; else if (commandFailed) return CommandResults.Failed(errorCode); return CommandResults.Failed("Command executing timeout"); } /// <summary> /// Invoked when command was push into queue and send out /// </summary> protected virtual void OnCommandExecuted() { } /// <summary> /// Parse received message /// </summary> /// <param name="message"></param> /// <returns>True: indicate current command execution success, False: indicate current command execution failed, Null: still waiting next receiving message</returns> protected virtual bool? Receive(string message, out string errorCode) { errorCode = ""; return true; } /// <summary> /// Parse received message /// </summary> /// <param name="message"></param> /// <returns>True: indicate current command execution success, False: indicate current command execution failed, Null: still waiting next receiving message</returns> internal bool? _ParseReceviedMessage(string message) { string errorCode; var result = Receive(message, out errorCode); if (result.HasValue) { if (result.Value) { commandSucc = true; manualEvent.Set(); return true; } else { commandFailed = true; this.errorCode = errorCode; manualEvent.Set(); return false; } } return null; } } [Serializable] public enum LoggingMessageType { ASCII, Binary, } public delegate void ReceivedMsgParsedEventHandler(object sender, ReceivedMsgParsedEventArgs args); public class ReceivedMsgParsedEventArgs { public bool? Result { get; set; } public ReceivedMsgParsedEventArgs(bool? result) { Result = result; } } public abstract class Communicator : Component { #region socket communciation objects /// <summary> /// Communication state definition /// </summary> private enum COMMUNICATION_STATE { DISABLED, DISCONNECTED, CONNECTING_RETRY_WAIT, IDLE, WAITING_AFTER_CMD_SEND, WAITING_CMD_RESPONSE, } // // Summary: // Specifies the parity bit for a System.IO.Ports.SerialPort object. [Serializable] public enum sParity { // // Summary: // No parity check occurs. None = 0, // // Summary: // Sets the parity bit so that the count of bits set is an odd number. Odd = 1, // // Summary: // Sets the parity bit so that the count of bits set is an even number. Even = 2, // // Summary: // Leaves the parity bit set to 1. Mark = 3, // // Summary: // Leaves the parity bit set to 0. Space = 4 } // // Summary: // Specifies the number of stop bits used on the System.IO.Ports.SerialPort object. [Serializable] public enum sStopBits { // // Summary: // No stop bits are used. This value is not supported by the System.IO.Ports.SerialPort.StopBits // property. None = 0, // // Summary: // One stop bit is used. One = 1, // // Summary: // Two stop bits are used. Two = 2, // // Summary: // 1.5 stop bits are used. OnePointFive = 3 } /// <summary> /// Communication type /// </summary> [Serializable] public enum COMMUNICATION_TYPE { SERIAL, TCPIP } /// <summary> /// Query message /// </summary> public class QueryCommand { public QueryCommand(Func<Command> commandFunc, double queryTimeIntervalSec) { LastQueryTime = DateTime.MinValue; TimeInterval = new TimeSpan(0, 0, 0, 0, (int)(queryTimeIntervalSec * 1000)); this.commandFunc = commandFunc; } Func<Command> commandFunc; public Command Command { get { return commandFunc.Invoke(); } } public TimeSpan TimeInterval { get; private set; } public DateTime LastQueryTime { get; set; } public override string ToString() { return Command == null ? "(NULL)" : Command.ToString(); } } #endregion #region Fields COMMUNICATION_STATE state = COMMUNICATION_STATE.DISCONNECTED; int retryConnectCnt = 0; Thread workThread; List<QueryCommand> QueryCommands = new List<QueryCommand>(); int currentQueryIndex = 0; Queue<Command> commandQueue = new Queue<Command>(); object commandQueueLock = new object(); Socket tcpSocket; SerialPort serialPort; protected byte[] recvBuffer = new byte[4096]; protected int recvBufferSize = 0; protected object recvBufferLock = new object(); string communicationFailReason; DeviceTimer retryConnectTimer = new DeviceTimer(); DeviceTimer commandReplyTimer = new DeviceTimer(); DeviceTimer commandPreWaitTimer = new DeviceTimer(); DeviceTimer commandSentDelayTimer = new DeviceTimer(); protected Command currentCommand; string messageSeperator; #endregion #region Constructor /// <summary> /// Default class Ctor /// </summary> public Communicator() { } #endregion #region Properties [PgVariable(VariableType.StatusVariable, DataType.Boolean, "", "Indicate whether communicator is active")] public bool IsEnabled { get; set; } = true; [PgConfig("Communication", "Select communication type", "", "", "", COMMUNICATION_TYPE.SERIAL)] public COMMUNICATION_TYPE CommunicationType { get; set; } [PgConfig("Communication", "Connection retry time interval", "sec", 0, 1000, 5)] public double ConnectionRetryTimeInterval { get; set; } [PgConfig("Communication", "Message seperator")] public List<byte> MessageSeperator { get; set; } [PgConfig("Communication", "Default command response time out", "second", 0, 1000, 1)] public double DefaultCommandReplyTimeout { get; set; } [PgConfig("Communication", "Minimal time interval between two message sending", "second", 0, 100, 0)] public double MinimalTimeIntervalBetweenTwoSending { get; set; } [PgConfig("Communication", "Waiting time after send before receive response", "second", 0, 100, 0)] public double WaitingTimeAfterSendBeforeReceive { get; set; } [PgConfig("Communication", "Specify data need parse using non ascii encoding,becase ascii only 7-bits(0~127)", "", null, null, false)] public bool IsNeedParseNonAsciiData { get; set; } [PgConfig("Logging", "Specify logging message type", "", null, null, LoggingMessageType.ASCII)] public LoggingMessageType LoggingMessageType { get; set; } [PgConfig("Tcp Port", "Tcp IP Address", "", "", "", "localhost")] public string TcpAddress { get; set; } [PgConfig("Tcp Port", "Tcp Port No", "", 0, 100000, 10000)] public int TcpPortNo { get; set; } [PgConfig("Serial Port", "Baud rate (2400,4800,9600,19200,38400,115200)", "bps", 2400, 115200, 9600)] public int SerialBaudRate { get; set; } [PgConfig("Serial Port", "Serial Port No", "", 1, 100, 3)] public int SerialPortNo { get; set; } [PgConfig("Serial Port", "Parity Selection", "", "", "", sParity.None)] public sParity Parity { get; set; } [PgConfig("Serial Port", "Stop Bits", "", "", "", sStopBits.One)] public sStopBits StopBits { get; set; } [PgConfig("Serial Port", "Data Bits", "", 1, 100, 8)] public int DataBits { get; set; } [PgConfig("Serial Port", "Reading timeout", "second", 0, 100, 5)] public double ReadingTimeout { get; set; } [PgConfig("Serial Port", "Writing timeout", "second", 0, 100, 5)] public double WritingTimeout { get; set; } [PgConfig("Serial Port", "Serial Port RTS Enabled?", "", null, null, false)] public bool SerialPortRtsEnable { get; set; } [PgConfig("Serial Port", "Serial Port DTR Enabled?", "", null, null, false)] public bool SerialPortDtrEnable { get; set; } [PgConfig("Communication", "Waiting response timeout fail count setting", "", 0, 100, 1)] public int WaitResponseFailCountSetting { get; set; } [PgAlarm(AlarmCode.ParameterControlError, "Message communication lost")] public string CommunFail { get; set; } [PgVariable(VariableType.StatusVariable, DataType.Boolean, "", "Indicate whether the communication is failed")] public bool IsCommunicationError { get { return retryConnectCnt > 0; } } [PgVariable(VariableType.StatusVariable, DataType.Boolean, "", "Indicate whether the communication is dissconnected")] public bool IsCommDissconnected { get { return retryConnectCnt >= WaitResponseFailCountSetting; } } #endregion #region Public Methods #endregion #region Protected Methods /// <summary> /// Component Initialize method /// </summary> protected override void Initialize() { base.Initialize(); messageSeperator = (MessageSeperator == null || MessageSeperator.Count == 0) ? "" : ASCIIEncoding.ASCII.GetString(MessageSeperator.ToArray()); //get query messages QueryCommands = GetQueryCommands(); if (QueryCommands == null) { QueryCommands = new List<QueryCommand>(); } //start communication thread workThread = new Thread(new ThreadStart(do_work)); workThread.Name = ComponentFullPath + " Communication Thread"; workThread.IsBackground = true; workThread.Start(); } /// <summary> /// Get query messages /// </summary> /// <returns></returns> protected abstract List<QueryCommand> GetQueryCommands(); /// <summary> /// Alarm cleared /// </summary> protected override void AlarmCleared(string alarmName) { base.AlarmCleared(alarmName); retryConnectCnt = 0; } /// <summary> /// On serial port / tcp communication connected /// </summary> protected abstract void OnConnected(); /// <summary> /// Process recevied data with head/tail seperator /// </summary> protected virtual void ProcessReceivedData() { lock (recvBufferLock) { if (recvBufferSize == 0) return; string recvBufString = string.Empty; if (IsNeedParseNonAsciiData) { recvBufString = ASCIIEncoding.UTF7.GetString(recvBuffer, 0, recvBufferSize); //for(int i = 0;i<recvBufferSize;i++) // recvBufString += (Char)recvBuffer[i]; } else { //recvBufString = ASCIIEncoding.ASCII.GetString(recvBuffer, 0, recvBufferSize); for (int i = 0; i < recvBufferSize; i++) recvBufString += (Char)recvBuffer[i]; } if (string.IsNullOrEmpty(messageSeperator)) { TryParseMessage(recvBufString); lock (recvBufferLock) { recvBufferSize = 0; } } else { int index = recvBufString.LastIndexOf(messageSeperator); if (index < 0) return; string subStr = recvBufString.Substring(0, index + messageSeperator.Length); string[] seperatedMessages = subStr.Split(new string[] { messageSeperator }, StringSplitOptions.RemoveEmptyEntries); foreach (var msg in seperatedMessages) { try { TryParseMessage(msg); } catch (Exception ex) { Log.WriteExceptionCatch(ComponentFullPath, ex); } } lock (recvBufferLock) { int leftLength = recvBufString.Length - index - messageSeperator.Length; if (leftLength > 0) { string leftStr = recvBufString.Substring(index + messageSeperator.Length, leftLength); Array.Copy(ASCIIEncoding.ASCII.GetBytes(leftStr), recvBuffer, leftLength); } recvBufferSize = leftLength; } } } } /// <summary> /// On received message /// </summary> /// <param name="receviedMessage"></param> protected abstract void Receive(string receviedMessage); #endregion #region Private Methods /// <summary> /// Try to parse received message /// </summary> /// <param name="recvMessage"></param> /// <returns></returns> protected void TryParseMessage(string receivedMessage) { var log = "[RECV] " + FormatLoggingMessage(receivedMessage); Log.Write(LogCategory.Debug, ComponentFullPath, log); Receive(receivedMessage); if (currentCommand != null) { bool? result = currentCommand._ParseReceviedMessage(receivedMessage); if (result.HasValue) { retryConnectCnt = 0; //reset connection retry count if communicator has got something if (state == COMMUNICATION_STATE.WAITING_CMD_RESPONSE || state == COMMUNICATION_STATE.WAITING_AFTER_CMD_SEND) state = COMMUNICATION_STATE.IDLE; currentCommand = null; } ReceivedMessageParsed?.Invoke(this, new ReceivedMsgParsedEventArgs(result)); } } protected virtual void Monitor() { } private string FormatLoggingMessage(string message) { if (LoggingMessageType == LoggingMessageType.ASCII) { string result = message.Trim('\x0', '\x1', '\x2', '\x3', '\x4', '\x5', '\x6', '\x7', '\x8', '\x9', '\xa', '\xb', '\xc', '\xd', '\xe', '\xf', '\x10', '\x11', '\x12', '\x13', '\x14', '\x15', '\x16', '\x17', '\x18', '\x19', '\x1a', '\x1b', '\x1c', '\x1d', '\x1e', '\x1f'); return result; } StringBuilder sb = new StringBuilder(); foreach (int c in message) { sb.Append(string.Format("{0:X2}", c)); sb.Append(" "); } return sb.ToString(); } /// <summary> /// Check tcp status /// </summary> /// <param name="failReason"></param> /// <returns></returns> private bool TestTcpStatus(out string failReason) { failReason = ""; // This is how you can determine whether a socket is still connected. bool blockingState = tcpSocket.Blocking; try { byte[] tmp = new byte[1]; tcpSocket.Blocking = false; tcpSocket.Send(tmp, 0, 0); //Console.WriteLine("Connected!"); } catch (SocketException e) { // 10035 == WSAEWOULDBLOCK if (e.NativeErrorCode.Equals(10035)) failReason = "Still Connected, but the Send would block"; else failReason = $"Disconnected: error code {e.NativeErrorCode}"; } finally { tcpSocket.Blocking = blockingState; } return tcpSocket.Connected; } /// <summary> /// Try establish tcp connection /// </summary> /// <param name="failReason"></param> /// <returns></returns> private bool TryTcpConnect(out string failReason) { failReason = ""; try { // release socket resource before each tcp connection if (tcpSocket != null) { try { Log.Write(LogCategory.Debug, ComponentFullPath, "Close and release socket resource"); tcpSocket.Dispose(); } catch (Exception e0) { Log.Write(LogCategory.Debug, ComponentFullPath, $"Close socket exception: {e0.Message}"); } finally { tcpSocket = null; } } tcpSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); tcpSocket.Connect(this.TcpAddress, this.TcpPortNo); bool isSucc = TestTcpStatus(out failReason); if (isSucc) { /* * https://msdn.microsoft.com/en-us/library/8s4y8aff%28v=vs.110%29.aspx * If no data is available for reading, the Receive method will block until data is available, * unless a time-out value was set by using Socket.ReceiveTimeout. * If the time-out value was exceeded, the Receive call will throw a SocketException. * If you are in non-blocking mode, and there is no data available in the in the protocol stack buffer, * the Receive method will complete immediately and throw a SocketException. * You can use the Available property to determine if data is available for reading. * When Available is non-zero, retry the receive operation. */ tcpSocket.Blocking = false; } else { return false; } } catch (Exception ex) { failReason = ex.Message; return false; } return true; } private bool TrySerialPortConnect(out string failReason) { failReason = ""; try { //Close serial port if it is not null if (serialPort != null) { try { Log.Write(LogCategory.Debug, ComponentFullPath, "Close serial port"); serialPort.Close(); serialPort = null; } catch (Exception e0) { Log.Write(LogCategory.Debug, ComponentFullPath, $"Close serial port exception: {e0.Message}"); } } //Open Serial Port serialPort = new SerialPort { PortName = $"COM{SerialPortNo}", BaudRate = SerialBaudRate, RtsEnable = SerialPortRtsEnable, DtrEnable = SerialPortDtrEnable, Parity = (Parity) Enum.Parse(typeof(Parity), Parity.ToString()), StopBits = (StopBits) Enum.Parse(typeof(StopBits), StopBits.ToString()), DataBits = DataBits, ReadTimeout = (int) (ReadingTimeout * 1000), WriteTimeout = (int) (WritingTimeout * 1000) }; serialPort.DataReceived += SerialPort_DataReceived; serialPort.Open(); if (!serialPort.IsOpen) throw new Exception("Serial Port Open Failed"); } catch (Exception ex) { failReason = ex.Message; return false; } return true; } /// <summary> /// When serial port received something, adding the recevied buf behind the recvBuffer /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void SerialPort_DataReceived(object sender, SerialDataReceivedEventArgs e) { try { int len = serialPort.BytesToRead; if (len > 0) { byte[] curReadingBuf = new byte[len]; serialPort.Read(curReadingBuf, 0, len); lock (recvBufferLock) { if (recvBufferSize + len < recvBuffer.Length) { Array.Copy(curReadingBuf, 0, recvBuffer, recvBufferSize, len); recvBufferSize += len; } } } } catch (Exception ex) { Log.WriteExceptionCatch(ComponentFullPath, ex); } } /// <summary> /// Try to receive data from tcp /// </summary> /// <param name="failReason"></param> /// <returns>true: success</returns> private bool TryTcpReceive(out string failReason) { failReason = ""; try { int len = tcpSocket.Available; if (len > 0) { byte[] curReadingBuf = new byte[len]; tcpSocket.Receive(curReadingBuf, len, SocketFlags.None); lock (recvBufferLock) { if (recvBufferSize + len < recvBuffer.Length) { Array.Copy(curReadingBuf, 0, recvBuffer, recvBufferSize, len); recvBufferSize += len; } } } } catch (Exception ex) { Log.WriteExceptionCatch(ComponentFullPath, ex); failReason = ex.Message; return false; } return true; } /// <summary> /// Try to receive data from serial port /// </summary> /// <param name="data"></param> /// <param name="isFailed"></param> /// <param name="failReason"></param> /// <returns>true: success</returns> private bool TrySerialReceive(out string failReason) { //do nothing, let void SerialPort_DataReceived(object sender, SerialDataReceivedEventArgs e) receive the data failReason = ""; return true; } /// <summary> /// Try socket sending data /// </summary> /// <param name="msg"></param> /// <param name="failReason"></param> /// <returns>True: succ</returns> private bool TryTcpSend(Command msg, out string failReason) { failReason = ""; try { tcpSocket.Send(msg.Data); //System.Diagnostics.Debug.WriteLine($"{DateTime.Now.ToString("HH:mm:ss")} {ComponentName} TCP Send: {msg.ToString()}"); var log = "[SEND] " + FormatLoggingMessage(msg.ToString()); Log.Write(LogCategory.Debug, ComponentFullPath, log); } catch (Exception ex) { failReason = ex.Message; return false; } return true; } /// <summary> /// Try serial port sending data /// </summary> /// <param name="msg"></param> private bool TrySerialSend(Command msg, out string failReason) { failReason = ""; try { serialPort.Write(msg.Data, 0, msg.Data.Length); //System.Diagnostics.Debug.WriteLine($"{ComponentFullPath} Serial Send: {msg.ToString()}"); var log = "[SEND] " + FormatLoggingMessage(msg.ToString()); Log.Write(LogCategory.Debug, ComponentFullPath, log); } catch (Exception ex) { failReason = ex.Message; return false; } return true; } /// <summary> /// Try receive data /// </summary> /// <param name="failReason"></param> /// <returns></returns> private void TryReceive() { string failReason = ""; bool isSucc = true; if (state == COMMUNICATION_STATE.IDLE || /*state == COMMUNICATION_STATE.WAITING_AFTER_CMD_SEND ||*/ state == COMMUNICATION_STATE.WAITING_CMD_RESPONSE) { if (CommunicationType == COMMUNICATION_TYPE.SERIAL) { isSucc = TrySerialReceive(out failReason); } else { isSucc = TryTcpReceive(out failReason); } } if (!isSucc) { retryConnectCnt++; communicationFailReason = failReason; Log.Write(LogCategory.Error, ComponentFullPath, communicationFailReason); retryConnectTimer.Start(ConnectionRetryTimeInterval * 1000); state = COMMUNICATION_STATE.CONNECTING_RETRY_WAIT; } } /// <summary> /// Communication work thread /// </summary> private void do_work() { while (true) { Thread.Sleep(50); try { if(!IsEnabled) { state = COMMUNICATION_STATE.DISABLED; continue; } else { if(state == COMMUNICATION_STATE.DISABLED) { Log.Write(LogCategory.Debug, ComponentFullPath, "Re-establish communication when disabled -> enabled"); retryConnectTimer.Start(ConnectionRetryTimeInterval * 1000); state = COMMUNICATION_STATE.CONNECTING_RETRY_WAIT; } } Monitor(); TryReceive(); ProcessReceivedData(); switch (state) { case COMMUNICATION_STATE.DISABLED: break; case COMMUNICATION_STATE.DISCONNECTED: { bool isSucc = false; if (CommunicationType == COMMUNICATION_TYPE.TCPIP) { Log.Write(LogCategory.Debug, ComponentFullPath, "Start tcp connection .. "); isSucc = TryTcpConnect(out communicationFailReason); } else { Log.Write(LogCategory.Debug, ComponentFullPath, "Start serial port connection .. "); isSucc = TrySerialPortConnect(out communicationFailReason); } if (isSucc) { lock (commandQueueLock) { commandQueue.Clear(); } lock (recvBufferLock) { recvBufferSize = 0; } retryConnectCnt = 0; communicationFailReason = ""; Log.Write(LogCategory.Information, ComponentFullPath, "Communicaiton established"); OnConnected(); state = COMMUNICATION_STATE.IDLE; } else { retryConnectCnt++; communicationFailReason = $"{communicationFailReason}, {retryConnectCnt} times retry, waiting {ConnectionRetryTimeInterval} sec and start next retry"; if (retryConnectCnt == 1) { RaiseAlarm(CommunFail, communicationFailReason); } Log.Write(LogCategory.Debug, ComponentFullPath, communicationFailReason); retryConnectTimer.Start(ConnectionRetryTimeInterval * 1000); state = COMMUNICATION_STATE.CONNECTING_RETRY_WAIT; } } break; case COMMUNICATION_STATE.CONNECTING_RETRY_WAIT: if (retryConnectTimer.IsTimeout()) { state = COMMUNICATION_STATE.DISCONNECTED; } break; case COMMUNICATION_STATE.IDLE: { if (commandSentDelayTimer.IsTimeout() || commandSentDelayTimer.IsIdle()) { if (commandQueue.Count == 0) { GenerateNextQueryCommand(); } Command nextCommand = null; lock (commandQueueLock) { if (commandQueue.Count > 0) { nextCommand = commandQueue.Dequeue(); } } if (nextCommand != null) { bool isSucc = false; commandSentDelayTimer.Start(MinimalTimeIntervalBetweenTwoSending * 1000); if (CommunicationType == COMMUNICATION_TYPE.TCPIP) { isSucc = TryTcpSend(nextCommand, out communicationFailReason); } else { isSucc = TrySerialSend(nextCommand, out communicationFailReason); } if (isSucc) { if (nextCommand.NeedReply) { currentCommand = nextCommand; commandReplyTimer.Start(currentCommand.TimeoutSec * 1000); commandPreWaitTimer.Start(WaitingTimeAfterSendBeforeReceive * 1000); state = COMMUNICATION_STATE.WAITING_AFTER_CMD_SEND; } else { currentCommand = null; state = COMMUNICATION_STATE.IDLE; } } else { retryConnectCnt++; communicationFailReason = $"Sending data failed,{communicationFailReason},waiting {ConnectionRetryTimeInterval} sec and start next re-connection"; if (retryConnectCnt == 1) { RaiseAlarm(CommunFail, communicationFailReason); } Log.Write(LogCategory.Error, ComponentFullPath, communicationFailReason); retryConnectTimer.Start(ConnectionRetryTimeInterval * 1000); state = COMMUNICATION_STATE.CONNECTING_RETRY_WAIT; } } } } break; case COMMUNICATION_STATE.WAITING_AFTER_CMD_SEND: if (commandPreWaitTimer.IsTimeout()) { state = COMMUNICATION_STATE.WAITING_CMD_RESPONSE; } break; case COMMUNICATION_STATE.WAITING_CMD_RESPONSE: if(commandReplyTimer.IsTimeout()) { retryConnectCnt++; communicationFailReason = $"Waiting command response timeout"; if (retryConnectCnt >= WaitResponseFailCountSetting) { RaiseAlarm(CommunFail, communicationFailReason); Log.Write(LogCategory.Error, ComponentFullPath, communicationFailReason); currentCommand = null; retryConnectTimer.Start(ConnectionRetryTimeInterval * 1000); state = COMMUNICATION_STATE.CONNECTING_RETRY_WAIT; } else { Log.Write(LogCategory.Information, ComponentFullPath, communicationFailReason + $" retry {retryConnectCnt}"); currentCommand = null; state = COMMUNICATION_STATE.IDLE; } } break; } } catch (Exception e) { retryConnectCnt++; communicationFailReason = $"Code running exception: {e.Message}, waiting {ConnectionRetryTimeInterval} sec and start next re-connection"; Log.Write(LogCategory.Debug, ComponentFullPath, communicationFailReason); Log.WriteExceptionCatch(ComponentFullPath, e); retryConnectTimer.Start(ConnectionRetryTimeInterval * 1000); state = COMMUNICATION_STATE.CONNECTING_RETRY_WAIT; } } } /// <summary> /// Periodically send robot query commands /// </summary> private void GenerateNextQueryCommand() { if (IsEnabled) { int num = 0; while (num < QueryCommands.Count) { bool isQueryCommandTriggered = false; if (currentQueryIndex >= QueryCommands.Count) currentQueryIndex = 0; var timeDiff = DateTime.Now - QueryCommands[currentQueryIndex].LastQueryTime; if (timeDiff > QueryCommands[currentQueryIndex].TimeInterval) { EnqueueCommand(QueryCommands[currentQueryIndex].Command); QueryCommands[currentQueryIndex].LastQueryTime = DateTime.Now; isQueryCommandTriggered = true; } currentQueryIndex++; num++; if (isQueryCommandTriggered) break; } } } #endregion #region Internal /// <summary> /// Enqueue message /// </summary> /// <param name="message"></param> internal void _EnqueueCommand(Command command) { EnqueueCommand(command); } protected void EnqueueCommand(Command command) { lock (commandQueueLock) { //TODO: add queue size limitation // commandQueue.Enqueue(command); } } #endregion #region Events public event ReceivedMsgParsedEventHandler ReceivedMessageParsed; #endregion } }