导入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
发送消息
消息发送步骤:
创建消息消费者, 指定消费者所属的组名
指定Nameserver地址
指定消费者订阅的主题和标签
设置回调函数,编写处理消息的方法
启动消息消费者
代码示例:
package com.wxit.test;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
/**
* @Author wj
**/
//发送消息
public class RocketMQSendMessageTest {
public static void main(String[] args) throws Exception {
//1.创建消息生产者,并且设置生产组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2. 指定Nameserver地址
producer.setNamesrvAddr("192.168.91.4:9876");
//3.启动生产者
producer.start();
//4.创建消息对象,指定主题、标签和消息体
Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes());
//5.发送消息
SendResult sendResult = producer.send(message, 10000);
System.out.println(sendResult);
//6.关闭生产者
producer.shutdown();
}
}
接收消息
消息接收步骤:
\1. 创建消息消费者, 指定消费者所属的组名
\2. 指定Nameserver地址
\3. 指定消费者订阅的主题和标签
\4. 设置回调函数,编写处理消息的方法
\5. 启动消息消费者
代码示例:
package com.wxit.test;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author wj
**/
//接收消息
public class RocketMQReceiveMessageTest {
//接收消息
public static void main(String[] args) throws Exception {
//1 创建消费者,并且为其指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myproducer-group");
//2 为消费者设置NameServer的地址
consumer.setNamesrvAddr("192.168.91.4:9876");
//3 指定消费者订阅的主题和标签
consumer.subscribe("myTopic", "*");
//4 设置一个回调函数,并在函数中编写接收到消息之后的处理方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
//处理获取到的消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//消费逻辑
System.out.println("Message===>" + list);
//返回消费成功状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5 启动消费者
consumer.start();
System.out.println("启动消费者成功了");
}
}
案例
接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:
订单微服务发送消息
1 在 shop-order 中添加rocketmq的依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
2 添加配置
#rocketmq
rocketmq:
name-server: 192.168.91.4:9876 #rocketMQ服务的地址
producer:
group: shop-order # 生产者组
3 编写测试代码
@RestController
@Slf4j
public class OrderController {
@Autowired
private RestTemplate restTemplate;
@Autowired
private OrderService orderService;
@Autowired
private ProductService productService;
@Autowired
private DiscoveryClient discoveryClient;
@Autowired
private RocketMQTemplate rocketMQTemplate;
//下单 --ribbon自定义负载均衡
//fegin
@RequestMapping("/order/prod/{pid}")
public Order order(@PathVariable("pid") Integer pid){
log.info("接收到{}号商品的下单请求,接下来调用商品的微服务查询此商品信息",pid);
/**
* 调用商品微服务,查询商品信息
* 问题:
* 1.代码可读性不好
* 2.编程风格不统一
*/
Product product = productService.findByPid(pid);
if (product.getPid() == -100){
Order order = new Order();
order.setOid(-100L);
order.setPname("下单失败");
return order;
}
log.info("查询到{}号商品的信息",pid);
//下单,创建订单
Order order = new Order();
order.setUid(1);
order.setUsername("测试用户");
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
orderService.createOrder(order);
log.info("创建订单成功,订单信息为{}",order);
//向mq中投递一个下单成功的信息
rocketMQTemplate.convertAndSend("order-topic",order);
return order;
}
用户微服务订阅消息
1 修改 shop-user 模块配置,导入依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
2 修改主类
package com.wxit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
/**
* @Author wj
**/
@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class);
}
}
3 修改配置文件
cloud: nacos: discovery: server-addr: 127.0.0.1:8848#rocrocketmq: name-server: 192.168.91.4:9876
4 编写消息接收服务
package com.wxit.service;import com.wxit.domain.Order;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * @Author wj **/@Slf4j@Service@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")public class SmsService implements RocketMQListener<Order> { @Override public void onMessage(Order message) { log.info("接收到了一个订单信息{},接下来就可以发送短信通知了", message); }}
5 启动服务,执行下单操作,观看后台输出