RabbitMQ - 备份交换器

参考:<<RabbitMQ实战指南>>

备份交换器(Alternate Exchange,简称AE),实际上和普通交换器没有多大区别,同样可以用来处理未被路由的消息。上一篇文章使用mandatory参数来解决,但是生产者代码逻辑变得复杂,所以我们可以使用备份交换器将这些未被路由的消息存储起来,之后有需要的时候再去处理。

实现方法:在声明交换器的时候添加alternate-exchange参数。

1. 客户端api

exchangeDeclare有多个重载方法,这些重载方法是由下面这个方法中缺省的某些参数构成:

Exchange.DeclareOk exchangeDeclare(String exchange,
        BuiltinExchangeType type,
        boolean durable,
        boolean autoDelete,
        boolean internal,
        Map<String, Object> arguments) throws IOException;

方法返回Exchange.DeclareOK,用来标识成功声明了一个交换器。各个参数详细说明如下:

  • exchange:交换器名称
  • type:交换器类型。BuiltinExchangeType枚举类,有以下4中类型交换器:DIRECT(“direct”), FANOUT(“fanout”), TOPIC(“topic”), HEADERS(“headers”)
  • durable:设置是否持久化。true:持久化,false:非持久化。持久化可以将交换器存盘,在服务器重启时不会丢失相关消息。
  • autoDelete:设置是否自动删除。true:自动删除,false:不自动删除。自动删除的前提是至少有一个队列或交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或交换器都与此交换器解绑。
  • internal:设置是否内置的。true:内置交换器,false:非内置交换器。内置交换器,客户端无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
  • arguments:其他一些结构化参数。如备份交换器:alternate-exchange

2. 示例

2.1 原生api

声明交换器时,通过alternate-exchange指定备份交换器。备份交换器建议设置成fanout类型,也可以设置成direct或topic的类型。

不过需要注意:消息被重新路由到备份交换器时的路由键和从生产者发出的路由键是一样的。

// ...
// 声明交换器
String exchangeName = "direct.exchange.test.ae-normal";
String queueName = "direct.queue.test.ae-normal";
String routingKey = "direct.routing-key.test.ae-normal";

String aeExchangeName = "fanout.exchange.test.ae";
String aeQueueName = "fanout.queue.test.ae";

// 声明普通交换器,指定备份交换器,绑定队列
Map<String, Object> args = new HashMap<>();
args.put("alternate-exchange", aeExchangeName);
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclare(exchangeName, "direct", true, false, args);
String queue = channel.queueDeclare(queueName, true, false, false, null).getQueue();
channel.queueBind(queue, exchangeName, routingKey);

// 声明备份交换器、绑定队列
AMQP.Exchange.DeclareOk fanoutDeclareOK = channel.exchangeDeclare(aeExchangeName, "fanout", true, false, null);
String aeQueue = channel.queueDeclare(aeQueueName, true, false, false, null).getQueue();
channel.queueBind(aeQueue, aeExchangeName, "");

// 正常路由,进入队列
byte[] content = ("Test Msg " + System.currentTimeMillis()).getBytes("UTF-8");
channel.basicPublish(exchangeName, routingKey, false, null, content);

// 不可路由,进入备份交换器
byte[] content2 = ("Test Msg2 " + System.currentTimeMillis()).getBytes("UTF-8");
channel.basicPublish(exchangeName, routingKey + "2", false, null, content2);

2.2 springboot

(1)添加rabbitmq的starter

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(2)application.yml

spring:
  rabbitmq:
    host: dev.tss.com
    port: 5672
    username: admin
    password: njittss

rabbitmq:
  test:
    ae:
      normalExchange: direct.exchange.test.ae-normal
      normalQueue: direct.queue.test.ae-normal
      normalRoutingKey: direct.routing-key.test.ae-normal
      aeExchange: fanout.exchange.test.ae
      aeQueue: fanout.queue.test.ae

(3)声明交换器、队列,设置备份交换器
和原生api一样,在声明普通交换器时,通过alternate-exchange设置备份交换器。

@Configuration
public class RabbitAlternateExchangeConfig {

    @Value("${rabbitmq.test.ae.normalExchange}")
    private String normalExchangeName;
    @Value("${rabbitmq.test.ae.normalQueue}")
    private String normalQueueName;
    @Value("${rabbitmq.test.ae.normalRoutingKey}")
    private String normalRoutingKey;

    @Value("${rabbitmq.test.ae.aeExchange}")
    private String aeExchangeName;
    @Value("${rabbitmq.test.ae.aeQueue}")
    private String aeQueueName;

    @Bean
    public DirectExchange aeNormalExchange() {
        // 配置备份交换器,交换器类型为Fanout
        Map<String, Object> args = new HashMap<>(4);
        args.put("alternate-exchange", aeExchangeName);
        return new DirectExchange(normalExchangeName, true, false, args);
    }
    @Bean
    public Queue aeNormalQueue() {
        return new Queue(normalQueueName, true);
    }
    @Bean
    Binding bindingAeNormalQueue(Queue aeNormalQueue, DirectExchange aeNormalExchange) {
        return BindingBuilder.bind(aeNormalQueue).to(aeNormalExchange).with(normalRoutingKey);
    }


    @Bean
    public FanoutExchange aeExchange() {
        return new FanoutExchange(aeExchangeName);
    }
    @Bean
    public Queue aeQueue() {
        return new Queue(aeQueueName, true);
    }
    @Bean
    Binding bindingAeQueue(Queue aeQueue, FanoutExchange aeExchange) {
        return BindingBuilder.bind(aeQueue).to(aeExchange);
    }
}

(4)测试发送可以正常路由、不可路由消息

@Value("${rabbitmq.test.ae.normalExchange}")
private String normalExchangeName;
@Value("${rabbitmq.test.ae.normalRoutingKey}")
private String normalRoutingKey;

@Autowired
private RabbitTemplate rabbitTemplate;
// 测试发送可以正常路由信息
public boolean sendNormalMessage() {
    String message = "test normal message";
    this.rabbitTemplate.convertAndSend(normalExchangeName, normalRoutingKey, message);
    return true;
}
// 测试发送不可路由信息
public boolean sendAbnormalMessage() {
    String message = "test abnormal message";
    this.rabbitTemplate.convertAndSend(normalExchangeName, normalRoutingKey + "2", message);
    return true;
}

观察rabbitmq web控制台,可以正常路由的消息已经进入队列(direct.queue.test.ae-normal),不可路由的消息已经转发到备份交换器,路由到与之绑定的队列上(fanout.queue.test.ae
RabbitMQ - 备份交换器
RabbitMQ - 备份交换器

3.备注

(1)假如备份交换器的类型被设置为direct,与其绑定的队列的路由键是key1,当某条路由键为key2的消息被转发到这个备份交换器的时候,备份交换器没有匹配到合适的队列,则消息丢失;如果消息的路由键是key1则可以存储到队列中。

建议将备份交换器设置为fanout类型。

(2)对于备份交换器,有以下几种特殊情况:

  • 如果设置的备份交换器不存在,客户端和RabbitMQ服务器都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有绑定任何队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有任何匹配的队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器和mandatory参数一起使用,那么mandatory参数无效。
上一篇:AE三维合成控制插件Pixel Cloud mac版


下一篇:redis的线程模型是什么?