Spring Cloud Alibaba 05:整合RocketMQ

1、安装 RocketMQ

下载:rocketmq-all-4.8.0-bin-release.zip

1、传入 Linux 服务器

2、解压缩

unzip rocketmq-all-4.8.0-bin-release.zip

3、调整启动参数,

cd rocketmq-all-4.8.0-bin-release/bin

修改默认启动参数,默认启动的最大内存为4G,比较大,修改小一点,否则如果服务器内存不够会启动失败

调整namesrv

vim runserver.sh

调整如下

Spring Cloud Alibaba 05:整合RocketMQ

调整broker

vim runbroker.sh 

调整如下

Spring Cloud Alibaba 05:整合RocketMQ

4、启动namesrv和启动broker

启动navmesrv

nohup sh mqnamesrv &

启动broker,注意ip为公网ip,端口为navmesrv的默认端口9876

nohup ./mqbroker -n localhost:9876 &

5、检查是否启动成功

jps -l

Spring Cloud Alibaba 05:整合RocketMQ

也可以查看日志

tail -f ~/logs/rocketmqlogs/broker.log

启动成功

6、测试 RocketMQ

消息发送

export NAMESRV_ADDR=localhost:
./tools.sh org.apache.rocketmq.example.quickstart.Producer

消息接收

./tools.sh org.apache.rocketmq.example.quickstart.Consumer

7、关闭 RocketMQ

./mqshutdown broker
./mqshutdown namesrv

2、安装 RocketMQ 控制台

git clone  https://github.com/apache/rocketmq-externals.git

1、进入到rocketmq-console的配置文件,修改如下:

Spring Cloud Alibaba 05:整合RocketMQ

2、打包

mvn clean package -Dmaven.test.skip=true

3、进入 target 启动 jar

java -jar rocketmq-console-ng-2.0.0.jar 

打开浏览器访问 localhost:9877,如果报错

Spring Cloud Alibaba 05:整合RocketMQ

这是因为我们的 RocketMQ 安装在 Linux 中,控制台在 windows,Linux 需要开放端口才能访问,开放 10909 和 9876 端口

firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload

重新启动控制台项目

3、Java 实现消息发送

1、pom.xml 中引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

2、生产消息

package com.godfrey;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

@SpringBootTest
class ProviderApplicationTests {

    @Test
    @DisplayName("测试RocketMQ消息发送")
    void test3() throws Exception {
        //创建消息生产者
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        //设置NameServer
        producer.setNamesrvAddr("39.106.41.184:9876");
        //启动生产者
        producer.start();
        //构建消息对象
        Message message = new Message("myTopic", "myTag", ("Test MQ").getBytes());
        //发送消息
        SendResult result = producer.send(message, 1000);
        System.out.println(result);
        //关闭生产者
        producer.shutdown();
    }
}

3、直接运行,如果报错 sendDefaultImpl call timeout,可以开放 10911 端口

firewall-cmd --zone=public --add-port=10911/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload

打开 RocketMQ 控制台,可查看消息。

4、Java 实现消息消费

package com.godfrey;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

@SpringBootTest
class ProviderApplicationTests {
    
    @Test
    @DisplayName("测试RocketMQ消息接收")
    void test4() throws MQClientException {
        //创建消息消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
        //设置NameServer
        consumer.setNamesrvAddr("39.106.41.184:9876");
        //指定订阅的主题和标签
        consumer.subscribe("myTopic", "*");
        //回调函数
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

5、Spring Boot 整合 RocketMQ

provider

1、pom.xml

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>

2、application.yml

rocketmq:
  name-server: 39.106.41.184:9876
  producer:
    group: myprovider

3、Order

package com.godfrey.entity;

import java.io.Serializable;
import java.util.Date;

/**
 * @author godfrey
 * @since 2020-12-27
 */
public class Order implements Serializable {
    private static final long serialVersionUID = -5397628182599822017L;
    private Integer id;
    private String buyerName;
    private String buyerTel;
    private String address;
    private Date createDate;

    public Order() {
    }

    public Order(Integer id, String buyerName, String buyerTel, String address, Date createDate) {
        this.id = id;
        this.buyerName = buyerName;
        this.buyerTel = buyerTel;
        this.address = address;
        this.createDate = createDate;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getBuyerName() {
        return buyerName;
    }

    public void setBuyerName(String buyerName) {
        this.buyerName = buyerName;
    }

    public String getBuyerTel() {
        return buyerTel;
    }

    public void setBuyerTel(String buyerTel) {
        this.buyerTel = buyerTel;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public Date getCreateDate() {
        return createDate;
    }

    public void setCreateDate(Date createDate) {
        this.createDate = createDate;
    }

    @Override
    public String toString() {
        return "Order{" +
                "id=" + id +
                ", buyerName='" + buyerName + '\'' +
                ", buyerTel='" + buyerTel + '\'' +
                ", address='" + address + '\'' +
                ", createDate=" + createDate +
                '}';
    }
}

4、Controller

private RocketMQTemplate rocketMQTemplate;

@Autowired
public ProviderController(RocketMQTemplate rocketMQTemplate) {
    this.rocketMQTemplate = rocketMQTemplate;
}

@GetMapping("/create")
public Order create(){
    Order order = new Order(
        1,
        "张三",
        "123123",
        "软件园",
        new Date()
    );
    this.rocketMQTemplate.convertAndSend("myTopic",order);
    return order;
}

consumer

1、pom.xml

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>

2、application.yml

rocketmq:
  name-server: 39.106.41.184:9876

3、Order

package com.godfrey.entity;

import java.io.Serializable;
import java.util.Date;

/**
 * @author godfrey
 * @since 2020-12-27
 */
public class Order implements Serializable {
    private static final long serialVersionUID = -5397628182599822017L;
    private Integer id;
    private String buyerName;
    private String buyerTel;
    private String address;
    private Date createDate;

    public Order() {
    }

    public Order(Integer id, String buyerName, String buyerTel, String address, Date createDate) {
        this.id = id;
        this.buyerName = buyerName;
        this.buyerTel = buyerTel;
        this.address = address;
        this.createDate = createDate;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getBuyerName() {
        return buyerName;
    }

    public void setBuyerName(String buyerName) {
        this.buyerName = buyerName;
    }

    public String getBuyerTel() {
        return buyerTel;
    }

    public void setBuyerTel(String buyerTel) {
        this.buyerTel = buyerTel;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public Date getCreateDate() {
        return createDate;
    }

    public void setCreateDate(Date createDate) {
        this.createDate = createDate;
    }


    @Override
    public String toString() {
        return "Order{" +
                "id=" + id +
                ", buyerName='" + buyerName + '\'' +
                ", buyerTel='" + buyerTel + '\'' +
                ", address='" + address + '\'' +
                ", createDate=" + createDate +
                '}';
    }
}

4、Service

package com.godfrey.service;

import com.godfrey.entity.Order;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
 * @author godfrey
 * @since 2020-12-27
 */
@Service
@RocketMQMessageListener(consumerGroup = "myConsumer", topic = "myTopic")
public class SmsService implements RocketMQListener<Order> {

    private static final Logger log = LoggerFactory.getLogger(SmsService.class);
    
    @Override
    public void onMessage(Order order) {
        log.info("新订单{},发短信通知用户", order);
    }
}
上一篇:LeetCode Sudoku Solver


下一篇:Python爬虫 如何利用浏览器如何JSON数据,如获取淘宝天猫的评论链接?