消息队列:消息队列是一种进程间通信或同一进程的不同线程间的通信方式。消息队列提供了异步的通信协议,每一个队列中的记录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。最终可以实现解耦的目的。
废话不多说,上代码:
1.事件模型的基类对象:
using System; namespace RabbitMqEventBus.EventBus { /// <summary> /// 事件模型 /// </summary> public class IntegrationEvent { public IntegrationEvent() { Id = Guid.NewGuid(); CreateTime = DateTime.Now; } public IntegrationEvent(Guid id, DateTime createTime) { Id = id; CreateTime = createTime; } public Guid Id { get; set; } public DateTime CreateTime { get; set; } } }
2.事件的基类
using System.Threading.Tasks; namespace RabbitMqEventBus.EventBus { public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler where TIntegrationEvent : IntegrationEvent { Task Handle(TIntegrationEvent @event); } public interface IIntegrationEventHandler { } }
3.消息总线的接口
namespace RabbitMqEventBus.EventBus { public interface IEventBus { /// <summary> /// 发布事件 /// </summary> /// <param name="event"></param> void Publish(IntegrationEvent @event); /// <summary> /// 订阅事件 /// </summary> /// <typeparam name="T"></typeparam> /// <typeparam name="TH"></typeparam> void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>; } }
注意订阅和发布的基类对象都是IntegrationEvent,就是我们第一步定义的对象,后续的业务实现对象都是继承此父类对象
4.RabbitMq的连接接口类(需要引入RabbitMQ.Client nuget包)
using RabbitMQ.Client; using System; namespace RabbitMqEventBus.RabbitMqPersistent { public interface IRabbitMQPersistentConnection : IDisposable { bool IsConnectned { get; } bool TryConnect(); IModel CreateModel(); } }
5.RabbitMQ的连接实现:
using Microsoft.Extensions.Logging; using Polly; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using System; using System.IO; using System.Net.Sockets; namespace RabbitMqEventBus.RabbitMqPersistent { public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection { bool _dispose; object lock_obj = new object(); private IConnection _connection; private readonly IConnectionFactory _connectionFactory; private readonly int _retryCount; private readonly ILogger<RabbitMQPersistentConnection> _logger; public RabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQPersistentConnection> logger, int RetryCount) { if (connectionFactory == null) throw new ArgumentNullException(nameof(connectionFactory)); if (logger == null) throw new ArgumentNullException(nameof(ILogger)); _connectionFactory = connectionFactory; _retryCount = RetryCount; _logger = logger; } public IModel CreateModel() { if (!IsConnectned) { throw new InvalidOperationException("No RabbitMq Connection are available to perform this action"); } return _connection.CreateModel(); } public void Dispose() { if (_dispose) return; _dispose = true; try { _connection.Dispose(); } catch (IOException ex) { _logger.LogCritical(ex.ToString()); } } public bool TryConnect() { _logger.LogInformation("RabbitMq Client is trying to connect"); lock (lock_obj) { var policy = Policy.Handle<SocketException>() .Or<BrokerUnreachableException>() .WaitAndRetry(_retryCount, retryCount => TimeSpan.FromSeconds(Math.Pow(2, retryCount)), (ex, time) => { _logger.LogWarning(ex, "RabbitMq Client cannot connect after {TimeOut}s ({Exceptiopn})"); }); policy.Execute(() => { _connection = _connectionFactory.CreateConnection(); }); if (IsConnectned) { _connection.ConnectionShutdown += _connection_ConnectionShutdown; _connection.CallbackException += _connection_CallbackException; _connection.ConnectionBlocked += _connection_ConnectionBlocked; _logger.LogInformation("RabbitMq Client acquired a persistent connection to "); return true; } else { _logger.LogCritical("RabbitMq Client connect error"); return false; } } } private void _connection_ConnectionBlocked(object sender, RabbitMQ.Client.Events.ConnectionBlockedEventArgs e) { if (_dispose) return; _logger.LogInformation("A rabbitMq Client Connection is ShutDown,trying to re-connect..."); TryConnect(); } private void _connection_CallbackException(object sender, RabbitMQ.Client.Events.CallbackExceptionEventArgs e) { if (_dispose) return; _logger.LogInformation("A rabbitMq Client Connection is throw exception"); TryConnect(); } private void _connection_ConnectionShutdown(object sender, ShutdownEventArgs e) { if (_dispose) return; _logger.LogInformation("A rabbit Client Connection is ShutDown,trying to re-connect...."); TryConnect(); } public bool IsConnectned => _connection != null && _connection.IsOpen && !_dispose; } }
6.上面第3步的事件总线的实现
using Autofac; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Polly; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMqEventBus.EventBus; using System; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; namespace RabbitMqEventBus.RabbitMqPersistent { public class EventBusRabbitMq : IEventBus { const string BROKER_NAME = "blogcore_event_bus"; private readonly IRabbitMQPersistentConnection _rabbitMqPersistentConnection; private readonly ILogger<EventBusRabbitMq> _logger; private readonly ILifetimeScope _scope; private readonly IEventBusSubscriptionsManager _subsManager; private readonly string AUTOFAC_SCOPE_NAME = "blogcore_event_bus"; private readonly int _retryCount; private string _queueName; private IModel _consumerChannel; public EventBusRabbitMq(IRabbitMQPersistentConnection rabbitMQPersistentConnection, ILogger<EventBusRabbitMq> logger, ILifetimeScope scope, IEventBusSubscriptionsManager subsManager, string queueName = null, int tryCount = 5 ) { _rabbitMqPersistentConnection = rabbitMQPersistentConnection; _logger = logger; _scope = scope; _queueName = queueName; _retryCount = tryCount; _subsManager = subsManager; _consumerChannel = CreateConsumerChannel(); _subsManager.OnEventRemoveHandler += _subsManager_OnEventRemoveHandler; } private void _subsManager_OnEventRemoveHandler(object sender, string eventName) { if (!_rabbitMqPersistentConnection.IsConnectned) { _rabbitMqPersistentConnection.TryConnect(); } using (var channel = _rabbitMqPersistentConnection.CreateModel()) { channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); if (_subsManager.IsEmpty) { _queueName = string.Empty; _consumerChannel.Close(); } } } public void Publish(IntegrationEvent @event) { if (!_rabbitMqPersistentConnection.IsConnectned) { _rabbitMqPersistentConnection.TryConnect(); } var polly = Policy.Handle<RabbitMQ.Client.Exceptions.BrokerUnreachableException>() .Or<SocketException>() .WaitAndRetry(_retryCount, _retryCount => TimeSpan.FromSeconds(Math.Pow(2, _retryCount)), (ex, time) => { _logger.LogInformation(ex, ""); }); var eventName = @event.GetType().Name; using var channel = _rabbitMqPersistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, ExchangeType.Direct); var message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); polly.Execute(() => { var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, mandatory: true, basicProperties: properties, body: body); }); } public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var eventName = _subsManager.GetEventKey<T>(); DoInternalSubscription(eventName); _subsManager.AddSubscription<T, TH>(); StartBasicConsume(); } private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey) { if (!_rabbitMqPersistentConnection.IsConnectned) { _rabbitMqPersistentConnection.TryConnect(); } using (var channel = _rabbitMqPersistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } } private IModel CreateConsumerChannel() { if (!_rabbitMqPersistentConnection.IsConnectned) { _rabbitMqPersistentConnection.TryConnect(); } _logger.LogTrace("Createing RabbitMq Consumer channel"); var channel = _rabbitMqPersistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, false);//prefechCount=1表示不要在同一时间给一个工作者发送多于1个的消息 //channel.QueueBind(_queueName,exchange:BROKER_NAME); channel.CallbackException += (sender, e) => { _logger.LogWarning(e.Exception, "Recreating RabbitMQ Consumer channel"); _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); StartBasicConsume(); }; return channel; } private void StartBasicConsume() { _logger.LogTrace("Starting RabbitMQ basic Consume..."); if (_consumerChannel != null) { var consumer = new AsyncEventingBasicConsumer(_consumerChannel); consumer.Received += Consumer_Received; _consumerChannel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer); } else { _logger.LogError("StartBasicConsume cannot call on_consumeChannel == null"); } } private async Task Consumer_Received(object sender, BasicDeliverEventArgs @event) { var eventName = @event.RoutingKey; var message = Encoding.UTF8.GetString(@event.Body.Span); try { if (message.ToLowerInvariant().Contains("throw-fake-exception")) { throw new InvalidOperationException($"Fake exception requested:\"{message}\""); } await ProcessEvent(eventName, message); } catch (Exception ex) { _logger.LogWarning(ex, "--- Error Processing message ", message); } _consumerChannel.BasicAck(@event.DeliveryTag, multiple: false); } private async Task ProcessEvent(string eventName, string message) { _logger.LogTrace("Processing RabbitMq event:{EventName}", eventName); if (_subsManager.HasSubscriptionsForEvent(eventName)) { using var scope = _scope.BeginLifetimeScope(AUTOFAC_SCOPE_NAME); var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; if (handler == null) continue; dynamic obj = JObject.Parse(message); await handler.Handle(obj); } else { var handler = scope.ResolveOptional(subscription.HandlerType); if (handler == null) continue; var eventType = _subsManager.GetEventTypeName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } else { _logger.LogWarning("No Subscription for RabbitMQ event:", eventName); } } } }
这步中有个核心的接口类 IEventBusSubscriptionsManager
6.1 IEventBusSubscriptionsManager接口()
using RabbitMqEventBus.EventBusSubscriptions; using System; using System.Collections.Generic; namespace RabbitMqEventBus.EventBus { public interface IEventBusSubscriptionsManager { bool IsEmpty { get; } event EventHandler<string> OnEventRemoveHandler; void AddSubscription<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>; void AddDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; bool HasSubscriptionsForEvent(string eventName); string GetEventKey<T>(); IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName); Type GetEventTypeName(string eventName); } }
6.2 IEventBusSubscriptionsManager接口的实现
using RabbitMqEventBus.EventBus; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace RabbitMqEventBus.EventBusSubscriptions { public class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager { private readonly Dictionary<string, List<SubscriptionInfo>> _handlers; private readonly List<Type> _eventType; public InMemoryEventBusSubscriptionsManager() { _handlers = new Dictionary<string, List<SubscriptionInfo>>(); _eventType = new List<Type>(); } public bool IsEmpty => _handlers == null || !_handlers.Keys.Any(); public event EventHandler<string> OnEventRemoveHandler; public void AddDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { DoAddSubscription(typeof(TH), eventName, isDynamic: true); } public void AddSubscription<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var eventKey = GetEventKey<T>(); DoAddSubscription(typeof(TH), eventKey, false); if (!_eventType.Contains(typeof(T))) { _eventType.Add(typeof(T)); } } private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic) { if (!HasSubscriptionsForEvent(eventName)) { _handlers.Add(eventName, new List<SubscriptionInfo>()); } if (_handlers[eventName].Any(p => p.HandlerType == handlerType)) { throw new ArgumentException($"Handler Type {handlerType.Name} already registered for {eventName}", nameof(handlerType)); } if (isDynamic) { _handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType)); } else { _handlers[eventName].Add(SubscriptionInfo.Typed(handlerType)); } } public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName); public string GetEventKey<T>() { return typeof(T).Name; } public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName) => _handlers[eventName]; public Type GetEventTypeName(string eventName) => _eventType.SingleOrDefault(p => p.Name == eventName); } }
6.3 事件对象
using System; namespace RabbitMqEventBus.EventBusSubscriptions { public class SubscriptionInfo { public bool IsDynamic { get; set; } public Type HandlerType { get; set; } public SubscriptionInfo(bool isDynamic, Type handlerType) { IsDynamic = isDynamic; HandlerType = handlerType; } public static SubscriptionInfo Dynamic(Type handlerType) { return new SubscriptionInfo(true, handlerType); } public static SubscriptionInfo Typed(Type handlerType) { return new SubscriptionInfo(false, handlerType); } } }
7.然后就是服务的注入和使用了
using Autofac; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using RabbitMQ.Client; using RabbitMqEventBus.EventBus; using RabbitMqEventBus.EventBusSubscriptions; using RabbitMqEventBus.EventHandling; using RabbitMqEventBus.RabbitMqPersistent; using System; namespace RabbitMqEventBus.ServiceExtention { public static class EventBusSetup { public static IServiceCollection AddEventBusSetup(this IServiceCollection service) { if (service == null) throw new ArgumentNullException(nameof(IServiceCollection)); service.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); //service.AddTransient<BlogDeleteIntegrationEventHander>(); service.AddSingleton<IRabbitMQPersistentConnection>(sp => { var logger = sp.GetRequiredService<ILogger<RabbitMQPersistentConnection>>(); var factory = new ConnectionFactory() { HostName = "127.0.0.1", DispatchConsumersAsync = true, Password = "guest", UserName = "guest" }; return new RabbitMQPersistentConnection(factory, logger, 5); }); service.AddSingleton<IEventBus, EventBusRabbitMq>(sp => { var rabbitMqConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>(); var lifeTimeScope = sp.GetRequiredService<ILifetimeScope>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMq>>(); var eventBusSubscriotion = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var retryCount = 5; return new EventBusRabbitMq(rabbitMqConnection, logger, lifeTimeScope, eventBusSubscriotion, "Blog_Core_Api", retryCount); }); return service; } public static IApplicationBuilder ConfigureEventBus(this IApplicationBuilder app) { var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); eventBus.Subscribe<BlogDeleteIntegrationModelEvent, BlogDeleteIntegrationEventHander>(); return app; } } }
demo地址: