什么是RabbitMQ?
RabbitMQ 是一个消息代理。它的核心原理非常简单:接收和发送消息。你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ 就扮演着邮箱、邮局以及邮递员的角色。
RabbitMQ 和邮局的主要区别是,它不是用来处理纸张的,它是用来接收、存储和发送消息(message)这种二进制数据的。
本文主要演示是Springboot+RabbitMQ简单整合+实例说明
关于安装RabbitMQ,由于RabbitMQ是用Erlang语言写的,首先必须安装Erlang的环境。
RabbitMQ在Window下的安装可以参考该博文链接:https://blog.csdn.net/weixin_39735923/article/details/79288578
该博文十分清楚详细,我就不多说了。
下面进入示例:
一、maven依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.springframework</groupId> <artifactId>gs-messaging-rabbitmq</artifactId> <version>0.1.0</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.8.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
二、编写Receive
package hello; import java.util.concurrent.CountDownLatch; import org.springframework.stereotype.Component; @Component public class Receiver { private CountDownLatch latch = new CountDownLatch(1); public void receiveMessage(String message) { System.out.println("Received <" + message + ">"); latch.countDown(); } public CountDownLatch getLatch() { return latch; } }
Receiver
是一个简单的POJO,它定义了一种接收消息的方法。当你注册它以接收消息时,你可以将其命名为任何你想要的名称。
为方便起见,这个POJO也有一个CountDownLatch
。这允许它发信号通知接收到消息。这是你不太可能在生产应用程序中实现的。
注册监听器并发送消息
Spring AMQP RabbitTemplate
提供了使用RabbitMQ发送和接收消息所需的一切。具体来说,您需要配置:
-
消息侦听器容器
-
声明队列,交换以及它们之间的绑定
-
用于发送一些消息以测试侦听器的组件
三、编写Runner
package hello; import java.util.concurrent.TimeUnit; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; @Component public class Runner implements CommandLineRunner { private final RabbitTemplate rabbitTemplate; private final Receiver receiver; public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) { this.receiver = receiver; this.rabbitTemplate = rabbitTemplate; } @Override public void run(String... args) throws Exception { System.out.println("Sending message..."); rabbitTemplate.convertAndSend(Application.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!"); receiver.getLatch().await(10000, TimeUnit.MILLISECONDS); } }
四、编写配置文件
spring.application.name=spirng-boot-rabbitmq spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
五、编写启动类
package hello; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class Application { static final String topicExchangeName = "spring-boot-exchange"; static final String queueName = "spring-boot"; @Bean Queue queue() { return new Queue(queueName, false); } @Bean TopicExchange exchange() { return new TopicExchange(topicExchangeName); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#"); } @Bean SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); container.setMessageListener(listenerAdapter); return container; } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } public static void main(String[] args) throws InterruptedException { SpringApplication.run(Application.class, args).close(); } }
Spring Boot会自动创建连接工厂和RabbitTemplate,从而减少您必须编写的代码量。
listenerAdapter()
方法中定义的bean在定义的容器中注册为消息侦听器container()
。它将侦听“spring-boot”队列中的消息。因为Receiver
该类是POJO,所以需要将其包装在MessageListenerAdapter
指定要调用的位置receiveMessage
。
该main()
方法通过创建Spring应用程序上下文来启动该过程。这将启动消息侦听器容器,该容器将开始侦听消息。Runner
然后会自动执行一个bean:它RabbitTemplate
从应用程序上下文中检索并发送“Hello from RabbitMQ!” “spring-boot”队列中的消息。最后,它关闭Spring应用程序上下文,应用程序结束。
补充说明:JMS队列和AMQP队列具有不同的语义。例如,JMS仅向一个使用者发送排队的消息。虽然AMQP队列执行相同的操作,但AMQP生成器不会将消息直接发送到队列。相反,消息被发送到交换机,交换机可以转到单个队列,或扇出到多个队列,模仿JMS主题的概念。