进程内EventBus

  项目里需要用到异步事件进行解耦, 试用了MediatR, 唯一不爽的是 publish使用同步方式调用各个Subscribe, 这会阻塞主线程尽快返回结果. 我想要的是:

  即使是进程内发布消息, Subscribe也是在新进程执行, Subscribe出现异常也不影响主线程返回结果

  当某一个Subscribe执行出现异常时, 可以针对那个Subscribe重发消息

 

  设计总体上参考了MediatR, asp.net core 启动时自动扫描 Subscribe 并注入DI, 扫描时需要指定 assembly, 因为像ABP那样全自动扫描真的有点慢

namespace InProcessEventBus
{
    using System;
    public abstract class EventData
    {
        public DateTime EventTime { get; set; }

        public string TargetClassName { get; set; }
    }
}


namespace InProcessEventBus
{
    /// <summary>
    /// 实现类需要什么东西(包括 IServiceProvider), 就在构造函数中添加
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class EventHandle<T>
        where T : EventData
    {
        public abstract void Handle(T eventData);
    }
}

  EventBus实现

namespace InProcessEventBus
{
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Logging;
    using Newtonsoft.Json;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reflection;

    public class EventBus
    {
        public static readonly JsonSerializerSettings SerializeSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All };
        public static readonly JsonSerializerSettings DeserializeSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Auto };

        private readonly Dictionary<Type, List<HandleTypeInfo>> eventDataHandles = new Dictionary<Type, List<HandleTypeInfo>>();

        protected IServiceProvider serviceProvider;
        protected ILogger logger;

        public void ScanEventHandles(IServiceCollection services, params string[] assemblyNames)
        {
            foreach (var assemblyName in assemblyNames)
            {
                var assembly = Assembly.Load(assemblyName);
                var handleTypes = assembly.GetTypes().Where(t => t.BaseType.IsGenericType && t.BaseType.GetGenericTypeDefinition() == typeof(EventHandle<>)).ToList();
                foreach (var handleType in handleTypes)
                {
                    var eventDataType = handleType.BaseType.GetGenericArguments()[0];
                    if (!eventDataHandles.TryGetValue(eventDataType, out var handleList))
                    {
                        handleList = new List<HandleTypeInfo>();
                        eventDataHandles[eventDataType] = handleList;
                    }
                    if (handleList.Any(o => o.HandleType == handleType))
                    {
                        continue;
                    }
                    var handleMethodInfo = handleType.GetMethod("Handle");
                    var handleTypeInfo = new HandleTypeInfo()
                    {
                        ClassName = $"{handleType.FullName}@{handleType.Assembly.FullName}",
                        HandleType = handleType,
                        HandleMethodInfo = handleMethodInfo
                    };
                    handleList.Add(handleTypeInfo);

                    services.AddTransient(handleType);
                }
            }
        }


        public virtual void Start(IServiceProvider serviceProvider, string loggerName = nameof(EventBus))
        {
            this.serviceProvider = serviceProvider;
            logger = serviceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(loggerName);
        }

        public virtual void Stop()
        {

        }

        public virtual void Publish(EventData eventData)
        {
            var json = JsonConvert.SerializeObject(eventData, SerializeSettings);
            ManualPublish(json);
        }

        /// <summary>
        /// 手动补发消息
        /// </summary>
        /// <param name="json">必须是由 EventData的实现类 序列化成的带 "$type" 的json(使用new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All })</param>
        public virtual void ManualPublish(string json)
        {
            EventData eventData;
            Type eventDataType;
            try
            {
                eventData = JsonConvert.DeserializeObject<EventData>(json, DeserializeSettings);
                eventDataType = eventData.GetType();
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "无法解析的json: " + json);
                return;
            }

            if (eventDataHandles.TryGetValue(eventDataType, out var handleList))
            {
                foreach (var handle in handleList)
                {
                    if (eventData.TargetClassName == null || eventData.TargetClassName == handle.ClassName)
                    {
                        System.Threading.Tasks.Task.Run(() =>
                        {
                            using (var scope = serviceProvider.CreateScope())
                            {
                                var handleTypeInstance = scope.ServiceProvider.GetRequiredService(handle.HandleType);
                                try
                                {
                                    handle.HandleMethodInfo.Invoke(handleTypeInstance, new object[] { eventData });
                                }
                                catch (Exception ex)
                                {
                                    var retryJson = CreateRetryJson(json, handle.ClassName, ex.InnerException.Message);
                                    logger.LogError(ex, retryJson);
                                }
                            }
                        });
                    }
                }
            }
        }


        private string CreateRetryJson(string json, string targetClassName, string exceptionMessage)
        {
            var jObject = Newtonsoft.Json.Linq.JObject.Parse(json);
            jObject.Property("TargetClassName").Value = targetClassName;
            jObject.Add("ExceptionMessage", exceptionMessage);
            return jObject.ToString(Formatting.None);
        }

        private class HandleTypeInfo
        {
            public string ClassName { get; set; }

            public Type HandleType { get; set; }

            public MethodInfo HandleMethodInfo { get; set; }
        }
    }
}

   用于注册的扩展

namespace Microsoft.Extensions.DependencyInjection
{
    using InProcessEventBus;
    using Microsoft.AspNetCore.Builder;
    using Microsoft.Extensions.Hosting;
    using System.Linq;

    public static class EventBusExtensions
    {
        /// <summary>
        /// 
        /// </summary>
        /// <param name="services"></param>
        /// <param name="assemblyNames"> 调用方所在的 Assembly 会自动添加的</param>
        public static void AddInProcessEventBus(this IServiceCollection services, params string[] assemblyNames)
        {
            var assemblyNameOfCaller = new System.Diagnostics.StackTrace().GetFrame(1).GetMethod().DeclaringType.Assembly.FullName;
            assemblyNames = assemblyNames.ToArray().Union(new[] { assemblyNameOfCaller }).ToArray();

            var eventBus = new EventBus();
            eventBus.ScanEventHandles(services, assemblyNames);

            services.AddSingleton(eventBus);
        }

        public static void EventBusRegisterToApplicationLifetime(this IApplicationBuilder app, IHostApplicationLifetime hostApplicationLifetime)
        {
            hostApplicationLifetime.ApplicationStarted.Register(() =>
            {
                var eventBus = app.ApplicationServices.GetRequiredService<EventBus>();
                eventBus.Start(app.ApplicationServices);
            });

            hostApplicationLifetime.ApplicationStopping.Register(() =>
            {
                var eventBus = app.ApplicationServices.GetRequiredService<EventBus>();
                eventBus.Stop();
            });
        }
    }
}

 

 

  项目地址: https://github.com/zhouandke/InProcessEventBus

  代码写完了, 发现还需要服务程序之间通知消息, 正准备手撸 RabbitMQ EventBus 时发现了 EasyNetQ, 把我想到的问题都完美实现了, 这段代码就留作纪念吧

 

  原文链接: https://www.cnblogs.com/zhouandke/p/12670333.html

 

进程内EventBus

上一篇:新手必学linux文本文件编辑命令 vi vim.....


下一篇:kali linux命令