.Net5开发MQTT服务器主要借助MQTTnet包,自主开发MQTT服务器,经测试,非常稳定。
using IoT; using JieYun.Admin.Net5; using JieYun.IoT.Common.Models; using JieYun.IoT.Server.Services; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using MQTTnet; using MQTTnet.Protocol; using MQTTnet.Server; using System; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using static IoT.IoTRpc; namespace JieYun.IoT.Server { public class ServerWorker : BackgroundService { public static IMqttServer mqttServer; private readonly ILogger<ServerWorker> _logger; private readonly IoTRpcClient _client; public ServerWorker(ILogger<ServerWorker> logger, IoTRpcClient client, IServiceProvider provider) { _logger = logger; _client = client; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await StartMqttServer(); } //启动Mqtt服务器 private async Task StartMqttServer() { try { //验证客户端信息 string hostIp = AppConfigProvider.AppConfig.IoTServerAddress;//IP地址 int hostPort = AppConfigProvider.AppConfig.IoTServerPort;//端口号 int timeout = 5;//超时时间 string username = "admin";//用户名 string password = "admin";//密码 var optionBuilder = new MqttServerOptionsBuilder() // .WithDefaultEndpointBoundIPAddress(System.Net.IPAddress.Parse(hostIp)) .WithDefaultEndpointPort(hostPort) .WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(timeout)) .WithConnectionValidator(t => { if (t.Username != username || t.Password != password) { t.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword; } t.ReasonCode = MqttConnectReasonCode.Success; }); var options = optionBuilder.Build(); //创建Mqtt服务器 mqttServer = new MqttFactory().CreateMqttServer(); //开启订阅事件 mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(MqttNetServer_SubscribedTopic); //取消订阅事件 mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttNetServer_UnSubscribedTopic); //客户端消息事件 mqttServer.UseApplicationMessageReceivedHandler(MqttServe_ApplicationMessageReceivedAsync); //客户端连接事件 mqttServer.UseClientConnectedHandler(MqttNetServer_ClientConnected); //客户端断开事件 mqttServer.UseClientDisconnectedHandler(MqttNetServer_ClientDisConnected); //启动服务器 await mqttServer.StartAsync(options); _logger.LogInformation("MQTT服务器已启动."); } catch (Exception e) { _logger.LogError($"MQTT服务启动失败:{e}"); } } /// <summary> /// 客户订阅 /// </summary> private void MqttNetServer_SubscribedTopic(MqttServerClientSubscribedTopicEventArgs e) { //客户端Id var ClientId = e.ClientId; var Topic = e.TopicFilter.Topic; _logger.LogInformation($"客户端【{ClientId}】订阅:{Topic}"); } /// <summary> /// 客户取消订阅 /// </summary> private void MqttNetServer_UnSubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e) { //客户端Id var ClientId = e.ClientId; var Topic = e.TopicFilter; _logger.LogInformation($"客户端【{ClientId}】取消订阅:{Topic}"); } /// <summary> /// 接收消息 /// </summary> private async Task MqttServe_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e) { _logger.LogInformation(e.ApplicationMessage.ToConsoleMessage()); //转发消息到WebClient var msgStr = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); var msg = JsonSerializer.Deserialize<MQTTMessage>(msgStr); if(msg.To == "SERVER"&&e.ApplicationMessage.Topic == MQTTTOPIC.UPDATE) { var msgSend = new MQTTMessage() { From = msg.From, To = "WebClient", Msg = msg.Msg }; await SendMessageToWebClient(msgSend); } } /// <summary> /// 客户连接 /// </summary> private async Task MqttNetServer_ClientConnected(MqttServerClientConnectedEventArgs e) { var ClientId = e.ClientId; _logger.LogInformation($"{DateTime.Now} 客户端【{ClientId}】已连接"); //通知服务器,客户端连接了 var msg = new MQTTMessage() { From = ClientId, To = "SERVER", Msg = ClientStatus.Connected }; await SendMessageToWebClient(msg); } /// <summary> /// 客户连接断开 /// </summary> private async Task MqttNetServer_ClientDisConnected(MqttServerClientDisconnectedEventArgs e) { var ClientId = e.ClientId; _logger.LogInformation($"{DateTime.Now} 客户端【{ClientId}】已断开"); //通知服务器,客户端断开了 var msg = new MQTTMessage() { From = ClientId, To = "SERVER", Msg = ClientStatus.Disconnected }; await SendMessageToWebClient(msg); } private async Task SendMessageToWebClient(MQTTMessage msg) { var msgStr = JsonSerializer.Serialize(msg); var payload = Encoding.UTF8.GetBytes(msgStr); MqttApplicationMessage mm = new MqttApplicationMessage() { Topic = MQTTTOPIC.UPDATE, Payload = payload }; await mqttServer.PublishAsync(mm); } } }
消息类MQTTMessage
using System; using System.Collections.Generic; using System.Text; using System.Text.Json; namespace JieYun.IoT.Common.Models { public class MQTTMessage { public string Msg { get; set; } public string From { get; set; } = "SERVER"; public string To { get; set; } = "e098060e71ef"; public override string ToString() { return JsonSerializer.Serialize(this); } } }
using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; namespace JieYun.IoT.Server { public class Program { public static void Main(string[] args) { var config = new ConfigurationBuilder() .AddCommandLine(args) .Build(); var host = new WebHostBuilder() .UseConfiguration(config) .UseKestrel() .UseContentRoot(Directory.GetCurrentDirectory()) .UseStartup<Startup>() .ConfigureServices((hostContext, services) => { services.AddHostedService<ServerWorker>(); }) .ConfigureLogging(logging => { logging.ClearProviders(); logging.SetMinimumLevel(LogLevel.Trace); logging.AddConsole(); }) .Build(); host.Run(); } } }