RocketMQ入门

RocketMQ

consumer消费消息的两种模式:

1、并发消费

2、顺序消费

consumer如何消费:

1、broker推送消息到consumer

2、consumer拉取broker中的消息

一、Windows环境下载及安装

1、下载地址:

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.0/rocketmq-all-4.9.0-bin-release.zip

2、安装:

解压到任意文件夹下,我的:D:\rocketMQ\rocketmq-all-4.9.0-source-release

3、配置环境变量

ROCKETMQ_HOME=D:\rocketMQ\rocketmq-all-4.9.0-bin-release
NAMESRV_ADDR=localhost:9876

4、启动nameServer:

.\bin\mqnamesrv.cmd

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LNxmQHdt-1633695784222)(/1625489325760.png)]

5、启动broker:

.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YCGlwY8S-1633695784235)(/1625489342147.png)]

6、测试:

发送消息:

.\tools.cmd org.apache.rocketmq.example.quickstart.Producer

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hKD4A5sc-1633695784238)(/1625489666961.png)]

接收消息:

.\tools.cmd org.apache.rocketmq.example.quickstart.Consumer

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qRAumKev-1633695784240)(/1625489684782.png)]

7、安装可视化界面:

下载:https://github.com/apache/rocketmq-externals

进入项目的:D:\rocketMQ\插件\rocketmq-externals\rocketmq-console\src\main\resources下,修改application.properties文件。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aZiSic77-1633695784241)(/1625490397542.png)]

进入D:\rocketMQ\插件\rocketmq-externals\rocketmq-console目录,cmd之后:

对项目打包编译:

mvn clean package -Dmaven.test.skip=true

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ea852CQr-1633695784243)(/1625490830927.png)]

打包成功,在target目录下生成jar包:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sjE0CRQQ-1633695784244)(/1625490861047.png)]

运行:

java -jar rocketmq-console-ng-2.0.0.jar &

输入:

127.0.0.1:端口

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Peei1e7K-1633695784245)(/1625491439440.png)]

二、简单例子快速入门

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

三种方式发送消息:同步、异步、单向传输

同步:

可靠的同步传输应用于广泛的场景,如重要通知消息、短信通知、短信营销系统等

public class SyncProducer {

    /**
     * 同步发送消息
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //实例化生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("syncProducer");
        // 指定nameserver器地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            //创建一个消息实例,指定主题、标记和消息主体。
            // new Message(主题、标记、消息体)
            Message msg = new Message("syncProducer-Topic" , "syncProducer-Tag" , ("syncProducer-Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) );
            //发送消息到一个 broker.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //关闭生产者
        producer.shutdown();
    }

}

异步:

异步传输一般用于响应时间敏感的业务场景。

public class AsyncProducer {

    /**
     * 发送异步消息
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //实例化
        DefaultMQProducer producer = new DefaultMQProducer("asyncProducer");
        // 指定nameserver地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();
        //消息发送失败,重试次数
        producer.setRetryTimesWhenSendAsyncFailed(0);

        int messageCount = 10;
        //等待10个线程执行完后再执行
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("asyncProducer-Topic", "asyncProducer-Tag", "asyncProducer-key", "asyncProducer-Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                //消息发送的回调执行SendCallback
                producer.send(msg, new SendCallback() {

                    /**
                     * 消息发送成功的回调
                     * @param sendResult
                     */
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    /**
                     * 消息发送出现异常的回调
                     * @param e
                     */
                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        // 10s后主线程被唤醒,不管countDown()被执行多少次
        countDownLatch.await(10, TimeUnit.SECONDS);
        // 关闭
        producer.shutdown();
    }
}

单向传输:

单向传输用于需要中等可靠性的情况,例如日志收集。

public class OneWayProducer {
    /**
     * 单向发送消息
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception{
        //实例化生产组
        DefaultMQProducer producer = new DefaultMQProducer("oneWayProducer");
        // 指定nameServer地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            //创建一个消息实例、指定主题、标记、消息体
            Message msg = new Message("oneWayProducer-Topic", "oneWayProducer-Tag", ("oneWayProducer-Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送消息到 broker
            producer.sendOneway(msg);
        }
        //等待发送完成
        Thread.sleep(5000);
        producer.shutdown();
    }
}

消费消息:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 实例化消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oneWayProducer");

        // 指定nameServer地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个主题来使用
        consumer.subscribe("oneWayProducer-Topic", "*");
        // 注册回调,以便从broker中及时拉取消息执行,并发消费
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + "线程======>");
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody(),0,msg.getBody().length));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

三、广播消息

广播正在向主题的所有订阅者发送消息。如果您希望所有订阅者都收到有关某个主题的消息,广播是一个不错的选择。默认集群模式。

生产者:

public class RadioProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("radioProducer");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++){
            Message msg = new Message("radioProducer-TopicTest", "radioProducer-Tag", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

消费者订阅:

public class RadioConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("radioProducer");
        consumer.setNamesrvAddr("localhost:9876");
        // 设置从队列头部开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //设置为广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //订阅一个主题来使用
        consumer.subscribe("radioProducer-TopicTest", "radioProducer-Tag");
        //注册监听,并发消费
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("11111111111111111111111");
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

四、顺序消息

RocketMQ 使用 FIFO 顺序提供有序消息,即消息发送时,将消息发送至同一个MessageQueue中,实现消息的局部顺序消费。

生产者:

public class OrderProducer {

    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        MQProducer producer = new DefaultMQProducer("orderGroupName");
        //Launch the instance.
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("order-Topic", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);

            System.out.printf("%s%n", sendResult);
        }
        //server shutdown
        producer.shutdown();
    }
}

消费者:

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderGroupName");
        //从消息队列头部开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //订阅
        consumer.subscribe("order-Topic", "TagA || TagC || TagD");
        //注册消息监听,有序消费
        consumer.registerMessageListener(new MessageListenerOrderly() {

            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    //正确消费
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 3) == 0) {
                    //回滚
                    return ConsumeOrderlyStatus.ROLLBACK;
                } else if ((this.consumeTimes.get() % 4) == 0) {
                    //提交
                    return ConsumeOrderlyStatus.COMMIT;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    //稍后消费
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;

            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

五、延迟消费

消息发送至broker后,等待一段时间之后,再推送至消费者那消费。

生产者:

public class SyncProducer {

    /**
     * 同步发送消息-----延迟消息
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //实例化生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("syncProducer");
        // 指定nameserver器地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            //创建一个消息实例,指定主题、标记和消息主体。
            // new Message(主题、标记、消息体)
            Message msg = new Message("syncProducer-Topic" , "syncProducer-Tag" , ("syncProducer-Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) );
            //延迟消息设置延迟等级,开源版的RocketMQ不支持自定义的消息等级,只能修改18个等级中的时间
            //delayTimeLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
            //上述等级可以在拓展的集群配置中修改
            msg.setDelayTimeLevel(3);
            //发送消息到一个 broker.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //关闭生产者
        producer.shutdown();
    }

}

消费者:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 实例化消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oneWayProducer");

        // 指定nameServer地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个主题来使用
        consumer.subscribe("oneWayProducer-Topic", "*");
        // 注册回调,以便从broker中及时拉取消息执行
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + "线程======>");
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody(),0,msg.getBody().length));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

六、批量消息

将多条消息合成一个批量消息,一次发送出去。好处:减少网络IO、提高吞吐量

使用限制:

同一批次的消息应该具有:相同的主题,相同的 waitStoreMsgOK 并且没有调度支持.

一批消息的总大小不应超过 1MiB

生产者:

1、如果一次发送的数据不大于1M时,可以采用如下的方式发送:

public class BatchProducer {
    public static void main(String[] args) throws Exception {
        //实例化生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("batchProducer");
        // 指定nameserver器地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();

        String topic = "Batch-Topic";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic,"TagA","batchKey01","Hello batch 0".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic,"TagA","batchKey02","Hello batch 1".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic,"TagA","batchKey03","Hello batch 2".getBytes(StandardCharsets.UTF_8)));
        //发送数据
        SendResult sendResult = producer.send(messages);
        System.out.printf("%s%n", sendResult);
        //关闭生产者
        producer.shutdown();
    }
}

2、如果发送的消息一次超过1M,可以拆分为列表:

public class BatchSplitProducer {
    public static void main(String[] args) throws Exception {
        //实例化生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("batchProducer");
        // 指定nameserver器地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();

        String topic = "Batch-Topic";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic,"TagA","batchKey01","Hello batch 0".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic,"TagA","batchKey02","Hello batch 1".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic,"TagA","batchKey03","Hello batch 2".getBytes(StandardCharsets.UTF_8)));

        //然后你可以将大列表拆分为小列表
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            try {
                List<Message>  listItem = splitter.next();
                producer.send(listItem);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //关闭生产者
        producer.shutdown();
    }
}
/**
 * 消息切割
 */
public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000; //最大数据限制
    private final List<Message> messages;  //数据
    private int currIndex;  //当前消息下标

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }
    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            //计算单个消息的大小
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead

            if (tmpSize > SIZE_LIMIT) {
                //单个消息大于最大限制,则丢弃,否则会阻塞在此
                if (nextIndex - currIndex == 0) {
                    //如果是最后一个元素,如果下一个列表没有元素,则加入这个消息之后就退出,否则退出
                   nextIndex++;  
                }
                break;
            }

            //如果当前单个消息 + 已经计算的消息大小 > 最大限制,则退出,否则累加
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }

        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 实例化消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batchProducer");

        // 指定nameServer地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个主题来使用
        consumer.subscribe("Batch-Topic", "*");
        // 注册回调,以便从broker中及时拉取消息执行
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + "线程======>");
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody(),0,msg.getBody().length));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

七、过滤消费

rocketMQ有sql进行消息过滤,消息过滤是在broker端进行的,consumer将sql过滤表达式推送至broker,broker过滤数据之后推送消息到consumer,减少网络数据传输。

消息过滤的两种方式:

1、TAG

2、SQL92

前提:需要在/conf/broker.conf配置文件加入支持过滤的属性:

enablePropertyFilter=true

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DmhTCbNC-1633695784246)(/1625883963295.png)]

生产者:

public class FilterProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("filter_group");
        producer.start();

        String[] tags = {"tagA","tagB","tagC"};
        for (int i = 0; i < 15; i++) {
            Message msg = new Message("filterTopic",
                    tags[i% tags.length],
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            //设置property:a 就是下标
            msg.putUserProperty("a", String.valueOf(i));
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }

        producer.shutdown();
    }
}

消费者:

public class FilterConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_group");

        // only subsribe messages have property a, also a >=0 and a <= 3
        consumer.subscribe("filterTopic", MessageSelector.bySql("a between 0 and 3"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach(System.out::println);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

sql表达式:

基本语法:
1、数值比较,如>, >=, <, <=, BETWEEN, =;
2、字符比较,如=, <>, IN;
3、IS NULL或IS NOT NULL;
4、逻辑AND, OR, NOT;
常量类型:
1、数字,如 123、3.1415;
2、字符,如'abc',必须用单引号引起;
3、NULL,特殊常数;
4、布尔值,TRUE或FALSE;

八、事务消息

什么是事务性消息?

它可以被认为是两阶段提交消息的实现,以确保分布式系统中的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子地执行。

使用限制

(1) 事务性消息没有调度和批处理支持。
(2) 为了避免单条消息被检查次数过多导致半队列消息堆积,我们默认将单条消息的检查次数限制为15次,但用户可以通过更改“transactionCheckMax ”参数在broker的配置中,如果一条消息被检查了“transactionCheckMax”次,broker默认会丢弃这条消息并同时打印错误日志。用户可以通过覆盖“AbstractTransactionCheckListener”类来更改此行为。
(3) 交易消息在一定时间后将被检查,该时间由代理配置中的参数“transactionTimeout”确定。并且用户也可以在发送事务消息时通过设置用户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来改变这个限制,这个参数优先于“transactionMsgTimeout”参数。
(4) 一条交易消息可能被检查或消费不止一次。
(5) 向用户目标主题提交的消息回复可能会失败。目前,这取决于日志记录。高可用是由 RocketMQ 本身的高可用机制来保证的。如果要保证事务消息不丢失,保证事务完整性,建议使用同步双写。机制。
(6) 事务性消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许向后查询。MQ Server 通过生产者 ID 查询客户端。

Transaction状态

事务消息有三种状态:
(1) TransactionStatus.CommitTransaction:提交事务,表示允许消费者消费这条消息。
(2) TransactionStatus.RollbackTransaction:回滚事务,表示消息将被删除,不允许消费。
(3) TransactionStatus.Unknown:中间状态,表示需要MQ回检来确定状态。

生产者:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //事务监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        //事务生产者
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        //线程池,作用?
        ExecutorService executorService = new ThreadPoolExecutor(
            2,
                5,
            100,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(2000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });
        //设置线程池
        producer.setExecutorService(executorService);
        //设置事务监听器
        producer.setTransactionListener(transactionListener);
        producer.start();
        //发送消息,分别由5个tag,每个tag2条消息
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                //null,所有消息都是事务消息,也可以指定某一条消息是事务消息
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        //等待
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}
/**
 * 事务监听器
 */
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * 执行本地事务
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Integer status = transactionIndex.getAndIncrement() % 3;
        //map保存<key,value>
        localTrans.put(msg.getTransactionId(), status);
        System.out.println("transactionId:" + msg.getTransactionId() + "----status:" + status);
        //消息未确定,需要消息回查确定
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 消息回查
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 实例化消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // 指定nameServer地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个主题来使用
        consumer.subscribe("TopicTest1234", "*");
        // 注册回调,以便从broker中及时拉取消息执行
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("=====================消息======================");
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody(),0,msg.getBody().length));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
上一篇:Centos7搭建基于GTID的MySQL的M-M-S-S架构


下一篇:Docker安装RocketMQ以及使用