1.前言
Spring AMQP项目将核心Spring概念应用基于AMQP的消息传递解决方案的开发。 Spring提供“模板”作为发送和接收消息的高级抽象。 Spring还为消息驱动的POJO提供支持。 这些库有助于管理AMQP资源,同时促进依赖注入和声明性配置的使用。 在所有这些情况下,您将看到Spring Framework中与JMS支持的相似之处。 有关其他项目相关信息,请访问Spring AMQP项目主页。
2.介绍
第一部分是Spring AMQP及其底层的高级概述概念和一些代码片段,可以让您尽快启动和运行。
2.1开始
介绍
先决条件:安装并运行RabbitMQ代理(http://www.rabbitmq.com/download.html)。 然后添加spring-rabbit JAR及其所有依赖项 - 最简单的方法是在构建工具中声明依赖性,例如: 对于Maven:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.5.RELEASE</version>
</dependency>
兼容性
Spring Framework的最小版本依赖性是5.0.x.
最小的amqp-client java客户端库版本是5.0.0。
非常非常快
使用简单的命令式Java来发送和接收消息:
ConnectionFactory connectionFactory = new CachingConnectionFactory()
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
请注意,本机Java Rabbit客户端中也有一个ConnectionFactory。 在上面的代码中使用了Spring抽象。 依赖于代理中的默认交换(因为在发送中没有指定),并且所有队列的默认绑定都是按名称进行的默认交换(因此我们可以使用队列名作为发送中的路由键)。 这些行为在AMQP规范中定义。
使用XML配置
与上面相同的示例,但将资源配置外部化为XML:
ApplicationContext context =
new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue")
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="myqueue"/>
</beans>
默认情况下,<rabbit:admin />声明会自动查找Queue,Exchange和Binding类型的bean,并代表用户向代理声明它们,因此不需要在简单的Java驱动程序中显式使用该bean。 有很多选项可以配置XML模式中组件的属性 - 您可以使用XML编辑器的自动完成功能来浏览它们并查看其文档。
使用Java配置
使用Java中的外部配置再次使用相同的示例:
//引用配置进行消息发送
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
//java配置
@Configuration
public class RabbitConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
}
2.2什么是新的
3.参考
参考文档的这一部分详细介绍了组成Spring AMQP的各个组件。主要章节涵盖了开发AMQP应用程序的核心类。
3.1使用Spring AMQP
在本章中,我们将探讨接口和类,它们是使用Spring AMQP开发应用程序的基本组件。
AMQP抽象
介绍
Spring AMQP由一些模块组成,每个模块由分布中的JAR表示。 这些模块是:spring-amqp和spring-rabbit。 spring-amqp模块包含org.springframework.amqp.core包。 在该包中,您将找到代表核心AMQP“模型”的类。 我们的目的是提供不依赖于任何特定AMQP代理实现或客户端库的通用抽象。 最终用户代码在供应商实现中更具可移植性,因为它只能针对抽象层进行开发。 然后使用这些抽象由特定于代理的模块实现,例如spring-rabbit。 目前只有RabbitMQ实现; 但是,除了RabbitMQ之外,还使用Apache Qpid在.NET中验证了抽象。 由于AMQP原则上在协议级别运行,因此RabbitMQ客户端可以与支持相同协议版本的任何代理一起使用,但我们目前不测试任何其他代理。
Message
0-9-1 AMQP规范没有定义消息类或接口。相反,当执行basicPublish()这样的操作时,内容作为字节数组参数传递,另外的属性作为单独的参数传递。Spring AMQP将消息类定义为更通用的AMQP域模型表示的一部分。Message类的目的是简单地封装单个实例中的主体和属性,这样API就可以变得更简单。消息类定义非常简单。
public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
MessageProperties接口定义了几个常见属性,例如messageId,timestamp,contentType等等。 通过调用setHeader(String key,Object value)方法,还可以使用用户定义的标头扩展这些属性。
重要
从版本1.5.7,1.6.11,1.7.4,2.0.0开始,如果消息体是序列化的Serializable java对象,则在执行toString()操作时(例如在日志中)不再反序列化(默认情况下)消息)。 这是为了防止不安全的反序列化。 默认情况下,只反序列化java.util和java.lang类。 要恢复到先前的行为,可以通过调用Message.addWhiteListPatterns(...)来添加允许的类/包模式。 支持简单的*通配符,例如com.foo.*,*.MyClass。 无法反序列化的实体将在日志消息中由字节[<size>]表示。
Exchange
Exchange接口表示AMQP Exchange,它是Message Producer发送的内容。 代理虚拟主机中的每个Exchange都具有唯一的名称以及一些其他属性:
public interface Exchange {
String getName();
String getExchangeType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}
如您所见,Exchange还具有由ExchangeTypes中定义的常量表示的类型。 基本类型包括:Direct,Topic,Fanout和Headers。 在核心包中,您将找到每种类型的Exchange接口的实现。 在这些Exchange类型处理绑定到队列的方式方面,行为会有所不同。 例如,Direct交换允许队列由固定路由密钥(通常是队列的名称)绑定。 Topic交换支持具有路由模式的绑定,路由模式可以分别包括*和#通配符,分别用于一个和零或多个。 Fanout交换发布到绑定到它的所有队列,而不考虑任何路由密钥。
注意:
AMQP规范还要求任何代理提供没有名称的“默认”Direct Exchange。 声明的所有队列将绑定到该默认Exchange,其名称为路由键。
Queue
Queue类表示消息使用者从中接收消息的组件。 与各种Exchange类一样,spring的实现旨在成为此核心AMQP类型的抽象表示。
public class Queue {
private final String name;
private volatile boolean durable;
private volatile boolean exclusive;
private volatile boolean autoDelete;
private volatile Map<String, Object> arguments;
/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}
// Getters and Setters omitted for brevity
}
请注意,构造函数采用队列名称。 根据实现,管理模板可以提供用于生成唯一命名的队列的方法。 此类队列可用作“回复”地址或其他临时情况。 因此,自动生成的Queue的exclusive和autoDelete属性都将设置为true。
Binding
鉴于生产者发送到Exchange并且消费者从队列接收,将队列连接到Exchange的绑定对于通过消息传递连接这些生产者和消费者是至关重要的。 在Spring AMQP中,我们定义了一个Binding类来表示这些连接。 让我们回顾一下将队列绑定到Exchange的基本选项。
您可以使用固定的路由键将队列绑定到DirectExchange。
new Binding(someQueue, someDirectExchange, "foo.bar")
您可以使用路由模式将Queue绑定到TopicExchange。
new Binding(someQueue, someTopicExchange, "foo.*")
您可以将队列绑定到没有路由键的FanoutExchange。
new Binding(someQueue, someFanoutExchange)
Spring还提供了一个BindingBuilder来促进“流畅的API”风格。
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
为了清楚起见,上面显示了BindingBuilder类,但是当使用bind()方法的静态导入时,这种样式很有效。
AmqpTemplate
介绍
与Spring Framework和相关项目提供的许多其他高级抽象一样,Spring AMQP提供了一个起着核心作用的“模板”。定义主要操作的接口称为AmqpTemplate。这些操作涵盖了发送和接收消息的一般行为。换句话说,它们并不是任何实现的唯一,因此名称中的“AMQP”。另一方面,该接口的实现与AMQP协议的实现相关联。与JMS不同,JMS是一种接口级API本身,AMQP是一种线级协议。该协议的实现提供了自己的客户端库,因此模板接口的每个实现都将依赖于特定的客户端库。目前,只有一个实现:RabbitTemplate。在下面的示例中,您将经常看到“AmqpTemplate”的用法,但是当您查看配置示例或任何实例化模板和/或调用setter的代码摘录时,您将看到实现类型(例如“RabbitTemplate”)。
如上所述,AmqpTemplate接口定义了发送和接收消息的所有基本操作。 我们将分别在以下两个部分中探讨消息发送和接收。
添加重试功能
从版本1.3开始,您现在可以将RabbitTemplate配置为使用RetryTemplate来帮助处理代理连接问题。 有关完整信息,请参阅spring-retry项目; 以下只是一个使用指数退避策略和默认SimpleRetryPolicy的示例,它会在将异常抛出给调用者之前进行三次尝试。
使用XML命名空间:
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
使用 @Configuration
:
@Bean
public AmqpTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
发布是异步的 - 如何检测成功和失败
发布消息是一种异步机制,默认情况下,无法路由的消息会被RabbitMQ删除。对于成功的发布,您可以接收一个异步确认,如下面“发布者确认和返回”一节所述。让我们考虑两个失败场景:
- 发布到一个交换,但是没有匹配的目标队列
- 发布到一个不存在的交换
第一个案例涉及发布者返回,如下面的“发布者确认和返回”一节所述。
对于第二种情况,消息被删除,不会生成返回;除异常外,关闭底层通道。默认情况下,会记录此异常,但您可以向CachingConnectionFactory注册一个ChannelListener,以获取此类事件的通知:
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
您可以检查信号的reason属性以确定发生的问题。
要检测发送线程上的异常,可以在RabbitTemplate上设置setChannelTransacted(true),并在txCommit()上检测异常。 但是,事务会严重影响性能,因此在为这一个用例启用事务之前请仔细考虑这一点。
发布者确认并返回
AmqpTemplate的RabbitTemplate实现支持Publisher确认和返回。
对于返回的消息,必须将模板的mandatory属性设置为true,或者对于特定消息,mandatory-expression必须求值为true。 此功能需要将CachingConnectionFactory的publisherReturns属性设置为true的。 返回通过调用setReturnCallback(ReturnCallback回调)注册RabbitTemplate.ReturnCallback发送给客户端。 回调必须实现此方法:
void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey);
每个RabbitTemplate只支持一个ReturnCallback。默认情况下,send和receive方法将在5秒后超时并返回null。 这可以通过设置replyTimeout属性来修改。 从版本1.5开始,如果将mandatory属性设置为true(或者对于特定消息,mandatory-expression的计算结果为true),如果无法将消息传递到队列,则将抛出AmqpMessageReturnedException。 这个异常有returnMessage,replyCode,replyText属性,以及用于发送的exchange和routingKey。
对于Publisher Confirms(也称为Publisher Acknowledgements),模板需要一个将CachingConnectionFactory的publisherConfirms属性设置为true的。 通过调用setConfirmCallback(ConfirmCallback回调)注册RabbitTemplate.ConfirmCallback,将确认发送给客户端。 回调必须实现此方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData 是客户端在发送原始消息时提供的对象。ack对于ack是正确的,而对于nack是错误的。对于nacks,如果生成nack时nack是可用的,则原因可能包含nack的原因。一个例子是向不存在的交换发送消息。在这种情况下,代理关闭通道;关闭的原因包含在原因中。cause在版本1.4中添加。
RabbitTemplate仅支持一个ConfirmCallback。
注意:当Rabbit模板发送操作完成时,通道关闭;这将阻止在连接工厂缓存已满的情况下接收确认或返回(当缓存中有空间时,通道未物理关闭且返回/确认将正常进行)。当缓存已满时,框架将关闭延迟最多5秒,以便有时间接收确认/返回。使用确认时,收到上次确认时将关闭频道。仅使用返回时,通道将保持打开整整5秒。通常建议将连接工厂的channelCacheSize设置为足够大的值,以便将发布消息的通道返回到缓存而不是关闭。您可以使用RabbitMQ管理插件监控频道使用情况;如果您看到频道被快速打开/关闭,您应该考虑增加缓存大小以减少服务器上的开销。
重要
在版本2.1之前,在收到确认之前,已为发布者确认启用的通道返回到缓存。 其他一些进程可以检出通道并执行一些导致通道关闭的操作 - 例如将消息发布到不存在的交换。 这可能导致确认丢失; 在确认未完成时,版本2.1及更高版本不再将通道返回到缓存。 由于RabbitTemplate在每次操作后在通道上执行逻辑close(); 一般来说,这意味着一次只能在一个频道上进行一次确认。
Java配置
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class); /** * * @Title: createConnection * @Description: 创建连接 * @return * @return Connection * @throws */ @Bean public ConnectionFactory createConnection() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("springcloud"); connectionFactory.setPassword("123456"); connectionFactory.setPublisherReturns(true); connectionFactory.setPublisherConfirms(true); return connectionFactory; } /** * * @Title: rabbitTemplate * @Description: rabbitmq 模板 * @return * @return AmqpTemplate * @throws */ @Bean public AmqpTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(createConnection()); template.setMandatory(true); RetryTemplate retryTemplate = new RetryTemplate(); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(500); backOffPolicy.setMultiplier(10.0); backOffPolicy.setMaxInterval(10000); retryTemplate.setBackOffPolicy(backOffPolicy); template.setRetryTemplate(retryTemplate); return template; }
发送者
package org.niugang.mq; import java.util.Date; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息发送者 * @author Administrator * */ @Component public class Sender { private static final Logger logger = LoggerFactory.getLogger(Sender.class); @Autowired private RabbitTemplate rabbitTemplate; public void send() { String content = " hello " + new Date(); System.out.println("Sender:" + content); // 执行保存 String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.setConfirmCallback(new ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("correlationData" + correlationData.getId() + ";ack:" + ack + ";cause:" + cause); } }); /** * 默认情况下,send和receive方法将在5秒后超时并返回null。 这可以通过设置replyTimeout属性来修改。 * 从版本1.5开始,如果将mandatory属性设置为true(或者对于特定消息,mandatory-expression的计算结果为true), * 如果无法将消息传递到队列,则将抛出AmqpMessageReturnedException。 * 这个异常有returnMessage,replyCode,replyText属性,以及用于发送的exchange和routingKey。 */ /** * replyTimeout:指定在使用其中一个sendAndReceive方法时等待回复消息时要使用的超时(以毫秒为单位)。 默认值定义为DEFAULT_REPLY_TIMEOUT。 负值表示无限期超时。 未在普通接收方法中使用,因为协议中没有定义阻塞接收操作。 */ rabbitTemplate.setReturnCallback(new ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("message" + message.getBody().toString() + ";replyCode:" + replyCode + ";replyText:" + replyText + ";exchange:" + exchange + ";routingKey:" + routingKey); } }); rabbitTemplate.convertAndSend("logs", "bootsis", content, correlationId); } }
Springboot版
application.properties
#rabbitmq spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=springcloud spring.rabbitmq.password=123456 spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true spring.rabbitmq.template.reply-timeout=10000
实现确认和回退接口
package org.niugang.mq; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息发送者 * @author Administrator * */ @Component public class Sender implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback { private static final Logger logger = LoggerFactory.getLogger(Sender.class); @Autowired private RabbitTemplate rabbitTemplate; public void send() { String content = " hello " + new Date(); System.out.println("Sender:" + content); //设置确认回调函数,否则无法成功 rabbitTemplate.setConfirmCallback(this); rabbitTemplate.convertAndSend("logs", "bootsis", content); } /** * 实现ConfirmCallback * ACK=true仅仅标示消息已被Broker接收到,并不表示已成功投放至消息队列中 *ACK=false表示消息由于Broker处理错误,消息并未处理成功 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("correlationData" + correlationData.getId() + ";ack:" + ack + ";cause:" + cause); } /** * 实现ReturnCallback当消息发送出去找不到对应路由队列时,将会把消息退回, 如果有任何一个路由队列接收投递消息成功,则不会退回消息 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("message" + message.getBody().toString() + ";replyCode:" + replyCode + ";replyText:" + replyText + ";exchange:" + exchange + ";routingKey:" + routingKey); } }
从版本2.1开始,CorrelationData对象具有可用于获取结果的ListenableFuture,而不是在模板上使用ConfirmCallback。
CorrelationData cd1 = new CorrelationData(); this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1); assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
作用域操作
通常,在使用模板时,会从缓存(或已创建)中检出Channel,用于操作,并返回缓存以供重用。 在多线程环境中,无法保证下一个操作将使用相同的通道。 但是,有时您可能希望更多地控制通道的使用,并确保在同一通道上执行大量操作。
从2.0版开始,提供了一个带有OperationsCallback的新方法调用。 在回调范围内以及在提供的RabbitOperations参数上执行的任何操作都将使用相同的专用通道,该通道将在最后关闭(不返回到缓存)。
消息集成
从版本1.4开始,RabbitMessagingTemplate构建在RabbitTemplate之上,提供与Spring Framework消息抽象的集成,即org.springframework.messaging.Message。 这允许您使用spring-messaging Message <?>抽象发送和接收消息。 其他Spring项目使用此抽象,例如Spring Integration和Spring的STOMP支持。 涉及两个消息转换器; 一个用于在spring-messaging Message <?>和Spring AMQP的Message抽象之间进行转换,另一个用于在Spring AMQP的Message抽象和底层RabbitMQ客户端库所需的格式之间进行转换。 默认情况下,消息有效负载由提供的RabbitTemplate的消息转换器转换。 或者,您可以使用其他一些有效负载转换器注入自定义MessagingMessageConverter:
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
发送消息
介绍
在发送消息时,可以使用以下任何一种方法:
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
我们可以用上面列出的最后一种方法开始我们的讨论,因为它实际上是最明确的。 它允许在运行时提供AMQP Exchange名称以及路由密键。 最后一个参数是负责实际创建Message实例的回调。 使用此方法发送消息的示例可能如下所示:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO",
new Message("12.34".getBytes(), someProperties));
如果您计划使用该模板实例在大多数或所有时间发送到同一个交换,则可以在模板上设置“exchange”属性。 在这种情况下,可以使用上面列出的第二种方法。 以下示例在功能上等同于前一个示例:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果在模板上设置了“exchange”和“routingKey”属性,则可以使用仅接受Message的方法:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
考虑交换和路由键属性的更好方法是显式方法参数将始终覆盖模板的默认值。实际上,即使您没有在模板上显式设置这些属性,也始终存在默认值。在这两种情况下,默认值都是空字符串,但这实际上是一个合理的默认值。就路由键而言,首先并不总是必要的(例如Fanout交换)。此外,可以使用空String将Queue绑定到Exchange。这些都是依赖于模板的路由键属性的默认空String值的合法方案。就Exchange名称而言,空字符串是非常常用的,因为AMQP规范将“默认Exchange”定义为没有名称。由于所有队列都使用其名称作为绑定值自动绑定到该默认Exchange(即Direct Exchange)[即默认的路由键为队列名称,Exchange为Direct Exachange],因此上述第二种方法可用于通过默认Exchange对任何队列进行简单的点对点消息传递。只需通过在运行时提供method参数,将队列名称作为“routingKey”提供:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,如果您希望创建一个主要用于主要或专门发布到单个队列的模板,则以下内容非常合理:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
Message构建API
从版本1.3开始,消息构建器API由MessageBuilder和MessagePropertiesBuilder提供; 它们提供了一种方便的“流畅”方式来创建消息或消息属性:
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
或
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
可以设置MessageProperties上定义的每个属性。 其他方法包括setHeader(String key,String value),removeHeader(String key),removeHeaders()和copyProperties(MessageProperties属性)。 每个属性设置方法都有一个set * IfAbsent()变体。 在存在默认初始值的情况下,该方法名为set * IfAbsentOrDefault()。
提供了五种静态方法来创建初始消息构建器:
public static MessageBuilder withBody(byte[] body) 1
public static MessageBuilder withClonedBody(byte[] body) 2
public static MessageBuilder withBody(byte[] body, int from, int to) 3
public static MessageBuilder fromMessage(Message message) 4
public static MessageBuilder fromClonedMessage(Message message) 5
1构建器创建的消息将具有一个直接引用该参数的主体。
2构建器创建的消息将具有一个主体,该主体是包含参数中字节副本的新数组。
3构建器创建的消息将具有一个主体,该主体是包含参数中字节范围的新数组。 有关更多详细信息,请参阅Arrays.copyOfRange()。
4构建器创建的消息将具有一个主体,该主体直接引用参数主体。 参数的属性将复制到新的MessageProperties对象。
5构建器创建的消息将具有一个主体,该主体是包含参数主体副本的新数组。 参数的属性将复制到新的MessageProperties对象。
发布者回退
当模板的mandatory属性为true时,返回的消息由“AmqpTemplate”中描述的回调提供。
从版本1.4开始,RabbitTemplate支持SpEL mandatoryExpression属性,该属性针对每个请求消息进行评估,作为根评估对象,解析为布尔值。 可以在表达式中使用Bean引用,例如“@ myBean.isMandatory(#root)”。
PublisherTemplate也可以在发送和接收操作中内部使用Publisher返回。 有关详细信息,请参阅“回复超时”一节。
回复超时:
默认情况下,send和receive方法将在5秒后超时并返回null。 这可以通过设置replyTimeout属性来修改。 从版本1.5开始,如果将mandatory属性设置为true(或者对于特定消息,mandatory-expression的计算结果为true),如果无法将消息传递到队列,则将抛出AmqpMessageReturnedException。 这个异常有returnMessage,replyCode,replyText属性,以及用于发送的exchange和routingKey。
批量发送消息
从版本1.4.2开始,引入了BatchingRabbitTemplate。 这是RabbitTemplate的子类,带有重写的send方法,根据BatchingStrategy批量处理消息; 只有当批处理完成时才会将消息发送到RabbitMQ。
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
警告
批处理数据保存在内存中; 系统发生故障时,未发送的消息可能会丢失。
提供了SimpleBatchingStrategy。 它支持将消息发送到单个交换/路由密钥。 它有属性:
batchSize - 批处理之前发送的消息数
bufferLimit - 批处理消息的最大大小; 如果超出,将抢占batchSize,并导致发送部分批处理timeout
- 在没有新活动向批处理添加消息时将发送部分批处理的时间
SimpleBatchingStrategy通过在每个嵌入消息之前使用4字节二进制长度来格式化批处理。 通过将springBatchFormat消息属性设置为lengthHeader4,将其传递给接收系统。
接收消息
介绍
消息接收总是比发送复杂一点。 有两种方法可以接收消息。 更简单的选项是使用轮询方法调用一次轮询单个Message。 更复杂但更常见的方法是注册一个将按需异步接收消息的侦听器。 我们将在接下来的两个小节中查看每种方法的示例。
轮训消费者
AmqpTemplate本身可用于轮询消息接收。 默认情况下,如果没有可用消息,则立即返回null; 没有阻挡。 从版本1.5开始,您现在可以设置一个receiveTimeout(以毫秒为单位),接收方法将阻塞长达这么长时间,等待消息。 小于零的值意味着无限期地阻塞(或者至少在与代理的连接丢失之前)。 版本1.6引入了接收方法的变体,允许在每次调用时传递超时。
警告
由于接收操作为每条消息创建一个新的QueueingConsumer,因此该技术并不适用于大容量环境; 考虑使用异步使用者,或者对于这些用例使用零的receiveTimeout。
有四种简单的接收方法可供使用。 与发送方的Exchange一样,有一种方法需要在模板本身上直接设置默认队列属性,并且有一种方法在运行时接受队列参数。 版本1.6引入了变体以接受timeoutMillis以基于每个请求覆盖receiveTimeout。
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
就像发送消息一样,AmqpTemplate有一些方便的方法来接收POJO而不是Message实例,实现将提供一种方法来定制用于创建返回的Object的MessageConverter:
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Message receiveAndConvert(long timeoutMillis) throws AmqpException;
Message receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
与sendAndReceive方法类似,从版本1.3开始,AmqpTemplate有几个方便的receiveAndReply方法,用于同步接收,处理和回复消息:
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
AmqpTemplate实现负责接收和回复阶段。 在大多数情况下,您应该仅提供ReceiveAndReplyCallback的实现,以便为接收的消息执行某些业务逻辑,并在需要时构建回复对象或消息。 注意,ReceiveAndReplyCallback可能返回null。 在这种情况下,没有发送回复,receiveAndReply像receive方法一样工作。 这允许将相同的队列用于混合消息,其中一些可能不需要回复。
仅当提供的回调不是ReceiveAndReplyMessageCallback的实例(其提供原始消息交换合同)时,才应用自动消息(请求和回复)转换。
ReplyToAddressCallback对于需要自定义逻辑在运行时根据收到的消息确定replyTo地址并从ReceiveAndReplyCallback进行回复的情况非常有用。 默认情况下,请求消息中的replyTo信息用于路由回复。
以下是基于POJO的接收和回复示例...
boolean received =
this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {
public Invoice handle(Order order) {
return processOrder(order);
}
});
if (received) {
log.info("We received an order!");
}
消息监听器
对于异步消息接收,涉及专用组件(不是AmqpTemplate)。 该组件是消息回调的容器。 稍后讨论容器及其属性,但首先我们应该查看回调,因为这是您的应用程序代码将与消息传递系统集成的地方。 从MessageListener接口的实现开始,回调有几个选项:
public interface MessageListener {
void onMessage(Message message);
}
如果您的回调逻辑出于任何原因依赖于AMQP Channel实例,您可以改为使用ChannelAwareMessageListener。 它看起来很相似,但有一个额外的参数:
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
MessageListenerAdapter
如果您希望在应用程序逻辑和消息传递API之间保持更严格的分离,则可以依赖框架提供的适配器实现。 这通常被称为“消息驱动的POJO”支持。
注意:版本1.5引入了更灵活的POJO消息传递机制,即@RabbitListener注解 ,稍后详细介绍。
使用适配器时,您只需要提供对适配器本身应该调用的实例的引用。
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
您可以对适配器进行子类化,并提供getListenerMethodName()的实现,以根据消息动态选择不同的方法。 这个方法有两个参数,originalMessage和extractedMessage,后者是任何转换的结果。 默认情况下,配置SimpleMessageConverter; 有关其他可用转换器的更多信息和信息,请参阅“SimpleMessageConverter”一节。
从版本1.4.2开始,原始消息具有属性consumerQueue和consumerTag,可用于确定从哪个队列接收消息。
从版本1.5开始,您可以将使用者队列/标记的映射配置为方法名称,以动态选择要调用的方法。 如果映射中没有条目,我们将回退到默认侦听器方法。 默认侦听器方法(如果未设置)是handleMessage。
从2.0版开始,提供了一个方便的FunctionalInterface:
@FunctionalInterface
public interface ReplyingMessageListener<T, R> {
R handleMessage(T t);
}
这有助于使用Java 8 lambdas方便地配置适配器:
new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
...
return result;
}));
Container
现在您已经看到了监听消息回调的各种选项,我们可以将注意力转向容器。基本上,容器处理“活动”职责,以便侦听器回调可以保持被动。容器是“生命周期”组件的一个实例。它提供了启动和停止的方法。在配置容器时,实际上是在弥补AMQP队列和MessageListener实例之间的差距。您必须提供对ConnectionFactory的引用,以及该侦听器应该从中使用消息的队列名称或队列实例。
在2.0版本之前,只有一个监听器容器——SimpleMessageListenerContainer;现在有了第二个容器——DirectMessageListenerContainer。在选择使用哪种容器时,容器和标准之间的差异将在“选择容器”一节中描述。
以下是使用SimpleMessageListenerContainer的最基本示例:
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));
作为“活动”组件,最常见的是使用bean定义创建侦听器容器,以便它可以在后台运行。 这可以通过XML完成
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
或
<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
上面xml是创建一个DirectMessageListenerContainer(注意type属性 - 它默认为simple)。
或者,您可能更喜欢使用@Configuration样式,它看起来与上面的实际代码片段非常相似:
@Configuration public class ExampleAmqpConfiguration { @Bean public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(rabbitConnectionFactory()); container.setQueueName("some.queue"); container.setMessageListener(exampleListener()); return container; } @Bean public ConnectionFactory rabbitConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Bean public MessageListener exampleListener() { return new MessageListener() { public void onMessage(Message message) { System.out.println("received: " + message); } }; } }
从RabbitMQ版本3.2开始,代理现在支持消费者优先级(请参阅使用RabbitMQ使用消费者优先级)。 通过在使用者上设置x-priority参数来启用此功能。 SimpleMessageListenerContainer现在支持设置使用者参数:
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
为方便起见,命名空间在listener元素上提供了priority属性:
<rabbit:listener-container connection-factory="rabbitConnectionFactory"> <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" /> </rabbit:listener-container>
auto-delete Queues
当容器配置为侦听自动删除队列,或者队列具有x-expires选项或在Broker上配置生存时策略时,如果容器是,则代理将删除队列 停止(最后一个消费者被取消)。 在1.3版之前,由于队列丢失,无法重新启动容器; 当连接关闭/打开时,RabbitAdmin仅自动重新声明队列等,这在容器停止/启动时不会发生。
从版本1.3开始,容器现在将使用RabbitAdmin在启动期间重新声明任何丢失的队列。
您还可以使用条件声明(称为“条件声明”一节)和auto-startup =“false”管理员来推迟队列声明,直到容器启动。
<rabbit:queue id="otherAnon" declared-by="containerAdmin" /> <rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin"> <rabbit:bindings> <rabbit:binding queue="otherAnon" key="otherAnon" /> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:listener-container id="container2" auto-startup="false"> <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" /> </rabbit:listener-container> <rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory" auto-startup="false" />
在这种情况下,队列和交换由containerAdmin声明,它具有auto-startup =“false”,因此在上下文初始化期间不会声明元素。 此外,出于同样的原因,容器未启动。 稍后启动容器时,它会使用它对containerAdmin的引用来声明元素。
Batched Messages
批处理消息由侦听器容器自动取消批处理(使用springBatchFormat消息头)。 拒绝批处理中的任何消息将导致拒绝整个批处理
演示纯java配置
RabbitConfig.java
package org.niugang.config; import java.util.Map; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.support.RetryTemplate; @Configuration public class RabbitConfig { // 用springboot配置取代java配置 /** * * @Title: createConnection * @Description: 创建连接 * @return * @return Connection * @throws */ @Bean public ConnectionFactory createConnection() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("springcloud"); connectionFactory.setPassword("123456"); connectionFactory.setPublisherReturns(true); connectionFactory.setPublisherConfirms(true); return connectionFactory; } // 用springboot配置取代java配置 /** * * @Title: rabbitTemplate * @Description: rabbitmq 模板 * @return * @return AmqpTemplate * @throws */ @Bean public AmqpTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(createConnection()); template.setMandatory(true); RetryTemplate retryTemplate = new RetryTemplate(); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(500); backOffPolicy.setMultiplier(10.0); backOffPolicy.setMaxInterval(10000); retryTemplate.setBackOffPolicy(backOffPolicy); template.setRetryTemplate(retryTemplate); return template; } /** * * @Title: messageListenerContainer * @Description: 监听 * @return * @return SimpleMessageListenerContainer * @throws */ @Bean public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(createConnection()); container.setQueueNames("queue-demo"); container.setMessageListener(exampleListener()); return container; } /** * * @Title: exampleListener * @Description:接收消息 * @return * @return MessageListener * @throws */ @Bean public MessageListener exampleListener() { return new MessageListener() { @Override public void onMessage(Message message) { byte[] body = message.getBody(); System.out.println(body); MessageProperties messageProperties = message.getMessageProperties(); // Map<String, Object> headers = messageProperties.getHeaders(); String messageId = messageProperties.getMessageId(); System.out.println("received: " + new String(body) + ":messageId;" + messageId); } }; } // 创建交换器 // 如果我们声明持久交换(交换将在服务器重启后继续存在),则为true // durable // 默认exchangeName为logs // exchangeType:为Topic /** * * @Title: queue * @Description: 创建队列 * @return * @return Queue * @throws */ @Bean public Queue queue() { return new Queue("queue-demo"); } /** * * @Title: topicExchange * @Description: 创建exchange * @return * @return TopicExchange * @throws */ @Bean public TopicExchange topicExchange() { return new TopicExchange("logs", true, false); } /** * * @Title: bindingExchange * @Description: 绑定交换器和路由键 * @return * @return Binding * @throws */ @Bean public Binding bindingExchange() { return BindingBuilder.bind(queue()).to(topicExchange()).with("bootsis"); } }
Sender.java
package org.niugang.mq; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.core.MessagePropertiesBuilder; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * 消息发送者 * @author Administrator * */ @Component public class Sender implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback { private static final Logger logger = LoggerFactory.getLogger(Sender.class); @Autowired private RabbitTemplate rabbitTemplate; public void send() { String content = " hello spring amqp"; System.out.println("Sender:" + content); // 执行保存 String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); // 设置确认回调函数,否则无法成功 rabbitTemplate.setConfirmCallback(this); // 定义消息内容 MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setMessageId("123").setHeader("bar", "baz") .build(); Message message = MessageBuilder.withBody(content.getBytes()).andProperties(props).build(); //发送消息 rabbitTemplate.send("logs", "bootsis", message, correlationId); } // 每3秒执行一次 @Scheduled(cron = "*/3 * * * * ?") private void process() { send(); } /** * 实现ConfirmCallback * ACK=true仅仅标示消息已被Broker接收到,并不表示已成功投放至消息队列中 * ACK=false表示消息由于Broker处理错误,消息并未处理成功 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("correlationData:" + correlationData.getId() + ";ack:" + ack + ";cause:" + cause); } /** * 实现ReturnCallback当消息发送出去找不到对应路由队列时,将会把消息退回, 如果有任何一个路由队列接收投递消息成功,则不会退回消息 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("message" + message.getBody().toString() + ";replyCode:" + replyCode + ";replyText:" + replyText + ";exchange:" + exchange + ";routingKey:" + routingKey); } }
*注解驱动的侦听器端点*
介绍
异步接收消息的最简单方法是使用带注解的侦听器端点基础结构。 简而言之,它允许您将托管bean的方法公开为Rabbit侦听器端点。
@Component
public class MyService {
@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
...
}
}
上述示例的思想是,只要名为myQueue的队列上有消息可用,就会相应地调用processOrder方法(在这种情况下,使用消息的有效负载)。
带注解的端点基础结构使用RabbitListenerContainerFactory为每个带注解的方法在后台创建消息侦听器容器。
在上面的示例中,myQueue必须已经存在并绑定到某个交换。 只要应用程序上下文中存在RabbitAdmin,就可以自动声明和绑定队列。
可以为注释属性(队列等)指定属性占位符($ {some.property})或SpEL表达式(#{someExpression})。 有关可能使用SpEL而不是属性占位符的示例,请参阅“侦听多个队列”一节。
@Component
public class MyService {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)
public void processOrder(Order order) {
...
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)
public void processInvoice(Invoice invoice) {
...
}
@RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true"))
public String handleWithSimpleDeclare(String data) {
...
}
}
在第一个示例中,如果需要,队列myQueue将与交换一起自动声明(持久),并使用路由键绑定到交换机。 在第二个示例中,将声明和绑定匿名(独占,自动删除)队列。 可以提供多个QueueBinding条目,允许侦听器侦听多个队列。 在第三个示例中,如果需要,将声明具有从属性my.queue中检索到的名称的队列,并使用队列名称作为路由键,使用默认绑定到默认交换。
从版本2.0开始,@ Exchange批注支持任何交换类型,包括自定义。 请参阅AMQP Concepts文档中的更多信息。需要更高级配置时,请使用常规@Bean定义。
请注意第一个示例中的exchange上的ignoreDeclarationExceptions。 例如,这允许绑定到可能具有不同设置(例如内部)的现有交换。 默认情况下,现有交换的属性必须匹配。
从2.0版开始,您现在可以将队列绑定到具有多个路由键的交换:
...
key = { "red", "yellow" }
...
您还可以在@QueueBinding注解中为队列,交换和绑定指定参数。 例如:
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "auto.headers", autoDelete = "true", arguments = @Argument(name = "x-message-ttl", value = "10000", type = "java.lang.Integer")), exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"), arguments = { @Argument(name = "x-match", value = "all"), @Argument(name = "foo", value = "bar"), @Argument(name = "baz") }) ) public String handleWithHeadersExchange(String foo) { ... }
请注意,队列的x-message-ttl参数设置为10秒。 由于参数类型不是String,我们必须指定其类型; 在这种情况下整数。 与所有此类声明一样,如果队列已存在,则参数必须与队列上的参数匹配。 对于标头交换,我们设置绑定参数以匹配标头foo设置为bar的消息,标头baz必须与任何值一起出现。 x-match参数意味着必须满足两个条件。
参数名称,值和类型可以是属性占位符($ {...})或SpEL表达式(#{...})。 名称必须解析为String; type的表达式必须解析为Class或类的完全限定名称。 该值必须解析为可以由DefaultConversionService转换为类型的内容(例如上例中的x-message-ttl)。
如果名称解析为null或空字符串,则忽略@Argument。
**************************完整版示例***********************************:
application.properties
#rabbitmq spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=springcloud spring.rabbitmq.password=123456 #发送确认 spring.rabbitmq.publisher-confirms=true #失败回退 spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true spring.rabbitmq.template.reply-timeout=10000 #外部化配置队列名 test.queue.name=myQueue test.exchange.name=logs test.route.key.name=bootsis
sender.java
package org.niugang.mq; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.core.MessagePropertiesBuilder; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * 消息发送者 * @author Administrator * */ @Component public class Sender implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback { private static final Logger logger = LoggerFactory.getLogger(Sender.class); @Autowired private RabbitTemplate rabbitTemplate; public void send() { String content = " hello spring amqp"; System.out.println("Sender:" + content); // 执行保存 String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); // 设置确认回调函数,否则无法成功 rabbitTemplate.setConfirmCallback(this); // 定义消息内容 MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setMessageId("123").setHeader("bar", "baz") .build(); Message message = MessageBuilder.withBody(content.getBytes()).andProperties(props).build(); //发送消息 rabbitTemplate.send("logs", "bootsis", message, correlationId); } // 每3秒执行一次 @Scheduled(cron = "*/3 * * * * ?") private void process() { send(); } /** * 实现ConfirmCallback * ACK=true仅仅标示消息已被Broker接收到,并不表示已成功投放至消息队列中 * ACK=false表示消息由于Broker处理错误,消息并未处理成功 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // logger.info("correlationData:" + correlationData.getId() + ";ack:" + ack + ";cause:" + cause); } /** * 实现ReturnCallback当消息发送出去找不到对应路由队列时,将会把消息退回, 如果有任何一个路由队列接收投递消息成功,则不会退回消息 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("message" + message.getBody().toString() + ";replyCode:" + replyCode + ";replyText:" + replyText + ";exchange:" + exchange + ";routingKey:" + routingKey); } }
myReceiver.java
package org.niugang.config; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; @Component public class MyReceiver { /** * 定义队列,exchange,路由键 */ @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "myQueue", durable = "true"), exchange = @Exchange(value = "logs", ignoreDeclarationExceptions = "true"), key = "bootsis")) public void processStr(Message message) { byte[] body = message.getBody(); System.out.println("注解接收者:" + new String(body)); } /** * 外部化配置队名,exchangeName,routeKey */ @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${test.queue.name}", durable = "true"), exchange = @Exchange(value = "${test.exchange.name}", ignoreDeclarationExceptions = "true"), key = "${test.route.key.name}")) public void processStr2(Message message) { byte[] body = message.getBody(); System.out.println("注解接收者2:" + new String(body)); } }
元注解(自定义注解)
有时您可能希望对多个侦听器使用相同的配置。 要减少样板配置,可以使用元注释创建自己的侦听器注释:
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT))) public @interface MyAnonFanoutListener { } public class MetaListener { @MyAnonFanoutListener public void handle1(String foo) { ... } @MyAnonFanoutListener public void handle2(String foo) { ... } }
在本例中,@MyAnonFanoutListener注解创建的每个侦听器都将一个匿名的自动删除队列绑定到fanout exchange metaFanout。元注释机制很简单,因为不检查用户定义注释上的属性——所以不能从元注释中重写设置。当需要更高级的配置时,使用普通的@Bean定义。
启用侦听器端点注解
要启用对@RabbitListener注释的支持,请将@EnableRabbit添加到您的@Configuration类之一。
@Configuration @EnableRabbit public class AppConfig { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); return factory; } }
从版本2.0开始,DirectMessageListenerContainerFactory也可用,它创建DirectMessageListenerContainer。
默认情况下,基础结构会查找名为rabbitListenerContainerFactory的bean作为工厂用于创建消息侦听器容器的源。 在这种情况下,忽略RabbitMQ基础结构设置,可以调用processOrder方法,核心轮询大小为3个线程,最大池大小为10个线程。
可以自定义侦听器容器工厂以使用每个注解,或者可以通过实现RabbitListenerConfigurer接口来配置显式默认值。 只有在没有特定容器工厂的情况下注册了至少一个端点时,才需要默认值。
如果您更喜欢XML配置,请使用<rabbit:annotation-driven>元素; 将检测到使用@RabbitListener注释的任何bean。
对于SimpleRabbitListenerContainer:
<rabbit:annotation-driven/> <bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory"> <property name="connectionFactory" ref="connectionFactory"/> <property name="concurrentConsumers" value="3"/> <property name="maxConcurrentConsumers" value="10"/> </bean>
对于DirectMessageListenerContainer:
<rabbit:annotation-driven/> <bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory"> <property name="connectionFactory" ref="connectionFactory"/> <property name="consumersPerQueue" value="3"/> </bean>
从2.0版本开始,@RabbitListener注释具有并发属性;它支持SpEL表达式(#{…})和属性占位符(${…})。它的含义和允许的值取决于容器类型。
对于DirectMessageListenerContainer,值必须是单个整数值,它在容器上设置consumersPerQueue属性。
对于SimpleRabbitListenerContainer,该值可以是单个整数值,它在容器上设置concurrentconsumer属性,也可以是m-n形式,其中m是concurrentconsumer属性,n是maxconcurrentconsumer属性。
在任何一种情况下,这个设置都会覆盖工厂上的设置。以前,如果侦听器需要不同的并发性,则必须定义不同的容器工厂。
注解方法的消息转换
在调用侦听器之前,管道中有两个转换步骤。 第一个使用MessageConverter将传入的Spring AMQP消息转换为spring-messaging消息。 调用目标方法时,如有必要,将消息有效内容转换为方法参数类型。
第一步的默认MessageConverter是Spring AMQP SimpleMessageConverter,它处理转换为String和java.io.Serializable对象; 所有其他的仍然是一个byte[]。 在下面的讨论中,我们称之为消息转换器。
第二步的默认转换器是GenericMessageConverter,它委托转换服务(DefaultFormattingConversionService的一个实例)。 在下面的讨论中,我们称之为方法参数转换器。
要更改消息转换器,只需将其作为属性添加到容器工厂bean:
@Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); ... factory.setMessageConverter(new Jackson2JsonMessageConverter()); ... return factory; }
这将配置一个Jackson2转换器,该转换器需要存在标题信息以指导转换。
您还可以考虑一个ContentTypeDelegatingMessageConverter,它可以处理不同内容类型的转换。
在大多数情况下,没有必要自定义方法参数转换器,除非您想要使用自定义ConversionService。
在1.6之前的版本中,转换JSON的类型信息必须在消息头中提供,或者需要自定义ClassMapper。 从1.6版开始,如果没有类型信息头,则可以从目标方法参数推断出类型。
此类型推断仅适用于方法级别的@RabbitListener。
有关更多信息,请参阅“Jackson2JsonMessageConverter”一节。
如果要自定义方法参数转换器,可以按如下方式进行:
@Configuration @EnableRabbit public class AppConfig implements RabbitListenerConfigurer { ... @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setMessageConverter(new GenericMessageConverter(myConversionService())); return factory; } @Bean public ConversionService myConversionService() { DefaultConversionService conv = new DefaultConversionService(); conv.addConverter(mySpecialConverter()); return conv; } @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } ... }
重要
对于多方法侦听器(请参阅“多方法侦听器”一节),方法选择基于消息转换后消息的有效负载; 方法参数转换器仅在选择方法后调用.
程序化端点注册
RabbitListenerEndpoint提供Rabbit端点的模型,负责为该模型配置容器。 除了RabbitListener注释检测到的端点之外,基础结构还允许您以编程方式配置端点。
@Configuration @EnableRabbit public class AppConfig implements RabbitListenerConfigurer { @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint(); endpoint.setQueueNames("anotherQueue"); endpoint.setMessageListener(message -> { // processing }); registrar.registerEndpoint(endpoint); } }
在上面的示例中,我们使用了SimpleRabbitListenerEndpoint,它提供了实际的MessageListener来调用,但您也可以构建自己的端点变量来描述自定义调用机制。
应该注意的是,你也可以完全跳过@RabbitListener的使用,只能通过RabbitListenerConfigurer以编程方式注册你的端点。
带注解的端点方法签名
到目前为止,我们已经在端点中注入了一个简单的字符串,但是它实际上可以有一个非常灵活的方法签名。让我们重写它,以注入优先级与自定义头:
@Component public class MyService { @RabbitListener(queues = "myQueue") public void processOrder(Order order, @Header("order_type") String orderType) { ... } }
这些是您可以在侦听器端点中注入的主要元素:
原始的org.springframework.amqp.core.Message。
收到消息的com.rabbitmq.client.Channel
org.springframework.messaging.Message表示传入的AMQP消息。 请注意,此消息包含自定义标头和标准标头(由AmqpHeaders定义)。
从版本1.6开始,入站deliveryMode标头现在在名称为AmqpHeaders.RECEIVED_DELIVERY_MODE而不是AmqpHeaders.DELIVERY_MODE的标头中可用。
@Header-注解方法参数,用于提取特定标头值,包括标准AMQP标头。
@ Headers-注解参数,必须也可以赋予java.util.Map以获取对所有头的访问权限。
不是受支持类型之一(即消息和信道)的非注释元素被认为是有效载荷。 您可以通过使用@Payload注释参数来使其明确。 您还可以通过添加额外的@Valid来启用验证。
注入Spring的Message抽象的能力对于从特定于传输的消息中存储的所有信息中受益特别有用,而不依赖于特定于传输的API。
@RabbitListener(queues = "myQueue")
public void processOrder(Message<Order> order) { ...
}
方法参数的处理由DefaultMessageHandlerMethodFactory提供,它可以进一步定制以支持其他方法参数。转换和验证支持也可以在那里进行定制。
例如,如果我们想在处理订单之前确保订单有效,我们可以使用@Valid注释有效负载并按如下方式配置必要的验证器:
@Configuration @EnableRabbit public class AppConfig implements RabbitListenerConfigurer { @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setValidator(myValidator()); return factory; } }
监听多个队列
@Component public class MyService { @RabbitListener(queues = { "queue1", "queue2" } ) public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) { ... } }
从1.5版开始,您可以使用属性占位符和SpEL外部化队列名称:
@Component public class MyService { @RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" ) public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) { ... } }
在1.5版之前,只能以这种方式指定单个队列; 每个队列都需要一个单独的属性
回复管理
MessageListenerAdapter中的现有支持已经允许您的方法具有非void返回类型。 在这种情况下,调用的结果将封装在原始消息的ReplyToAddress头中指定的地址中或在侦听器上配置的默认地址中发送的消息中。 现在可以使用消息传递抽象的@SendTo注释来设置该默认地址。
假设我们的processOrder方法现在应该返回OrderStatus,可以按如下方式编写它以自动发送回复:
@RabbitListener(destination = "myQueue") @SendTo("status") public OrderStatus processOrder(Order order) { // order processing return status; }
如果您需要以与传输无关的方式设置其他标头,则可以返回Message,例如:
@RabbitListener(destination = "myQueue") @SendTo("status") public Message<OrderStatus> processOrder(Order order) { // order processing return MessageBuilder .withPayload(status) .setHeader("code", 1234) .build(); }
@SendTo值被假定为模式exchange/routingKey之后的reply exchange和routingKey对,其中可以省略其中一个部分。 有效值为:
foo/bar - replyTo exchange和routingKey。
foo/ - replyTo交换和默认(空)routingKey。
bar或/ bar - replyTo routingKey和默认(空)交换。
/或为空 - replyTo默认交换和默认routingKey。
另外@SendTo可以在没有value属性的情况下使用。 这种情况等于空的sendTo模式。 @SendTo仅在入站消息没有replyToAddress属性时使用。
从版本1.5开始,@SendTo值可以是bean初始化SpEL表达式,例如......
@RabbitListener(queues = "test.sendTo.spel") @SendTo("#{spelReplyTo}") public String capitalizeWithSendToSpel(String foo) { return foo.toUpperCase(); } ... @Bean public String spelReplyTo() { return "test.sendTo.reply.spel"; }
表达式必须求值为一个String,它可以是一个简单的队列名称(发送到默认交换机),也可以是表格exchange / routingKey,如上所述。
在初始化期间,#{...}表达式被计算一次。
@RabbitListener(queues = "test.sendTo.spel") @SendTo("!{'some.reply.queue.with.' + result.queueName}") public Bar capitalizeWithSendToSpel(Foo foo) { return processTheFooAndReturnABar(foo); }
总之,#{...}在初始化期间被评估一次,#root对象是应用程序上下文; bean由其名称引用。 !{...}在运行时针对每个消息进行评估,其中根对象具有上述属性,bean以其名称引用,前缀为@。
SpEL表达式的运行时性质用!{...}分隔符表示。 表达式的评估上下文#root对象有三个属性:
request - o.s.amqp.core.Message请求对象。
source - 转换后的o.s.messaging.Message <?>。
result - 方法结果。
上下文有一个map属性访问器,一个标准类型转换器和一个bean解析器,允许引用上下文中的其他bean(例如@ someBeanName.determineReplyQ(request,result))。
***************示例************************
/** * 发送队列 */ @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "boot-sis-mq", durable = "true"), exchange = @Exchange(value = "logs", ignoreDeclarationExceptions = "true"), key = "bootsis")) @SendTo("reply-queue") //将返回值"success" 发送给reply-queue队列 public String processStr3(Message message) { byte[] body = message.getBody(); System.out.println("注解接收者1:" + new String(body)); return "success"; } /** * 接收回复的队列 */ @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "reply-queue", durable = "true"), exchange = @Exchange(value = ""),key="")) public void reply(String re) { System.out.println("回复1:" + re); }
*************多方法监听***************
从版本1.5.0开始,现在可以在类级别指定@RabbitListener注释。 与新的@RabbitHandler注释一起,这允许单个侦听器根据传入消息的有效负载类型调用不同的方法(根据发送消息的参数,决定执行那个方法)。 最好用一个例子来描述:
@RabbitListener(id="multi", queues = "someQueue") public class MultiListenerBean { @RabbitHandler @SendTo("my.reply.queue") public String bar(Bar bar) { ... } @RabbitHandler public String baz(Baz baz) { ... } @RabbitHandler public String qux(@Header("amqp_receivedRoutingKey") String rk, @Payload Qux qux) { ... } }
在这种情况下,如果转换的有效负载是Bar,Baz或Qux,则调用各个@RabbitHandler方法。 重要的是要理解系统必须能够基于有效载荷类型识别唯一的方法。 检查类型是否可分配给没有注释的单个参数,或者使用@Payload注释进行注释。 请注意,相同的方法签名适用于上述方法级@RabbitListener中所讨论的。
请注意,必须在每个方法上指定@SendTo(如果需要); 它在类级别不受支持。
@Repeatable @RabbitListener
从1.6版开始,@ RabbitListener注释标记为@Repeatable。 这意味着注释可以多次出现在同一个带注释的元素(方法或类)上。 在这种情况下,为每个注释创建一个单独的侦听器容器,每个注释都调用相同的侦听器@Bean。 可重复的注释可以与Java 8或更高版本一起使用; 使用Java 7或更早版本时,使用@RabbitListeners“容器”注释和@RabbitListener注释数组可以实现相同的效果。
代理@RabbitListener和泛型
如果您的服务要代理(例如,在@Transactional的情况下),则在接口具有通用参数时需要考虑一些因素。 使用通用接口和特定实现,例如:
interface TxService<P> { String handle(P payload, String header); } static class TxServiceImpl implements TxService<Foo> { @Override @RabbitListener(...) public String handle(Foo foo, String rk) { ... } }
您*切换到CGLIB目标类代理,因为接口句柄方法的实际实现是一种桥接方法。 在事务管理的情况下,使用注释选项配置CGLIB的使用 - @EnableTransactionManagement(proxyTargetClass = true)。 在这种情况下,必须在实现中的目标方法上声明所有注释:
static class TxServiceImpl implements TxService<Foo> { @Override @Transactional @RabbitListener(...) public String handle(@Payload Foo foo, @Header("amqp_receivedRoutingKey") String rk) { ... } }
处理异常
默认情况下,如果带注释的侦听器方法抛出异常,则会将其抛出到容器中,并且消息将被重新排队并重新传递,丢弃或路由到死信交换,具体取决于容器和代理配置。 没有任何内容返回给发件人。
从2.0版开始,@ RubyBistener注释有两个新属性:errorHandler和returnExceptions。
默认情况下不配置这些。
使用errorHandler提供RabbitListenerErrorHandler实现的bean名称。 这个功能界面有一个方法:
@FunctionalInterface public interface RabbitListenerErrorHandler { Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) throws Exception; }
如您所见,您可以访问从容器接收的原始消息,消息转换器生成的Spring消息传递Message <?>对象以及侦听器抛出的异常,包含在ListenerExecutionFailedException中。 错误处理程序可以返回一些将作为回复发送的结果,或者抛出将被抛出到容器的原始或新异常,或者返回给发送方,具体取决于returnExceptions设置。
returnExceptions属性,当“true”将导致异常返回给发送者。 该异常包含在RemoteInvocationResult对象中。 在发送方,有一个可用的RemoteInvocationAwareMessageConverterAdapter,如果配置到RabbitTemplate中,它将重新抛出服务器端异常,包含在AmqpRemoteException中。 将通过合并服务器和客户端堆栈跟踪来合成服务器异常的堆栈跟踪。
重要
这种机制通常只能使用默认的SimpleMessageConverter,后者使用Java序列化; 例外通常不是“Jackson-friendly”,因此无法序列化为JSON。 如果您使用的是JSON,请考虑使用errorHandler在抛出异常时返回其他一些Jackson友好的Error对象。
微信公众号:
JAVA程序猿成长之路
分享资源,记录程序猿成长点滴。专注于Java,Spring,SpringBoot,SpringCloud,分布式,微服务。