RocketMQ

RocketMQ

概念

RocketMQ 是一个消息队列中间件,具有高性能、高可靠、高实时、分布式特点。

能够保证严格的消息顺序

​ 顺序消息,消息入队列,生产方可以选择将消息入到哪一个队列

提供丰富的消息拉取模式

PUSH consumer 发送请求,保持长连接,broker每五秒察看是否有消息,有就回复给consumer

PULL 定时向broker拉取消息

保证消息不丢失

同步发送:阻塞当前进程,等到bocker返回发送结果

异步发送:首先构建任务,把任务交给线程池,等发送完后,执行用户自定义的回调函数

Oneway:只发送,不等结果

​ 发送超时,失败,会重试

​ 消费者会先拉取消息,消费完成后,才向服务器返回结果

​ 消费失败也会重试

架构图

RocketMQ

name server :控制中心,记录,做路由,需要首先启动

brocker:存储消息、事物回查

producer:先向name server查询broker信息,然后与其建立连接,发送消息

consumer:同样现象name server 查询broker,建立连接,拉取消息

消息

发送消息(生产者)

  • 创建DefaultMQProducer

    • produceGroup 区别生产者集群
  • 设置name server地址

  • 开启DefaultMQProducer

  • 创建消息Message

    • topic 主题
    • tags 标签,消费者可以根据tags过滤消息
    • keys 消息唯一值(主键)
    • body 消息具体内容
  • 发送消息

    • 会返回一个result
  • 关闭DefaultMQProducer

拉取消息(消费者)

  • 创建DefaultMQConsumer

    • 消费组
  • 设置name server

  • 设置subscript,要读取的主题信息

    • 消费主体、过滤规则(* 或者 tags1 || tags2)
  • 创建消息监听 MessageListener

  • 获取消息消息

  • 返回消息读取状态

    • 读取成功返回success
    • 失败则重试

普通消息

顺序消息

​ 每次发送只发送到指定队列,接收也从一个队列拉取消息

分布式事务消息

生产者:我要去干一些事情,过一会你再问我做完没

MQ Server:好的

生产方执行事务之前先向MQ Server发送Half消息,等到服务器回复后开始执行任务,此时,消费方并不能获取到消息,服务器会询问生产方事务执行状态,当生产方执行完毕后,再次向服务器发送消息,服务器 接收到后才commit,否则,进行回滚操作

消息批量发送/广播

​ 发送方-->List

​ 接收方 设置广播模式 consumer.setMessageModel(MessageMode.BOARDCASTING)

安装

进入官网下载 http://rocketmq.apache.org/release_notes/release-notes-4.3.0/

RocketMQ

解压下载的压缩包

配置环境变量

RocketMQ

修改启动参数 runserver.sh

RocketMQ

runbocker.sh

RocketMQ

修改 broker-a.properties 要新建日志文件夹

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0 表示Master,>0 表示Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=127.0.0.1:9876
# 在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker 自动创建Topic, 建议线下开启, 线上关闭
autoCreateTopicEnable=true
# 是否允许Broker 自动创建订阅组, 建议线下开启, 线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认是凌晨4点
deleteWhen=04
# 文件保留时间,默认是48小时
fileReservedTime=48
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条, 根据业务情况调整
mapedFileSizeConsumeQueue=30000
# destroyMapedFileIntervalForcibly=12000
# redeleteHangedFileInterval=12000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=D:\download\rocketmq-all-4.3.0-bin-release\log\store
# commitLog存储路径
storePathCommitLog=D:\download\rocketmq-all-4.3.0-bin-release\log\store\commitlog
# 消息队列储存路径
storePathConsumeQueue=D:\download\rocketmq-all-4.3.0-bin-release\log\store\consumequeue
# 消息索引粗存路径
storePathIndex=D:\download\rocketmq-all-4.3.0-bin-release\log\store\index
# checkpoint 文件储存路径
storeCheckpoint=D:\download\rocketmq-all-4.3.0-bin-release\log\store\checkpoint
# abort 文件存储路径
abortFile=D:\download\rocketmq-all-4.3.0-bin-release\log\store\abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker的角色
# -ASYNC_MASTER 异步复制Master
# -SYNC_MASTER 同步双写Master
# -SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# checkTransactionMessageEnable=false
# 发消息线程池数量
# sendMessageTreadPoolNums=128
# 拉消息线程池数量
# pullMessageTreadPoolNums=128lushDiskType=ASYNC_FLUSHH

将conf文件夹下的所有xml文件中的${user.home} 修改为D:/download/rocketmq-all-4.3.0-bin-release/log

进入bin目录,先 start mqnamesrv.cmd

提示出错

RocketMQ

但是配置了java_home还是报错

直接修改cmd文件

RocketMQ

启动成功

RocketMQ

start mqbroker.cmd -n 172.16.33.2:9876 autoCreateTopicEnable=true

同样修改cmd文件,直接设置jdk路径,启动成功

RocketMQ

demo实现

新建maven项目

导入依赖

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
            <version>4.3.0</version>
        </dependency>

创建生产者

package orderMessage;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author: chen
 * @date: 2021/1/13 17:23
 * @Description
 */
public class producer {

    int count = 0;

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
        DefaultMQProducer producer = new DefaultMQProducer("order_message_producer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        //  同步发送
        for (int i = 0; i < 5; i++) {
            Message message = new Message("order",
                    "test",
                    "id_" + i,
                    ("say_hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            System.out.println("produce开始发送消息--" + i);
            producer.send(message,
                    new MessageQueueSelector() {
                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                            return list.get((Integer)o);
                        }
                    },
                    0);
            System.out.println("produce消息发送完毕--" + i);
        }


        Thread.sleep(5000);

        //  异步发送
        for (int i = 6; i < 11; i++) {
            Message message = new Message("order",
                    "async",
                    "async_id_" + i,
                    ("say_async" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            System.out.println("produce开始异步发送消息--" + i);
            producer.send(message,
                    new MessageQueueSelector() {
                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                            return list.get((Integer) o);
                        }
                    },
                    0, new SendCallback() {
                        public void onSuccess(SendResult sendResult) {
                            System.out.println("---异步发送成功----");
                        }

                        public void onException(Throwable throwable) {
                            System.out.println("---异步发送失败----"+ throwable);
                        }
                    });
            System.out.println("produce异步消息发送完毕--" + i);
        }
        producer.shutdown();
    }
}

创建消费者

package orderMessage;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author: chen
 * @date: 2021/1/13 17:34
 * @Description
 */
public class consumer {

    int count = 0;
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_message_consumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("order", "*");
        //consumer.setConsumeMessageBatchMaxSize(3);
        consumer.setMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                ConsumeConcurrentlyStatus result = null;
                for (int i = 0 ; i < list.size(); i ++) {
                    System.out.println("consumer----接受消息----" + i);
                    Message message = list.get(i);
                    try {
                        System.out.println("message:" + message.getTopic()
                                + "---" + message.getTags()
                                + "---" + message.getKeys()
                                + "---" + new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        result = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        break;
                    }
                }
                if (result == null) {
                    result = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return result;
            }
        });
        consumer.start();
    }
}

报错org.apache.rocketmq.client.exception.MQClientException: No route info for this topic, order

原因是maven 引入的jar包的版本不一致,修改版本后能正常运行

RocketMQ

RocketMQ

分布式事务消息代码

public class transProducer {
    public static void main(String[] args) throws MQClientException {
        TransactionMQProducer txPoducer=new TransactionMQProducer("transaction_group");
        txPoducer.setNamesrvAddr("127.0.0.1:9876");

        //本地事务执行和broker检查的回调都是由producer端来实现的
        txPoducer.setTransactionListener(new TransactionListener() {
            ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<String, Integer>();
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                String transactionId = msg.getTransactionId();
                map.put(transactionId,0);
                System.out.println("setTransactionListener   txID:"+transactionId+",body "+new String(msg.getBody()));
                try {
                    System.out.println("开始执行");
                    Thread.sleep(65000);
                    map.put(transactionId,1);
                    System.out.println("执行完毕");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    map.put(transactionId,2);
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                String transactionId = msg.getTransactionId();
                Integer status = map.get(transactionId);
                System.out.println("消息会查---"+ status);
                switch (status){
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                return LocalTransactionState.UNKNOW;

            }
        });

        txPoducer.start();

        TransactionSendResult temp = txPoducer.sendMessageInTransaction(new Message("order", "测试分布式消息事务流程".getBytes()),null);

        System.out.println("消息回执:"+temp);

    }
}

RocketMQ

RocketMQ

通过pull方式拉取消息

public class pullConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("PullConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");

        consumer.start();

        // 从指定topic中拉取所有消息队列
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order");

        for (MessageQueue mq : mqs) {
            // 获取消息的offset,指定从store中获取
            long offset = consumer.fetchConsumeOffset(mq, true);
            System.out.println("queue:" + mq + ":" + offset);

            try {
                while (true) {
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq,
                            null, consumer.fetchConsumeOffset(mq, false), 1);

                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                            for (MessageExt message : messageExtList) {
                                System.out.println("message:" + message.getTopic()
                                        + "---" + message.getTags()
                                        + "---" + message.getKeys()
                                        + "---" + new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        consumer.shutdown();

    }

RocketMQ

上一篇:rest接口的函数需要是public


下一篇:根据不同的分辨率调用不同的css