MQ
MQ 架构:
Message 包含内容:
RocketMQ 客户端坐标:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
消息发送&消费模式
注意:当生产者出现找不到 topic 的报错信息时,要检查是否服务器的防火墙策略导致的。
One-To-One(单生产者单消费者)
生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1"); // 入参为自定义组名
// 2.设定发送的命名服务器地址
producer.setNamesrvAddr("192.168.3.244:9876");
// 设置发送消息超时时间(默认为3000)
producer.setSendMsgTimeout(60000);
// 3.启动发送的服务
producer.start();
// 4.创建要发送的消息对象,指定topic和body
Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8"));
// 5.发送单条消息
SendResult result = producer.send(msg);
// 打印返回消息
System.out.println("返回结果:"+result);
// 6.关闭连接
producer.shutdown();
}
}
运行结果:
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E658458644D464D61A1F80000, offsetMsgId=C0A803DE00002A9F0000000000000000, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=3], queueOffset=0]
消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); // 入参为自定义组名
// 2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.3.244:9876");
// 3.设置接收消息对应的topic,对应的sub标签为任意*
consumer.subscribe("topic1", "*");
// 4.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 遍历消息
for(MessageExt msg : list){
// System.out.println("收到消息:"+msg);
System.out.println("消息:" + new String(msg.getBody()));
}
// 成功处理后,mq 收到该标记,则相同的消息将不会再次发给消费者
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动接收消息的服务
consumer.start(); // 开启多线程监控消息,会持续运行
System.out.println("接收消息服务已开启运行");
}
}
运行结果:
接收消息服务已开启运行
消息:hello rocketmq
One-To-Many(单生产者多消费者)
生产者
package onetoone;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1"); // 入参为自定义组名
// 2.设定发送的命名服务器地址
producer.setNamesrvAddr("192.168.3.222:9876");
// 设置发送消息超时时间(默认为3000)
producer.setSendMsgTimeout(60000);
// 3.启动发送的服务
producer.start();
// 创建要发送的消息对象,指定topic和body
// Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8"));
// 发送单条消息
// SendResult result = producer.send(msg);
// 4. 发送多条消息
for (int i = 1; i <= 10; i++) {
Message msg = new Message("topic1", ("生产者:hello rocketmq "+i).getBytes("UTF-8"));
SendResult result = producer.send(msg);
System.out.println("返回结果:" + result);
}
// 5.关闭连接
producer.shutdown();
}
}
运行结果:
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EF300000, offsetMsgId=C0A803DE00002A9F0000000000000E79, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=2], queueOffset=4]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EF7A0001, offsetMsgId=C0A803DE00002A9F0000000000000F2A, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=3], queueOffset=6]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EF980002, offsetMsgId=C0A803DE00002A9F0000000000000FDB, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=0], queueOffset=6]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFAB0003, offsetMsgId=C0A803DE00002A9F000000000000108C, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=1], queueOffset=5]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFB70004, offsetMsgId=C0A803DE00002A9F000000000000113D, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=2], queueOffset=5]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFCC0005, offsetMsgId=C0A803DE00002A9F00000000000011EE, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=3], queueOffset=7]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFDF0006, offsetMsgId=C0A803DE00002A9F000000000000129F, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=0], queueOffset=7]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFEB0007, offsetMsgId=C0A803DE00002A9F0000000000001350, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=1], queueOffset=6]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFF20008, offsetMsgId=C0A803DE00002A9F0000000000001401, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=2], queueOffset=6]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFFD0009, offsetMsgId=C0A803DE00002A9F00000000000014B2, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=3], queueOffset=8]
消费者
负载均衡模式
负载均衡模式:即生产的多个消息(数量)会均衡分配给各个消费者。
package onetoone;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); // 入参为自定义组名
// 2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.3.222:9876");
// 3.设置接收消息对应的topic,对应的sub标签为任意*
consumer.subscribe("topic1", "*");
// 设置当前消费者的消费模式:负载均衡(也是默认模式)
consumer.setMessageModel(MessageModel.CLUSTERING);
// 4.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 遍历消息
for(MessageExt msg : list){
// System.out.println("收到消息:"+msg);
System.out.println("消息:" + new String(msg.getBody()));
}
// 成功处理后,mq 收到该标记,则相同的消息将不会再次发给消费者
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动接收消息的服务
consumer.start(); // 开启多线程监控消息,会持续运行
System.out.println("接收消息服务已开启运行");
}
}
运行结果:
广播模式
广播模式:即生产出来的同一条消息,会被每个消费者所接收。
广播模式的现象:
- 如果先生产,后消费,则消息只能被消费一次。
- 如果多个消费者先启动(广播模式),后生产,此时才有广播的效果。
- 结论:必须先启动消费者,再启动生产者,才有广播的效果。
package onetoone;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); // 入参为自定义组名
// 2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.3.222:9876");
// 3.设置接收消息对应的topic,对应的sub标签为任意*
consumer.subscribe("topic1", "*");
// 设置当前消费者的消费模式为广播模式:所有客户端接收的消息都是一样的
consumer.setMessageModel(MessageModel.BROADCASTING);
// 4.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 遍历消息
for(MessageExt msg : list){
// System.out.println("收到消息:"+msg);
System.out.println("消息:" + new String(msg.getBody()));
}
// 成功处理后,mq 收到该标记,则相同的消息将不会再次发给消费者
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动接收消息的服务
consumer.start(); // 开启多线程监控消息,会持续运行
System.out.println("接收消息服务已开启运行");
}
}
运行结果:
Many-To-Many(多生产者多消费者)
多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费。
(演示:将上述示例的生产者启动多个实例即可)。
消息类别
同步消息
特征:即时性较强,通常是重要的且必须有回执的消息,例如短信、通知(转账成功)。
代码实现:
SendResult result = producer.send(msg);
异步消息
特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息。
代码实现:
// 注意:回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行
producer.send(msg, new SendCallback() {
// 表示成功返回结果
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
// 表示发送消息失败
public void onException(Throwable t) {
System.out.println(t);
}
});
// 注意后面不要马上shutdown(关闭)连接,确保留有时间接收到异步返回的消息
单向消息
特征:不需要有回执的消息,例如日志类的消息。
代码实现:
producer.sendOneway(msg);
延时消息
延时消息:指消息发送时并不直接发送到消息服务器,而是根据设定的等待时间才到达,起到延时到达的缓冲作用。
Message msg = new Message("topic3", ("延时消息:hello rocketmq "+i).getBytes("UTF-8"));
// 设置当前消息的延时效果
msg.setDelayTimeLevel(3); // 入参表示下标3,代表30s(参考下表)
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
目前支持的消息时间:
- 秒级:1、5、10、30
- 分级:1~10、20、30
- 时级:1、2
- 入参下标含义:1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h
批量消息
// 创建消息集合创建要发送的消息对象,指定topic和body
ArrayList<Message> messageList = new ArrayList<>();
Message msg1 = new Message("topic1", "hello rocketmq 1".getBytes("UTF-8"));
Message msg2 = new Message("topic1", "hello rocketmq 2".getBytes("UTF-8"));
Message msg3 = new Message("topic1", "hello rocketmq 3".getBytes("UTF-8"));
messageList.add(msg1);
messageList.add(msg2);
messageList.add(msg3);
// (一次)发送批量消息
SendResult result = producer.send(messageList);
System.out.println("发送结果:"+result);
运行结果:
发送结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E681458644D4650530BBF0000,A9FE135E681458644D4650530BBF0001,A9FE135E681458644D4650530BBF0002, offsetMsgId=C0A803DE00002A9F0000000000001E3E,C0A803DE00002A9F0000000000001EE3,C0A803DE00002A9F0000000000001F88, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=2], queueOffset=13]
消息过滤
分类过滤
- 生产者:发送时指定 tag
Message msg = new Message("topic6", "tag2", ("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));
- 消费者:消费指定 tag 的消息
// 接收消息时,除了可以指定topic,还可以指定接收的tag(*代表任意tag,||代表或)
consumer.subscribe("topic6", "tag1 || tag2");
属性过滤(SQL 过滤)
生产者:
// 为消息添加属性
msg.putUserProperty("vip", "1");
msg.putUserProperty("age", "20");
消费者:
// 使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic1", MessageSelector.bySql("age >= 18"));
配置:
- 注意:SQL 过滤需要依赖服务器的功能支持,在 broker.conf 配置文件中添加对应的功能项,并开启对应功能。
enablePropertyFilter=true
- 启动服务器使启用对应配置文件:
sh mqbroker -n localhost:9876 -c ../conf/broker.conf
消息顺序
消息乱序
-
默认情况下,MQ 开启了多个队列, 同时发送多个消息的的话,发送给哪个队列是不确定的。同时消费者读取消息,每读取一个消息开启一个线程,也不能保证消息的顺序性。
-
想要保证消息的有序性,需要指定消息发送时的队列。同时消费者应该一个队列开启一个线程进行接收,而不是一个消息一个线程。
顺序消息
生产者:
// 设置消息进入到指定的消息队列中
for(final Order order : orderList){
Message msg = new Message("orderTopic", order.toString().getBytes());
// 发送时要指定对应的消息队列选择器
SendResult result = producer.send(msg, new MessageQueueSelector() {
// 设置当前消息发送时使用哪一个消息队列
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
// 根据发送的信息不同,选择不同的消息队列
// 根据id来选择一个消息队列的对象(用id的哈希值来取模)
int mqIndex = order.getId().hashCode() % list.size();
return list.get(mqIndex);
}
}, null);
System.out.println(result);
}
消费者:
// 使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
consumer.registerMessageListener(new MessageListenerOrderly() {
// 使用MessageListenerOrderly接口后,
// 对消息队列的处理由一个消息队列多个线程服务转化为一个消息队列一个线程服务
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for(MessageExt msg : list){
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
事务消息
事务消息过程
事务消息状态
-
提交状态:允许进入队列,此消息与非事务消息无区别。
-
回滚状态:不允许进入队列,此消息等同于未发送过。
-
中间状态:完成了half消息的发送,未对 MQ 进行二次状态确认。
注意:事务消息仅与生产者有关,与消费者无关。
事务消息实现
// 事务消息使用的生产者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.184.128:9876");
// 添加本地事务对应的监听
producer.setTransactionListener(new TransactionListener() {
// 正常事务过程
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 提交状态
return LocalTransactionState.COMMIT_MESSAGE;
// 回滚状态
return LocalTransactionState.ROLLBACK_MESSAGE;
// 中间状态
return LocalTransactionState.UNKNOW;
}
// 事务补偿过程
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
// 事务补偿过程必须保证服务器正在运行,否则将无法进行正常的事务补偿
return LocalTransactionState.COMMIT_MESSAGE;
// return null;
}
});
producer.start();
Message msg = new Message("topic8", ("事务消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回结果:"+result);
producer.shutdown();