Spring Cloud Alibaba Dubbo
一、项目简介
Dubbo Spring Cloud 基于 Dubbo Spring Boot 2.7.1 和 Spring Cloud 2.x 开发,无论开发人员是 Dubbo 用户还是 Spring Cloud 用户,都能轻松地驾驭,并以接近“零”成本的代价使应用向上迁移。Dubbo Spring Cloud 致力于简化 Cloud Native 开发成本,提高研发效能以及提升应用性能等目的。
Dubbo Spring Cloud 首个 Preview Release,随同Spring Cloud Alibaba 0.2.2.RELEASE
和0.9.0.RELEASE
一同发布,分别对应 Spring Cloud Finchley 与 Greenwich(下文分别简称为 “F” 版 和 “G” 版)
二、功能的完成度
由于 Dubbo Spring Cloud 构建在原生的 Spring Cloud 之上,其服务治理方面的能力可认为是 Spring Cloud Plus,不仅完全覆盖 Spring Cloud 原生特性,而且提供更为稳定和成熟的实现,特性比对如下表所示:
三、框架的搭建
我们将搭建如图所示的项目框架
3.1 搭建 spring-cloud-dubbo-examples
spring-cloud-dubbo-exmaples 是一个父项目,用来给子项目控制版本和去除公共的依赖。
3.1.1 创建项目
使用 IDEA 创建一个模块:
选择 Maven:
点击 Next,进行下一步操作:
Parent:必须选择之前我们创建的 spring-cloud-alibaba-examples。
Name:spring-cloud-dubbo-examples 项目的名称
点击 Finish,完成项目的创建。
至此,spring-cloud-dubbo-examples 项目已经完成创建了。
3.1.2 添加依赖
打开该项目的 pom.xml,添加以下内容:
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-dubbo</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
</dependencies>
3.1.3 修改项目的打包方式
<packaging>pom</packaging>
3.1.4 完整的 pom.xml 文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-alibaba-examples</artifactId>
<groupId>com.bjsxt</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<artifactId>spring-cloud-dubbo-examples</artifactId>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-dubbo</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
</dependencies>
</project>
3.2 搭建 dubbo-api
dubbo-api 里面将存放用于发表服务的接口。
3.2.1 创建 dubbo-api 项目
使用 IDEA 创建一个子模块。
选择 Maven 项目:
点击 Next 进行下一步操作:
Parent:选择 spring-cloud-dubbo-examples
Name:名称为 dubbo-api
点击 Finish 完成项目的创建:
3.2.2 完整的 pom.xml 文件如下
dubbo-api 的 pom.xml 文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-dubbo-examples</artifactId>
<groupId>com.bjsxt</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dubbo-api</artifactId>
</project>
3.3 搭建 dubbo-provider
3.3.1 创建 dubbo-provider 项目
搭建 dubbo-provider 用来做一个服务的提供者。
使用 IDEA 创建一个模块:
选择 Maven 项目:
点击 Next,进行下一步操作:
Parent:选择 spring-cloud-alibaba-examples
Name:dubbo-provider
点击 Finish,完成项目的创建。
3.3.2 修改 Maven 的打包方式
Maven 项目默认会被 target 目录下的 class 文件打包在一个 jar 里面,该 jar 并不能直接运行,我们需要修改它的打包方式为 spring-boot 的打包,这样,打包后的项目将能直接被运行。
修改 pom.xml ,添加如下的内容:
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
这样,该项目将最终被打包成为一个 jar,能直接通过 java -jar
来运行
3.3.3 完整的 pom.xml 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-dubbo-examples</artifactId>
<groupId>com.bjsxt</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dubbo-provider</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.4 搭建 dubbo-consumer
3.4.1 创建 dubbo-provider-consumer 项目
搭建 dubbo-provider 用来做一个服务的提供者。
使用 IDEA 创建一个模块:
选择 Maven 项目:
点击 Next,进行下一步操作:
Parent:选择 spring-cloud-alibaba-examples
Name:dubbo-consumer
点击 Finish,完成项目的创建。
3.4.2 修改 Maven 的打包方式
Maven 项目默认会被 target 目录下的 class 文件打包在一个 jar 里面,该 jar 并不能直接运行,我们需要修改它的打包方式为 spring-boot 的打包,这样,打包后的项目将能直接被运行。
修改 pom.xml ,添加如下的内容:
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
这样,该项目将最终被打包成为一个 jar,能直接通过 java -jar
来运行
3.4.3 完整的 pom.xml 文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-dubbo-examples</artifactId>
<groupId>com.bjsxt</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dubbo-consumer</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.4.4 完整的项目结构
四、代码的完善
4.1 dubbo-api 代码的完善
4.1.1 定义 Dubbo 服务接口
Dubbo 服务接口是服务提供方与消费方的远程通讯契约,通常由普通的 Java 接口(interface)来声明。
代码如下:
public interface EchoService {
String echo(String message);
}
4.1.2 项目的打包
Api 项目主要是为了把 rpc 中定义的接口发布出去。
我们可以使用 Maven 的普通打包方式把编译后的 class 文件打包为jar。
打包成功后,项目的 jar 位于:
4.2 dubbo-provider 代码的完善
4.2.1 添加依赖
在 dubbo-provider 的 pom.xml 的 dependencies 添加以下的依赖。
<dependencies>
<dependency>
<groupId>com.bjsxt</groupId>
<artifactId>dubbo-api</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
4.2.2 实现 dubbo-api 里面定义的接口
代码内容如下:
@Service
public class EchoServiceImpl implements EchoService {
@Override
public String echo(String message) {
return "[echo] Hello, " + message;
}
}
4.2.3 添加配置文件
内容如下:
dubbo:
scan:
# dubbo 服务扫描基准包
base-packages: com.bjsxt.service.impl
cloud:
subscribed-services: dubbo-provider
protocol:
# dubbo 协议
name: dubbo
# dubbo 协议端口( -1 表示自增端口,从 20880 开始)
port: -1
registry:
# 挂载到 Spring Cloud 注册中心
address: spring-cloud://localhost
spring:
application:
# Dubbo 应用名称
name: dubbo-provider
main:
# Spring Boot 2.1 需要设定
allow-bean-definition-overriding: true
cloud:
nacos:
# Nacos 服务发现与注册配置
discovery:
server-addr: localhost:8848
4.2.4 启动类
代码如下:
@SpringBootApplication
@EnableDiscoveryClient
public class ProviderServiceApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderServiceApplication.class, args) ;
}
}
4.3 dubbo-consumer 代码的完善
4.3.1 添加依赖
在 dubbo-consumer 的 pom.xml 的 dependencies 添加以下依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.bjsxt</groupId>
<artifactId>dubbo-api</artifactId>
<version>1.0</version>
</dependency>
<!-- Dubbo Spring Cloud Starter -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-dubbo</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
</dependencies>
4.3.2 添加配置文件
内容如下:
dubbo:
registry:
# 挂载到 Spring Cloud 注册中心
address: nacos://127.0.0.1:8848
cloud:
subscribed-services: dubbo-provider
server:
port: 8080
spring:
application:
# Dubbo 应用名称
name: dubbo-consumer
main:
# Spring Boot 2.1 需要设定
allow-bean-definition-overriding: true
cloud:
nacos:
# Nacos 服务发现与注册配置
discovery:
server-addr: 127.0.0.1:8848
4.3.3 启动类
代码如下:
@EnableDiscoveryClient
@SpringBootApplication
@RestController
public class ConsumerServiceApplication {
@Reference
private EchoService echoService ;
public static void main(String[] args) {
SpringApplication.run(ConsumerServiceApplication.class,args) ;
}
@GetMapping("/rpc")
public ResponseEntity<String> rpc(){
return ResponseEntity.ok(String.format("调用结果
为%s",echoService.echo("info")));
}
}
4.4 远程调用测试
- 启动 Nacos-Server
- 启动 dubbo-provider
- 启动 dubbo-consumer
查看 Nacos 控制台:
http://localhost:8848/nacos/
浏览器访问:
调用已经成功;
五、负载均衡调用测试
5.1 启动多个服务的提供者
修改服务提供者里面实现类的代码:
启动多个:
再次使用 Clt+D 复制一个:
启动这 2 个:
现在,共有 3 台同时运行:
查看 Nacos:
5.2 使用消费者负载均衡调用测试
访问:
http://localhost:8080/rpc
负载均衡测试成功。
Spring Cloud Alibaba RocketMQ
一、RocketMQ 介绍
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能
二、RocketMQ 基本使用
2.1 下载 RocketMQ
使用浏览器打开:
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
这里我们选择 4.4.0 版本的原因在于,我们 spring cloud alibaba 版本为:2.2.0.RELEASE,它里面控制的 rocketMQ 的版是 4.4.0。
2.2 RocketMQ 目录分析
将该压缩包复制到软件目录里面,使用压缩软件进行解压。Benchmark
:包含一些性能测试的脚本;Bin
:可执行文件目录;Conf
:配置文件目录;Lib
:第三方依赖;LICENSE
:授权信息;NOTICE
:版本公告;
2.3 配置环境变量
找到配置环境变量的对话框:
点击新建创建一个环境变量:
- 变量名:ROCKETMQ_HOME
- 变量值:D:\devtools\rocketMQ\rocketmq-all-4.4.0-bin-release
2.4 RocketMQ 的启动
我们进入到${rocketMQ}/bin
,在此目录里面启动和停止命令。
2.4.1 启动 NameServe
注意:弹出的黑窗口不要关闭。
2.4.2 启动 Broker
./mqbroker.cmd -n localhost:9876
其中:-n localhost:9876
是为了指定 nameserver
的地址
2.5 RocketMQ 的停止
直接把弹出的黑框关闭,即可停止 RocketMQ 的 namesrv 和 broker。
2.6 RocketMQ 控制台的安装
Rocketmq 控制台可以可视化 MQ 的消息发送!
2.6.1 下载 RocketMQ 控制台
2.6.2 复制到软件目录里面
2.6.3 运行该 jar
java -jar rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr=127.0.0.1:9876
其中:
运行成功后:
访问:
http://localhost:8080/#/
三、Spring Cloud Stream 介绍
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。Spring Cloud Stream 内部有两个概念:Binder 和 Binding:
-
Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的Binder 实现。
举例说明:
Kafka 的实现 KafkaMessageChannelBinder,RabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder。 -
Binding: 包括 Input Binding 和Output Binding。
Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
四、测试框架搭建
我们将创建 spring-cloud-bus-rocketmq-example 项目,用来测试它的所有功能。
4.1 搭建 spring-cloud-bus-rocketmq-example
spring-cloud-bus-rocketmq-example 将去除子模块的公共依赖部分。
4.1.1 使用 IDEA 创建一个 Maven 项目
选择 Maven 项目:
点击 Next ,填写以下的内容:
Parent:我们选择 spring-cloud-alibaba-examples
Name:spring-cloud-bus-rocketmq-example
其他的项保持不变。
点击 Finish 完成创建。
4.1.2 添加依赖
打开项目的 pom.xml 文件,我们添加以下的内容:
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-bus-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
4.1.3 完整的 pom.xml 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-alibaba-examples</artifactId>
<groupId>com.bjsxt</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-bus-rocketmq-example</artifactId>
<packaging>pom</packaging>
<modules>
<module>rocketmq-produce-example</module>
</modules>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-bus-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
4.2 搭建 rocketmq-produce-example
produce 代表服务的生产者,用来发送消息。
4.2.1 使用 IDEA 创建一个 Maven 项目
选择 Maven:
点击 Next 添加以下的内容:
Parent:spring-cloud-bus-rocketmq-example
Name:rocketmq-produce-example
点击 Finish 完成项目的创建
4.2.2 修改 Maven 的打包方式
此项目我们以后可能需要使用 jar 发布,在此,我们添加 spring-boot 的打包插件:
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
4.2.3 完整的 pom.xml 文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-bus-rocketmq-example</artifactId>
<groupId>com.bjsxt</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-produce-example</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
4.3 搭建 rocketmq-produce-example
4.3.1 使用 IDEA 创建一个 Maven 项
选择 Maven:
点击 Next 添加以下的内容:
Parent:spring-cloud-bus-rocketmq-example
Name:rocketmq-consumer-example
点击 Finish 完成项目的创建
4.3.2 修改 Maven 的打包方式
为了以后打包为一个 jar 发布,我们添加一个打包插件:
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
4.3.3 完整的 pom.xml 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-bus-rocketmq-example</artifactId>
<groupId>com.bjsxt</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-consume-example</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
4.4 项目的完整结构如下
五、完善 rocketmq-produce-example 项目
5.1 添加一个配置文件
配置信息如下:
logging.level.com.alibaba.cloud.stream.binder.rocketmq=DEBUG
# rocketmq 服务器nameserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
# stream->binding->output(input)
# output1
# 发送消息的目的地址
spring.cloud.stream.bindings.output1.destination=test-topic
# 消息的默认类型
spring.cloud.stream.bindings.output1.content-type=application/json
# 生产者组
spring.cloud.stream.rocketmq.bindings.output1.producer.group=binder-group
# 消息的同步发送
spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true
# output2 主要样式事务消息的发送
spring.cloud.stream.bindings.output2.destination=TransactionTopic
spring.cloud.stream.bindings.output2.content-type=application/json
# 发送的是事务消息
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup
# output3 用它样式消息的手动拉取
spring.cloud.stream.bindings.output3.destination=pull-topic
spring.cloud.stream.bindings.output3.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output3.producer.group=pull-binder-group
spring.application.name=rocketmq-produce-example
server.port=28081
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
5.2 添加一个启类
@SpringBootApplication
public class RocketMQProduceApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQProduceApplication.class, args);
}
}
5.3 添加 MQSource
在 Source 里面定义输出:
/**
* 读取我们配置文件里的output
*/
public interface MQSource {
@Output("output1")
MessageChannel output1() ;
@Output("output2")
MessageChannel output2() ;//演示事务消息的发送
@Output("output1")
MessageChannel output3() ;
}
4.5 添加配置类
代码如下:
@Configuration
@EnableBinding({MQSource.class})
public class MQConfig {
}
5.4 添加发送消息的类
@Service
public class SendService {
@Autowired
private MQSource source;
/**
* 发送简单的测试消息
* @param msg
* @throws Exception
*/
public void send(String msg) throws Exception {
source.output1().send(MessageBuilder.withPayload(msg).build());
}
/**
* 发消息时添加标签
* @param msg
* @param tag
* @param <T>
* @throws Exception
*/
public <T> void sendWithTags(T msg, String tag) throws Exception {
Message message = MessageBuilder.createMessage(msg,
new MessageHeaders(Stream.of(tag).collect(Collectors
.toMap(str -> MessageConst.PROPERTY_TAGS,String::toString))));
source.output1().send(message);
}
/**
* 发送一个对象消息
* @param msg
* @param tag
* @param <T>
* @throws Exception
*/
public <T> void sendObject(T msg, String tag) throws Exception {
Message message = MessageBuilder.withPayload(msg)
.setHeader(MessageConst.PROPERTY_TAGS, tag)
.setHeader(MessageHeaders.CONTENT_TYPE,MimeTypeUtils.APPLICATION_JSON)
.build();
source.output1().send(message);
}
/**
* 发送事务的消息
* @param msg
* @param num
* @param <T>
* @throws Exception
*/
public <T> void sendTransactionalMsg(T msg, int num) throws Exception {
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(MessageHeaders.CONTENT_TYPE,MimeTypeUtils.APPLICATION_JSON);
builder.setHeader("test", String.valueOf(num));
Message message = builder.build();
source.output2().send(message);
}
public void sendMassiveMessage(String msg) {
source.output3().send(MessageBuilder.withPayload(msg).build());
}
}
5.6 事务消息往往需要我们监听回查
新建一个类:
代码如下:
/**
* TransactionStatus.CommitTransaction:消息提交,当消息状态为 CommitTransaction,表示允许消费者允许消费当前消息
* TransactionStatus.RollbackTransaction:消息回滚,表示 MQ 服务端将会删除当前半消息,不允许消费者消费。
* TransactionStatus.Unknown:中间状态,表示 MQ 服务需要发起回查操作,检测当前发送方本地事务的执行状态。
*/
@RocketMQTransactionListener(
txProducerGroup = "myTxProducerGroup",
corePoolSize = 5,
maximumPoolSize = 10)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
/**
*消息生产者需要在 executeLocalTransaction 中执行本地事务,当事务半消息提交成功,执行完
毕后需要返回事务状态码。
* @param msg
* @param o
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object o)
{
Object num = msg.getHeaders().get("test");
if ("1".equals(num)) {
System.out.println(
"executer: " + new String((byte[]) msg.getPayload()) + " unknown");
return RocketMQLocalTransactionState.UNKNOWN; // 将会导致再次查询本地事务
}
else if ("2".equals(num)) {
System.out.println(
"executer: " + new String((byte[]) msg.getPayload()) + " rollback");
return RocketMQLocalTransactionState.ROLLBACK; // 半消息将会被 mq 服务器删除,并且消费者不会消费到该消息
}
System.out.println(
"executer: " + new String((byte[]) msg.getPayload()) + " commit");
return RocketMQLocalTransactionState.COMMIT; // 半消息提交,消费者会消费到该消息。
}
/**
* 实现 checkLocalTransaction 方法,该方法用于进行本地事务执行情况回查,并回应事务状态给
MQ 的 broker,
* 执行完成之后需要返回对应的事务状态码
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("check: " + new String((byte[]) message.getPayload()));
return RocketMQLocalTransactionState.COMMIT;
}
}
5.7 构建一个简单的模型
代码如下:
5.8 测试消息的发送
@RestController
public class SendMessageController {
@Autowired
private SendService sendService ;
/**
* 发送一个简单的消息
* @param msg
* @return
* @throws Exception
*/
@GetMapping("/send/simple")
private ResponseEntity<String> sendSimpleMessage( @RequestParam(required = true)
String msg) throws Exception {
sendService.send(msg);
return ResponseEntity.ok("发送成功") ;
}
/**
* 发送消息并且带上标签
* @param msg 消息
* @param tags 消息的标签
* @return
* @throws Exception
*/
@GetMapping("/send/tags")
private ResponseEntity<String> sendMessageWithTag( @RequestParam(required = true)
String msg,@RequestParam(required = true)String tags) throws Exception {
sendService.sendWithTags(msg,tags);
return ResponseEntity.ok("发送成功") ;
}
/**
* 发送对象消息
* @param user
* @param tags
* @return
* @throws Exception
*/
@GetMapping("/send/object")
public ResponseEntity<String> sendObjectMessage(User user,String tags) throws
Exception {
sendService.sendObject(user,tags);
return ResponseEntity.ok("发送成功") ;
}
/**
* 发送一个事务消息,也就是 half 消息
* @param msg
* @param num 类型
* @return
* @throws Exception
*/
@GetMapping("/send/transaction")
public ResponseEntity<String> sendTransactionMessage(String msg ,int num) throws
Exception {
sendService.sendTransactionalMsg(msg,num);
return ResponseEntity.ok("发送成功") ;
}
/**
* 发送很多消息
* @param msg
* @return
* @throws Exception
*/
@GetMapping("/send/poll")
public ResponseEntity<String> sendMassiveMessage(String msg) throws Exception {
sendService.sendMassiveMessage(msg);
return ResponseEntity.ok("发送成功") ;
}
}
5.9 启动类
代码如下:
@SpringBootApplication
public class RocketMQProduceApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQProduceApplication.class, args);
}
}
六、完善 rocketmq-consumer-example 项目
6.1 添加配置文件
内容如下:
# rocketmq nameserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
# stream->bindings->input
# input1
spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
# input2
spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
spring.cloud.stream.bindings.input2.consumer.concurrency=20
spring.cloud.stream.bindings.input2.consumer.maxAttempts=1
# input3
spring.cloud.stream.bindings.input3.destination=test-topic
spring.cloud.stream.bindings.input3.content-type=application/json
spring.cloud.stream.bindings.input3.group=test-group3
spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20
# input4
spring.cloud.stream.bindings.input4.destination=TransactionTopic
spring.cloud.stream.bindings.input4.content-type=text/plain
spring.cloud.stream.bindings.input4.group=transaction-group
spring.cloud.stream.bindings.input4.consumer.concurrency=5
# input5 手动消息的拉取
spring.cloud.stream.bindings.input5.destination=pull-topic
spring.cloud.stream.bindings.input5.content-type=text/plain
spring.cloud.stream.bindings.input5.group=pull-topic-group
spring.application.name=rocketmq-consume-example
server.port=28082
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
6.2 添加一个 Sink
在 Sink 里面添加输入:
public interface Sink {
@Input("input1")
SubscribableChannel input1();
@Input("input2")
SubscribableChannel input2();
@Input("input3")
SubscribableChannel input3();
@Input("input4")
SubscribableChannel input4();
@Input("input5")
PollableMessageSource input5();
}
6.3 创建消息的监听器
/**
* receive
*/
@Service
public class ReceiveService {
@StreamListener("input1")
public void receiveInput1(String receiveMsg) {
System.out.println("input1 receive: " + receiveMsg);
}
@StreamListener("input2")
public void receiveInput2(String receiveMsg) {
System.out.println("input2 receive: " + receiveMsg);
}
@StreamListener("input3")
public void receiveInput3(@Payload User user) {
System.out.println("input3 receive: " + user);
}
@StreamListener("input4")
public void receiveTransactionalMsg(String transactionMsg) {
System.out.println("input4 receive transaction msg: " + transactionMsg);
}
}
6.4 主动去 mq 服务器拉取消息
使用定时任务,主动去服务器拉取消息:
@Service
public class PullMessageTask {
@Autowired
private Sink sink ;
@Scheduled(fixedRate = 5*1000)
public void pullMessage(){
sink.input5().poll((message) -> {
String payload = (String) message.getPayload();
System.out.println("pull msg: " + payload);
}, new ParameterizedTypeReference<String>() {
});
}
}
6.5 模型类
直接从 produce 里面复制过来:
6.6 配置类
新建 MQConfig:
代码如下:
@Configuration
@EnableBinding({Sink.class})
public class MQConfig {
}
6.7 启动类
@SpringBootApplication
@EnableScheduling
public class RocketMQConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQConsumerApplication.class ,args);
}
}
七、测试案例测试
7.1 启动服务
启动 2 个服务:
- rocketmq-produce-example
- rocketmq-consumer-example
7.2 发送消息测试
7.2.1 发送简单的字符串
http://localhost:28081/send/simple?msg=RocketMQ
7.2.2 发送带标签的消息
7.2.3 发送对象消息
http://localhost:28081/send/object?id=1&userName=bjsxt&password=123456&tags=xxx
7.2.4 发送事务消息
http://localhost:28081/send/transaction?msg=order&num=1
http://localhost:28081/send/transaction?msg=order&num=2
http://localhost:28081/send/transaction?msg=order&num=3
7.2.4 手动拉取消息
http://localhost:28081/send/poll?msg=order