文章目录
RabbitMQ可实施方案(镜像队列模式)
假设已经搭建好镜像队列集群
配置镜像队列
在任意节点的sbin目录下输入以下命令(因为该命令会自动在集群中同步
)
rabbitmqctl set_policy ha-all-name "^my" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
ha-all-name
: 为策略名称;
^my
:为匹配符,只有一个代表匹配所有,my为匹配名称以my开头的queue或exchange;
ha-mode
:为同步模式,一共3种模式:
all-所有(所有的节点都同步消息),
exctly-指定节点的数目(需配置ha-params参数,此参数为int类型比如2,在集群中随机抽取2个节点同步消息)如:"ha-params":2
nodes-指定具体节点(需配置ha-params参数,此参数为数组类型比如:"ha-params":["rabbit@rabbitmq1","rabbit@rabbitmq2"],明确指定在这两个节点上同步消息)。
ha-sync-mode
:节点默认自动同步
web管理界面也能配置:
SpringBoot整合RabbitMQ集群
生产者:
application.xml
配置
spring:
rabbitmq:
username: harry
password: harry123
virtual-host: harry
connection-timeout: 15000
publisher-confirms: true
publisher-returns: false
template:
mandatory: true
addresses: 00.00.00.00:1111,00.00.00.00:2222,00.00.00.00:3333
virtual-host
:主机名
publisher-confirms
:一旦消息被投递到所有匹配的队列之后,RabbiMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达了。
publisher-returns
:启动消息失败返回,比如路由不到队列时触发回调。
mandatory
: 当mandatory标志位设置为true时,无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃
package com.xyl.springboot.producer;
import java.util.Map;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class RabbitSender {
//自动注入RabbitTemplate模板类
@Autowired
private RabbitTemplate rabbitTemplate;
//回调函数: confirm确认
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
System.err.println("ack: " + ack);
if(!ack){
System.err.println("异常处理....");
}
}
};
//回调函数: return返回
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
//发送消息方法调用: 构建Message消息
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData("1234567890");
rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
}
}
pom.xml
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
消费者:
application.xml
配置
spring:
rabbitmq:
username: harry
password: harry123
virtual-host: harry
connection-timeout: 15000
addresses: 00.00.00.00:1111,00.00.00.00:2222,00.00.00.00:3333
listener:
direct:
acknowledge-mode: manual
simple:
concurrency: 5
max-concurrency: 10
acknowledge-mode
:手动开启通知 Rabbit 消息消费成功
simple
:指定最小最大消费者数量
消费者测试代码:
package com.xyl.springboot.conusmer;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class RabbitReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-1",
durable="true"),
exchange = @Exchange(value = "exchange-1",
durable="true",
type= "topic",
ignoreDeclarationExceptions = "true"),
key = "springboot.*"
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端Payload: " + message.getPayload());
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
RabbitTemplate <<<<<<------------------------------