.Net5开发MQTT服务器

.Net5开发MQTT服务器主要借助MQTTnet包,自主开发MQTT服务器,经测试,非常稳定。

 

  

using IoT;
using JieYun.Admin.Net5;
using JieYun.IoT.Common.Models;
using JieYun.IoT.Server.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using static IoT.IoTRpc;

namespace JieYun.IoT.Server
{
    public class ServerWorker : BackgroundService
    {
        public static IMqttServer mqttServer;

        private readonly ILogger<ServerWorker> _logger;
        private readonly IoTRpcClient _client;

        public ServerWorker(ILogger<ServerWorker> logger, IoTRpcClient client, IServiceProvider provider)
        {
            _logger = logger;
            _client = client;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            await StartMqttServer();
        }

        //启动Mqtt服务器
        private async Task StartMqttServer()
        {
            try
            {
                //验证客户端信息
                string hostIp = AppConfigProvider.AppConfig.IoTServerAddress;//IP地址
                int hostPort = AppConfigProvider.AppConfig.IoTServerPort;//端口号
                int timeout = 5;//超时时间
                string username = "admin";//用户名
                string password = "admin";//密码

                var optionBuilder = new MqttServerOptionsBuilder()
                  // .WithDefaultEndpointBoundIPAddress(System.Net.IPAddress.Parse(hostIp))
                   .WithDefaultEndpointPort(hostPort)
                   .WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(timeout))
                   .WithConnectionValidator(t =>
                   {
                       if (t.Username != username || t.Password != password)
                       {
                           t.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;
                       }
                       t.ReasonCode = MqttConnectReasonCode.Success;
                   });
                var options = optionBuilder.Build();

                //创建Mqtt服务器
                mqttServer = new MqttFactory().CreateMqttServer();

                //开启订阅事件
                mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(MqttNetServer_SubscribedTopic);

                //取消订阅事件
                mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttNetServer_UnSubscribedTopic);

                //客户端消息事件
                mqttServer.UseApplicationMessageReceivedHandler(MqttServe_ApplicationMessageReceivedAsync);

                //客户端连接事件
                mqttServer.UseClientConnectedHandler(MqttNetServer_ClientConnected);

                //客户端断开事件
                mqttServer.UseClientDisconnectedHandler(MqttNetServer_ClientDisConnected);

                //启动服务器
                await mqttServer.StartAsync(options);

                _logger.LogInformation("MQTT服务器已启动.");
            }
            catch (Exception e)
            {
                _logger.LogError($"MQTT服务启动失败:{e}");
            }
        }

        /// <summary>
        /// 客户订阅
        /// </summary>
        private void MqttNetServer_SubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)
        {
            //客户端Id
            var ClientId = e.ClientId;
            var Topic = e.TopicFilter.Topic;
            _logger.LogInformation($"客户端【{ClientId}】订阅:{Topic}");
        }

        /// <summary>
        /// 客户取消订阅
        /// </summary>
        private void MqttNetServer_UnSubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)
        {
            //客户端Id
            var ClientId = e.ClientId;
            var Topic = e.TopicFilter;
            _logger.LogInformation($"客户端【{ClientId}】取消订阅:{Topic}");
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        private async Task MqttServe_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
        {
            _logger.LogInformation(e.ApplicationMessage.ToConsoleMessage());

            //转发消息到WebClient
            var msgStr = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
            var msg = JsonSerializer.Deserialize<MQTTMessage>(msgStr);

            if(msg.To == "SERVER"&&e.ApplicationMessage.Topic == MQTTTOPIC.UPDATE)
            {
                var msgSend = new MQTTMessage()
                {
                    From = msg.From,
                    To = "WebClient",
                    Msg = msg.Msg
                };
                await SendMessageToWebClient(msgSend);
            }
        }

        /// <summary>
        /// 客户连接
        /// </summary>
        private async Task MqttNetServer_ClientConnected(MqttServerClientConnectedEventArgs e)
        {
            var ClientId = e.ClientId;
            _logger.LogInformation($"{DateTime.Now} 客户端【{ClientId}】已连接");
           
            //通知服务器,客户端连接了
            var msg = new MQTTMessage()
            {
                From = ClientId,
                To = "SERVER",
                Msg = ClientStatus.Connected
            };
            await SendMessageToWebClient(msg);
        }

        /// <summary>
        /// 客户连接断开
        /// </summary>
        private async Task MqttNetServer_ClientDisConnected(MqttServerClientDisconnectedEventArgs e)
        {
            var ClientId = e.ClientId;
            
            _logger.LogInformation($"{DateTime.Now} 客户端【{ClientId}】已断开");

            //通知服务器,客户端断开了
            var msg = new MQTTMessage()
            {
                From = ClientId,
                To = "SERVER",
                Msg = ClientStatus.Disconnected
            };
            await SendMessageToWebClient(msg);
        }

        private async Task SendMessageToWebClient(MQTTMessage msg)
        {
            var msgStr = JsonSerializer.Serialize(msg);
            var payload = Encoding.UTF8.GetBytes(msgStr);
            MqttApplicationMessage mm = new MqttApplicationMessage()
            {
                Topic = MQTTTOPIC.UPDATE,
                Payload = payload
            };
            await mqttServer.PublishAsync(mm);
        }
    }
}
消息类MQTTMessage
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;

namespace JieYun.IoT.Common.Models
{
    public class MQTTMessage
    {
        public string Msg { get; set; }
        public string From { get; set; } = "SERVER";
        public string To { get; set; } = "e098060e71ef";

        public override string ToString()
        {
            return JsonSerializer.Serialize(this);
        }
    }
}
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;

namespace JieYun.IoT.Server
{
    public class Program
    {
        public static void Main(string[] args)
        {
            var config = new ConfigurationBuilder()
            .AddCommandLine(args)
            .Build();


            var host = new WebHostBuilder()
              .UseConfiguration(config)
              .UseKestrel()
              .UseContentRoot(Directory.GetCurrentDirectory())
              .UseStartup<Startup>()
              .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<ServerWorker>();
                })
              .ConfigureLogging(logging => {
                  logging.ClearProviders();
                  logging.SetMinimumLevel(LogLevel.Trace);
                  logging.AddConsole();
              })
              .Build();

            host.Run();
        }
    }
}

 

.Net5开发MQTT服务器

上一篇:42种常见的HTTP响应代码


下一篇:js {}与class属性描述符的区别