官方网站
中文说明网页
https://github.com/apache/rocketmq/tree/master/docs/cn
官方代码实践说明
https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
自己项目实践代码
pom坐标
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
生产者代码
/**
* NameServer地址
*/
@Value("${space.config.rocketmq.namesrvAddr}")
private String namesrvAddr;
@Value("${space.config.rocketmq.topic}")
private String topic;
private String producerGroup = "space_producer_group";
private DefaultMQProducer producer;
public DefaultMQProducer getProducer() {
return producer;
}
@PostConstruct
public void defaultMQProducer() {
//生产者的组名
producer = new DefaultMQProducer(producerGroup);
//指定NameServer地址,多个地址以 ; 隔开
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
try {
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
for (int i = 0; i < 10; i++) {
String messageBody = "我是消息内容:" + i;
String message = new String(messageBody.getBytes(), "utf-8");
//构建消息
Message msg = new Message(topic /* PushTopic */, "push"/* Tag */, "key_" + i /* Keys */, message.getBytes());
//发送消息
SendResult result = producer.send(msg);
System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//producer.shutdown();
}
}
public void sendMsg(String msg){
Message message = new Message(topic, "push", "key_" + new Date(), msg.getBytes());
try {
producer.send(message);
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
消费者代码
/**
* 消费者的组名
*/
private String consumerGroup = "hmigroup";
/**
* NameServer地址
*/
@Value("${space.config.rocketmq.namesrvAddr}")
private String namesrvAddr;
@Value("${space.config.rocketmq.topic}")
private String topic;
private String tagExpression = "push";
private DefaultMQPushConsumer consumer;
@Autowired
private SessionManager sessionManager;
@PostConstruct
public void defaultMQPushConsumer() {
//消费者的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setInstanceName("RocketConsumer11");
//指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr(namesrvAddr);
try {
//订阅PushTopic下Tag为push的消息
consumer.subscribe(topic, tagExpression);
//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
//如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
try {
System.out.println("messageExt: " + messageExt);//输出消息内容
String messageBody = new String(messageExt.getBody(), "utf-8");
System.out.println("一号:消费响应:Msg: " + messageExt.getMsgId() + ",msgBody: " + messageBody);//输出消息内容
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
}
});
consumer.start();
this.consumer = consumer;
} catch (Exception e) {
e.printStackTrace();
}
}