rabbitMq举例

新来个技术总监,把 RabbitMQ 讲的那叫一个透彻,佩服!

生产者

代码举例

public String sendMsg(final String exchangeName,final String routingKey,final String msg) {}

/**
	 * 发送消息
	 * @param exchangeName	exchangeName
	 * @param routingKey	routingKey
	 * @param msg	mq message
	 * @return msgId
	 */
	public String sendMsg(final String exchangeName,final String routingKey,final String msg) {
		final CorrelationData correlationDataInfo = new CorrelationData();
		final String msgId = String.join(":", "yl:dps", exchangeName, routingKey,
				String.valueOf(IdUtil.getSnowflake(1, 3).nextId()));
		correlationDataInfo.setId(msgId);

		//send over callback log
		rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
			if(correlationData!=null) {
				final String id = correlationData.getId();
				if (ack) {
					log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData));
				} else {
					log.error("消息投递失败,消息Id[{}] [{}]", id, cause);
				}
			}else {
				log.error("消息投递失败,correlationData为null!");
			}
		});
		rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);
		

分析代码

这段代码的功能是向 RabbitMQ 发送消息,并且在发送消息后通过回调函数来确认消息的投递状态。下面是对代码的详细分析:

1. 方法签名

java

public String sendMsg(final String exchangeName, final String routingKey, final String msg)

  • 该方法接收三个参数:
    • exchangeName:消息发送的交换机名称。
    • routingKey:消息的路由键,用来确定消息将被路由到哪个队列。
    • msg:待发送的消息内容。

2. CorrelationData 创建与消息 ID 设置

java

final CorrelationData correlationDataInfo = new CorrelationData(); final String msgId = String.join(":", "yl:dps", exchangeName, routingKey, String.valueOf(IdUtil.getSnowflake(1, 3).nextId())); correlationDataInfo.setId(msgId);

  • CorrelationData:RabbitMQ 提供的一个类,用于设置和传递消息的相关信息,主要用来在消息确认回调中标识消息。
  • 生成 msgIdmsgId 是通过拼接一个前缀 "yl:dps"、交换机名 exchangeName、路由键 routingKey,以及通过 IdUtil.getSnowflake(1, 3).nextId() 生成的雪花算法(Snowflake)ID来创建的唯一标识符。这个 ID 用来唯一标识这条消息。
  • setId(msgId):将生成的 msgId 设置到 correlationDataInfo 对象中。

3. 设置回调函数

java

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if(correlationData != null) { final String id = correlationData.getId(); if (ack) { log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData)); } else { log.error("消息投递失败,消息Id[{}] [{}]", id, cause); } } else { log.error("消息投递失败,correlationData为null!"); } });

  • setConfirmCallback:通过 rabbitTemplate.setConfirmCallback() 方法设置消息确认回调函数。在消息发送之后,RabbitMQ 会调用该回调函数来通知消息是否成功投递到交换机。
  • 参数说明
    • correlationData:携带消息相关信息的对象(这里是 correlationDataInfo),包含了消息的 ID 等。
    • ack:布尔值,表示消息是否成功投递(true 表示成功,false 表示失败)。
    • cause:如果 ack 为 falsecause 会包含失败的原因。
  • 回调内容
    • 如果 correlationData 不为空:
      • 如果 ack 为 true,则表示消息投递成功,记录日志。
      • 如果 ack 为 false,则表示消息投递失败,记录失败日志,并包含失败原因。
    • 如果 correlationData 为 null,记录错误日志。

4. 发送消息

 

java

rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);

  • convertAndSend:该方法用于将消息发送到指定的交换机(exchangeName)、路由键(routingKey)和消息内容(msg)。同时,将 correlationDataInfo(包含 msgId)附带发送,以便在回调中进行确认和记录。
  • rabbitTemplate 是 RabbitTemplate 对象,用来发送和接收消息。它是 Spring AMQP 的核心组件,封装了与 RabbitMQ 的交互。

5. 返回消息 ID

return msgId;

  • 返回生成的消息 ID。这个 ID 可以在外部被用于追踪消息投递的状态。

总结:

这段代码的主要作用是:

  • 生成一个唯一的消息 ID,包含了交换机名、路由键和通过 Snowflake 算法生成的 ID。
  • 设置消息确认回调,当消息发送后,RabbitMQ 会通过回调通知消息是否成功投递。
  • 调用 rabbitTemplate.convertAndSend() 方法向指定的交换机发送消息,同时附带 CorrelationData(包含消息 ID)。
  • 返回生成的消息 ID,以便后续追踪或处理。

关键点:

  • 消息确认机制:使用 ConfirmCallback 确保消息是否成功投递,并且通过日志记录投递状态。
  • 消息 ID:通过 Snowflake 算法生成一个唯一的消息 ID,可以用于追踪和日志记录。
  • 回调机制:确保生产者能在消息投递过程中获得详细的确认信息。

消费者

代码举例


    @RabbitHandler
    @RabbitListener(
            bindings = @QueueBinding(value = @Queue(value = "${rabbitmq.yl.x.queue}", durable = "true"),
                    exchange = @Exchange(value = "${rabbitmq.yl.x.exchange}"),
                    key = "${rabbitmq.yl.x.routing.key}"))
    public void stateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        log.info("MQ-收到【x】状态变更消息:{} ", payload);

        final String objDeliveryTag = String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG));

        if (NumberUtil.isNumber(objDeliveryTag)) {
            final long deliveryTag= Long.parseLong(objDeliveryTag);
            log.info("MQ-收到【x】MQ消息进行消费【执行ack】[{}]", deliveryTag);
            channel.basicAck(deliveryTag,true);
        }

        if(CharSequenceUtil.isNotBlank(payload)) {}

        }

在这个 Java 方法 leaseStateChange 中,通常是在使用 Spring AMQP 框架与 RabbitMQ 集成时出现的消费消息的处理方法。下面逐个解析这个方法中的各个部分:

方法签名解析

public void leaseStateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException

1. public void leaseStateChange
  • public: 表明这个方法是公开的,可以被外部调用。
  • void: 表明这个方法没有返回值。
  • leaseStateChange: 这是方法的名称。可以推测这个方法是用来处理与“租赁状态变化”相关的消息的。
2. @Payload String payload
  • @Payload: 这是 Spring AMQP 框架中的一个注解,用于将消息体的内容绑定到方法参数中。在这个例子中,payload 是一个 String 类型的参数,代表从 RabbitMQ 队列中接收到的消息内容。通常,这个消息体是通过 JSON 或其他格式的字符串传递的。

    • 注解的作用: @Payload 使得 Spring 能够自动将消息的主体部分注入到方法参数 payload 中。比如,如果消息体是一个 JSON 格式的字符串,Spring 会将其直接赋值给 payload 参数。

    • 示例: 假设接收到的消息体是 "{"state": "active", "leaseId": "12345"}"payload 将会是该字符串。

3. @Headers Map<String, Object> headers
  • @Headers: 这是另一个 Spring AMQP 注解,用来将消息的头部信息注入到方法参数中。RabbitMQ 消息不仅有消息体(payload),还可能包含一些头信息(比如消息的发送时间、路由信息等)。

    • 注解的作用: @Headers 会将消息头部的内容绑定到 headers 参数,这个参数是一个 Map<String, Object> 类型,其中键是头部的名称,值是相应的值。头部信息常常用于传递一些附加信息(例如消息的优先级、发送者标识等)。

    • 示例: 如果消息头包含如下信息:

      {"correlationId": "abc123", "messageType": "leaseUpdate"}

      那么 headers 将会是一个 Map,其内容是:

      {"correlationId": "abc123", "messageType": "leaseUpdate"}
4. Channel channel
  • Channel: 这是 RabbitMQ 的核心概念之一。Channel 代表一个与 RabbitMQ 服务的连接通道,允许你在该通道上进行消息的消费、确认等操作。

    • 作用: 在 Spring AMQP 中,Channel 通常用来进行消息的确认(acknowledge)操作,或者处理消息处理失败时的重新排队等任务。你可以使用 Channel 来手动确认消息,或者控制消息是否成功消费。

    • 示例: 如果在消息处理过程中出现异常,消费者可能需要通过 channel.basicNack() 方法来拒绝该消息并可能重新入队。

5. throws IOException
  • throws IOException: 表明这个方法可能会抛出 IOException 异常。RabbitMQ 的消息操作可能会遇到 I/O 错误,因此需要在方法签名中声明可能抛出此异常。通常,这类异常会发生在与 RabbitMQ 的连接中断、消息传输过程失败时等。

Spring AMQP 消费者代码示例

假设这是一个处理来自某个队列的消息的方法,下面是该方法的使用场景和完整代码示例:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.MessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;

import java.io.IOException;
import java.util.Map;

@Component
public class LeaseStateChangeListener {

    // 监听指定队列的消息
    @RabbitListener(queues = "leaseStateQueue")
    public void leaseStateChange(@Payload String payload, 
                                 @Headers Map<String, Object> headers, 
                                 Channel channel) throws IOException {
        try {
            // 处理消息体
            System.out.println("Received message: " + payload);

            // 获取消息头部信息
            String correlationId = (String) headers.get("correlationId");
            String messageType = (String) headers.get("messageType");
            System.out.println("CorrelationId: " + correlationId + ", MessageType: " + messageType);

            // 模拟处理业务逻辑
            processLeaseStateChange(payload);

            // 确认消息已成功消费
            channel.basicAck(headers.hashCode(), false);  // 手动确认消息
        } catch (Exception e) {
            // 异常处理,拒绝消息并重新入队
            System.err.println("Error processing message: " + e.getMessage());
            channel.basicNack(headers.hashCode(), false, true); // 拒绝并重新入队
        }
    }

    private void processLeaseStateChange(String payload) {
        // 假设这里是处理租赁状态更新的业务逻辑
        // 比如将消息解析为对象,进行租赁状态更新
        System.out.println("Processing lease state change for payload: " + payload);
    }
}

解析

  • @RabbitListener: 注解的作用是声明这个方法是一个 RabbitMQ 消息的消费者,并且该方法监听 leaseStateQueue 队列。当有消息到达这个队列时,这个方法会被调用。

  • 消息体 (payload): 这个方法会接收到一个消息体,@Payload 注解将该消息的内容(通常是 JSON 格式的字符串)自动绑定到方法参数 payload 上。

  • 消息头 (headers): 使用 @Headers 注解将消息的头部信息绑定到 headers 参数上,Map<String, Object> 类型。你可以从中获取如 correlationIdmessageType 等附加信息。

  • Channel: 这个参数用于消息的确认、拒绝等操作。在成功处理完消息后,调用 channel.basicAck() 来确认消息,表示该消息已经被成功消费。如果处理失败,调用 channel.basicNack() 拒绝该消息,并可以选择是否重新入队。

总结

  • 该方法是一个 RabbitMQ 消费者,用于从指定的队列中消费消息。
  • 通过 @Payload 获取消息体内容,使用 @Headers 获取消息头信息。
  • 使用 Channel 来确认消息的处理状态。
  • 使用 @RabbitListener 注解自动监听队列,并处理消费的消息。

这种方式非常适合处理队列中的业务逻辑,并能够灵活处理消息的确认、拒绝等操作。

消息怎么知道发给哪一个队列

先看队列与交换机怎么绑定的

先创建队列,然后绑定到交换机

RabbitMQ系列-6.如何通过控制台创建交换机、队列、死信队列、延迟队列 - 简书

上一篇:css 布局方式


下一篇:第三十二章 Spring之源码阅读——事务篇