.net core使用rabbitmq消息队列

  看博文的朋友,本文有些过时了,还有些BUG,如果想了解更多用法,看看这篇吧:.net core使用rabbitmq消息队列 (二)

  

  首先,如果你还没有安装好rabbitmq,可以参考我的博客:

  Ubuntu16.04下,erlang安装和rabbitmq安装步骤

  Ubuntu16.04下,rabbimq集群搭建

  另外,我的另外一篇博客有介绍rabbitmq的基础用法以及使用C#操作rabbitmq,并且对rabbitmq有一个简单的封装,这里使用.net core操作rabbitmq也会使用到这些封装类,所以感兴趣的可以看看:

  C# .net 环境下使用rabbitmq消息队列

  好了现在开始我们的正文

  Rabbitmq一般使用

  说明一下,这里我们使用的是.net core 2.1

  我们先创建一个RabbitMQDemo项目(我创建的是MVC项目),然后使用nuget安装RabbitMQ.Client:

  .net core使用rabbitmq消息队列

  将上面封装的rabbitmq操作类添加到项目中:   

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using System;
using System.Collections.Generic;
using System.Text; namespace RabbitMQDemo
{
public class QueueOptions
{
/// <summary>
/// 是否持久化
/// </summary>
public bool Durable { get; set; } = true;
/// <summary>
/// 是否自动删除
/// </summary>
public bool AutoDelete { get; set; } = false;
/// <summary>
/// 参数
/// </summary>
public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>();
}
public class ConsumeQueueOptions : QueueOptions
{
/// <summary>
/// 是否自动提交
/// </summary>
public bool AutoAck { get; set; } = false;
/// <summary>
/// 每次发送消息条数
/// </summary>
public ushort? FetchCount { get; set; }
}
public class ExchangeConsumeQueueOptions : ConsumeQueueOptions
{
/// <summary>
/// 路由值
/// </summary>
public string[] RoutingKeys { get; set; }
/// <summary>
/// 参数
/// </summary>
public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>();
}
public class ExchangeQueueOptions : QueueOptions
{
/// <summary>
/// 交换机类型
/// </summary>
public string Type { get; set; }
/// <summary>
/// 队列及路由值
/// </summary>
public (string, string)[] QueueAndRoutingKey { get; set; }
/// <summary>
/// 参数
/// </summary>
public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>();
}
}

QueueOptions

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using System;
using System.Collections.Generic;
using System.Text; namespace RabbitMQDemo
{
public static class RabbitMQExchangeType
{
/// <summary>
/// 普通模式
/// </summary>
public const string Common = "";
/// <summary>
/// 路由模式
/// </summary>
public const string Direct = "direct";
/// <summary>
/// 发布/订阅模式
/// </summary>
public const string Fanout = "fanout";
/// <summary>
/// 匹配订阅模式
/// </summary>
public const string Topic = "topic";
}
}

RabbitMQDemo

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text; namespace RabbitMQDemo
{
public abstract class RabbitBase : IDisposable
{
List<AmqpTcpEndpoint> amqpList;
IConnection connection; protected RabbitBase(params string[] hosts)
{
if (hosts == null || hosts.Length == 0)
{
throw new ArgumentException("invalid hosts!", nameof(hosts));
} this.amqpList = new List<AmqpTcpEndpoint>();
this.amqpList.AddRange(hosts.Select(host => new AmqpTcpEndpoint(host, Port)));
}
protected RabbitBase(params (string, int)[] hostAndPorts)
{
if (hostAndPorts == null || hostAndPorts.Length == 0)
{
throw new ArgumentException("invalid hosts!", nameof(hostAndPorts));
} this.amqpList = new List<AmqpTcpEndpoint>();
this.amqpList.AddRange(hostAndPorts.Select(tuple => new AmqpTcpEndpoint(tuple.Item1, tuple.Item2)));
} /// <summary>
/// 端口
/// </summary>
public int Port { get; set; } = 5672;
/// <summary>
/// 账号
/// </summary>
public string UserName { get; set; } = ConnectionFactory.DefaultUser;
/// <summary>
/// 密码
/// </summary>
public string Password { get; set; } = ConnectionFactory.DefaultPass;
/// <summary>
/// 虚拟机
/// </summary>
public string VirtualHost { get; set; } = ConnectionFactory.DefaultVHost; /// <summary>
/// 释放
/// </summary>
public virtual void Dispose()
{
//connection?.Close();
//connection?.Dispose();
}
/// <summary>
/// 关闭连接
/// </summary>
public void Close()
{
connection?.Close();
connection?.Dispose();
} #region Private
/// <summary>
/// 获取rabbitmq的连接
/// </summary>
/// <returns></returns>
protected IModel GetChannel()
{
if (connection == null)
{
lock (this)
{
if (connection == null)
{
var factory = new ConnectionFactory();
factory.Port = Port;
factory.UserName = UserName;
factory.VirtualHost = VirtualHost;
factory.Password = Password;
connection = factory.CreateConnection(this.amqpList);
}
}
}
return connection.CreateModel();
} #endregion
}
}

RabbitBase

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text; namespace RabbitMQDemo
{
public class RabbitMQProducer : RabbitBase
{
public RabbitMQProducer(params string[] hosts) : base(hosts)
{ }
public RabbitMQProducer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
{ } #region 普通模式、Work模式
/// <summary>
/// 发布消息
/// </summary>
/// <param name="queue"></param>
/// <param name="message"></param>
/// <param name="options"></param>
public void Publish(string queue, string message, QueueOptions options = null)
{
options = options ?? new QueueOptions();
var channel = GetChannel();
channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
var buffer = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", queue, null, buffer);
channel.Close();
}
/// <summary>
/// 发布消息
/// </summary>
/// <param name="queue"></param>
/// <param name="message"></param>
/// <param name="configure"></param>
public void Publish(string queue, string message, Action<QueueOptions> configure)
{
QueueOptions options = new QueueOptions();
configure?.Invoke(options);
Publish(queue, message, options);
}
#endregion
#region 订阅模式、路由模式、Topic模式
/// <summary>
/// 发布消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="routingKey"></param>
/// <param name="message"></param>
/// <param name="options"></param>
public void Publish(string exchange, string routingKey, string message, ExchangeQueueOptions options = null)
{
options = options ?? new ExchangeQueueOptions();
var channel = GetChannel();
channel.ExchangeDeclare(exchange, string.IsNullOrEmpty(options.Type) ? RabbitMQExchangeType.Fanout : options.Type, options.Durable, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
if (options.QueueAndRoutingKey != null)
{
foreach (var t in options.QueueAndRoutingKey)
{
if (!string.IsNullOrEmpty(t.Item1))
{
channel.QueueBind(t.Item1, exchange, t.Item2 ?? "", options.BindArguments ?? new Dictionary<string, object>());
}
}
}
var buffer = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange, routingKey, null, buffer);
channel.Close();
}
/// <summary>
/// 发布消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="routingKey"></param>
/// <param name="message"></param>
/// <param name="configure"></param>
public void Publish(string exchange, string routingKey, string message, Action<ExchangeQueueOptions> configure)
{
ExchangeQueueOptions options = new ExchangeQueueOptions();
configure?.Invoke(options);
Publish(exchange, routingKey, message, options);
}
#endregion
}
}

RabbitMQProducer

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading; namespace RabbitMQDemo
{
public class RabbitMQConsumer : RabbitBase
{
public RabbitMQConsumer(params string[] hosts) : base(hosts)
{ }
public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
{ } public event Action<RecieveResult> Received; /// <summary>
/// 构造消费者
/// </summary>
/// <param name="channel"></param>
/// <param name="options"></param>
/// <returns></returns>
private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options)
{
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
if (!options.AutoAck)
{
cancellationTokenSource.Token.Register(() =>
{
channel.BasicAck(e.DeliveryTag, false);
});
}
Received?.Invoke(new RecieveResult(e, cancellationTokenSource));
}
catch { }
};
if (options.FetchCount != null)
{
channel.BasicQos(0, options.FetchCount.Value, false);
}
return consumer;
} #region 普通模式、Work模式
/// <summary>
/// 消费消息
/// </summary>
/// <param name="queue"></param>
/// <param name="options"></param>
public ListenResult Listen(string queue, ConsumeQueueOptions options = null)
{
options = options ?? new ConsumeQueueOptions();
var channel = GetChannel();
channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
var consumer = ConsumeInternal(channel, options);
channel.BasicConsume(queue, options.AutoAck, consumer);
ListenResult result = new ListenResult();
result.Token.Register(() =>
{
try
{
channel.Close();
channel.Dispose();
}
catch { }
});
return result;
}
/// <summary>
/// 消费消息
/// </summary>
/// <param name="queue"></param>
/// <param name="configure"></param>
public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure)
{
ConsumeQueueOptions options = new ConsumeQueueOptions();
configure?.Invoke(options);
return Listen(queue, options);
}
#endregion
#region 订阅模式、路由模式、Topic模式
/// <summary>
/// 消费消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="queue"></param>
/// <param name="options"></param>
public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null)
{
options = options ?? new ExchangeConsumeQueueOptions();
var channel = GetChannel();
channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange))
{
foreach (var key in options.RoutingKeys)
{
channel.QueueBind(queue, exchange, key, options.BindArguments);
}
}
var consumer = ConsumeInternal(channel, options);
channel.BasicConsume(queue, options.AutoAck, consumer);
ListenResult result = new ListenResult();
result.Token.Register(() =>
{
try
{
channel.Close();
channel.Dispose();
}
catch { }
});
return result;
}
/// <summary>
/// 消费消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="queue"></param>
/// <param name="configure"></param>
public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure)
{
ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions();
configure?.Invoke(options);
return Listen(exchange, queue, options);
}
#endregion
}
public class RecieveResult
{
CancellationTokenSource cancellationTokenSource;
public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource)
{
this.Body = Encoding.UTF8.GetString(arg.Body);
this.ConsumerTag = arg.ConsumerTag;
this.DeliveryTag = arg.DeliveryTag;
this.Exchange = arg.Exchange;
this.Redelivered = arg.Redelivered;
this.RoutingKey = arg.RoutingKey;
this.cancellationTokenSource = cancellationTokenSource;
} /// <summary>
/// 消息体
/// </summary>
public string Body { get; private set; }
/// <summary>
/// 消费者标签
/// </summary>
public string ConsumerTag { get; private set; }
/// <summary>
/// Ack标签
/// </summary>
public ulong DeliveryTag { get; private set; }
/// <summary>
/// 交换机
/// </summary>
public string Exchange { get; private set; }
/// <summary>
/// 是否Ack
/// </summary>
public bool Redelivered { get; private set; }
/// <summary>
/// 路由
/// </summary>
public string RoutingKey { get; private set; } public void Commit()
{
if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return; cancellationTokenSource.Cancel();
cancellationTokenSource.Dispose();
cancellationTokenSource = null;
}
}
public class ListenResult
{
CancellationTokenSource cancellationTokenSource; /// <summary>
/// CancellationToken
/// </summary>
public CancellationToken Token { get { return cancellationTokenSource.Token; } }
/// <summary>
/// 是否已停止
/// </summary>
public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } } public ListenResult()
{
cancellationTokenSource = new CancellationTokenSource();
} /// <summary>
/// 停止监听
/// </summary>
public void Stop()
{
cancellationTokenSource.Cancel();
}
}
}

RabbitMQConsumer

  修改Startup,在ConfigureServices中将rabbitmq的生产类RabbitMQProducer加入到DI容器中:   

  //将rabbitmq的生产类加入到DI容器中
  var producer = new RabbitMQProducer("192.168.187.129");
  producer.Password = "123456";
  producer.UserName = "admin";
  services.AddSingleton(producer);//这里我没有使用集群

  至于消息的消费,我们可以将消费者使用一个线程启动并监听,但是这里我个人推荐使用.net core 自带的HostedService去实现,至于消费者的功能,我们就简单的将消息记录都文本文档中,类似日志记录。

  我们创建一个RabbitHostedService类:   

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.IO; namespace RabbitMQDemo
{
public class RabbitHostedService : IHostedService
{
RabbitMQConsumer consumer; public RabbitHostedService()
{
consumer = new RabbitMQConsumer("192.168.187.129");
consumer.Password = "123456";
consumer.UserName = "admin";
} /// <summary>
/// 服务启动
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task StartAsync(CancellationToken cancellationToken)
{
await Task.Run(() =>
{
consumer.Received += new Action<RecieveResult>(result =>
{
//文件路径
string path = Path.Combine(Directory.GetCurrentDirectory(), "logs");
if (!Directory.Exists(path))
{
Directory.CreateDirectory(path);
} //文件
string fileName = Path.Combine(path, $"{DateTime.Now.ToString("yyyyMMdd")}.log");
string message = $"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}接收到消息:{result.Body}{Environment.NewLine}";
File.AppendAllText(fileName, message); //提交
result.Commit();
}); consumer.Listen("queue1", options =>
{
options.AutoAck = false;
options.Arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
options.AutoDelete = false;
options.Durable = true;
});
});
}
/// <summary>
/// 服务停止
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task StopAsync(CancellationToken cancellationToken)
{
var task = Task.Run(() =>
{
consumer.Close();
}); await Task.WhenAny(task, Task.Delay(-1, cancellationToken));
cancellationToken.ThrowIfCancellationRequested();
}
}
}

RabbitHostedService

  同时,我们需要在Startup中将RabbitHostedService注入到容器中:  

  //注入消费者
services.AddSingleton<IHostedService, RabbitHostedService>();

  得到Startup的代码如下:  

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.HttpsPolicy;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; namespace RabbitMQDemo
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
} public IConfiguration Configuration { get; } // This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.Configure<CookiePolicyOptions>(options =>
{
// This lambda determines whether user consent for non-essential cookies is needed for a given request.
options.CheckConsentNeeded = context => true;
options.MinimumSameSitePolicy = SameSiteMode.None;
}); //将rabbitmq的生产类加入到DI容器中
var producer = new RabbitMQProducer("192.168.187.129");
producer.Password = "123456";
producer.UserName = "admin";
services.AddSingleton(producer);//这里我没有使用集群
//注入消费者
services.AddSingleton<IHostedService, RabbitHostedService>(); services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
} // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, Microsoft.Extensions.Hosting.IHostingEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
else
{
app.UseExceptionHandler("/Home/Error");
app.UseHsts();
} app.UseHttpsRedirection();
app.UseStaticFiles();
app.UseCookiePolicy(); app.UseMvc(routes =>
{
routes.MapRoute(
name: "default",
template: "{controller=Home}/{action=Index}/{id?}");
});
}
}
}

Startup

  到这里,.net core 集成rabbitmq就写好了,然后就是发送消息使用了,我们添加一个名为RabbitController的控制器,里面代码如下:   

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using RabbitMQDemo.Models; namespace RabbitMQDemo.Controllers
{
public class RabbitController : Controller
{
RabbitMQProducer producer;
public RabbitController(RabbitMQProducer producer)
{
this.producer = producer;
} /// <summary>
/// 首页
/// </summary>
/// <returns></returns>
public IActionResult Index()
{
return View();
}
/// <summary>
/// 消息提交
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public IActionResult Submit(string message)
{
if (!string.IsNullOrEmpty(message))
{
//发送消息
producer.Publish("queue1", message, options =>
{
options.Arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
options.AutoDelete = false;
options.Durable = true;
});
}
return View("Index");
}
}
}

RabbitController

  RabbitController里面有两个Action,它们返回同一个视图:    

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
@{
Layout = null;
} @using (Html.BeginForm("Submit", "Rabbit"))
{
@Html.DisplayName("消息:");
<br />
@Html.TextArea("message", new { @class = "multieditbox" })
<br />
<input type="submit" class="buttoncss" value="确定" />
}
<style type="text/css">
.buttoncss {
font-family: "tahoma", "宋体"; /*www.52css.com*/
font-size: 9pt;
color: #003399;
border: 1px #003399 solid;
color: #006699;
border-bottom: #93bee2 1px solid;
border-left: #93bee2 1px solid;
border-right: #93bee2 1px solid;
border-top: #93bee2 1px solid;
background-image: url(../images/bluebuttonbg.gif);
background-color: #e8f4ff;
font-style: normal;
width: 60px;
height: 22px;
} .multieditbox {
background: #f8f8f8;
border-bottom: #b7b7b7 1px solid;
border-left: #b7b7b7 1px solid;
border-right: #b7b7b7 1px solid;
border-top: #b7b7b7 1px solid;
color: #000000;
cursor: text;
font-family: "arial";
font-size: 9pt;
padding: 1px; /*www.52css.com*/
width: 200px;
height: 80px;
}
</style>

Index

  现在可以启动项目,输入http://localhost:5000/Rabbit就可以进入页面测试了:

  .net core使用rabbitmq消息队列

  点击上面确定之后,你会发现在项目根目录下生成了一个logs目录,里面有一个文件,文件里面就是我们发送的消息了

    注:如果报错,可以登录rabbitmq后台查看账号虚拟机权限是否存在

  .net core使用rabbitmq消息队列

  Rabbitmq日志记录

  上面是我们使用.net core集成rabbitmq的一种简单方式,但是不建议在开发时这么使用

  可以注意到,上面的例子我直接将RabbitMQProducer注入到容器中,但开发时应该按自己的需求对RabbitMQProducer做一层封装,然后将封装类注入到容器中,比如我们要使用rabbitmq做日志记录,可以记录到数据库,也可以记录到文件中去,但是.net core为我们提供了一整套的日志记录功能,因此我们只需要将rabbitmq集成进去就可以了

  首先,我们需要创建几个类,将rabbitmq继承到日志记录功能中去:  

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks; namespace RabbitMQDemo
{
public abstract class RabbitMQOptions
{
/// <summary>
/// 服务节点
/// </summary>
public string[] Hosts { get; set; }
/// <summary>
/// 端口
/// </summary>
public int Port { get; set; }
/// <summary>
/// 账号
/// </summary>
public string UserName { get; set; }
/// <summary>
/// 密码
/// </summary>
public string Password { get; set; }
/// <summary>
/// 虚拟机
/// </summary>
public string VirtualHost { get; set; }
/// <summary>
/// 是否持久化
/// </summary>
public bool Durable { get; set; } = true;
/// <summary>
/// 是否自动删除
/// </summary>
public bool AutoDelete { get; set; } = false;
/// <summary>
/// 队列
/// </summary>
public string Queue { get; set; }
/// <summary>
/// 交换机
/// </summary>
public string Exchange { get; set; }
/// <summary>
/// 交换机类型,放空则为普通模式
/// </summary>
public string Type { get; set; }
/// <summary>
/// 参数
/// </summary>
public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>();
}
public class RabbitMQLoggerOptions : RabbitMQOptions
{
/// <summary>
/// 最低日志记录
/// </summary>
public LogLevel MinLevel { get; set; } = LogLevel.Information;
/// <summary>
/// 分类
/// </summary>
public string Category { get; set; } = "Rabbit";
}
public class RabbitMQConsumerOptions : RabbitMQOptions
{
/// <summary>
/// 是否自动提交
/// </summary>
public bool AutoAck { get; set; } = false;
/// <summary>
/// 每次发送消息条数
/// </summary>
public ushort? FetchCount { get; set; }
}
}

RabbitMQOptions

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks; namespace RabbitMQDemo
{
public class RabbitLoggerProvider : ILoggerProvider
{
RabbitMQLoggerOptions loggerOptions; public RabbitLoggerProvider(IOptionsMonitor<RabbitMQLoggerOptions> options)
{
loggerOptions = options.CurrentValue;
} /// <summary>
/// 创建Logger对象
/// </summary>
/// <param name="categoryName"></param>
/// <returns></returns>
public ILogger CreateLogger(string categoryName)
{
//可缓存实例,这里略过了
return new RabbitLogger(categoryName, loggerOptions);
} /// <summary>
/// 释放
/// </summary>
public void Dispose()
{ }
}
}

RabbitLoggerProvider

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks; namespace RabbitMQDemo
{
public class RabbitLogger : ILogger, IDisposable
{
string category;
RabbitMQLoggerOptions loggerOptions;
RabbitMQProducer producer; public RabbitLogger(string category, RabbitMQLoggerOptions options)
{
this.category = category;
this.loggerOptions = options; producer = new RabbitMQProducer(options.Hosts);
producer.Password = options.Password;
producer.UserName = options.UserName;
producer.Port = options.Port;
producer.VirtualHost = options.VirtualHost;
} public IDisposable BeginScope<TState>(TState state)
{
return this;
}
/// <summary>
/// 释放
/// </summary>
public void Dispose()
{
GC.Collect();
}
/// <summary>
/// 是否记录日志
/// </summary>
/// <param name="logLevel"></param>
/// <returns></returns>
public bool IsEnabled(LogLevel logLevel)
{
//只记录日志等级大于指定最小等级且属于Rabbit分类的日志
return logLevel >= loggerOptions.MinLevel && category.Contains(loggerOptions.Category, StringComparison.OrdinalIgnoreCase);
} /// <summary>
/// 日志记录
/// </summary>
/// <typeparam name="TState"></typeparam>
/// <param name="logLevel"></param>
/// <param name="eventId"></param>
/// <param name="state"></param>
/// <param name="exception"></param>
/// <param name="formatter"></param>
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
if (IsEnabled(logLevel))
{
string message = "";
if (state != null)
{
message = state.ToString();
}
if (exception != null)
{
message += Environment.NewLine + formatter?.Invoke(state, exception);
}
//发送消息
producer.Publish(loggerOptions.Queue, message, options =>
{
options.Arguments = loggerOptions.Arguments;
options.AutoDelete = loggerOptions.AutoDelete;
options.Durable = loggerOptions.Durable;
});
}
}
}
}

RabbitLogger

  接着,我们修改Startup的服务对象:  

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.HttpsPolicy;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; namespace RabbitMQDemo
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
} public IConfiguration Configuration { get; } // This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.Configure<CookiePolicyOptions>(options =>
{
// This lambda determines whether user consent for non-essential cookies is needed for a given request.
options.CheckConsentNeeded = context => true;
options.MinimumSameSitePolicy = SameSiteMode.None;
}); //配置消息发布
services.Configure<RabbitMQLoggerOptions>(options =>
{
options.Category = "Rabbit";
options.Hosts = new string[] { "192.168.187.129" };
options.MinLevel = Microsoft.Extensions.Logging.LogLevel.Information;
options.Password = "123456";
options.Port = 5672;
options.Queue = "queue1";
options.UserName = "admin";
options.VirtualHost = "/";
options.Arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
options.AutoDelete = false;
options.Durable = true;
});
//将RabbitLoggerProvider加入到容器中
services.AddSingleton<ILoggerProvider, RabbitLoggerProvider>(); //配置消息消费
services.Configure<RabbitMQConsumerOptions>(options =>
{
options.Hosts = new string[] { "192.168.187.129" };
options.Password = "123456";
options.Port = 5672;
options.Queue = "queue1";
options.UserName = "admin";
options.VirtualHost = "/";
options.Arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
options.AutoDelete = false;
options.Durable = true;
options.AutoAck = false;
});
//注入消费者
services.AddSingleton<IHostedService, RabbitHostedService>(); services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
} // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, Microsoft.Extensions.Hosting.IHostingEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
else
{
app.UseExceptionHandler("/Home/Error");
app.UseHsts();
} app.UseHttpsRedirection();
app.UseStaticFiles();
app.UseCookiePolicy(); app.UseMvc(routes =>
{
routes.MapRoute(
name: "default",
template: "{controller=Home}/{action=Index}/{id?}");
});
}
}
}

Startup

  顺带提一下,这个Startup中服务最好使用拓展方法去实现,这里我是为了简单没有采用拓展方法,所以在开发时多注意一下吧

  另外,我们也可以调整一下RabbitHostedService:  

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
using Microsoft.Extensions.Options; namespace RabbitMQDemo
{
public class RabbitHostedService : IHostedService
{
RabbitMQConsumerOptions consumerOptions;
RabbitMQConsumer consumer; public RabbitHostedService(IOptions<RabbitMQConsumerOptions> options)
{
consumerOptions = options.Value; consumer = new RabbitMQConsumer(consumerOptions.Hosts);
consumer.Password = consumerOptions.Password;
consumer.UserName = consumerOptions.UserName;
consumer.VirtualHost = consumerOptions.VirtualHost;
consumer.Port = consumerOptions.Port;
} /// <summary>
/// 服务启动
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task StartAsync(CancellationToken cancellationToken)
{
await Task.Run(() =>
{
consumer.Received += new Action<RecieveResult>(result =>
{
//文件路径
string path = Path.Combine(Directory.GetCurrentDirectory(), "logs");
if (!Directory.Exists(path))
{
Directory.CreateDirectory(path);
} //文件
string fileName = Path.Combine(path, $"{DateTime.Now.ToString("yyyyMMdd")}.log");
string message = $"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}接收到消息:{result.Body}{Environment.NewLine}";
File.AppendAllText(fileName, message); //提交
result.Commit();
}); consumer.Listen(consumerOptions.Queue, options =>
{
options.AutoAck = consumerOptions.AutoAck;
options.Arguments = consumerOptions.Arguments;
options.AutoDelete = consumerOptions.AutoDelete;
options.Durable = consumerOptions.Durable;
});
});
}
/// <summary>
/// 服务停止
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task StopAsync(CancellationToken cancellationToken)
{
var task = Task.Run(() =>
{
consumer.Close();
}); await Task.WhenAny(task, Task.Delay(-1, cancellationToken));
cancellationToken.ThrowIfCancellationRequested();
}
}
}

RabbitHostedService

  到这里,rabbitmq就被继承到.net core的日志记录功能中去了,但是这里面为了避免记录不要要的日志,我在其中添加了一个限制,rabbitmq只记录categoryName包含Rabbit的日志,这样一来,我们就可以在我们的控制器中使用:  

  .net core使用rabbitmq消息队列.net core使用rabbitmq消息队列
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using RabbitMQDemo.Models; namespace RabbitMQDemo.Controllers
{
public class RabbitController : Controller
{
ILogger logger;
public RabbitController(ILoggerFactory loggerFactory)
{
//下面的logger的categoryName=typeof(RabbitController).FullName=RabbitMQDemo.Controllers.RabbitController
logger = loggerFactory.CreateLogger<RabbitController>();
} /// <summary>
/// 首页
/// </summary>
/// <returns></returns>
public IActionResult Index()
{
return View();
}
/// <summary>
/// 消息提交
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public IActionResult Submit(string message)
{
if (!string.IsNullOrEmpty(message))
{
//发送消息,这里也可以检验日志级别的过滤
logger.LogCritical($"Log from Critical:{message}");
logger.LogDebug($"Log from Debug:{message}");
logger.LogError($"Log from Error:{message}");
logger.LogInformation($"Log from Information:{message}");
logger.LogTrace($"Log from Trace:{message}");
logger.LogWarning($"Log from Warning:{message}");
}
return View("Index");
}
}
}

RabbitController

  现在可以启动项目,输入http://localhost:5000/Rabbit就可以进入页面测试了,可以发现功能和上面是一样的

  细心的朋友可能会发现,本博文内使用的rabbitmq其实是它6中模式中最简单的一种模式:hello world模式,读者可以将上面的代码稍作修改,即可变成其他几种模式,但在现实开发中,具体使用哪种模式需要根据自己的业务需求定。 

  其实,有一个很好用的第三方封装好的插件,可以让我们很方便的操作rabbitmq,那就是EasyNetQ,可以使用一个消息总栈IBus来进行消息的操作,包含一系列消息模式:Publish/Subscribe, Request/Response和 Send/Receive,这些消息模式也是我们最常用的,所以以后有空再写

上一篇:RabbitMQ消息队列(二)-RabbitMQ消息队列架构与基本概念


下一篇:【原+转】用CMake代替makefile进行跨平台交叉编译