RabbitMQ集群参数配置

文章目录

RabbitMQ可实施方案(镜像队列模式)

假设已经搭建好镜像队列集群

配置镜像队列

在任意节点的sbin目录下输入以下命令(因为该命令会自动在集群中同步)

rabbitmqctl set_policy ha-all-name "^my" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

ha-all-name: 为策略名称;

^my:为匹配符,只有一个代表匹配所有,my为匹配名称以my开头的queue或exchange;

ha-mode:为同步模式,一共3种模式:

all-所有(所有的节点都同步消息),
exctly-指定节点的数目(需配置ha-params参数,此参数为int类型比如2,在集群中随机抽取2个节点同步消息)如:"ha-params":2
nodes-指定具体节点(需配置ha-params参数,此参数为数组类型比如:"ha-params":["rabbit@rabbitmq1","rabbit@rabbitmq2"],明确指定在这两个节点上同步消息)。

ha-sync-mode:节点默认自动同步

web管理界面也能配置:
RabbitMQ集群参数配置

SpringBoot整合RabbitMQ集群

生产者:

application.xml配置

spring:
  rabbitmq:
    username: harry
    password: harry123
    virtual-host: harry
    connection-timeout: 15000
    publisher-confirms: true
    publisher-returns: false
    template:
      mandatory: true
    addresses: 00.00.00.00:1111,00.00.00.00:2222,00.00.00.00:3333

virtual-host:主机名

publisher-confirms:一旦消息被投递到所有匹配的队列之后,RabbiMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达了。

publisher-returns:启动消息失败返回,比如路由不到队列时触发回调。

mandatory: 当mandatory标志位设置为true时,无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃

package com.xyl.springboot.producer;

import java.util.Map;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class RabbitSender {

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;  
    
    //回调函数: confirm确认
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if(!ack){
                System.err.println("异常处理....");
            }
        }
    };
    
    //回调函数: return返回
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: " 
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };
    
    //发送消息方法调用: 构建Message消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageHeaders mhs = new MessageHeaders(properties);
        Message msg = MessageBuilder.createMessage(message, mhs);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一 
        CorrelationData correlationData = new CorrelationData("1234567890");
        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
    }
      
}

pom.xml:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

消费者:

application.xml配置

spring:
  rabbitmq:
    username: harry
    password: harry123
    virtual-host: harry
    connection-timeout: 15000
    addresses: 00.00.00.00:1111,00.00.00.00:2222,00.00.00.00:3333
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        concurrency: 5
        max-concurrency: 10

acknowledge-mode:手动开启通知 Rabbit 消息消费成功

simple:指定最小最大消费者数量

消费者测试代码:

package com.xyl.springboot.conusmer;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1", 
            durable="true"),
            exchange = @Exchange(value = "exchange-1", 
            durable="true", 
            type= "topic", 
            ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
            )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
}

RabbitTemplate <<<<<<------------------------------

上一篇:rabbitmqweb管理端口http://localhost:15672/ 无法访问可能出现的问题


下一篇:部署Rabbitmq集群