RabbitMQ-AMQP模型详解二

RabbitMQ-AMQP模型详解_踩踩踩从踩的博客-CSDN博客

前言

上篇文章介绍了AMQP得流程,以及介绍Vhost Host、连接  、通道 、RoutingKey、exchange、绑定、message等组件;这篇文章会继续介绍AMQP中重要的概念,生产路由不可达,以及可靠的发布 事务机制,发布确认机制,消费者独占等机制

publisher

路由不可达

当消息发送给交换器或队列,在发送中,出现没有队列。

  • 交换没有绑定队列
  • 交换没法根据消息的路由key把消息路由到队列。

可以处理的情况 但是别抛异常 只是为找到交换器之类的

  • 退回
  • 死信队列(备用交换)

退回

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props,
byte[] body)
mandatory : true 强制退回, false 不需退回,直接丢弃。 在发送数据时设置  RabbitMQ-AMQP模型详解二 设置返回消息的回调处理
channel.addReturnListener(returnMessage -> {
				try {
					System.out.println("收到退回消息:" + new String(returnMessage.getBody(), "UTF-8"));
				} catch (UnsupportedEncodingException e) {
					e.printStackTrace();
				}
			});

在spring中使用

  • 设置消息不可以路由退回,设置消息退回回调 【注意】一个 RabbitTemplate 只能设置一个 ReturnCallback
@Bean
	public RabbitTemplate busiARabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate template = new RabbitTemplate(connectionFactory);
		template.setMandatory(true); // 设置消息不可以路由退回
		// 设置消息退回回调 【注意】一个 RabbitTemplate 只能设置一个 ReturnCallback
		template.setReturnCallback(myReturnCallback());
		return template;
	}
  • replyCode broker的回应码 replyText 回应描述
private ReturnCallback myReturnCallback() {
		return new ReturnCallback() {
			@Override
			// replyCode broker的回应码 replyText 回应描述
			public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
					String routingKey) {

				// 在这里写退回处理逻辑
				System.out.println("收到回退消息 replyCode=" + replyCode + " replyText=" + replyText + " exchange=" + exchange
						+ " routingKey=" + routingKey);

				System.out.println(" 消息:" + message);
			}
		};
	}

在spring中,要重写 ReturnCallback,如果在spring中 配置文件中添加属性配置,这个是没有用的。在返回的数据可以知道

备用交换

  • policy  设置好策略,
rabbitmqctl set_policy mike "^my-direct$" '{"alternate-exchange":"my-ae"}'
#对那些交换器进行匹配  指定备用交换器
  • 代码中声明交换时通过参数指定备用交换
//声明参数 
Map<String, Object> args = new HashMap<String, Object>(); 
args.put("alternate-exchange", "my-ae"); 
//备用交换参数指定 
channel.exchangeDeclare("my-direct", "direct", false, false, args);
channel.exchangeDeclare("my-ae", "fanout"); 
channel.queueDeclare("routed"); 
channel.queueBind("routed", "my-direct", "key1"); 
channel.queueDeclare("unrouted");
channel.queueBind("unrouted", "my-ae", "");

加上备用参数进行指定上 myae上通道上去。

事务机制

怎么确认可靠发布,这就是事务机制要做的事情,保证网络传输的可靠发布。保证一个收,要么都收,无论是数据库,还是mq都是一样的,都是通过保证的。

RabbitMQ-AMQP模型详解二

当方法里面发布消息,并且需要做其他事情时,所以开启事务

spring 事务管理需要的组件 

事务管理器 TransactionManager  
@Configuration public class TxConfiguration { 
        @Bean 
        // 配置事务管理器 
   public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) { 
        return new RabbitTransactionManager(connectionFactory); 
    }
 }
在 spring 该怎么玩事务就怎么玩 .   RabbitTransactionManager 只能做 Rabbitmq 的消息事务管理 只能是单连接的连接工厂 如果方法中,即要做数据库又要做rabbitmq,它没办法实现的。 没有分布式事务管理器实现。 rabbitmq 中事务机制来保证消息的可靠发布,性能是比较差  这是相对的发布确认机制。 调用 开启
@Transactional
	public void send(int i) {
		// 一定要设置ChannelTransacted(true) 表示开启通道事务
		this.template.setChannelTransacted(true);
		String message = "Hello World!-" + i;
		this.template.convertAndSend(queue.getName(), message);
		System.out.println(" [x] Sent '" + message + "'");
		if (i % 2 == 0)
			throw new RuntimeException();
	}

发布确认机制

性能是事务机制的 250 倍。 发布者发布消息,一般是走异步。 channel 有三种确认模式
  • 异步流式确认 事件驱动  优点 :开销低,吞吐量大  
  • 批量发布确认 批次等待,确认不ok 一批重发  
  • 单条确认 发一条就等待确认 
broker 给出确认会有三种结果
  • ack 接收成功
  • nack 接收失败
  • 发布者收不到Broker的确认(超时)

这都是确认会出现的情况。

异步流式确认

  • 开启发布确认模式 就不能再做事务管理了
  • 待确认消息的Map
  • 指定流式确认事件回调处理
  • 从Map中移除对应的消息
  • 重发,或做其他处理
// 1 开启发布确认模式 就不能再做事务管理了
			channel.confirmSelect();
			// 2 待确认消息的Map
			Map<Long, String> messagesMap = new ConcurrentHashMap<>();

			// 3 指定流式确认事件回调处理
			channel.addConfirmListener((deliveryTag, multiple) -> { // multiple表示是否是多条的确认
				System.out.println("收到OK ack:deliveryTag=" + deliveryTag + " multiple=" + multiple + ",从Map中移除消息");
				// 从Map中移除对应的消息
				messagesMap.remove(deliveryTag);
			}, (deliveryTag, multiple) -> {
				System.out.println("收到 NON OK ack:deliveryTag=" + deliveryTag + " multiple=" + multiple + " 从Map中移除消息,重发或做其他处理");
				// 从Map中移除对应的消息
				String message = messagesMap.remove(deliveryTag);
				// 重发,或做其他处理
				System.out.println("失败消息:" + message);
			});

for (int i = 1; i < 100; i++) {
				// 消息内容
				String message = "消息" + i;
				// 4 将消息放入到map中
				messagesMap.put(channel.getNextPublishSeqNo(), message);
				// 5、发送消息
				channel.basicPublish("mandatory-ex", "", true, null, message.getBytes());
				System.out.println("发布消息:" + message);

				Thread.sleep(2000L);
			}

在spring中添加 publisher-confirms 开启消息确认 

这里做流式确认 设置回调 发送

// 配置RabbitTemplate Bean
	@Bean
	public RabbitTemplate busiARabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate template = new RabbitTemplate(connectionFactory);
		// 设置发布确认回调,一个RabbitTemplate只可设置一个回调。
		template.setConfirmCallback(confirmCallback());

		return template;
	}

Consumer

消费者的两种消息模式、消费者 注册 取消 、独占消费者 消费者 优先级  消息确认  pull 拉模式消费。这几种模式。

两种消费模式

  • push 推模式
  • pull 拉模式

在这两种模式下面的问题出现

push 模式

broker client 消费者 client 向 broker 注册对某个队列的消费者
// 对感兴趣的队列注册消费者,返回Server生成的consumerTag(消费者标识) String consumerTag = channel.basicConsume(queueName, true, callback, consumerTag -> {});
取消消费者注册
channel.basicCancel(consumerTag);

独占消费者

独占队列:被创建它的连接独占 这个连接上的 channel 可以共享。连接关闭,独占队列没有了。 独占消费者:消费者独占一个队列进行消息消费,适用场景: 消息一定要严格按序消费处理。 一旦独占消费者挂了的话,消息队列里面的数据就会一直存在着,因此需要备用的 在spring中 如何添加 只需要添加 exclusive设置为true就可以了

 

RabbitMQ-AMQP模型详解二

采用不断的重试去抢独占,也是防止被挂了。

上一篇:Linux系统man查询命令等级及意义


下一篇:RabbitMQ学习04--基本工作模式(SpringBoot方式)