Spring Cloud Alibaba技术栈【下】

Spring Cloud Alibaba Dubbo

一、项目简介

Dubbo Spring Cloud 基于 Dubbo Spring Boot 2.7.1 和 Spring Cloud 2.x 开发,无论开发人员是 Dubbo 用户还是 Spring Cloud 用户,都能轻松地驾驭,并以接近“零”成本的代价使应用向上迁移。Dubbo Spring Cloud 致力于简化 Cloud Native 开发成本,提高研发效能以及提升应用性能等目的。

Dubbo Spring Cloud 首个 Preview Release,随同Spring Cloud Alibaba 0.2.2.RELEASE0.9.0.RELEASE 一同发布,分别对应 Spring Cloud Finchley 与 Greenwich(下文分别简称为 “F” 版 和 “G” 版)

二、功能的完成度

由于 Dubbo Spring Cloud 构建在原生的 Spring Cloud 之上,其服务治理方面的能力可认为是 Spring Cloud Plus,不仅完全覆盖 Spring Cloud 原生特性,而且提供更为稳定和成熟的实现,特性比对如下表所示:
Spring Cloud Alibaba技术栈【下】
Spring Cloud Alibaba技术栈【下】

三、框架的搭建

我们将搭建如图所示的项目框架
Spring Cloud Alibaba技术栈【下】

3.1 搭建 spring-cloud-dubbo-examples

spring-cloud-dubbo-exmaples 是一个父项目,用来给子项目控制版本和去除公共的依赖。

3.1.1 创建项目

使用 IDEA 创建一个模块:
Spring Cloud Alibaba技术栈【下】
选择 Maven:
Spring Cloud Alibaba技术栈【下】
点击 Next,进行下一步操作:
Spring Cloud Alibaba技术栈【下】
Parent:必须选择之前我们创建的 spring-cloud-alibaba-examples。
Name:spring-cloud-dubbo-examples 项目的名称
点击 Finish,完成项目的创建。
Spring Cloud Alibaba技术栈【下】
至此,spring-cloud-dubbo-examples 项目已经完成创建了。

3.1.2 添加依赖

打开该项目的 pom.xml,添加以下内容:

<dependencies>
	<dependency>
		<groupId>com.alibaba.cloud</groupId>
		<artifactId>spring-cloud-starter-dubbo</artifactId>
	</dependency>
	<dependency>
		<groupId>com.alibaba.cloud</groupId>
		<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
	</dependency>
</dependencies>

3.1.3 修改项目的打包方式

<packaging>pom</packaging>

3.1.4 完整的 pom.xml 文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-alibaba-examples</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<packaging>pom</packaging>
	<artifactId>spring-cloud-dubbo-examples</artifactId>
	<dependencies>
		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-dubbo</artifactId>
		</dependency>
		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
		</dependency>
	</dependencies>
</project>

3.2 搭建 dubbo-api

dubbo-api 里面将存放用于发表服务的接口。

3.2.1 创建 dubbo-api 项目

使用 IDEA 创建一个子模块。
Spring Cloud Alibaba技术栈【下】
选择 Maven 项目:
Spring Cloud Alibaba技术栈【下】
点击 Next 进行下一步操作:
Spring Cloud Alibaba技术栈【下】
Parent:选择 spring-cloud-dubbo-examples
Name:名称为 dubbo-api
点击 Finish 完成项目的创建:
Spring Cloud Alibaba技术栈【下】

3.2.2 完整的 pom.xml 文件如下

dubbo-api 的 pom.xml 文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-dubbo-examples</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<artifactId>dubbo-api</artifactId>
</project>

3.3 搭建 dubbo-provider

3.3.1 创建 dubbo-provider 项目

搭建 dubbo-provider 用来做一个服务的提供者。
使用 IDEA 创建一个模块:
Spring Cloud Alibaba技术栈【下】
选择 Maven 项目:
Spring Cloud Alibaba技术栈【下】
点击 Next,进行下一步操作:
Spring Cloud Alibaba技术栈【下】
Parent:选择 spring-cloud-alibaba-examples
Name:dubbo-provider
点击 Finish,完成项目的创建。

3.3.2 修改 Maven 的打包方式

Maven 项目默认会被 target 目录下的 class 文件打包在一个 jar 里面,该 jar 并不能直接运行,我们需要修改它的打包方式为 spring-boot 的打包,这样,打包后的项目将能直接被运行。
修改 pom.xml ,添加如下的内容:

<build>
	<plugins>
		<plugin>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-maven-plugin</artifactId>
		</plugin>
	</plugins>
</build>

这样,该项目将最终被打包成为一个 jar,能直接通过 java -jar来运行

3.3.3 完整的 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-dubbo-examples</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<artifactId>dubbo-provider</artifactId>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

3.4 搭建 dubbo-consumer

3.4.1 创建 dubbo-provider-consumer 项目

搭建 dubbo-provider 用来做一个服务的提供者。
使用 IDEA 创建一个模块:

Spring Cloud Alibaba技术栈【下】
选择 Maven 项目:
Spring Cloud Alibaba技术栈【下】
点击 Next,进行下一步操作:
Spring Cloud Alibaba技术栈【下】
Parent:选择 spring-cloud-alibaba-examples
Name:dubbo-consumer
点击 Finish,完成项目的创建。

3.4.2 修改 Maven 的打包方式

Maven 项目默认会被 target 目录下的 class 文件打包在一个 jar 里面,该 jar 并不能直接运行,我们需要修改它的打包方式为 spring-boot 的打包,这样,打包后的项目将能直接被运行。
修改 pom.xml ,添加如下的内容:

<build>
	<plugins>
		<plugin>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-maven-plugin</artifactId>
		</plugin>
	</plugins>
</build>

这样,该项目将最终被打包成为一个 jar,能直接通过 java -jar 来运行

3.4.3 完整的 pom.xml 文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-dubbo-examples</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<artifactId>dubbo-consumer</artifactId>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

3.4.4 完整的项目结构

Spring Cloud Alibaba技术栈【下】

四、代码的完善

4.1 dubbo-api 代码的完善

4.1.1 定义 Dubbo 服务接口

Dubbo 服务接口是服务提供方与消费方的远程通讯契约,通常由普通的 Java 接口(interface)来声明。

Spring Cloud Alibaba技术栈【下】
代码如下:

public interface EchoService {
	String echo(String message);
}

4.1.2 项目的打包

Api 项目主要是为了把 rpc 中定义的接口发布出去。
我们可以使用 Maven 的普通打包方式把编译后的 class 文件打包为jar。
Spring Cloud Alibaba技术栈【下】
打包成功后,项目的 jar 位于:
Spring Cloud Alibaba技术栈【下】

4.2 dubbo-provider 代码的完善

4.2.1 添加依赖

在 dubbo-provider 的 pom.xml 的 dependencies 添加以下的依赖。

<dependencies>
	<dependency>
		<groupId>com.bjsxt</groupId>
		<artifactId>dubbo-api</artifactId>
		<version>1.0</version>
	</dependency>
</dependencies>

4.2.2 实现 dubbo-api 里面定义的接口

Spring Cloud Alibaba技术栈【下】
代码内容如下:

@Service
public class EchoServiceImpl implements EchoService {

	@Override
	public String echo(String message) {
		return "[echo] Hello, " + message;
	}
}

4.2.3 添加配置文件

Spring Cloud Alibaba技术栈【下】
内容如下:

dubbo:
	scan:
		# dubbo 服务扫描基准包
		base-packages: com.bjsxt.service.impl
	cloud:
		subscribed-services: dubbo-provider
	protocol:
		# dubbo 协议
		name: dubbo
		# dubbo 协议端口( -1 表示自增端口,从 20880 开始)
		port: -1
	registry:
		# 挂载到 Spring Cloud 注册中心
		address: spring-cloud://localhost
spring:
	application:
		# Dubbo 应用名称
		name: dubbo-provider
	main:
		# Spring Boot 2.1 需要设定
		allow-bean-definition-overriding: true
	cloud:
		nacos:
			# Nacos 服务发现与注册配置
			discovery:
				server-addr: localhost:8848

4.2.4 启动类

Spring Cloud Alibaba技术栈【下】
代码如下:

@SpringBootApplication
@EnableDiscoveryClient
public class ProviderServiceApplication {
	public static void main(String[] args) {
		SpringApplication.run(ProviderServiceApplication.class, args) ;
	}
}

4.3 dubbo-consumer 代码的完善

4.3.1 添加依赖

在 dubbo-consumer 的 pom.xml 的 dependencies 添加以下依赖:

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>com.bjsxt</groupId>
		<artifactId>dubbo-api</artifactId>
		<version>1.0</version>
	</dependency>
	<!-- Dubbo Spring Cloud Starter -->
	<dependency>
		<groupId>com.alibaba.cloud</groupId>
		<artifactId>spring-cloud-starter-dubbo</artifactId>
	</dependency>
	<dependency>
		<groupId>com.alibaba.cloud</groupId>
		<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
	</dependency>
</dependencies>

4.3.2 添加配置文件

Spring Cloud Alibaba技术栈【下】
内容如下:

dubbo:
	registry:
		# 挂载到 Spring Cloud 注册中心
		address: nacos://127.0.0.1:8848
	cloud:
		subscribed-services: dubbo-provider
server:
	port: 8080
spring:
	application:
	# Dubbo 应用名称
	name: dubbo-consumer
	main:
		# Spring Boot 2.1 需要设定
		allow-bean-definition-overriding: true
	cloud:
		nacos:
			# Nacos 服务发现与注册配置
			discovery:
				server-addr: 127.0.0.1:8848

4.3.3 启动类

Spring Cloud Alibaba技术栈【下】
代码如下:

@EnableDiscoveryClient
@SpringBootApplication
@RestController
public class ConsumerServiceApplication {

	@Reference
	private EchoService echoService ;
	
	public static void main(String[] args) {
		SpringApplication.run(ConsumerServiceApplication.class,args) ;
	}
	
	@GetMapping("/rpc")
	public ResponseEntity<String> rpc(){
		return ResponseEntity.ok(String.format("调用结果
	为%s",echoService.echo("info")));
	}
}

4.4 远程调用测试

  • 启动 Nacos-Server
  • 启动 dubbo-provider
  • 启动 dubbo-consumer
    查看 Nacos 控制台:
    http://localhost:8848/nacos/
    Spring Cloud Alibaba技术栈【下】
    浏览器访问:
    Spring Cloud Alibaba技术栈【下】
    调用已经成功;

五、负载均衡调用测试

5.1 启动多个服务的提供者

修改服务提供者里面实现类的代码:
Spring Cloud Alibaba技术栈【下】
启动多个:
Spring Cloud Alibaba技术栈【下】
Spring Cloud Alibaba技术栈【下】
Spring Cloud Alibaba技术栈【下】
再次使用 Clt+D 复制一个:
Spring Cloud Alibaba技术栈【下】
启动这 2 个:
Spring Cloud Alibaba技术栈【下】
现在,共有 3 台同时运行:
Spring Cloud Alibaba技术栈【下】
查看 Nacos:
Spring Cloud Alibaba技术栈【下】

5.2 使用消费者负载均衡调用测试

访问:
http://localhost:8080/rpc
Spring Cloud Alibaba技术栈【下】
Spring Cloud Alibaba技术栈【下】
负载均衡测试成功。

Spring Cloud Alibaba RocketMQ

一、RocketMQ 介绍

Spring Cloud Alibaba技术栈【下】
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能

二、RocketMQ 基本使用

2.1 下载 RocketMQ

使用浏览器打开:
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
这里我们选择 4.4.0 版本的原因在于,我们 spring cloud alibaba 版本为:2.2.0.RELEASE,它里面控制的 rocketMQ 的版是 4.4.0。
Spring Cloud Alibaba技术栈【下】

2.2 RocketMQ 目录分析

将该压缩包复制到软件目录里面,使用压缩软件进行解压。
Spring Cloud Alibaba技术栈【下】
Benchmark:包含一些性能测试的脚本;
Bin:可执行文件目录;
Conf:配置文件目录;
Lib:第三方依赖;
LICENSE:授权信息;
NOTICE:版本公告;

2.3 配置环境变量

找到配置环境变量的对话框:
Spring Cloud Alibaba技术栈【下】
点击新建创建一个环境变量:
Spring Cloud Alibaba技术栈【下】

  • 变量名:ROCKETMQ_HOME
  • 变量值:D:\devtools\rocketMQ\rocketmq-all-4.4.0-bin-release

2.4 RocketMQ 的启动

我们进入到${rocketMQ}/bin,在此目录里面启动和停止命令。

2.4.1 启动 NameServe

Spring Cloud Alibaba技术栈【下】
注意:弹出的黑窗口不要关闭。

2.4.2 启动 Broker

./mqbroker.cmd -n localhost:9876

其中:
-n localhost:9876是为了指定 nameserver 的地址
Spring Cloud Alibaba技术栈【下】

2.5 RocketMQ 的停止

直接把弹出的黑框关闭,即可停止 RocketMQ 的 namesrv 和 broker。

2.6 RocketMQ 控制台的安装

Rocketmq 控制台可以可视化 MQ 的消息发送!

2.6.1 下载 RocketMQ 控制台

2.6.2 复制到软件目录里面

Spring Cloud Alibaba技术栈【下】

2.6.3 运行该 jar

java -jar rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr=127.0.0.1:9876

Spring Cloud Alibaba技术栈【下】
其中:
Spring Cloud Alibaba技术栈【下】
运行成功后:
访问:
http://localhost:8080/#/
Spring Cloud Alibaba技术栈【下】

三、Spring Cloud Stream 介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。Spring Cloud Stream 内部有两个概念:Binder 和 Binding:

  • Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的Binder 实现。

    举例说明:
    Kafka 的实现 KafkaMessageChannelBinder,RabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder。

  • Binding: 包括 Input Binding 和Output Binding。
    Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

四、测试框架搭建

我们将创建 spring-cloud-bus-rocketmq-example 项目,用来测试它的所有功能。
Spring Cloud Alibaba技术栈【下】

4.1 搭建 spring-cloud-bus-rocketmq-example

spring-cloud-bus-rocketmq-example 将去除子模块的公共依赖部分。

4.1.1 使用 IDEA 创建一个 Maven 项目

Spring Cloud Alibaba技术栈【下】
选择 Maven 项目:
Spring Cloud Alibaba技术栈【下】
点击 Next ,填写以下的内容:
Spring Cloud Alibaba技术栈【下】
Parent:我们选择 spring-cloud-alibaba-examples
Name:spring-cloud-bus-rocketmq-example
其他的项保持不变。
点击 Finish 完成创建。

4.1.2 添加依赖

打开项目的 pom.xml 文件,我们添加以下的内容:

<dependencies>
	<dependency>
		<groupId>com.alibaba.cloud</groupId>
		<artifactId>spring-cloud-starter-bus-rocketmq</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-actuator</artifactId>
	</dependency>
</dependencies>

4.1.3 完整的 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-alibaba-examples</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<artifactId>spring-cloud-bus-rocketmq-example</artifactId>
	<packaging>pom</packaging>
	<modules>
		<module>rocketmq-produce-example</module>
	</modules>
	<dependencies>
		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-bus-rocketmq</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

4.2 搭建 rocketmq-produce-example

produce 代表服务的生产者,用来发送消息。

4.2.1 使用 IDEA 创建一个 Maven 项目

Spring Cloud Alibaba技术栈【下】
选择 Maven:
Spring Cloud Alibaba技术栈【下】
点击 Next 添加以下的内容:
Spring Cloud Alibaba技术栈【下】
Parent:spring-cloud-bus-rocketmq-example
Name:rocketmq-produce-example
点击 Finish 完成项目的创建

4.2.2 修改 Maven 的打包方式

此项目我们以后可能需要使用 jar 发布,在此,我们添加 spring-boot 的打包插件:

<build>
	<plugins>
		<plugin>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-maven-plugin</artifactId>
		</plugin>
	</plugins>
</build>

4.2.3 完整的 pom.xml 文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-bus-rocketmq-example</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<artifactId>rocketmq-produce-example</artifactId>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

4.3 搭建 rocketmq-produce-example

4.3.1 使用 IDEA 创建一个 Maven 项

Spring Cloud Alibaba技术栈【下】
选择 Maven:
Spring Cloud Alibaba技术栈【下】
点击 Next 添加以下的内容:
Spring Cloud Alibaba技术栈【下】
Parent:spring-cloud-bus-rocketmq-example
Name:rocketmq-consumer-example
点击 Finish 完成项目的创建

4.3.2 修改 Maven 的打包方式

为了以后打包为一个 jar 发布,我们添加一个打包插件:

<build>
	<plugins>
		<plugin>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-maven-plugin</artifactId>
		</plugin>
	</plugins>
</build>

4.3.3 完整的 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-bus-rocketmq-example</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<artifactId>rocketmq-consume-example</artifactId>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

4.4 项目的完整结构如下

Spring Cloud Alibaba技术栈【下】

五、完善 rocketmq-produce-example 项目

5.1 添加一个配置文件

Spring Cloud Alibaba技术栈【下】
配置信息如下:

logging.level.com.alibaba.cloud.stream.binder.rocketmq=DEBUG
# rocketmq 服务器nameserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876

# stream->binding->output(input)
# output1
# 发送消息的目的地址
spring.cloud.stream.bindings.output1.destination=test-topic
# 消息的默认类型
spring.cloud.stream.bindings.output1.content-type=application/json
# 生产者组
spring.cloud.stream.rocketmq.bindings.output1.producer.group=binder-group
# 消息的同步发送
spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true

# output2 主要样式事务消息的发送
spring.cloud.stream.bindings.output2.destination=TransactionTopic
spring.cloud.stream.bindings.output2.content-type=application/json
# 发送的是事务消息
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup

# output3 用它样式消息的手动拉取
spring.cloud.stream.bindings.output3.destination=pull-topic
spring.cloud.stream.bindings.output3.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output3.producer.group=pull-binder-group

spring.application.name=rocketmq-produce-example

server.port=28081

management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

5.2 添加一个启类

Spring Cloud Alibaba技术栈【下】

@SpringBootApplication
public class RocketMQProduceApplication {
	public static void main(String[] args) {
		SpringApplication.run(RocketMQProduceApplication.class, args);
	}
}

5.3 添加 MQSource

Spring Cloud Alibaba技术栈【下】
在 Source 里面定义输出:

/**
* 读取我们配置文件里的output
*/
public interface MQSource {
	@Output("output1")
	MessageChannel output1() ;
	
	@Output("output2")
	MessageChannel output2() ;//演示事务消息的发送
	
	@Output("output1")
	MessageChannel output3() ;
}

4.5 添加配置类

Spring Cloud Alibaba技术栈【下】
代码如下:

@Configuration
@EnableBinding({MQSource.class})
public class MQConfig {
}

5.4 添加发送消息的类

Spring Cloud Alibaba技术栈【下】

@Service
public class SendService {

	@Autowired
	private MQSource source;
	
	/**
	* 发送简单的测试消息
	* @param msg
	* @throws Exception
	*/
	public void send(String msg) throws Exception {
	source.output1().send(MessageBuilder.withPayload(msg).build());
	}
	
	/**
	* 发消息时添加标签
	* @param msg
	* @param tag
	* @param <T>
	* @throws Exception
	*/
	public <T> void sendWithTags(T msg, String tag) throws Exception {
		Message message = MessageBuilder.createMessage(msg,
			new MessageHeaders(Stream.of(tag).collect(Collectors
				.toMap(str -> MessageConst.PROPERTY_TAGS,String::toString))));
		source.output1().send(message);
	}
	/**
	* 发送一个对象消息
	* @param msg
	* @param tag
	* @param <T>
	* @throws Exception
	*/
	public <T> void sendObject(T msg, String tag) throws Exception {
	Message message = MessageBuilder.withPayload(msg)
		.setHeader(MessageConst.PROPERTY_TAGS, tag)
		.setHeader(MessageHeaders.CONTENT_TYPE,MimeTypeUtils.APPLICATION_JSON)
		.build();
	source.output1().send(message);
	}
	/**
	* 发送事务的消息
	* @param msg
	* @param num
	* @param <T>
	* @throws Exception
	*/
	public <T> void sendTransactionalMsg(T msg, int num) throws Exception {
		MessageBuilder builder = MessageBuilder.withPayload(msg)
			.setHeader(MessageHeaders.CONTENT_TYPE,MimeTypeUtils.APPLICATION_JSON);
		builder.setHeader("test", String.valueOf(num));
		Message message = builder.build();
		source.output2().send(message);
	}
		public void sendMassiveMessage(String msg) {
			source.output3().send(MessageBuilder.withPayload(msg).build());
		}
}

5.6 事务消息往往需要我们监听回查

新建一个类:
Spring Cloud Alibaba技术栈【下】
Spring Cloud Alibaba技术栈【下】
代码如下:

/**
* TransactionStatus.CommitTransaction:消息提交,当消息状态为 CommitTransaction,表示允许消费者允许消费当前消息
* TransactionStatus.RollbackTransaction:消息回滚,表示 MQ 服务端将会删除当前半消息,不允许消费者消费。
* TransactionStatus.Unknown:中间状态,表示 MQ 服务需要发起回查操作,检测当前发送方本地事务的执行状态。
*/

@RocketMQTransactionListener(
	txProducerGroup = "myTxProducerGroup",
	corePoolSize = 5,
	maximumPoolSize = 10)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
	/**
	*消息生产者需要在 executeLocalTransaction 中执行本地事务,当事务半消息提交成功,执行完
	毕后需要返回事务状态码。
	* @param msg
	* @param o
	* @return
	*/
	@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object o)
	{
		Object num = msg.getHeaders().get("test");
		if ("1".equals(num)) {
			System.out.println(
				"executer: " + new String((byte[]) msg.getPayload()) + " unknown");
			return RocketMQLocalTransactionState.UNKNOWN; // 将会导致再次查询本地事务
		}
		else if ("2".equals(num)) {
			System.out.println(
				"executer: " + new String((byte[]) msg.getPayload()) + " rollback");
			return RocketMQLocalTransactionState.ROLLBACK; // 半消息将会被 mq 服务器删除,并且消费者不会消费到该消息
		}
		System.out.println(
			"executer: " + new String((byte[]) msg.getPayload()) + " commit");
		return RocketMQLocalTransactionState.COMMIT; // 半消息提交,消费者会消费到该消息。
	}

	/**
	* 实现 checkLocalTransaction 方法,该方法用于进行本地事务执行情况回查,并回应事务状态给
	MQ 的 broker,
	* 执行完成之后需要返回对应的事务状态码
	* @param message
	* @return
	*/
	@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
		System.out.println("check: " + new String((byte[]) message.getPayload()));
		return RocketMQLocalTransactionState.COMMIT;
	}
}

5.7 构建一个简单的模型

Spring Cloud Alibaba技术栈【下】
代码如下:
Spring Cloud Alibaba技术栈【下】

5.8 测试消息的发送

Spring Cloud Alibaba技术栈【下】

@RestController
public class SendMessageController {
	
	@Autowired
	private SendService sendService ;
	
	/**
	* 发送一个简单的消息
	* @param msg
	* @return
	* @throws Exception
	*/
	@GetMapping("/send/simple")
	private ResponseEntity<String> sendSimpleMessage( @RequestParam(required = true)
	String msg) throws Exception {
		sendService.send(msg);
		return ResponseEntity.ok("发送成功") ;
	}
	/**
	* 发送消息并且带上标签
	* @param msg 消息
	* @param tags 消息的标签
	* @return
	* @throws Exception
	*/
	@GetMapping("/send/tags")
		private ResponseEntity<String> sendMessageWithTag( @RequestParam(required = true)
	String msg,@RequestParam(required = true)String tags) throws Exception {
		sendService.sendWithTags(msg,tags);
		return ResponseEntity.ok("发送成功") ;
	}
	/**
	* 发送对象消息
	* @param user
	* @param tags
	* @return
	* @throws Exception
	*/
	@GetMapping("/send/object")
	public ResponseEntity<String> sendObjectMessage(User user,String tags) throws
	Exception {
		sendService.sendObject(user,tags);
		return ResponseEntity.ok("发送成功") ;
	}
	
	/**
	* 发送一个事务消息,也就是 half 消息
	* @param msg
	* @param num 类型
	* @return
	* @throws Exception
	*/
	@GetMapping("/send/transaction")
	public ResponseEntity<String> sendTransactionMessage(String msg ,int num) throws
	Exception {
		sendService.sendTransactionalMsg(msg,num);
		return ResponseEntity.ok("发送成功") ;
	}
/**
* 发送很多消息
* @param msg
* @return
* @throws Exception
*/
@GetMapping("/send/poll")
public ResponseEntity<String> sendMassiveMessage(String msg) throws Exception {
	sendService.sendMassiveMessage(msg);
	return ResponseEntity.ok("发送成功") ;
	}
}

5.9 启动类

Spring Cloud Alibaba技术栈【下】
代码如下:

@SpringBootApplication
public class RocketMQProduceApplication {
	public static void main(String[] args) {
		SpringApplication.run(RocketMQProduceApplication.class, args);
	}
}

六、完善 rocketmq-consumer-example 项目

6.1 添加配置文件

Spring Cloud Alibaba技术栈【下】
内容如下:

# rocketmq nameserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876

# stream->bindings->input
# input1
spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true

# input2
spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
spring.cloud.stream.bindings.input2.consumer.concurrency=20
spring.cloud.stream.bindings.input2.consumer.maxAttempts=1

# input3
spring.cloud.stream.bindings.input3.destination=test-topic
spring.cloud.stream.bindings.input3.content-type=application/json
spring.cloud.stream.bindings.input3.group=test-group3
spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20

# input4
spring.cloud.stream.bindings.input4.destination=TransactionTopic
spring.cloud.stream.bindings.input4.content-type=text/plain
spring.cloud.stream.bindings.input4.group=transaction-group
spring.cloud.stream.bindings.input4.consumer.concurrency=5

# input5 手动消息的拉取
spring.cloud.stream.bindings.input5.destination=pull-topic
spring.cloud.stream.bindings.input5.content-type=text/plain
spring.cloud.stream.bindings.input5.group=pull-topic-group

spring.application.name=rocketmq-consume-example

server.port=28082

management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

6.2 添加一个 Sink

Spring Cloud Alibaba技术栈【下】
在 Sink 里面添加输入:

public interface Sink {
	@Input("input1")
	SubscribableChannel input1();
	
	@Input("input2")
	SubscribableChannel input2();
	
	@Input("input3")
	SubscribableChannel input3();
	
	@Input("input4")
	SubscribableChannel input4();
	
	@Input("input5")
	PollableMessageSource input5();
}

6.3 创建消息的监听器

Spring Cloud Alibaba技术栈【下】

/**
* receive
*/
@Service
public class ReceiveService {

	@StreamListener("input1")
	public void receiveInput1(String receiveMsg) {
		System.out.println("input1 receive: " + receiveMsg);
	}
	
	@StreamListener("input2")
	public void receiveInput2(String receiveMsg) {
		System.out.println("input2 receive: " + receiveMsg);
	}
	
	@StreamListener("input3")
	public void receiveInput3(@Payload User user) {
		System.out.println("input3 receive: " + user);
	}
	
	@StreamListener("input4")
	public void receiveTransactionalMsg(String transactionMsg) {
		System.out.println("input4 receive transaction msg: " + transactionMsg);
	}
}

6.4 主动去 mq 服务器拉取消息

Spring Cloud Alibaba技术栈【下】
使用定时任务,主动去服务器拉取消息:

@Service
public class PullMessageTask {
	
	@Autowired
	private Sink sink ;
	
	@Scheduled(fixedRate = 5*1000)
	public void pullMessage(){
		sink.input5().poll((message) -> {
			String payload = (String) message.getPayload();
			System.out.println("pull msg: " + payload);
		}, new ParameterizedTypeReference<String>() {
		});
	}
}

6.5 模型类

直接从 produce 里面复制过来:
Spring Cloud Alibaba技术栈【下】

6.6 配置类

新建 MQConfig:
Spring Cloud Alibaba技术栈【下】
代码如下:

@Configuration
@EnableBinding({Sink.class})
public class MQConfig {
}

6.7 启动类

@SpringBootApplication
@EnableScheduling
public class RocketMQConsumerApplication {
	public static void main(String[] args) {
		SpringApplication.run(RocketMQConsumerApplication.class ,args);
	}
}

七、测试案例测试

7.1 启动服务

启动 2 个服务:

  • rocketmq-produce-example
  • rocketmq-consumer-example
    Spring Cloud Alibaba技术栈【下】

7.2 发送消息测试

7.2.1 发送简单的字符串

http://localhost:28081/send/simple?msg=RocketMQ
Spring Cloud Alibaba技术栈【下】
Spring Cloud Alibaba技术栈【下】

7.2.2 发送带标签的消息

7.2.3 发送对象消息

http://localhost:28081/send/object?id=1&userName=bjsxt&password=123456&tags=xxx

Spring Cloud Alibaba技术栈【下】

7.2.4 发送事务消息

http://localhost:28081/send/transaction?msg=order&num=1

Spring Cloud Alibaba技术栈【下】

http://localhost:28081/send/transaction?msg=order&num=2

Spring Cloud Alibaba技术栈【下】

http://localhost:28081/send/transaction?msg=order&num=3

Spring Cloud Alibaba技术栈【下】

7.2.4 手动拉取消息

http://localhost:28081/send/poll?msg=order

Spring Cloud Alibaba技术栈【下】

上一篇:整合Nacos完成服务注册、调用、容错(哨兵)


下一篇:17-Spring Cloud Alibaba