RocketMQ

RocketMQTemplate的使用

概念介绍

https://www.cnblogs.com/weifeng1463/p/12889300.html

生产者YML配置

# 名称服务器和生产者组必须配置
rocketmq:
  producer:
    group: my-producer
  name-server: 127.0.0.1:9876

RocketMQAutoConfiguration.java

@Bean(PRODUCER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultMQProducer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
    RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
    String nameServer = rocketMQProperties.getNameServer();
    String groupName = producerConfig.getGroup();

    // 名称服务器和生产者组必须配置
    Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
    Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");

    String accessChannel = rocketMQProperties.getAccessChannel();

    String ak = rocketMQProperties.getProducer().getAccessKey();
    String sk = rocketMQProperties.getProducer().getSecretKey();
    boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
    String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();

    DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);

    producer.setNamesrvAddr(nameServer);
    if (!StringUtils.isEmpty(accessChannel)) {
        producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
    }
    producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
    producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
    producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
    producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
    producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
    producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());

    return producer;
}

消费者监听信息(Push方式的信息)

被动的收取发送过来的信息

使用@RocketMQMessageListener
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
    String TRACE_TOPIC_PLAqCEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
    /**
     * 消费者组,组内消费者必须具有完全相同的订阅才能正确实现负载平衡
     */
    String consumerGroup();
    /**
     * 主题
     */
    String topic();

    /**
     * 消息筛选方式
     */
    SelectorType selectorType() default SelectorType.TAG;

    /**
     * 标签名
     */
    String selectorExpression() default "*";
    /**
     * 消费模式
     */
    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
    /**
     * 消息模式
     */
    MessageModel messageModel() default MessageModel.CLUSTERING;
    /**
     * 最大消费者线程数
     */
    int consumeThreadMax() default 64;
    /**
     * 一条消息可能阻塞消费线程的最长时间(以分钟为单位)
     */
    long consumeTimeout() default 15L;
    /**
     * The property of "access-key".
     */
    String accessKey() default ACCESS_KEY_PLACEHOLDER;
    /**
     * The property of "secret-key".
     */
    String secretKey() default SECRET_KEY_PLACEHOLDER;
    /**
     * Switch flag instance for message trace.
     */
    boolean enableMsgTrace() default true;
    /**
     * The name value of message trace topic.If you don't config,you can use the default trace topic name.
     */
    String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
    /**
     * 名称服务器
     */
    String nameServer() default NAME_SERVER_PLACEHOLDER;
    /**
     * The property of "access-channel".
     */
    String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}

注解类通过实现RocketMQListener接口重写onMessage方法收取消息(详情看下方例子)

ListenerContainerConfiguration.java

广播模式下消息不能顺序消费

private void validate(RocketMQMessageListener annotation) {
        if (annotation.consumeMode() == ConsumeMode.ORDERLY &&
            annotation.messageModel() == MessageModel.BROADCASTING) {
            throw new BeanDefinitionValidationException(
                "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!");
        }
    }

消费者组和主题必须要配置,否则将会初始化失败

boolean listenerEnabled =
            (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                .getOrDefault(topic, true);

if (!listenerEnabled) {
    log.debug(
        "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
        consumerGroup, topic);
    return;
}
validate(annotation);

DefaultRocketMQListenerContainer.java

可通过实现RocketMQPushConsumerLifecycleListener或RocketMQPushConsumerLifecycleListener接口重写prepareStart方法配置消费者属性

if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
    ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
    ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
}

例子:(请不要在onMessage方法中捕捉任何异常,因为没有异常代表消费成功

@Service
@RocketMQMessageListener(consumerGroup = "my-consumer", topic = "my-topic")
public class Producer1Service implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    private int a = 1;

    @Override
    public void onMessage(String message) {
        System.out.println("======================================");
        System.out.println("第" + a + "次接受");
        a++;
        System.out.println("======================================");
        System.out.println("consumer1-1:" + message);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        // 设置消费者消息重试次数
        consumer.setMaxReconsumeTimes(1);
        // 设置客户端实例名称
        consumer.setInstanceName("Producer1");
    }
}

消费者监听信息(Pull方式的信息)

– 主动拉取信息

YML配置

# 必须配置名称服务器,消费者组和主题
rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: my-consumer
    topic: my-topic

RocketMQAutoConfiguration.java

@Bean(CONSUMER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultLitePullConsumer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"})
public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
    throws MQClientException {
    RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
    String nameServer = rocketMQProperties.getNameServer();
    String groupName = consumerConfig.getGroup();
    String topicName = consumerConfig.getTopic();

    // 必须配置名称服务器,消费者组和主题
    Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
    Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
    Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");

    String accessChannel = rocketMQProperties.getAccessChannel();
    MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
    SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
    String selectorExpression = consumerConfig.getSelectorExpression();
    String ak = consumerConfig.getAccessKey();
    String sk = consumerConfig.getSecretKey();
    int pullBatchSize = consumerConfig.getPullBatchSize();

    DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize);
    return litePullConsumer;
}

例子:

@Service
public class Test {

    private DefaultLitePullConsumer litePullConsumer;

    @Autowired
    public Test(DefaultLitePullConsumer litePullConsumer) throws MQClientException {
        this.litePullConsumer = litePullConsumer;
        System.out.println(litePullConsumer.getPullBatchSize());
        // 开启
        litePullConsumer.start();
        this.test();
    }

    public void test() {
        try {
            while (true) {
                // 拉取信息
                List<MessageExt> messageExts = litePullConsumer.poll();
                messageExts.forEach(e -> {
                    try {
                        System.out.println(new String(e.getBody(), "utf-8"));
                    } catch (UnsupportedEncodingException unsupportedEncodingException) {
                        unsupportedEncodingException.printStackTrace();
                    }
                });
            }
        } finally {
            // 关闭
            litePullConsumer.shutdown();
        }
    }
}
上一篇:Oracle的FRA(Flash Recovery Area)的好处


下一篇:SQL Server 查询分析器键盘快捷方式