RocketMQ
consumer消费消息的两种模式:
1、并发消费
2、顺序消费
consumer如何消费:
1、broker推送消息到consumer
2、consumer拉取broker中的消息
一、Windows环境下载及安装
1、下载地址:
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.0/rocketmq-all-4.9.0-bin-release.zip
2、安装:
解压到任意文件夹下,我的:D:\rocketMQ\rocketmq-all-4.9.0-source-release
3、配置环境变量
ROCKETMQ_HOME=D:\rocketMQ\rocketmq-all-4.9.0-bin-release
NAMESRV_ADDR=localhost:9876
4、启动nameServer:
.\bin\mqnamesrv.cmd
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LNxmQHdt-1633695784222)(/1625489325760.png)]
5、启动broker:
.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YCGlwY8S-1633695784235)(/1625489342147.png)]
6、测试:
发送消息:
.\tools.cmd org.apache.rocketmq.example.quickstart.Producer
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hKD4A5sc-1633695784238)(/1625489666961.png)]
接收消息:
.\tools.cmd org.apache.rocketmq.example.quickstart.Consumer
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qRAumKev-1633695784240)(/1625489684782.png)]
7、安装可视化界面:
下载:https://github.com/apache/rocketmq-externals
进入项目的:D:\rocketMQ\插件\rocketmq-externals\rocketmq-console\src\main\resources下,修改application.properties文件。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aZiSic77-1633695784241)(/1625490397542.png)]
进入D:\rocketMQ\插件\rocketmq-externals\rocketmq-console目录,cmd之后:
对项目打包编译:
mvn clean package -Dmaven.test.skip=true
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ea852CQr-1633695784243)(/1625490830927.png)]
打包成功,在target目录下生成jar包:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sjE0CRQQ-1633695784244)(/1625490861047.png)]
运行:
java -jar rocketmq-console-ng-2.0.0.jar &
输入:
127.0.0.1:端口
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Peei1e7K-1633695784245)(/1625491439440.png)]
二、简单例子快速入门
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
三种方式发送消息:同步、异步、单向传输
同步:
可靠的同步传输应用于广泛的场景,如重要通知消息、短信通知、短信营销系统等
public class SyncProducer {
/**
* 同步发送消息
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//实例化生产组名称
DefaultMQProducer producer = new DefaultMQProducer("syncProducer");
// 指定nameserver器地址
producer.setNamesrvAddr("localhost:9876");
//启动实例
producer.start();
for (int i = 0; i < 10; i++) {
//创建一个消息实例,指定主题、标记和消息主体。
// new Message(主题、标记、消息体)
Message msg = new Message("syncProducer-Topic" , "syncProducer-Tag" , ("syncProducer-Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) );
//发送消息到一个 broker.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//关闭生产者
producer.shutdown();
}
}
异步:
异步传输一般用于响应时间敏感的业务场景。
public class AsyncProducer {
/**
* 发送异步消息
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//实例化
DefaultMQProducer producer = new DefaultMQProducer("asyncProducer");
// 指定nameserver地址
producer.setNamesrvAddr("localhost:9876");
//启动实例
producer.start();
//消息发送失败,重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 10;
//等待10个线程执行完后再执行
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("asyncProducer-Topic", "asyncProducer-Tag", "asyncProducer-key", "asyncProducer-Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//消息发送的回调执行SendCallback
producer.send(msg, new SendCallback() {
/**
* 消息发送成功的回调
* @param sendResult
*/
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
/**
* 消息发送出现异常的回调
* @param e
*/
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
// 10s后主线程被唤醒,不管countDown()被执行多少次
countDownLatch.await(10, TimeUnit.SECONDS);
// 关闭
producer.shutdown();
}
}
单向传输:
单向传输用于需要中等可靠性的情况,例如日志收集。
public class OneWayProducer {
/**
* 单向发送消息
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
//实例化生产组
DefaultMQProducer producer = new DefaultMQProducer("oneWayProducer");
// 指定nameServer地址
producer.setNamesrvAddr("localhost:9876");
//启动实例
producer.start();
for (int i = 0; i < 10; i++) {
//创建一个消息实例、指定主题、标记、消息体
Message msg = new Message("oneWayProducer-Topic", "oneWayProducer-Tag", ("oneWayProducer-Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送消息到 broker
producer.sendOneway(msg);
}
//等待发送完成
Thread.sleep(5000);
producer.shutdown();
}
}
消费消息:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 实例化消费组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oneWayProducer");
// 指定nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来使用
consumer.subscribe("oneWayProducer-Topic", "*");
// 注册回调,以便从broker中及时拉取消息执行,并发消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "线程======>");
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody(),0,msg.getBody().length));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
三、广播消息
广播正在向主题的所有订阅者发送消息。如果您希望所有订阅者都收到有关某个主题的消息,广播是一个不错的选择。默认集群模式。
生产者:
public class RadioProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("radioProducer");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++){
Message msg = new Message("radioProducer-TopicTest", "radioProducer-Tag", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
消费者订阅:
public class RadioConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("radioProducer");
consumer.setNamesrvAddr("localhost:9876");
// 设置从队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//订阅一个主题来使用
consumer.subscribe("radioProducer-TopicTest", "radioProducer-Tag");
//注册监听,并发消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("11111111111111111111111");
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
四、顺序消息
RocketMQ 使用 FIFO 顺序提供有序消息,即消息发送时,将消息发送至同一个MessageQueue中,实现消息的局部顺序消费。
生产者:
public class OrderProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
MQProducer producer = new DefaultMQProducer("orderGroupName");
//Launch the instance.
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("order-Topic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
//server shutdown
producer.shutdown();
}
}
消费者:
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderGroupName");
//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅
consumer.subscribe("order-Topic", "TagA || TagC || TagD");
//注册消息监听,有序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
//正确消费
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
//回滚
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
//提交
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
//稍后消费
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
五、延迟消费
消息发送至broker后,等待一段时间之后,再推送至消费者那消费。
生产者:
public class SyncProducer {
/**
* 同步发送消息-----延迟消息
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//实例化生产组名称
DefaultMQProducer producer = new DefaultMQProducer("syncProducer");
// 指定nameserver器地址
producer.setNamesrvAddr("localhost:9876");
//启动实例
producer.start();
for (int i = 0; i < 10; i++) {
//创建一个消息实例,指定主题、标记和消息主体。
// new Message(主题、标记、消息体)
Message msg = new Message("syncProducer-Topic" , "syncProducer-Tag" , ("syncProducer-Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) );
//延迟消息设置延迟等级,开源版的RocketMQ不支持自定义的消息等级,只能修改18个等级中的时间
//delayTimeLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
//上述等级可以在拓展的集群配置中修改
msg.setDelayTimeLevel(3);
//发送消息到一个 broker.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//关闭生产者
producer.shutdown();
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 实例化消费组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oneWayProducer");
// 指定nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来使用
consumer.subscribe("oneWayProducer-Topic", "*");
// 注册回调,以便从broker中及时拉取消息执行
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "线程======>");
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody(),0,msg.getBody().length));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
六、批量消息
将多条消息合成一个批量消息,一次发送出去。好处:减少网络IO、提高吞吐量
使用限制:
同一批次的消息应该具有:相同的主题,相同的 waitStoreMsgOK 并且没有调度支持.
一批消息的总大小不应超过 1MiB
生产者:
1、如果一次发送的数据不大于1M时,可以采用如下的方式发送:
public class BatchProducer {
public static void main(String[] args) throws Exception {
//实例化生产组名称
DefaultMQProducer producer = new DefaultMQProducer("batchProducer");
// 指定nameserver器地址
producer.setNamesrvAddr("localhost:9876");
//启动实例
producer.start();
String topic = "Batch-Topic";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic,"TagA","batchKey01","Hello batch 0".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message(topic,"TagA","batchKey02","Hello batch 1".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message(topic,"TagA","batchKey03","Hello batch 2".getBytes(StandardCharsets.UTF_8)));
//发送数据
SendResult sendResult = producer.send(messages);
System.out.printf("%s%n", sendResult);
//关闭生产者
producer.shutdown();
}
}
2、如果发送的消息一次超过1M,可以拆分为列表:
public class BatchSplitProducer {
public static void main(String[] args) throws Exception {
//实例化生产组名称
DefaultMQProducer producer = new DefaultMQProducer("batchProducer");
// 指定nameserver器地址
producer.setNamesrvAddr("localhost:9876");
//启动实例
producer.start();
String topic = "Batch-Topic";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic,"TagA","batchKey01","Hello batch 0".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message(topic,"TagA","batchKey02","Hello batch 1".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message(topic,"TagA","batchKey03","Hello batch 2".getBytes(StandardCharsets.UTF_8)));
//然后你可以将大列表拆分为小列表
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
}
}
//关闭生产者
producer.shutdown();
}
}
/**
* 消息切割
*/
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1000 * 1000; //最大数据限制
private final List<Message> messages; //数据
private int currIndex; //当前消息下标
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
//计算单个消息的大小
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) {
//单个消息大于最大限制,则丢弃,否则会阻塞在此
if (nextIndex - currIndex == 0) {
//如果是最后一个元素,如果下一个列表没有元素,则加入这个消息之后就退出,否则退出
nextIndex++;
}
break;
}
//如果当前单个消息 + 已经计算的消息大小 > 最大限制,则退出,否则累加
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 实例化消费组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batchProducer");
// 指定nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来使用
consumer.subscribe("Batch-Topic", "*");
// 注册回调,以便从broker中及时拉取消息执行
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "线程======>");
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody(),0,msg.getBody().length));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
七、过滤消费
rocketMQ有sql进行消息过滤,消息过滤是在broker端进行的,consumer将sql过滤表达式推送至broker,broker过滤数据之后推送消息到consumer,减少网络数据传输。
消息过滤的两种方式:
1、TAG
2、SQL92
前提:需要在/conf/broker.conf配置文件加入支持过滤的属性:
enablePropertyFilter=true
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DmhTCbNC-1633695784246)(/1625883963295.png)]
生产者:
public class FilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("filter_group");
producer.start();
String[] tags = {"tagA","tagB","tagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("filterTopic",
tags[i% tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//设置property:a 就是下标
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
producer.shutdown();
}
}
消费者:
public class FilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_group");
// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("filterTopic", MessageSelector.bySql("a between 0 and 3"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
msgs.forEach(System.out::println);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
sql表达式:
基本语法:
1、数值比较,如>, >=, <, <=, BETWEEN, =;
2、字符比较,如=, <>, IN;
3、IS NULL或IS NOT NULL;
4、逻辑AND, OR, NOT;
常量类型:
1、数字,如 123、3.1415;
2、字符,如'abc',必须用单引号引起;
3、NULL,特殊常数;
4、布尔值,TRUE或FALSE;
八、事务消息
什么是事务性消息?
它可以被认为是两阶段提交消息的实现,以确保分布式系统中的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子地执行。
使用限制
(1) 事务性消息没有调度和批处理支持。
(2) 为了避免单条消息被检查次数过多导致半队列消息堆积,我们默认将单条消息的检查次数限制为15次,但用户可以通过更改“transactionCheckMax ”参数在broker的配置中,如果一条消息被检查了“transactionCheckMax”次,broker默认会丢弃这条消息并同时打印错误日志。用户可以通过覆盖“AbstractTransactionCheckListener”类来更改此行为。
(3) 交易消息在一定时间后将被检查,该时间由代理配置中的参数“transactionTimeout”确定。并且用户也可以在发送事务消息时通过设置用户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来改变这个限制,这个参数优先于“transactionMsgTimeout”参数。
(4) 一条交易消息可能被检查或消费不止一次。
(5) 向用户目标主题提交的消息回复可能会失败。目前,这取决于日志记录。高可用是由 RocketMQ 本身的高可用机制来保证的。如果要保证事务消息不丢失,保证事务完整性,建议使用同步双写。机制。
(6) 事务性消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许向后查询。MQ Server 通过生产者 ID 查询客户端。
Transaction状态
事务消息有三种状态:
(1) TransactionStatus.CommitTransaction:提交事务,表示允许消费者消费这条消息。
(2) TransactionStatus.RollbackTransaction:回滚事务,表示消息将被删除,不允许消费。
(3) TransactionStatus.Unknown:中间状态,表示需要MQ回检来确定状态。
生产者:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
//事务生产者
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
//线程池,作用?
ExecutorService executorService = new ThreadPoolExecutor(
2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//设置线程池
producer.setExecutorService(executorService);
//设置事务监听器
producer.setTransactionListener(transactionListener);
producer.start();
//发送消息,分别由5个tag,每个tag2条消息
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//null,所有消息都是事务消息,也可以指定某一条消息是事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//等待
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
/**
* 事务监听器
*/
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 执行本地事务
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Integer status = transactionIndex.getAndIncrement() % 3;
//map保存<key,value>
localTrans.put(msg.getTransactionId(), status);
System.out.println("transactionId:" + msg.getTransactionId() + "----status:" + status);
//消息未确定,需要消息回查确定
return LocalTransactionState.UNKNOW;
}
/**
* 消息回查
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 实例化消费组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 指定nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来使用
consumer.subscribe("TopicTest1234", "*");
// 注册回调,以便从broker中及时拉取消息执行
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("=====================消息======================");
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody(),0,msg.getBody().length));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}