C#利用RabbitMQ实现消息总线

消息队列:消息队列是一种进程间通信或同一进程的不同线程间的通信方式。消息队列提供了异步的通信协议,每一个队列中的记录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。最终可以实现解耦的目的。

废话不多说,上代码:

1.事件模型的基类对象:

using System;

namespace RabbitMqEventBus.EventBus
{
    /// <summary>
    /// 事件模型
    /// </summary>
    public class IntegrationEvent
    {
        public IntegrationEvent()
        {
            Id = Guid.NewGuid();
            CreateTime = DateTime.Now;
        }
        public IntegrationEvent(Guid id, DateTime createTime)
        {
            Id = id;
            CreateTime = createTime;
        }
        public Guid Id { get; set; }
        public DateTime CreateTime { get; set; }

    }
}

2.事件的基类

using System.Threading.Tasks;

namespace RabbitMqEventBus.EventBus
{
    public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler
        where TIntegrationEvent : IntegrationEvent
    {
        Task Handle(TIntegrationEvent @event);
    }

    public interface IIntegrationEventHandler
    {

    }
}

 

3.消息总线的接口

namespace RabbitMqEventBus.EventBus
{
    public interface IEventBus
    {
        /// <summary>
        /// 发布事件
        /// </summary>
        /// <param name="event"></param>
        void Publish(IntegrationEvent @event);
        /// <summary>
        /// 订阅事件
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <typeparam name="TH"></typeparam>
        void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>;
    }
}
注意订阅和发布的基类对象都是IntegrationEvent,就是我们第一步定义的对象,后续的业务实现对象都是继承此父类对象

4.RabbitMq的连接接口类(需要引入RabbitMQ.Client nuget包)

using RabbitMQ.Client;
using System;

namespace RabbitMqEventBus.RabbitMqPersistent
{
    public interface IRabbitMQPersistentConnection : IDisposable
    {
        bool IsConnectned { get; }
        bool TryConnect();
        IModel CreateModel();
    }
}

5.RabbitMQ的连接实现:

using Microsoft.Extensions.Logging;
using Polly;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using System;
using System.IO;
using System.Net.Sockets;

namespace RabbitMqEventBus.RabbitMqPersistent
{
    public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection
    {
        bool _dispose;
        object lock_obj = new object();
        private IConnection _connection;
        private readonly IConnectionFactory _connectionFactory;
        private readonly int _retryCount;
        private readonly ILogger<RabbitMQPersistentConnection> _logger;
        public RabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQPersistentConnection> logger, int RetryCount)
        {
            if (connectionFactory == null) throw new ArgumentNullException(nameof(connectionFactory));
            if (logger == null) throw new ArgumentNullException(nameof(ILogger));
            _connectionFactory = connectionFactory;
            _retryCount = RetryCount;
            _logger = logger;
        }
        public IModel CreateModel()
        {
            if (!IsConnectned)
            {
                throw new InvalidOperationException("No RabbitMq Connection are available to perform this action");
            }
            return _connection.CreateModel();
        }

        public void Dispose()
        {
            if (_dispose) return;
            _dispose = true;
            try
            {
                _connection.Dispose();
            }
            catch (IOException ex)
            {
                _logger.LogCritical(ex.ToString());
            }
        }

        public bool TryConnect()
        {
            _logger.LogInformation("RabbitMq Client is trying to connect");
            lock (lock_obj)
            {
                var policy = Policy.Handle<SocketException>()
                    .Or<BrokerUnreachableException>()
                    .WaitAndRetry(_retryCount, retryCount => TimeSpan.FromSeconds(Math.Pow(2, retryCount)), (ex, time) =>
                    {
                        _logger.LogWarning(ex, "RabbitMq Client cannot connect after {TimeOut}s ({Exceptiopn})");
                    });
                policy.Execute(() =>
                {
                    _connection = _connectionFactory.CreateConnection();
                });
                if (IsConnectned)
                {
                    _connection.ConnectionShutdown += _connection_ConnectionShutdown;
                    _connection.CallbackException += _connection_CallbackException;
                    _connection.ConnectionBlocked += _connection_ConnectionBlocked;

                    _logger.LogInformation("RabbitMq Client acquired a persistent  connection to ");
                    return true;
                }
                else
                {
                    _logger.LogCritical("RabbitMq Client connect error");
                    return false;
                }
            }
        }

        private void _connection_ConnectionBlocked(object sender, RabbitMQ.Client.Events.ConnectionBlockedEventArgs e)
        {
            if (_dispose) return;
            _logger.LogInformation("A rabbitMq Client Connection is ShutDown,trying to re-connect...");
            TryConnect();
        }

        private void _connection_CallbackException(object sender, RabbitMQ.Client.Events.CallbackExceptionEventArgs e)
        {
            if (_dispose) return;
            _logger.LogInformation("A rabbitMq Client Connection is throw exception");
            TryConnect();
        }

        private void _connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            if (_dispose) return;
            _logger.LogInformation("A rabbit Client Connection is ShutDown,trying to re-connect....");
            TryConnect();
        }

        public bool IsConnectned => _connection != null && _connection.IsOpen && !_dispose;
    }
}

6.上面第3步的事件总线的实现

using Autofac;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Polly;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMqEventBus.EventBus;
using System;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace RabbitMqEventBus.RabbitMqPersistent
{
    public class EventBusRabbitMq : IEventBus
    {
        const string BROKER_NAME = "blogcore_event_bus";
        private readonly IRabbitMQPersistentConnection _rabbitMqPersistentConnection;
        private readonly ILogger<EventBusRabbitMq> _logger;
        private readonly ILifetimeScope _scope;
        private readonly IEventBusSubscriptionsManager _subsManager;
        private readonly string AUTOFAC_SCOPE_NAME = "blogcore_event_bus";
        private readonly int _retryCount;
        private string _queueName;
        private IModel _consumerChannel;
        public EventBusRabbitMq(IRabbitMQPersistentConnection rabbitMQPersistentConnection,
            ILogger<EventBusRabbitMq> logger,
            ILifetimeScope scope,
            IEventBusSubscriptionsManager subsManager,
            string queueName = null,
            int tryCount = 5
            )
        {
            _rabbitMqPersistentConnection = rabbitMQPersistentConnection;
            _logger = logger;
            _scope = scope;
            _queueName = queueName;
            _retryCount = tryCount;
            _subsManager = subsManager;
            _consumerChannel = CreateConsumerChannel();
            _subsManager.OnEventRemoveHandler += _subsManager_OnEventRemoveHandler;
        }

        private void _subsManager_OnEventRemoveHandler(object sender, string eventName)
        {
            if (!_rabbitMqPersistentConnection.IsConnectned)
            {
                _rabbitMqPersistentConnection.TryConnect();
            }
            using (var channel = _rabbitMqPersistentConnection.CreateModel())
            {
                channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);
                if (_subsManager.IsEmpty)
                {
                    _queueName = string.Empty;
                    _consumerChannel.Close();
                }
            }
        }

        public void Publish(IntegrationEvent @event)
        {
            if (!_rabbitMqPersistentConnection.IsConnectned)
            {
                _rabbitMqPersistentConnection.TryConnect();
            }
            var polly = Policy.Handle<RabbitMQ.Client.Exceptions.BrokerUnreachableException>()
                .Or<SocketException>()
                .WaitAndRetry(_retryCount, _retryCount => TimeSpan.FromSeconds(Math.Pow(2, _retryCount)), (ex, time) =>
                {
                    _logger.LogInformation(ex, "");
                });
            var eventName = @event.GetType().Name;
            using var channel = _rabbitMqPersistentConnection.CreateModel();
            channel.ExchangeDeclare(exchange: BROKER_NAME, ExchangeType.Direct);
            var message = JsonConvert.SerializeObject(@event);
            var body = Encoding.UTF8.GetBytes(message);
            polly.Execute(() =>
            {
                var properties = channel.CreateBasicProperties();
                properties.DeliveryMode = 2;
                channel.BasicPublish(exchange: BROKER_NAME,
                                    routingKey: eventName,
                                    mandatory: true,
                                    basicProperties: properties,
                                    body: body);
            });
        }

        public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>
        {
            var eventName = _subsManager.GetEventKey<T>();
            DoInternalSubscription(eventName);

            _subsManager.AddSubscription<T, TH>();
            StartBasicConsume();
        }
        private void DoInternalSubscription(string eventName)
        {
            var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
            if (!containsKey)
            {
                if (!_rabbitMqPersistentConnection.IsConnectned)
                {
                    _rabbitMqPersistentConnection.TryConnect();
                }
                using (var channel = _rabbitMqPersistentConnection.CreateModel())
                {
                    channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);
                }
            }
        }
        private IModel CreateConsumerChannel()
        {
            if (!_rabbitMqPersistentConnection.IsConnectned)
            {
                _rabbitMqPersistentConnection.TryConnect();
            }
            _logger.LogTrace("Createing RabbitMq Consumer channel");

            var channel = _rabbitMqPersistentConnection.CreateModel();
            channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
            channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, false);//prefechCount=1表示不要在同一时间给一个工作者发送多于1个的消息
            //channel.QueueBind(_queueName,exchange:BROKER_NAME);
            channel.CallbackException += (sender, e) =>
            {
                _logger.LogWarning(e.Exception, "Recreating RabbitMQ Consumer channel");
                _consumerChannel.Dispose();
                _consumerChannel = CreateConsumerChannel();
                StartBasicConsume();
            };
            return channel;
        }

        private void StartBasicConsume()
        {
            _logger.LogTrace("Starting RabbitMQ basic Consume...");
            if (_consumerChannel != null)
            {
                var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
                consumer.Received += Consumer_Received;
                _consumerChannel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);
            }
            else
            {
                _logger.LogError("StartBasicConsume cannot call on_consumeChannel == null");
            }
        }

        private async Task Consumer_Received(object sender, BasicDeliverEventArgs @event)
        {
            var eventName = @event.RoutingKey;
            var message = Encoding.UTF8.GetString(@event.Body.Span);
            try
            {
                if (message.ToLowerInvariant().Contains("throw-fake-exception"))
                {
                    throw new InvalidOperationException($"Fake exception requested:\"{message}\"");
                }
                await ProcessEvent(eventName, message);
            }
            catch (Exception ex)
            {
                _logger.LogWarning(ex, "--- Error Processing message ", message);
            }
            _consumerChannel.BasicAck(@event.DeliveryTag, multiple: false);
        }

        private async Task ProcessEvent(string eventName, string message)
        {
            _logger.LogTrace("Processing RabbitMq event:{EventName}", eventName);
            if (_subsManager.HasSubscriptionsForEvent(eventName))
            {
                using var scope = _scope.BeginLifetimeScope(AUTOFAC_SCOPE_NAME);
                var subscriptions = _subsManager.GetHandlersForEvent(eventName);
                foreach (var subscription in subscriptions)
                {
                    if (subscription.IsDynamic)
                    {
                        var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                        if (handler == null) continue;
                        dynamic obj = JObject.Parse(message);
                        await handler.Handle(obj);
                    }
                    else
                    {
                        var handler = scope.ResolveOptional(subscription.HandlerType);
                        if (handler == null) continue;
                        var eventType = _subsManager.GetEventTypeName(eventName);
                        var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                        var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                        await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                    }
                }
            }
            else
            {
                _logger.LogWarning("No Subscription for RabbitMQ event:", eventName);
            }
        }
    }
}

这步中有个核心的接口类 IEventBusSubscriptionsManager
6.1
IEventBusSubscriptionsManager接口()
using RabbitMqEventBus.EventBusSubscriptions;
using System;
using System.Collections.Generic;

namespace RabbitMqEventBus.EventBus
{
    public interface IEventBusSubscriptionsManager
    {
        bool IsEmpty { get; }
        event EventHandler<string> OnEventRemoveHandler;
        void AddSubscription<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>;

        void AddDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler;

        bool HasSubscriptionsForEvent(string eventName);

        string GetEventKey<T>();

        IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);

        Type GetEventTypeName(string eventName);
    }
}

6.2 IEventBusSubscriptionsManager接口的实现

using RabbitMqEventBus.EventBus;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace RabbitMqEventBus.EventBusSubscriptions
{
    public class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager
    {
        private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
        private readonly List<Type> _eventType;
        public InMemoryEventBusSubscriptionsManager()
        {
            _handlers = new Dictionary<string, List<SubscriptionInfo>>();
            _eventType = new List<Type>();
        }
        public bool IsEmpty => _handlers == null || !_handlers.Keys.Any();

        public event EventHandler<string> OnEventRemoveHandler;

        public void AddDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
        {
            DoAddSubscription(typeof(TH), eventName, isDynamic: true);
        }

        public void AddSubscription<T, TH>()
            where T : IntegrationEvent
            where TH : IIntegrationEventHandler<T>
        {
            var eventKey = GetEventKey<T>();
            DoAddSubscription(typeof(TH), eventKey, false);
            if (!_eventType.Contains(typeof(T)))
            {
                _eventType.Add(typeof(T));
            }
        }

        private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic)
        {
            if (!HasSubscriptionsForEvent(eventName))
            {
                _handlers.Add(eventName, new List<SubscriptionInfo>());
            }
            if (_handlers[eventName].Any(p => p.HandlerType == handlerType))
            {
                throw new ArgumentException($"Handler Type {handlerType.Name} already registered for {eventName}", nameof(handlerType));
            }
            if (isDynamic)
            {
                _handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType));
            }
            else
            {
                _handlers[eventName].Add(SubscriptionInfo.Typed(handlerType));
            }
        }

        public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);
        public string GetEventKey<T>()
        {
            return typeof(T).Name;
        }
        public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName) => _handlers[eventName];

        public Type GetEventTypeName(string eventName) => _eventType.SingleOrDefault(p => p.Name == eventName);
    }
}

6.3 事件对象

using System;

namespace RabbitMqEventBus.EventBusSubscriptions
{
    public class SubscriptionInfo
    {
        public bool IsDynamic { get; set; }
        public Type HandlerType { get; set; }
        public SubscriptionInfo(bool isDynamic, Type handlerType)
        {
            IsDynamic = isDynamic;
            HandlerType = handlerType;
        }
        public static SubscriptionInfo Dynamic(Type handlerType)
        {
            return new SubscriptionInfo(true, handlerType);
        }

        public static SubscriptionInfo Typed(Type handlerType)
        {
            return new SubscriptionInfo(false, handlerType);
        }
    }
}

 

7.然后就是服务的注入和使用了

using Autofac;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMqEventBus.EventBus;
using RabbitMqEventBus.EventBusSubscriptions;
using RabbitMqEventBus.EventHandling;
using RabbitMqEventBus.RabbitMqPersistent;
using System;

namespace RabbitMqEventBus.ServiceExtention
{
    public static class EventBusSetup
    {
        public static IServiceCollection AddEventBusSetup(this IServiceCollection service)
        {
            if (service == null) throw new ArgumentNullException(nameof(IServiceCollection));
            service.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
            //service.AddTransient<BlogDeleteIntegrationEventHander>();
            service.AddSingleton<IRabbitMQPersistentConnection>(sp =>
            {
                var logger = sp.GetRequiredService<ILogger<RabbitMQPersistentConnection>>();
                var factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    DispatchConsumersAsync = true,
                    Password = "guest",
                    UserName = "guest"
                };
                return new RabbitMQPersistentConnection(factory, logger, 5);
            });

            service.AddSingleton<IEventBus, EventBusRabbitMq>(sp =>
            {
                var rabbitMqConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
                var lifeTimeScope = sp.GetRequiredService<ILifetimeScope>();
                var logger = sp.GetRequiredService<ILogger<EventBusRabbitMq>>();
                var eventBusSubscriotion = sp.GetRequiredService<IEventBusSubscriptionsManager>();

                var retryCount = 5;

                return new EventBusRabbitMq(rabbitMqConnection, logger, lifeTimeScope, eventBusSubscriotion, "Blog_Core_Api", retryCount);
            });
            return service;
        }

        public static IApplicationBuilder ConfigureEventBus(this IApplicationBuilder app)
        {
            var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
            eventBus.Subscribe<BlogDeleteIntegrationModelEvent, BlogDeleteIntegrationEventHander>();
            return app;
        }
    }
}

demo地址:

C#利用RabbitMQ实现消息总线

上一篇:Oracle数据库中的时间格式和java中时间格式的转换


下一篇:AcWing 1237. 螺旋折线