java-AMQP Spring Integration错误处理

我有一个集成流程,如下所示:

@Bean
public IntegrationFlow auditFlow(@Qualifier("eventLoggingConnectionFactory") ConnectionFactory connectionFactory,
                                 @Qualifier("writeChannel") MessageChannel messageChannel,
                                 @Qualifier("parseErrorChannel") MessageChannel errorChannel) {
    return IntegrationFlows
            .from(Amqp.inboundAdapter(connectionFactory, auditQueue)
                    .errorChannel(errorChannel)
                    .concurrentConsumers(numConsumers)
                    .messageConverter(new MongoMessageConverter())) // converts JSON to org.bson.Document
            .enrichHeaders(e -> e.<Document>headerFunction(MongoWriteConfiguration.MONGO_COLLECTION_HEADER_KEY,
                    o -> getNamespace(o.getPayload())))
            .channel(messageChannel)
            .get();
}

如果引入了格式错误的消息,则消息转换器当然会出错,从而引发MessageConversionException.在那种情况下,我当然不希望消息重新排队-但我也不想将默认值设置为“不重新排队被拒绝的消息”,就像我可以对AmqpInboundAdapterSpec那样.对我来说,不重新排队以这种方式出错的消息的正确方法是什么(或者出于调试目的而重新发布它们)?

更一般而言,同一流程中的下游进程可能会错误处理语义上更不正确的数据-再一次,我不想重新排队它们.在那个时候我可以抛出一个AmqpRejectAndDontRequeueException,但是那样我就失去了关注点的分离,这仅是问题的一半.处理这些异常的正确方法是什么?也许有一种方法可以转换为AmqpRejectAndDontRequeueException?

解决方法:

入站适配器的SimpleMessageListenerContainer中的默认错误处理程序(一个ConditionalRejectingErrorHandler)就是这样做的(它检测到MessageConversionException并抛出AmqpRejectAndDontRequeueException).

您可以通过注入自己的FatalExceptionStrategy来自定义ConditionalRejectingErrorHandler,以查找其他异常类型并以相同方式处理它们.

DefaultExceptionStrategy看起来像这样…

@Override
public boolean isFatal(Throwable t) {
    if (t instanceof ListenerExecutionFailedException
            && t.getCause() instanceof MessageConversionException) {
        if (logger.isWarnEnabled()) {
            logger.warn("Fatal message conversion error; message rejected; "
                    + "it will be dropped or routed to a dead letter exchange, if so configured: "
                    + ((ListenerExecutionFailedException) t).getFailedMessage(), t);
        }
        return true;
    }
    return false;
}

仅当原因链中还没有AmqpRejectAndDontRequeueException时才调用它.

上一篇:php操作rabbitmq


下一篇:简单的Java AMQP服务器