一、rocketmq(个人理解)
是阿里开源的一款分布式消息中间件,主要分为以下几个部分
1、生产者Producer,消费者(Consumer),nameServer,Broker
2、生产者主要用来发送消息,,消费者用来接收消息
3、nameServer就像一个邮局,四个Broker相当于快递小哥,nameServer用来管理四个Broker。
4、发送消息有三种方式,分别是同步,异步,单项,异步发送会返回一个回调函数,同步则没有;单项发送是发送消息后不会管结果的
5、每个生产者都会有一个Topic和tags,我把它理解为用来区分消息的标识
二、安装rocketmq后,模拟一个消息发送接收的简单过程
三、生产者
1、首先配置rocketmq(端口号,服务器地址看自己的环境)
server:
port: 8002
tomcat:
uri-encoding: utf-8
rocketMq:
#MQ服务器地址
addr: 127.0.0.1:9876
#生产者分组
producerGroup: topicProducer
1
2
3
4
5
6
7
8
9
10
2、编写生产者发送消息,发送的消息设置在请求路径的参数上
@Service
public class ProducerServiceImpl {
@Value("${rocketMq.producerGroup}")
private String producerGroup;
@Value("${rocketMq.addr}")
private String addr;
private DefaultMQProducer producer;
@PostConstruct
public void initProducer() {
producer = new DefaultMQProducer("topicProducer");
producer.setNamesrvAddr(addr);
producer.setRetryTimesWhenSendFailed(3);
try {
producer.start();
System.out.println("[rocketMq生产者已启动]");
} catch (Exception e) {
e.printStackTrace();
}
}
public boolean sendTopic(String topic, String tags, String msg) {
SendResult result = new SendResult();
try {
Message message = new Message(topic, tags, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
result = producer.send(message);
System.out.println("[Producer] msgID(" + result.getMsgId() + ") " + result.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
}
if(result.getSendStatus().equals(SendStatus.SEND_OK)){
return true;
}else{
return false;
}
}
@PreDestroy
public void shutDownProducer() {
if (producer != null) {
producer.shutdown();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@RestController
@RequestMapping("/cmcc")
public class CMCCController {
@Autowired
private ProducerServiceImpl producerService;
@GetMapping("/receive/{message}")
public ResponseEntity receive(@PathVariable String message){
boolean isSuccess = producerService.sendTopic("TestTopic", "NBIOT", message);
if(isSuccess){
return new ResponseEntity<>(HttpStatus.OK);
}else{
return new ResponseEntity<>(HttpStatus.BAD_REQUEST);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
四、消费者
1、消费者的配置
数据源可无视~~
server:
port: 8080
#配置数据源
spring:
datasource:
driver-class-name: com.taosdata.jdbc.TSDBDriver
#url: jdbc:TAOS://127.0.0.1:6020/db?timezone=Asia/Shanghai&characterEncoding=utf8
username: root
password: taosdata
rocketMq:
#MQ服务器地址
addr: 127.0.0.1:9876
#消费者分组
consumerGroup: topicConsumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ConsumerService {
@Value("${rocketMq.consumerGroup}")
private String consumerGroup;
@Value("${rocketMq.addr}")
private String addr;
private DefaultMQPushConsumer consumer;
@Autowired
private JdbcTemplate jdbcTemplate;
@PostConstruct
public void initConsumer() {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(addr);
try {
consumer.subscribe("TestTopic", "*");
} catch (MQClientException e) {
e.printStackTrace();
}
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
Message msg = msgs.get(0);
String message = new String(msg.getBody());
System.out.println("消息接收:" + message)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
consumer.start();
System.out.println("[rocketMq消费者已启动]");
} catch (Exception e) {
e.printStackTrace();
}
}
@PreDestroy
public void shutDownConsumer() {
if (consumer != null) {
consumer.shutdown();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
5、同时启动生产者和消费者,成功接收
原文链接:https://blog.csdn.net/weixin_42516484/article/details/107471412