1. 简介:
RabbitMQ是一个在AMQP基础上可复用的企业消息系统。
AMQP(Advanced(预先) Message Queuing Protocol(协议)) 是一个提供统一消息服务的应用层标准协议,基于此协议的 客户端与消息中间件可传递消息,
并不受客户端中间件不同产品,不同开发语言的限制。
RabbitMQ 遵循AMQP协议,用erlang语言开发,用来通过协议在完全不同应用之间共享数据,
或者将作业排队以便让分布式服务器进行处理。
2. 组件说明
1. Broker: 消息队列服务器实体
2. Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
3. Queue: 消息队列的载体,每个消息都会被投入到一个,或多个队列。
4. Binding: 绑定,它的作用就是把exchange和队列按照一定的规则绑定起来。
5. Routing Key : 路由关键字,exchange 根据这个关键字进行消息投递,
6. channel: 消息通道,在客户端的每个链接里,可建立多个channel,每个channel
代表一个绘画任务。
7. vhost: 虚拟主机,一个broker中可开设多个vhost,用作不同用户的权限分离。
8. producer: 消息生产者。
9. consumer: 消息消费者。
10. 消息队列的使用过程:
(1)生产者链接到服务器,声明一个exchange,并设置交换机类型等属性。
(2)消费者 连接到消息队列服务器,打开一个channel
(3)消费者 声明一个queue,并设置相关属性。
(4)消费者 使用routing key, 绑定 exchange 和 queue。
(5)生产者 投递消息到exchange
(6)exchange 接收到消息后,根据消息的key和已经设置的binding,进行消息路由,将消息
投递到一个或多个队列。
总结:
使用rabbitmq 可以解耦应用程序,使用消息在应用程序中进行传递,
将一些无需及时返回且耗时的操作提取出来,进行异步处理,可节省服务器相应时间,
提高系统吞吐量。
2. 安装
安装rabbitmq前,先要安装erlang 并配置好环境变量(erlang_home, path 都要设)。
2.1. 监测erlang 是否安装好的方法是:
cmd到erlang安装目录的bin文件夹下:
erl -version
2.2 监测rabbitmq是否安装好的方法是:
cmd到安装目录的sbin目录下,运行
rabbitmqctl status
3. springboot 使用 rabbitmq 示例:
Rabbit MQ 服务器 会根据路由键将消息从交换路由到消息队列中,如何处理投递多个队列的情况?
这里不同类型的交换机起到了重要作用。分别是 fanout,direct,topic, 每种类型实现了不同的路由算法。
3.1 使用 Fanout Exchange 交换机
当使用Fanout Exchange 时,无需处理路由键,很像子广播,
每一台与该交换机绑定的队列都会获得一份消息。
Fanout 交换机处理消息是最快的。
示例:
1 所需依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2 加载交换机,队列,到容器,并绑定。
/**
* 初始化Fanout交换机和队列,并将其绑定
* */
@Configuration
public class FanoutConfig {
// 初始化创建交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(RabbitConstants.FANOUT_EXCHANGE);
}
// 初始化创建队列
@Bean
public Queue getQueue1(){
return new Queue(RabbitConstants.F_QUEUE1,true);
}
@Bean
public Queue getQueue2(){
return new Queue(RabbitConstants.F_QUEUE2,true);
}
@Bean
public Queue getQueue3(){
return new Queue(RabbitConstants.F_QUEUE3,true);
}
// 将交换机和 和队列绑定
@Bean
public Binding binding1(){
return BindingBuilder.bind(getQueue1()).to(fanoutExchange());
}
@Bean
public Binding binding2(){
return BindingBuilder.bind(getQueue2()).to(fanoutExchange());
}
@Bean
public Binding binding3(){
return BindingBuilder.bind(getQueue3()).to(fanoutExchange());
}
}
3. 生产者
package com.*.web;
import com.*.config.RabbitConstants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class FanoutPublisherController implements RabbitTemplate.ConfirmCallback {
private String exchange_name = "test_fanout_exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/fanout/test")
public String send(String msg){
System.out.println("发送消息:【"+ msg+"】");
// 设置回调函数
rabbitTemplate.setConfirmCallback(this);
// 发送消息,
rabbitTemplate.convertAndSend(RabbitConstants.FANOUT_EXCHANGE,"",msg);
return msg;
}
// 回调确认方法
@Override
public void confirm(CorrelationData correlationData, boolean ask, String cause) {
if(ask) {
System.out.println("消费成功!");
}else {
System.out.println("消费失败!:" + cause);
}
}
}
4 消费者
package com.*.web;
import com.*.config.RabbitConstants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutCustomer {
// 定义监听字符串队列
@RabbitListener(queues = {RabbitConstants.F_QUEUE1})
public void receiveMsg1(String msg){
System.out.println("F_QUEUE1 收到消息:"+msg);
}
// 定义监听字符串队列
@RabbitListener(queues = {RabbitConstants.F_QUEUE2})
public void receiveMsg2(String msg){
System.out.println("F_QUEUE2 收到消息:"+msg);
}
// 定义监听字符串队列
@RabbitListener(queues = {RabbitConstants.F_QUEUE3})
public void receiveMsg3(String msg){
System.out.println("F_QUEUE3 收到消息:"+msg);
}
}
源码地址: https://github.com/sss996/springboot-rabbitmq
3.2 使用Topic Exchange
topic exchange 将路由键 和 某个模式进行匹配。
此时队列需要绑定在一个模式上。
通过“#” 匹配一个或多个词,
通过“*”匹配一个词。因此 “fin.#” 能匹配到“fin.aa.bb",
但是fin.* 只能匹配到fin.aa
代码示例:
1. 初始化交换机,队列,并绑定
package com.*.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 初始化Fanout交换机和队列,并将其绑定
* */
@Configuration
public class TopicConfig {
// 初始化创建交换机
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(RabbitConstants.TOPIC_EXCHANGE);
}
// 初始化创建队列
@Bean
public Queue getQueue1(){
return new Queue(RabbitConstants.T_QUEUE1,true);
}
@Bean
public Queue getQueue2(){
return new Queue(RabbitConstants.T_QUEUE2,true);
}
// 将交换机和 和队列绑定
@Bean
public Binding binding1(){
return BindingBuilder.bind(getQueue1()).to(topicExchange()).with("insert.#");
}
@Bean
public Binding binding2(){
return BindingBuilder.bind(getQueue2()).to(topicExchange()).with("insert.user");
}
}
3.2.2 生产者
package com.*.web;
import com.*.config.RabbitConstants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TopicPublisherController implements RabbitTemplate.ConfirmCallback {
private String exchange_name = "test_fanout_exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/topic/test")
public String send(String msg,String routingKey){
System.out.println("发送消息:【"+ msg+"】");
// 设置回调函数
rabbitTemplate.setConfirmCallback(this);
// 发送消息,
rabbitTemplate.convertAndSend(RabbitConstants.TOPIC_EXCHANGE,"insert.user",msg);
return msg;
}
// 回调确认方法
@Override
public void confirm(CorrelationData correlationData, boolean ask, String cause) {
if(ask) {
System.out.println("消费成功!");
}else {
System.out.println("消费失败!:" + cause);
}
}
}
3.2.3 消费者
package com.*.web;
import com.*.config.RabbitConstants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicCustomer {
// 定义监听字符串队列T
@RabbitListener(queues = {RabbitConstants.T_QUEUE1})
public void receiveMsg1(String msg){
System.out.println("T_QUEUE1 收到消息:"+msg);
}
// 定义监听字符串队列
@RabbitListener(queues = {RabbitConstants.T_QUEUE2})
public void receiveMsg2(String msg){
System.out.println("T_QUEUE2 收到消息:"+msg);
}
}
源码地址: https://github.com/sss996/springboot-rabbitmq
3.3 使用 Direct Exchange
Direct Exchange .需要将一个队列绑定到交换机上,要求该消息与路由键完全匹配
用法和topic 基本相同不在重复。
源码地址: https://github.com/sss996/springboot-rabbitmq
4. 借鉴的博客 https://blog.csdn.net/u013045552/column/info/17329
5. 踩过的坑
在浏览器中输入localhost:15672 可以打开http客户端。
注意:server端口号是 5672 ,sprnigboot项目里注意不能绑定15672,应该是5672 否则会报如下错误。
java.net.SocketException: Socket Closed
at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_151]
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_151]
at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_151]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_151]
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[na:1.8.0_151]
at java.io.BufferedInputStream.read(BufferedInputStream.java:265) ~[na:1.8.0_151]
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) ~[na:1.8.0_151]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580) ~[amqp-client-5.1.2.jar:5.1.2]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
2019-07-30 20:06:02.943 ERROR 13320 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).
org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:62) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:484) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:626) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:240) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1797) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:345) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:995) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_151]
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) ~[na:1.8.0_151]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_151]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_151]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_151]
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[na:1.8.0_151]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_151]
at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_151]
at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:955) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:907) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:847) ~[amqp-client-5.1.2.jar:5.1.2]
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:457) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
... 9 common frames omitted