简易搭建RocketMQ集群

简易搭建RocketMQ集群

学习搭建rocketmq的各类集群部署搭建方式,部分内容参考官网

准备

首先,来到rocketmq官网下载安装包。RocketMQ。下载完成后,解压安装包。

### 解压rocketmq包
$ unzip rocketmq-all-4.7.1-bin-release.zip
### 查看,得到如下文件夹
$ ls
rocketmq-all-4.7.1-bin-release
rocketmq-all-4.7.1-bin-release.zip

部署集群的时候,需要多台服务器。没有多台云服务器的同学可以考虑本机安装虚拟机运行。这里我用的VMware Workstation Pro,另外单独安装的镜像文件。

单master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

启动nameServer

### 进入rocketmq、bin目录。
$ cd rocketmq-all-4.7.1-bin-release/bin
### 启动mqnamesrv
$ sh mqnamesrv & 
### 查看是否启动
$ ps -ef | grep namesrv

启动mqnamesrv前可以调整一下jvm的分配内存参数(内存超大机器可以无视)。

$ vim runserver.sh
### 修改JAVA_OPT参数设置成机器可以接收的合理内存范围
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m"

启动broker

### 启动mqbroker -n 地址和端口
$ sh mqbroker -n localhost:9876  & 

查看连接状态

通过管理平台查看rocketmq的启动连接情况。

简易搭建RocketMQ集群

多master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

启动nameServer

NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer。

分别启动三台机器的namesrv服务。

### 进入rocketmq、bin目录。
$ cd rocketmq-all-4.7.1-bin-release/bin
### 启动mqnamesrv
$ sh mqnamesrv & 
### 查看是否启动
$ ps -ef | grep namesrv

启动broker

### 启动mqbroker -n 地址和端口 多台服务按照分号区分
$ sh mqbroker -n 192.168.80.128:9876;192.168.80.129:9876;192.168.80.130:9876 -c ../conf/2m-noslave/broker-a.properties  & 

分别启动三台机器上的broker服务,启动时指定配置文件及地址和端口。

查看连接状态

管理平台查看一下连接状态,如果没有展示可以尝试多刷新一下。

简易搭建RocketMQ集群

多Master多Slave模式-异步复制

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

准备

### 首先准备一下环境的搭建
### 在各台虚拟机上复制安装好的rocketmq
$ cd /usr/local/share
$ cp -r rocketmq-all-4.7.1-bin-release rocketmq-all-s-4.7.1-bin-release

注意:如果slave部署在master相同的服务器上,还需要修改文件存放路径和监听的端口,避免和master冲突。broker的其他配置可以参考:最佳实践

storePathRootDir=/root/rocketmqstore-s
storePathCommitLog=/root/rocketmqstore-s/commitlog
storePathConsumeQueue=/root/rocketmqstore-s/consumequeue
storePathIndex=/root/rocketmqstore-s/index
storeCheckpoint=/root/rocketmqstore-s/checkpoint
abortFile=/root/rocketmqstore-s/abort
listenPort=10010

启动nameServer

同上面启动相同过程。

启动broker

### 首先停掉刚才在多master模式下启动的broker。
$ cd /usr/local/share/rocketmq-all-4.7.1-bin-release/bin
$ sh mqshutdown broker
### 接着以多master多slave模式启动
$ sh mqbroker -c ../conf/2m-2s-async/broker-a.properties -n 192.168.80.128:9876;192.168.80.129:9876;192.168.80.130:9876 &
### master启动完成后,启动slave的服务
$ cd /usr/local/share/rocketmq-all-s-4.7.1-bin-release/bin
$ sh mqbroker -c ../conf/2m-2s-async/broker-a-s.properties -n 192.168.80.128:9876;192.168.80.129:9876;192.168.80.130:9876 &
### 其他几台服务器同样操作

查看连接状态

管理平台查看一下连接状态,如果没有展示可以尝试多刷新一下。

简易搭建RocketMQ集群

客户端测试

新建spring-boot项目,引入rocketmq客户端jar包

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>

springboot配置文件中添加rocketmq配置项

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=test-rocket-mq-group

测试发送消息

//引入rocketMQTemplate
@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
 * 同步发送消息
 * @param topic
 * @param content
 * @return
 */
@GetMapping("/sync/sendMessage")
public Object sendMessage(String topic, String content) {
    SendResult result = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(content).build());
    System.out.println(result);
    return result;
}

/**
 * 异步发送消息
 * @param topic
 * @param content
 * @return
 */
@GetMapping("/async/sendMessage")
public Object sendMessageAsync(String topic, String content) {
    rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(content).build(), new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println(sendResult);
        }

        @Override
        public void onException(Throwable throwable) {
            throwable.printStackTrace();
            System.out.println("onException");
        }
    });
    return null;
}

/**
 * 发送事务消息
 * @param topic
 * @param content
 * @return
 */
@GetMapping("/transaction/sendMessage")
public Object sendMessageTransaction(String topic, String content) {
    TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(content).build(), 666);
    System.out.println(result);
    System.out.println("state:" + result.getLocalTransactionState());
    return result;

}

测试接收消息

/**
 * 消费普通消息
 */
@Service
@RocketMQMessageListener(topic = "testTopic", consumerGroup = "test-rocket-mq-group")
public class TestTopicConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println(s);
    }
}

/**
 * 消费事务消息
 */
@RocketMQTransactionListener
@Service
public class TestTopicTransactionConsumer implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("message:" + message);
        System.out.println("o:" + (null != o ? o : "null"));
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        System.out.println("checkLocalTransaction: " + message);
        return RocketMQLocalTransactionState.COMMIT;
    }
}

其他可以参考官网wiki

上一篇:SpringBoot Log4j 安全漏洞分析及解决方案


下一篇:ROS error: robot_voice/iat_publish/usr/bin/ld: 找不到 -lmsc