这几天在折腾消息队列,在.Net环境下有基于RabbitMQ有很多有API的选择,最后选择了比较简单的EasyNetQ(http://easynetq.com/)
在测试使用的时候发现一个问题,对于处理错误的消息,EasyNetQ默认会放到一个错误队列中并提供了一个工具可以对错误队列的消息进行重新分发。RabiitMQ是有一个事务功能的,但是貌似在EasyNetQ的实现中,没有发现相关的操作方式
现在的需求是,在某种特定的情况下,需要将处理不成功的消息,保留在消息原队列
做了一个变通处理,出现错误消息的时候,将其再送回原队列。从客户端上可以直接send回队列,但这样不是一个很好的方式
研究了一下EasyNetQ的相关代码,发现其定义了一个IConsumerErrorStrategy接口,我们只要自己自行实现,并注册一个自定义方法就可以
ComponentRegistration.cs原注册方法相关,默认错误消息处理方式在DefaultConsumerErrorStrategy.cs中
1 public class ComponentRegistration 2 { 3 public static void RegisterServices(IContainer container) 4 { 5 Preconditions.CheckNotNull(container, "container"); 6 7 // Note: IConnectionConfiguration gets registered when MQContext.CreateBus(..) is run. 8 #region DefaultConsumerErrorStrategy 9 container 10 .Register(_ => container) 11 .Register<IMessageQueueQLogger, ConsoleLogger>() 12 .Register<ISerializer, JsonSerializerN>() 13 .Register<IConventions, Conventions>() 14 .Register<IEventBus, EventBus>() 15 .Register<ITypeNameSerializer, TypeNameSerializer>() 16 .Register<Func<string>>(x => CorrelationIdGenerator.GetCorrelationId) 17 .Register<IClusterHostSelectionStrategy<ConnectionFactoryInfo>, DefaultClusterHostSelectionStrategy<ConnectionFactoryInfo>>() 18 .Register<IConsumerDispatcherFactory, ConsumerDispatcherFactory>() 19 .Register<IPublishExchangeDeclareStrategy, PublishExchangeDeclareStrategy>() 20 .Register<IPublisherConfirms, PublisherConfirms>() 21 .Register<IConsumerErrorStrategy, DefaultConsumerErrorStrategy>() 22 .Register<IHandlerRunner, HandlerRunner>() 23 .Register<IInternalConsumerFactory, InternalConsumerFactory>() 24 .Register<IConsumerFactory, ConsumerFactory>() 25 .Register<IConnectionFactory, ConnectionFactoryWrapper>() 26 .Register<IPersistentChannelFactory, PersistentChannelFactory>() 27 .Register<IClientCommandDispatcherFactory, ClientCommandDispatcherFactory>() 28 .Register<IHandlerCollectionFactory, HandlerCollectionFactory>() 29 .Register<IAdvancedBus, RabbitAdvancedBus>() 30 .Register<IRpc, Rpc>() 31 .Register<ISendReceive, SendReceive>() 32 .Register<IBus, RabbitBus>(); 33 #endregion 34 } 35 }
由于最终消息队列中保存的消息都是序列化后的结果,借鉴EasyNetQ自带工具的错误消息处理方式,只需要将消息直接调用BasicPublish方法便可
自定义错误消息处理HandleConsumerError方法
1 public virtual PostExceptionAckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception) 2 { 3 Preconditions.CheckNotNull(context, "context"); 4 Preconditions.CheckNotNull(exception, "exception"); 5 6 try 7 { 8 Connect(); 9 10 using (var model = connection.CreateModel()) 11 { 12 var messageBody = context.Body; 13 var properties = model.CreateBasicProperties(); 14 context.Properties.CopyTo(properties); 15 16 model.BasicPublish(String.Empty, context.Info.RoutingKey, properties, messageBody); 17 } 18 } 19 catch (Exception unexpectedException) 20 { 21 // Something else unexpected has gone wrong :( 22 logger.ErrorWrite("EasyNetQ Consumer Error Handler: Failed to publish error message\nException is:\n" 23 + unexpectedException); 24 } 25 return Consumer.PostExceptionAckStrategy.ShouldAck; 26 }
关于原队列的名字保存在RoutingKey属性中
IConsumerErrorStrategy实现的完整代码
1 using System; 2 using System.Collections.Concurrent; 3 using System.Text; 4 using EasyNetQ.SystemMessages; 5 using RabbitMQ.Client; 6 using RabbitMQ.Client.Exceptions; 7 8 namespace EasyNetQ.Consumer 9 { 10 public class DefaultConsumerErrorStrategy : IConsumerErrorStrategy 11 { 12 private readonly IConnectionFactory connectionFactory; 13 private readonly ISerializer serializer; 14 private readonly IMessageQueueQLogger logger; 15 private readonly IConventions conventions; 16 private readonly ITypeNameSerializer typeNameSerializer; 17 18 private IConnection connection; 19 private readonly ConcurrentDictionary<string, string> errorExchanges = new ConcurrentDictionary<string, string>(); 20 21 public DefaultConsumerErrorStrategy( 22 IConnectionFactory connectionFactory, 23 ISerializer serializer, 24 IMessageQueueQLogger logger, 25 IConventions conventions, 26 ITypeNameSerializer typeNameSerializer) 27 { 28 Preconditions.CheckNotNull(connectionFactory, "connectionFactory"); 29 Preconditions.CheckNotNull(serializer, "serializer"); 30 Preconditions.CheckNotNull(logger, "logger"); 31 Preconditions.CheckNotNull(conventions, "conventions"); 32 Preconditions.CheckNotNull(typeNameSerializer, "typeNameSerializer"); 33 34 this.connectionFactory = connectionFactory; 35 this.serializer = serializer; 36 this.logger = logger; 37 this.conventions = conventions; 38 this.typeNameSerializer = typeNameSerializer; 39 } 40 41 private void Connect() 42 { 43 if(connection == null || !connection.IsOpen) 44 { 45 connection = connectionFactory.CreateConnection(); 46 } 47 } 48 49 public virtual PostExceptionAckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception) 50 { 51 Preconditions.CheckNotNull(context, "context"); 52 Preconditions.CheckNotNull(exception, "exception"); 53 54 try 55 { 56 Connect(); 57 58 using (var model = connection.CreateModel()) 59 { 60 var messageBody = context.Body; 61 var properties = model.CreateBasicProperties(); 62 context.Properties.CopyTo(properties); 63 64 model.BasicPublish(String.Empty, context.Info.RoutingKey, properties, messageBody); 65 } 66 } 67 catch (Exception unexpectedException) 68 { 69 // Something else unexpected has gone wrong :( 70 logger.ErrorWrite("EasyNetQ Consumer Error Handler: Failed to publish error message\nException is:\n" 71 + unexpectedException); 72 } 73 return Consumer.PostExceptionAckStrategy.ShouldAck; 74 } 75 76 private bool disposed = false; 77 78 public virtual void Dispose() 79 { 80 if (disposed) return; 81 82 if(connection != null) connection.Dispose(); 83 84 disposed = true; 85 } 86 } 87 }
因为EasyNetQ在CreateBus的时候就会将相关事件注册,所以我们只需最后自行在config中加入有关配置,在注册事件的位置加入判断即可让异常抛回
由于刚接触RabbitMQ,对于一些概念的理解可能还不是非常到位,就目前的状况来看,只需要队列,不需要做特别复杂的路由选择的情况下,这是我所能找到的解决方案
当然可能以后的版本中,EasyNetQ会加入对事务的操作,更或许现在已经有了但是我没有发现