一文搞定rabbitmq
1. 入门
1.1 什么是rabbitmq
RabbitMQ 是部署最广泛的开源消息代理。(官网是这么说的,我不清楚)
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的。所有主要的编程语言均有与代理接口通讯的客户端库。
1.2 安装rabbitmq
- 环境规划
centos7.4
erlang-18.3-1
rabbitmq-server-3.6.5
- 安装包
erlang-18.3-1.el7.centos.x86_64.rpm
socat-1.7.3.2-5.el7.lux.x86_64.rpm
rabbitmq-server-3.6.5-1.noarch.rpm
- 安装过程
1 安装erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
2 安装rabbitmq
# 安装
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
# 安装
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
- 启动&停止
# 启动服务
service rabbitmq-server start
# 停止服务
service rabbitmq-server stop
# 重启服务
service rabbitmq-server restart
1.3 安装rabbitmq图形化管理界面
# 开启管理界面
rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
1.4 rabbitmq管理平台简单使用
- 访问
http://ip:15672
- 登录
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
用户名 guest
密码 guest
- 管理平台简单介绍
2. rabbitmq高级特性
2.1 架构模型
-
Broker : 标识消息队列服务器实体rabbitmq-server
-
v-host : Virtual Host 虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。
-
Exchange: 交换器用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
-
Queue : 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
-
Banding : 绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
-
==Channel ==: 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
-
Connection : 网络连接,比如一个TCP连接。
2.2 工作模式
RabbitMQ 提供了 6 种工作模式:
- 简单模式
- work queues
- Publish/Subscribe 发布与订阅模式
- Routing 路由模式
- Topics 主题模式
- RPC 远程调用模式(暂不作介绍)
官网对应模式介绍
2.3 消息确认机制(消息零丢失)
简单看一个图
- 生产者消息确认
- confirm
是指生产者投递消息后,如果Broker收到消息,则会给我们生产这一个应答。
ack 已收
nack 因为一些原因拒收
- return
exchange无法投递到queue
- 消费者消息确认
- commit 机制
手动提交确认收到消息
2.4 TTL&死信队列
TTL队列 : 当消息达到存活时间还没有被消费,会被自动清除
死信队列 : DLX 当消息成为dead message 会被发送到死信队列
消息成为dead message 的条件
- 消息长度到达限制
- 消费者拒收消息,并且不把消息放入原队列中
- TTL 过期未消费
2.5 队列最大长度限制
使用x-arguments定义队列的最大长度
Map<String, Object> args = new HashMap<>();
args.put("x-max-length", 10);
channel.queueDeclare("myqueue", false, false, false, args);
2.6 消息持久化
通过参数配置开启持久化,消费端commit之后会删除,否则不会删除,直到达到配置的队列最大长度限制,或者TTL队列的过期时间,会进入到死信队列,死信队列满了会直接丢失。
2.7 消费端限流
RabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
-
prefetchSize:0,单条消息大小限制,0代表不限制
-
prefetchCount:一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。
-
global:true、false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别。当我们设置为 false 的时候生效,设置为 true 的时候没有了限流功能,因为 channel 级别尚未实现。
注意:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不研究。特别注意一点,prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。
- 实现
-
既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);
-
设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
-
在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);
3. Java使用rabbitmq–springboot版本
3.1 简单模式
- Producer
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
</dependencies>
spring:
rabbitmq:
host: 192.168.213.40
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual
@Configuration
public class RabbitmqConfig {
private static final String SIMPLE_QUEUE = "haha";
private static final String SIMPLE_QUEUE_POJO = "haha_pojo";
@Bean("haha")
public Queue haha() {
return new Queue(SIMPLE_QUEUE, true);
}
@Bean("haha_pojo")
public Queue hahaPojo() {
return new Queue(SIMPLE_QUEUE_POJO, true);
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
@SpringBootTest
@RunWith(SpringRunner.class)
public class SimpleQueueTest {
private static final String SIMPLE_QUEUE = "haha";
private static final String SIMPLE_QUEUE_POJO = "haha_pojo";
private static final int loop_index = 10;
//注入 RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendString() {
for (int i = 0; i < loop_index; i++) {
rabbitTemplate.convertAndSend(SIMPLE_QUEUE, "SimpleQueueTest boot mq..." + i);
}
}
@Test
public void sendObject() {
User user = null;
for (int i = 0; i < loop_index; i++) {
user = new User();
user.setName("张三" + i);
user.setSex("男");
user.setAge(i);
rabbitTemplate.convertAndSend(SIMPLE_QUEUE_POJO, user);
}
}
}
- consumer
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
</dependencies>
spring:
rabbitmq:
host: 192.168.213.40
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual
@Component
public class SimpleConsumer {
private static final String SIMPLE_QUEUE = "haha";
private static final String SIMPLE_QUEUE_POJO = "haha_pojo";
enum Action {
// 处理成功
ACCEPT,
// 可以重试的错误
RETRY,
// 无需重试的错误
REJECT;
}
/**
* 定义方法进行信息的监听
* RabbitListener中的参数用于表示监听的是哪一个队列
*
* @param body
* @param headers
*/
@RabbitListener(queues = SIMPLE_QUEUE)
public void ListenerQueueString(@Payload String body, @Headers Map<String, Object> headers, Message message, Channel channel) {
Action action = Action.ACCEPT;
long tag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("string message header: " + headers);
System.out.println("string message: " + body);
} catch (Exception e) {
//根据异常种类决定是ACCEPT、RETRY还是 REJECT
action = Action.RETRY;
e.printStackTrace();
} finally {
try {
if (action == Action.ACCEPT) {
// 确认收到消息,消息将被队列移除;false只确认当前consumer一个消息收到,true确认所有consumer获得的消息。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else if (action == Action.RETRY) {
//确认否定消息,第一个boolean表示一个consumer还是所有,第二个boolean表示requeue是否重新回到队列,true重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} else {
//拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列。
channel.basicNack(tag, false, false);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
@RabbitListener(queues = SIMPLE_QUEUE_POJO)
public void ListenerQueuePojo(@Payload User body, @Headers Map<String, Object> headers, Message message, Channel channel) {
Action action = Action.ACCEPT;
long tag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("pojo message header: " + headers);
System.out.println("pojo message: " + body);
} catch (Exception e) {
//根据异常种类决定是ACCEPT、RETRY还是 REJECT
action = Action.RETRY;
e.printStackTrace();
} finally {
try {
if (action == Action.ACCEPT) {
// 确认收到消息,消息将被队列移除;false只确认当前consumer一个消息收到,true确认所有consumer获得的消息。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else if (action == Action.RETRY) {
//确认否定消息,第一个boolean表示一个consumer还是所有,第二个boolean表示requeue是否重新回到队列,true重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} else {
//拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列。
channel.basicNack(tag, false, false);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
3.2 work queues
简单模式基础上,新增一个消费者,可以并发消费。
application-8082.yml
spring:
application:
name: haha-rabbitmq-consumer-8082
server:
port: 8082
======================================================
application-8083.yml
spring:
application:
name: haha-rabbitmq-consumer-8083
server:
port: 8083
3.3 Publish/Subscribe 广播
- 广播模式Fanout
/**
* 广播模式
*/
private static final String FANOUT_EXCHANGE = "FANOUT_EXCHANGE";
/**
* fanout测试队列1
*/
private static final String FANOUT_QUEUE1 = "FANOUT_QUEUE1";
/**
* fanout测试队列2
*/
private static final String FANOUT_QUEUE2 = "FANOUT_QUEUE2";
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
@Autowired
private RabbitAdmin rabbitAdmin;
/**
* 广播模式交换机
*
* @return
*/
@Bean
FanoutExchange contractFanoutExchange() {
FanoutExchange contractFanoutExchange = new FanoutExchange(FANOUT_EXCHANGE);
rabbitAdmin.declareExchange(contractFanoutExchange);
log.info("fanout交换机默认实例创建成功");
return contractFanoutExchange;
}
@Bean
Queue fanoutQueue1() {
Queue queue = new Queue(FANOUT_QUEUE1);
rabbitAdmin.declareQueue(queue);
log.debug("fanout测试队列-1实例化成功");
return queue;
}
@Bean
Queue fanoutQueue2() {
Queue queue = new Queue(FANOUT_QUEUE2);
rabbitAdmin.declareQueue(queue);
log.debug("fanout测试队列-2实例化成功");
return queue;
}
/**
*广播模式绑定queu
*
*/
@Bean
Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange exchange) {
Binding binding = BindingBuilder.bind(fanoutQueue1).to(exchange);
rabbitAdmin.declareBinding(binding);
log.debug("fanout队列-1/交换机绑定成功");
return binding;
}
@Bean
Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange exchange) {
Binding binding = BindingBuilder.bind(fanoutQueue2).to(exchange);
rabbitAdmin.declareBinding(binding);
log.debug("fanout队列-2/交换机绑定成功");
return binding;
}
3.4 Routing 路由模式 全匹配路由
/**
* 定向模式
*/
private static final String DIRECT_EXCHANGE = "DIRECT_EXCHANGE";
/**
* direct测试队列1
*/
private static final String DIRECT_QUEUE1 = "DIRECT_QUEUE1";
/**
* direct测试队列2
*/
private static final String DIRECT_QUEUE2 = "DIRECT_QUEUE2";
/**
* direct测试routingkey1
*/
private static final String DIRECT_KEY1 = "DIRECT_KEY1";
/**
* 定向型交换机
*/
@Bean
DirectExchange contractDirectExchange() {
DirectExchange contractDirectExchange = new DirectExchange(DIRECT_EXCHANGE);
rabbitAdmin.declareExchange(contractDirectExchange);
log.info("direct交换机默认实例创建成功");
return contractDirectExchange;
}
@Bean
Queue directQueue1() {
Queue queue = new Queue(DIRECT_QUEUE1);
rabbitAdmin.declareQueue(queue);
log.debug("direct测试队列-1实例化成功");
return queue;
}
@Bean
Queue directQueue2() {
Queue queue = new Queue(DIRECT_QUEUE2);
rabbitAdmin.declareQueue(queue);
log.debug("direct测试队列-2实例化成功");
return queue;
}
@Bean
Binding bindingDirectQueue1(Queue directQueue1, DirectExchange exchange) {
//绑定结构:队列-交换机-路由key
Binding binding = BindingBuilder.bind(directQueue1).to(exchange).with(DIRECT_KEY1);
rabbitAdmin.declareBinding(binding);
log.debug("direct队列1/交换机绑定成功");
return binding;
}
@Bean
Binding bindingDirectQueue2(Queue directQueue2, DirectExchange exchange) {
//绑定结构:队列-交换机-路由key
Binding binding = BindingBuilder.bind(directQueue2).to(exchange).with(DIRECT_KEY1);
rabbitAdmin.declareBinding(binding);
log.debug("direct队列2/交换机绑定成功");
return binding;
}
3.5 Topics 主题模式 规则匹配路由
/**
* 主题模式
*/
private static final String TOPIC_EXCHANGE = "TOPIC_EXCHANGE";
/**
* topic测试队列1
*/
private static final String TOPIC_QUEUE1 = "TOPIC_QUEUE1";
/**
* topic测试队列2
*/
private static final String TOPIC_QUEUE2 = "TOPIC_QUEUE2";
/**
* topic测试routingkey1
*/
private static final String TOPIC_KEY1 = "男.#";
/**
* topic测试routingkey2
*/
private static final String TOPIC_KEY2 = "女.#";
/**
* 主题模式交换机
*
* @return
*/
@Bean
TopicExchange contractTopicExchange() {
TopicExchange contractTopicExchange = new TopicExchange(TOPIC_EXCHANGE);
rabbitAdmin.declareExchange(contractTopicExchange);
log.info("topic交换机默认实例创建成功");
return contractTopicExchange;
}
@Bean
Queue topicQueue1() {
Queue queue = new Queue(TOPIC_QUEUE1);
rabbitAdmin.declareQueue(queue);
log.debug("topic测试队列-1实例化成功");
return queue;
}
@Bean
Queue topicQueue2() {
Queue queue = new Queue(TOPIC_QUEUE2);
rabbitAdmin.declareQueue(queue);
log.debug("topic测试队列-2实例化成功");
return queue;
}
@Bean
Binding bindingTopicQueue1(Queue topicQueue1, TopicExchange exchange) {
Binding binding = BindingBuilder.bind(topicQueue1).to(exchange).with(TOPIC_KEY1);
rabbitAdmin.declareBinding(binding);
log.debug("topic队列-1/交换机绑定成功");
return binding;
}
@Bean
Binding bindingTopicQueue2(Queue topicQueue2, TopicExchange exchange) {
Binding binding = BindingBuilder.bind(topicQueue2).to(exchange).with(TOPIC_KEY2);
rabbitAdmin.declareBinding(binding);
log.debug("topic队列-2/交换机绑定成功");
return binding;
}
5. 小结
rabbitmq 很牛,coding everyday ~!