public void Subscribe(Action<MarketData> action) { SubscriberSocket sub = null; try { var setting = new AppSetting(); using (sub = new SubscriberSocket()) { //sub.Options.ReconnectInterval = new TimeSpan(0, 0, 1); //sub.Options.ReconnectIntervalMax = new TimeSpan(24, 0, 0); sub.Options.TcpKeepalive = true; //sub.Options.TcpKeepaliveIdle = new TimeSpan(24, 0, 0); //sub.Options.TcpKeepaliveInterval = new TimeSpan(0, 0, 1); sub.Connect(setting.TicketHostPort()); sub.Subscribe(setting.TicketSubscribeTopic()); var timeout = new TimeSpan(0, 0, 3); while (true) { List<string> contents = new List<string>(); if (sub.TryReceiveMultipartStrings(timeout, ref contents)) { if (contents != null && contents.Count > 0) { var content = contents[1]; if (content.IsNullOrEmpty() == false) { var data = content.Deserialize<MarketData>(); if (data != null) { if (data.securityCode.IsNullOrEmpty() == false) { action(data); } else { Log.Info($"NetMqTicketSubscribe Subscribe empty receive content {content} "); } } else { Log.Info($"NetMqTicketSubscribe Subscribe error receive content {content} no as MarketData"); } } else { Log.Info($"NetMqTicketSubscribe Subscribe error receive content {content}"); } } else { Log.Info("NetMqTicketSubscribe Subscribe error receive count !=2 "); } } else { ReConnection(action, sub, null); } } //while (true) //{ // var contents = sub.ReceiveMultipartStrings(System.Text.Encoding.UTF8); // if (contents != null && contents.Count > 0) // { // var content = contents[1]; // if (content.IsNullOrEmpty() == false) // { // var data = content.Deserialize<MarketData>(); // if (data != null && data.securityCode.IsNullOrEmpty() == false) // { // action(data); // } // else // { // Log.Info($"NetMqTicketSubscribe Subscribe error receive content {content} no as MarketData"); // } // } // else // { // Log.Info($"NetMqTicketSubscribe Subscribe error receive content {content}"); // } // } // else // { // Log.Info("NetMqTicketSubscribe Subscribe error receive count !=2 "); // } //} } } catch (Exception ex) { ReConnection(action, sub, ex); } } private void ReConnection(Action<MarketData> action, SubscriberSocket sub, Exception ex) { if (sub != null) { { try { sub.Dispose(); } catch (Exception e) { Log.Error("获取行情异常 关闭连接异常", e); } } } Log.Error("获取行情异常 准备重连", ex); Subscribe(action); }
sub.TryReceiveMultipartStrings(timeout, ref contents) 使用该方法一次获取多条数据,设置超时时间,断线后自动重连