在ASP.NET Core上利用MassTransit来集成使用RabbitMQ真的很简单,代码也很简洁。近期因为项目需要,我便在这基础上再次进行了封装,抽成了公共方法,使得使用RabbitMQ的调用变得更方便简洁。那么,就让咱们来瞧瞧其魅力所在吧。
MassTransit
先看看MassTransit是个什么宝贝(MassTransit官网的简介):
MassTransit是一个免费的开源轻量级消息总线,用于使用.NET框架创建分布式应用程序。MassTransit在现有的*消息传输上提供了一系列广泛的功能,从而以开发人员友好的方式使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠且可扩展的方式。
通俗描述:
MassTransit就是一套基于消息服务的高级封装类库,下游可联接RabbitMQ、Redis、MongoDb等服务。
github官网:https://github.com/MassTransit/MassTransit
RabbitMQ
RabbitMQ是成熟的MQ队列服务,是由 Erlang 语言开发的 AMQP 的开源实现。关于介绍RabbitMQ的中文资料也很多,有需要可以自行查找。我这里贴出其官网与下载安装的链接,如下:
下载与安装:http://www.rabbitmq.com/download.html
实现代码
通过上面的介绍,咱们已对MassTransit与RabbitMQ有了初步了解,那么现在来看看如何在ASP.NET Core上优雅的使用RabbitMQ吧。
1、创建一个名为“RabbitMQHelp.cs”公共类,用于封装操作RabbitMQ的公共方法,并通过Nuget来管理并引用“MassTransit”与“MassTransit.RabbitMQ”类库。
2、“RabbitMQHelp.cs”公共类主要对外封装两个静态方法,其代码如下:
using MassTransit; using MassTransit.RabbitMqTransport; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace Lezhima.Comm { /// <summary> /// RabbitMQ公共操作类,基于MassTransit库 /// </summary> public class RabbitMQHelp { #region 交换器 /// <summary> /// 操作日志交换器 /// 同时需在RabbitMQ的管理后台创建同名交换器 /// </summary> public static readonly string actionLogExchange = "Lezhima.ActionLogExchange"; #endregion #region 声明变量 /// <summary> /// MQ联接地址,建议放到配置文件 /// </summary> private static readonly string mqUrl = "rabbitmq://192.168.1.181/"; /// <summary> /// MQ联接账号,建议放到配置文件 /// </summary> private static readonly string mqUser = "admin"; /// <summary> /// MQ联接密码,建议放到配置文件 /// </summary> private static readonly string mqPwd = "admin"; #endregion /// <summary> /// 创建连接对象 /// 不对外公开 /// </summary> private static IBusControl CreateBus(Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> registrationAction = null) { //通过MassTransit创建MQ联接工厂 return Bus.Factory.CreateUsingRabbitMq(cfg => { var host = cfg.Host(new Uri(mqUrl), hst => { hst.Username(mqUser); hst.Password(mqPwd); }); registrationAction?.Invoke(cfg, host); }); } /// <summary> /// MQ生产者 /// 这里使用fanout的交换类型 /// </summary> /// <param name="obj"></param> public async static Task PushMessage(string exchange, object obj) { var bus = CreateBus(); var sendToUri = new Uri($"{mqUrl}{exchange}"); var endPoint = await bus.GetSendEndpoint(sendToUri); await endPoint.Send(obj); } /// <summary> /// MQ消费者 /// 这里使用fanout的交换类型 /// consumer必需是实现IConsumer接口的类实例 /// </summary> /// <param name="obj"></param> public static void ReceiveMessage(string exchange, object consumer) { var bus = CreateBus((cfg, host) => { //从指定的消息队列获取消息 通过consumer来实现消息接收 cfg.ReceiveEndpoint(host, exchange, e => { e.Instance(consumer); }); }); bus.Start(); } } }
3、“RabbitMQHelp.cs”公共类已经有了MQ“生产者”与“消费者”两个对外的静态公共方法,其中“生产者”方法可以在业务代码中直接调用,可传递JSON、对象等类型的参数向指定的交换器发送数据。而“消费者”方法是从指定交换器中进行接收绑定,但接收到的数据处理功能则交给了“consumer”类(因为在实际项目中,不同的数据有不同的业务处理逻辑,所以这里我们直接就通过IConsumer接口交给具体的实现类去做了)。那么,下面我们再来看看消费者里传递进来的“consumer”类的代码吧:
using MassTransit; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace Lezhima.Storage.Consumer { /// <summary> /// 从MQ接收并处理数据 /// 实现MassTransit的IConsumer接口 /// </summary> public class LogConsumer : IConsumer<ActionLog> { /// <summary> /// 实现Consume方法 /// 接收并处理数据 /// </summary> /// <param name="context"></param> /// <returns></returns> public Task Consume(ConsumeContext<ActionLog> context) { return Task.Run(async () => { //获取接收到的对象 var amsg = context.Message; Console.WriteLine($"Recevied By Consumer:{amsg}"); Console.WriteLine($"Recevied By Consumer:{amsg.ActionLogId}"); }); } } }
调用代码
1、生产者调用代码如下:
/// <summary> /// 测试MQ生产者 /// </summary> /// <returns></returns> [HttpGet] public async Task<MobiResult> AddMessageTest() { //声明一个实体对象 var model = new ActionLog(); model.ActionLogId = Guid.NewGuid(); model.CreateTime = DateTime.Now; model.UpdateTime = DateTime.Now; //调用MQ await RabbitMQHelp.PushMessage(RabbitMQHelp.actionLogExchange, model); return new MobiResult(1000, "操作成功"); }
2、消费者调用代码如下:
using Lezhima.Storage.Consumer; using Microsoft.Extensions.Configuration; using System; using System.IO; namespace Lezhima.Storage { class Program { static void Main(string[] args) { var conf = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("appsettings.json", true, true) .Build(); //调用接收者 RabbitMQHelp.ReceiveMessage(RabbitMQHelp.actionLogExchange, new LogConsumer() ); Console.ReadLine(); } } }
总结
1、基于MassTransit库使得我们使用RabbitMQ变得更简洁、方便。而基于再次封装后,生产者与消费者将不需要关注具体的业务,也跟业务代码解耦了,更能适应项目的需要。
2、RabbitMQ的交换器需在其管理后台自行创建,而这里使用的fanout类型是因为其发送速度最快,且能满足我的项目需要,各位可视自身情况选用不同的类型。fanout类型不会存储消息,必需要消费者绑定交换器后才会发送给消费者。
声明
本文为作者原创,转载请备注出处与保留原文地址,谢谢。如文章能给您带来帮助,请点下推荐或关注,感谢您的支持!