消息发送和接收演示

导入依赖

         <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>

发送消息

消息发送步骤:

​ 创建消息消费者, 指定消费者所属的组名

​ 指定Nameserver地址

​ 指定消费者订阅的主题和标签

​ 设置回调函数,编写处理消息的方法

​ 启动消息消费者

代码示例:

package com.wxit.test;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * @Author wj
 **/
//发送消息
public class RocketMQSendMessageTest {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者,并且设置生产组名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");

        //2. 指定Nameserver地址
        producer.setNamesrvAddr("192.168.91.4:9876");

        //3.启动生产者
        producer.start();

        //4.创建消息对象,指定主题、标签和消息体
        Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes());

        //5.发送消息
        SendResult sendResult = producer.send(message, 10000);
        System.out.println(sendResult);

        //6.关闭生产者
        producer.shutdown();
    }
}

接收消息

消息接收步骤:

​ \1. 创建消息消费者, 指定消费者所属的组名

​ \2. 指定Nameserver地址

​ \3. 指定消费者订阅的主题和标签

​ \4. 设置回调函数,编写处理消息的方法

​ \5. 启动消息消费者

代码示例:

package com.wxit.test;


import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author wj
 **/
//接收消息
public class RocketMQReceiveMessageTest {

    //接收消息
    public static void main(String[] args) throws Exception {

        //1 创建消费者,并且为其指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myproducer-group");

        //2 为消费者设置NameServer的地址
        consumer.setNamesrvAddr("192.168.91.4:9876");

        //3 指定消费者订阅的主题和标签
        consumer.subscribe("myTopic", "*");

        //4 设置一个回调函数,并在函数中编写接收到消息之后的处理方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //处理获取到的消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //消费逻辑
                System.out.println("Message===>" + list);

                //返回消费成功状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //5 启动消费者
        consumer.start();
        System.out.println("启动消费者成功了");
    }
}

案例

接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:

订单微服务发送消息

1 在 shop-order 中添加rocketmq的依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

2 添加配置

#rocketmq
rocketmq:
  name-server: 192.168.91.4:9876 #rocketMQ服务的地址
  producer:
    group: shop-order  # 生产者组

3 编写测试代码

@RestController
@Slf4j
public class OrderController {

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private OrderService orderService;

    @Autowired
    private ProductService productService;

    @Autowired
    private DiscoveryClient discoveryClient;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    //下单 --ribbon自定义负载均衡
    //fegin
    @RequestMapping("/order/prod/{pid}")
    public Order order(@PathVariable("pid") Integer pid){
        log.info("接收到{}号商品的下单请求,接下来调用商品的微服务查询此商品信息",pid);

        /**
         * 调用商品微服务,查询商品信息
         * 问题:
         * 1.代码可读性不好
         * 2.编程风格不统一
         */
        Product product = productService.findByPid(pid);

        if (product.getPid() == -100){
            Order order = new Order();
            order.setOid(-100L);
            order.setPname("下单失败");
            return order;
        }

        log.info("查询到{}号商品的信息",pid);

        //下单,创建订单
        Order order = new Order();
        order.setUid(1);
        order.setUsername("测试用户");

        order.setPid(pid);
        order.setPname(product.getPname());
        order.setPprice(product.getPprice());
        order.setNumber(1);

        orderService.createOrder(order);

        log.info("创建订单成功,订单信息为{}",order);

        //向mq中投递一个下单成功的信息
        rocketMQTemplate.convertAndSend("order-topic",order);

        return order;
    }

用户微服务订阅消息

1 修改 shop-user 模块配置,导入依赖

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

2 修改主类

package com.wxit;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

/**
 * @Author wj
 **/
@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class);
    }
}

3 修改配置文件

  cloud:    nacos:      discovery:        server-addr: 127.0.0.1:8848#rocrocketmq:  name-server: 192.168.91.4:9876

4 编写消息接收服务

package com.wxit.service;import com.wxit.domain.Order;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * @Author wj **/@Slf4j@Service@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")public class SmsService implements RocketMQListener<Order> {    @Override    public void onMessage(Order message) {        log.info("接收到了一个订单信息{},接下来就可以发送短信通知了", message);    }}

5 启动服务,执行下单操作,观看后台输出

上一篇:阿里云学生机ESC入门指南


下一篇:阿里云开放搜索(OpenSearch) VS 阿里云ElasticSearch