RocketMQ
概念
RocketMQ 是一个消息队列中间件,具有高性能、高可靠、高实时、分布式特点。
能够保证严格的消息顺序
顺序消息,消息入队列,生产方可以选择将消息入到哪一个队列
提供丰富的消息拉取模式
PUSH consumer 发送请求,保持长连接,broker每五秒察看是否有消息,有就回复给consumer
PULL 定时向broker拉取消息
保证消息不丢失
同步发送:阻塞当前进程,等到bocker返回发送结果
异步发送:首先构建任务,把任务交给线程池,等发送完后,执行用户自定义的回调函数
Oneway:只发送,不等结果
发送超时,失败,会重试
消费者会先拉取消息,消费完成后,才向服务器返回结果
消费失败也会重试
架构图
name server :控制中心,记录,做路由,需要首先启动
brocker:存储消息、事物回查
producer:先向name server查询broker信息,然后与其建立连接,发送消息
consumer:同样现象name server 查询broker,建立连接,拉取消息
消息
发送消息(生产者)
-
创建DefaultMQProducer
- produceGroup 区别生产者集群
-
设置name server地址
-
开启DefaultMQProducer
-
创建消息Message
- topic 主题
- tags 标签,消费者可以根据tags过滤消息
- keys 消息唯一值(主键)
- body 消息具体内容
-
发送消息
- 会返回一个result
-
关闭DefaultMQProducer
拉取消息(消费者)
-
创建DefaultMQConsumer
- 消费组
-
设置name server
-
设置subscript,要读取的主题信息
- 消费主体、过滤规则(* 或者 tags1 || tags2)
-
创建消息监听 MessageListener
-
获取消息消息
-
返回消息读取状态
- 读取成功返回success
- 失败则重试
普通消息
顺序消息
每次发送只发送到指定队列,接收也从一个队列拉取消息
分布式事务消息
生产者:我要去干一些事情,过一会你再问我做完没
MQ Server:好的
生产方执行事务之前先向MQ Server发送Half消息,等到服务器回复后开始执行任务,此时,消费方并不能获取到消息,服务器会询问生产方事务执行状态,当生产方执行完毕后,再次向服务器发送消息,服务器 接收到后才commit,否则,进行回滚操作
消息批量发送/广播
发送方-->List
接收方 设置广播模式 consumer.setMessageModel(MessageMode.BOARDCASTING)
安装
进入官网下载 http://rocketmq.apache.org/release_notes/release-notes-4.3.0/
解压下载的压缩包
配置环境变量
修改启动参数 runserver.sh
runbocker.sh
修改 broker-a.properties 要新建日志文件夹
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0 表示Master,>0 表示Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=127.0.0.1:9876
# 在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker 自动创建Topic, 建议线下开启, 线上关闭
autoCreateTopicEnable=true
# 是否允许Broker 自动创建订阅组, 建议线下开启, 线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认是凌晨4点
deleteWhen=04
# 文件保留时间,默认是48小时
fileReservedTime=48
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条, 根据业务情况调整
mapedFileSizeConsumeQueue=30000
# destroyMapedFileIntervalForcibly=12000
# redeleteHangedFileInterval=12000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=D:\download\rocketmq-all-4.3.0-bin-release\log\store
# commitLog存储路径
storePathCommitLog=D:\download\rocketmq-all-4.3.0-bin-release\log\store\commitlog
# 消息队列储存路径
storePathConsumeQueue=D:\download\rocketmq-all-4.3.0-bin-release\log\store\consumequeue
# 消息索引粗存路径
storePathIndex=D:\download\rocketmq-all-4.3.0-bin-release\log\store\index
# checkpoint 文件储存路径
storeCheckpoint=D:\download\rocketmq-all-4.3.0-bin-release\log\store\checkpoint
# abort 文件存储路径
abortFile=D:\download\rocketmq-all-4.3.0-bin-release\log\store\abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker的角色
# -ASYNC_MASTER 异步复制Master
# -SYNC_MASTER 同步双写Master
# -SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# checkTransactionMessageEnable=false
# 发消息线程池数量
# sendMessageTreadPoolNums=128
# 拉消息线程池数量
# pullMessageTreadPoolNums=128lushDiskType=ASYNC_FLUSHH
将conf文件夹下的所有xml文件中的${user.home} 修改为D:/download/rocketmq-all-4.3.0-bin-release/log
进入bin目录,先 start mqnamesrv.cmd
提示出错
但是配置了java_home还是报错
直接修改cmd文件
启动成功
后 start mqbroker.cmd -n 172.16.33.2:9876 autoCreateTopicEnable=true
同样修改cmd文件,直接设置jdk路径,启动成功
demo实现
新建maven项目
导入依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
<version>4.3.0</version>
</dependency>
创建生产者
package orderMessage;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* @author: chen
* @date: 2021/1/13 17:23
* @Description
*/
public class producer {
int count = 0;
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer("order_message_producer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 同步发送
for (int i = 0; i < 5; i++) {
Message message = new Message("order",
"test",
"id_" + i,
("say_hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
System.out.println("produce开始发送消息--" + i);
producer.send(message,
new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
return list.get((Integer)o);
}
},
0);
System.out.println("produce消息发送完毕--" + i);
}
Thread.sleep(5000);
// 异步发送
for (int i = 6; i < 11; i++) {
Message message = new Message("order",
"async",
"async_id_" + i,
("say_async" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
System.out.println("produce开始异步发送消息--" + i);
producer.send(message,
new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
return list.get((Integer) o);
}
},
0, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println("---异步发送成功----");
}
public void onException(Throwable throwable) {
System.out.println("---异步发送失败----"+ throwable);
}
});
System.out.println("produce异步消息发送完毕--" + i);
}
producer.shutdown();
}
}
创建消费者
package orderMessage;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* @author: chen
* @date: 2021/1/13 17:34
* @Description
*/
public class consumer {
int count = 0;
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_message_consumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("order", "*");
//consumer.setConsumeMessageBatchMaxSize(3);
consumer.setMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
ConsumeConcurrentlyStatus result = null;
for (int i = 0 ; i < list.size(); i ++) {
System.out.println("consumer----接受消息----" + i);
Message message = list.get(i);
try {
System.out.println("message:" + message.getTopic()
+ "---" + message.getTags()
+ "---" + message.getKeys()
+ "---" + new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
result = ConsumeConcurrentlyStatus.RECONSUME_LATER;
break;
}
}
if (result == null) {
result = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return result;
}
});
consumer.start();
}
}
报错org.apache.rocketmq.client.exception.MQClientException: No route info for this topic, order
原因是maven 引入的jar包的版本不一致,修改版本后能正常运行
分布式事务消息代码
public class transProducer {
public static void main(String[] args) throws MQClientException {
TransactionMQProducer txPoducer=new TransactionMQProducer("transaction_group");
txPoducer.setNamesrvAddr("127.0.0.1:9876");
//本地事务执行和broker检查的回调都是由producer端来实现的
txPoducer.setTransactionListener(new TransactionListener() {
ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<String, Integer>();
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transactionId = msg.getTransactionId();
map.put(transactionId,0);
System.out.println("setTransactionListener txID:"+transactionId+",body "+new String(msg.getBody()));
try {
System.out.println("开始执行");
Thread.sleep(65000);
map.put(transactionId,1);
System.out.println("执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
map.put(transactionId,2);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String transactionId = msg.getTransactionId();
Integer status = map.get(transactionId);
System.out.println("消息会查---"+ status);
switch (status){
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
});
txPoducer.start();
TransactionSendResult temp = txPoducer.sendMessageInTransaction(new Message("order", "测试分布式消息事务流程".getBytes()),null);
System.out.println("消息回执:"+temp);
}
}
通过pull方式拉取消息
public class pullConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("PullConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
// 从指定topic中拉取所有消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order");
for (MessageQueue mq : mqs) {
// 获取消息的offset,指定从store中获取
long offset = consumer.fetchConsumeOffset(mq, true);
System.out.println("queue:" + mq + ":" + offset);
try {
while (true) {
PullResult pullResult = consumer.pullBlockIfNotFound(mq,
null, consumer.fetchConsumeOffset(mq, false), 1);
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt message : messageExtList) {
System.out.println("message:" + message.getTopic()
+ "---" + message.getTags()
+ "---" + message.getKeys()
+ "---" + new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
consumer.shutdown();
}