RocketMQTemplate的使用
概念介绍
https://www.cnblogs.com/weifeng1463/p/12889300.html
生产者YML配置
# 名称服务器和生产者组必须配置
rocketmq:
producer:
group: my-producer
name-server: 127.0.0.1:9876
RocketMQAutoConfiguration.java
@Bean(PRODUCER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultMQProducer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = producerConfig.getGroup();
// 名称服务器和生产者组必须配置
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();
DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
return producer;
}
消费者监听信息(Push方式的信息)
– 被动的收取发送过来的信息
使用@RocketMQMessageListener
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLAqCEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
/**
* 消费者组,组内消费者必须具有完全相同的订阅才能正确实现负载平衡
*/
String consumerGroup();
/**
* 主题
*/
String topic();
/**
* 消息筛选方式
*/
SelectorType selectorType() default SelectorType.TAG;
/**
* 标签名
*/
String selectorExpression() default "*";
/**
* 消费模式
*/
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
/**
* 消息模式
*/
MessageModel messageModel() default MessageModel.CLUSTERING;
/**
* 最大消费者线程数
*/
int consumeThreadMax() default 64;
/**
* 一条消息可能阻塞消费线程的最长时间(以分钟为单位)
*/
long consumeTimeout() default 15L;
/**
* The property of "access-key".
*/
String accessKey() default ACCESS_KEY_PLACEHOLDER;
/**
* The property of "secret-key".
*/
String secretKey() default SECRET_KEY_PLACEHOLDER;
/**
* Switch flag instance for message trace.
*/
boolean enableMsgTrace() default true;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
/**
* 名称服务器
*/
String nameServer() default NAME_SERVER_PLACEHOLDER;
/**
* The property of "access-channel".
*/
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}
注解类通过实现RocketMQListener接口重写onMessage方法收取消息(详情看下方例子)
ListenerContainerConfiguration.java
广播模式下消息不能顺序消费
private void validate(RocketMQMessageListener annotation) {
if (annotation.consumeMode() == ConsumeMode.ORDERLY &&
annotation.messageModel() == MessageModel.BROADCASTING) {
throw new BeanDefinitionValidationException(
"Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!");
}
}
消费者组和主题必须要配置,否则将会初始化失败
boolean listenerEnabled =
(boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic, true);
if (!listenerEnabled) {
log.debug(
"Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
consumerGroup, topic);
return;
}
validate(annotation);
DefaultRocketMQListenerContainer.java
可通过实现RocketMQPushConsumerLifecycleListener或RocketMQPushConsumerLifecycleListener接口重写prepareStart方法配置消费者属性
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
}
例子:(请不要在onMessage方法中捕捉任何异常,因为没有异常代表消费成功)
@Service
@RocketMQMessageListener(consumerGroup = "my-consumer", topic = "my-topic")
public class Producer1Service implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
private int a = 1;
@Override
public void onMessage(String message) {
System.out.println("======================================");
System.out.println("第" + a + "次接受");
a++;
System.out.println("======================================");
System.out.println("consumer1-1:" + message);
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
// 设置消费者消息重试次数
consumer.setMaxReconsumeTimes(1);
// 设置客户端实例名称
consumer.setInstanceName("Producer1");
}
}
消费者监听信息(Pull方式的信息)
– 主动拉取信息
YML配置
# 必须配置名称服务器,消费者组和主题
rocketmq:
name-server: 127.0.0.1:9876
consumer:
group: my-consumer
topic: my-topic
RocketMQAutoConfiguration.java
@Bean(CONSUMER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultLitePullConsumer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"})
public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
throws MQClientException {
RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
// 必须配置名称服务器,消费者组和主题
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
String selectorExpression = consumerConfig.getSelectorExpression();
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
int pullBatchSize = consumerConfig.getPullBatchSize();
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize);
return litePullConsumer;
}
例子:
@Service
public class Test {
private DefaultLitePullConsumer litePullConsumer;
@Autowired
public Test(DefaultLitePullConsumer litePullConsumer) throws MQClientException {
this.litePullConsumer = litePullConsumer;
System.out.println(litePullConsumer.getPullBatchSize());
// 开启
litePullConsumer.start();
this.test();
}
public void test() {
try {
while (true) {
// 拉取信息
List<MessageExt> messageExts = litePullConsumer.poll();
messageExts.forEach(e -> {
try {
System.out.println(new String(e.getBody(), "utf-8"));
} catch (UnsupportedEncodingException unsupportedEncodingException) {
unsupportedEncodingException.printStackTrace();
}
});
}
} finally {
// 关闭
litePullConsumer.shutdown();
}
}
}