目录
配套资料,免费下载
链接:https://pan.baidu.com/s/1gsHGUjRI8nPe_Gv1bZ7BMg
提取码:5la2
复制这段内容后打开百度网盘手机App,操作更方便哦
第九章 WebFlux
9.1、WebFlux的概述
WebFlux
Spring框架中包含的原始Web框架Spring WebMVC是专门为Servlet API和Servlet容器而构建的。响应式Web框架Spring WebFlux是在Spring 5.0以后添加的新的模块。WebFlux是一种异步非阻塞的框架,异步非阻塞的框架在 Servlet3.1 以后才支持,核心是基于 Reactor 的相关API实现的。WebFlux能够在有限资源下,提高系统吞吐量和伸缩性,并以 Reactor 为基础实现响应式编程,可在Netty,Undertow和Servlet 3.1+容器等服务器上运行。这两个Web框架都反映了其源模块的名称(spring-webmvc和 spring-webflux),并在Spring Framework中并存。每个模块都是可选的,应用程序可以使用一个模块,也可以使用两个模块。
异步非阻塞
要想解释清楚异步非阻塞,我们就得明白,异步和同步、阻塞和非阻塞之间的关系。
- 异步和同步针对调用者,调用者发送请求,如果等着对方回应之后才去做其他事情就是同步,如果发送请求之后不等着对方回应就去做其他事情就是异步
- 阻塞和非阻塞针对被调用者,被调用者收到请求之后,做完请求任务之后才给出反馈就是阻塞,收到请求之后马上给出反馈然后再去做事情就是非阻塞
了解了异步非阻塞,我们来说一说Spring WebMVC 和 Spring WebFlux分别属于哪种?
【spring-webmvc + Servlet + Tomcat】命令式的、同步阻塞的
【spring-webflux + Reactor + Netty】响应式的、异步非阻塞的
响应式编程
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。
Reactive 和 Reactor
Reactor 是基于Reactive Streams 规范的第四代响应库,用于在JVM上构建非阻塞的应用程序。Reactor是Spring WebFlux的首选响应库。它提供了 Mono和 Flux API类型,并通过丰富运算符集来处理0…1(Mono)和0…N(Flux)数据序列。因此,我们了解到 Reactive 是一种响应式编程的规范,而 Reactor 是此规范的一种具体实现,他是支撑 WebFlux 实现响应式编程的基础。
到底用Spring WebMVC还是Spring WebFlux?
一个自然的问题要问,但建立了不合理的二分法。实际上,两者共同努力扩大了可用选项的范围。两者的设计旨在实现彼此的连续性和一致性,它们可以并行使用,并且来自双方的反馈对双方都有利。下图显示了两者之间的关系,它们的共同点以及各自的独特支持:
我们建议您考虑以下几点:
-
如果您有运行正常的Spring MVC应用程序,则无需更改。命令式编程是编写,理解和调试代码的最简单方法。您有最大的库选择空间,因为从历史上看,大多数库都是阻塞的。
-
如果您已经在购买无阻塞的Web堆栈,Spring WebFlux可以提供与该领域其他服务器相同的执行模型优势,还可以选择服务器(Netty,Tomcat,Jetty,Undertow和Servlet 3.1+容器),选择编程模型(带注释的控制器和功能性Web端点),以及选择反应式库(Reactor,RxJava或其他)。
-
如果您对与Java 8 lambda或Kotlin一起使用的轻量级功能性Web框架感兴趣,则可以使用Spring WebFlux功能性Web端点。对于要求较低复杂性的较小应用程序或微服务(可以受益于更高的透明度和控制)而言,这也是一个不错的选择。
-
在微服务架构中,您可以混合使用带有Spring MVC或Spring WebFlux控制器或带有Spring WebFlux功能端点的应用程序。在两个框架中都支持相同的基于注释的编程模型,这使得重用知识变得更加容易,同时还为正确的工作选择了正确的工具。
-
评估应用程序的一种简单方法是检查其依赖关系。如果您要使用阻塞性持久性API(JPA,JDBC)或网络API,则Spring MVC至少是通用体系结构的最佳选择。使用Reactor和RxJava在单独的线程上执行阻塞调用在技术上是可行的,但您不会充分利用非阻塞Web堆栈。
-
如果您的Spring MVC应用程序具有对远程服务的调用,请尝试使用active WebClient。您可以直接从Spring MVC控制器方法返回反应类型(Reactor,RxJava或其他)。每个呼叫的等待时间或呼叫之间的相互依赖性越大,好处就越明显。Spring MVC控制器也可以调用其他反应式组件。
-
如果您有庞大的团队,请牢记向无阻塞,功能性和声明性编程的过渡过程中的学习曲线很陡。在没有完全切换的情况下启动的一种实用方法是使用WebClient。除此之外,从小处着手并衡量收益。我们希望,对于广泛的应用程序,这种转变是不必要的。如果不确定要寻找什么好处,请先了解无阻塞I / O的工作原理(例如,单线程Node.js上的并发性)及其影响。
9.2、WebFlux的基础
9.2.1、两个核心类
响应式编程操作中,Reactor 是满足 Reactive 规范的框架,Reactor 有两个核心类,Mono 和 Flux,这两个类实现接口 Publisher,提供丰富操作符。Flux 对象实现发布者,返回 N 个元素;Mono 实现发布者,返回 0 或者 1 个元素,Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号: 元素值,错误信号,完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。
三种信号特点:
- 错误信号和完成信号都是终止信号,不能共存
- 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
- 如果没有错误信号,没有完成信号,表示是无限数据流
一个入门案例:
项目名称: reactor-demo
引入依赖:
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.1</version>
</dependency>
声明数据流:
//第一种形式
Flux.just(1, 2, 3, 4);
Mono.just(1);
//第二种形式
Integer[] array = {1, 2, 3, 4};
Flux.fromArray(array);
//第三种形式
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list);
//第四种形式
Stream<Integer> stream = list.stream();
Flux.fromStream(stream);
订阅数据流: 调用 just 或者其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的
//第一种形式
Flux.just(1, 2, 3, 4).subscribe(System.out::println);
Mono.just(1).subscribe(System.out::println);
9.2.2、四种操作符
操作符就是对数据进行一道又一道的操作,就好比生产车间的流水线,常见的操作符有四种:map、flatMap、filter、zipWith
map:将元素映射为一个新元素
flatMap:把每个元素转换成流,把转换之后多个流合并成大的流
filter:可以对元素进行筛选
zip:可以对元素进行合并
9.3、WebFlux的注解式编程模型
项目名称: webflux-demo-01
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.properties
server.port=8080
com.caochenlei.webfluxdemo01.entity.User
public class User {
private Integer id;
private String name;
private String gender;
private Integer age;
public User() {
}
public User(Integer id, String name, String gender, Integer age) {
this.id = id;
this.name = name;
this.gender = gender;
this.age = age;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}
com.caochenlei.webfluxdemo01.dao.UserDao
public interface UserDao {
//查找一个用户
public User findOne(Integer id);
//查找全部用户
public Collection<User> findAll();
//新增一个用户
public void save(User user);
//删除一个用户
public void delete(User user);
//更新一个用户
public void update(User user);
}
com.caochenlei.webfluxdemo01.dao.impl.UserDaoImpl
Repository
public class UserDaoImpl implements UserDao {
//模拟数据库中的数据
private final Map<Integer, User> users = new ConcurrentHashMap<>();
//调用无参构造初始化
public UserDaoImpl() {
users.put(1, new User(1, "张三", "男", 18));
users.put(2, new User(2, "李四", "女", 19));
users.put(3, new User(3, "王五", "男", 20));
}
@Override
public User findOne(Integer id) {
return users.get(id);
}
@Override
public Collection<User> findAll() {
return users.values();
}
@Override
public void save(User user) {
int id = users.size() + 1;
user.setId(id);
users.put(id, user);
}
@Override
public void delete(User user) {
users.remove(user.getId());
}
@Override
public void update(User user) {
int id = user.getId();
users.remove(id);
users.put(id, user);
}
}
com.caochenlei.webfluxdemo01.service.UserService
public interface UserService {
//查找一个用户
public Mono<User> findOne(Integer id);
//查找全部用户
public Flux<User> findAll();
//新增一个用户
public Mono<Void> save(Mono<User> userMono);
//删除一个用户
public Mono<Void> delete(Mono<User> userMono);
//更新一个用户
public Mono<Void> update(Mono<User> userMono);
}
com.caochenlei.webfluxdemo01.service.UserServiceImpl
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserDao userDao;
@Override
public Mono<User> findOne(Integer id) {
return Mono.justOrEmpty(userDao.findOne(id));
}
@Override
public Flux<User> findAll() {
return Flux.fromIterable(userDao.findAll());
}
@Override
public Mono<Void> save(Mono<User> userMono) {
return userMono.doOnNext(user -> {
//保存一个用户
userDao.save(user);
}).thenEmpty(Mono.empty());
}
@Override
public Mono<Void> delete(Mono<User> userMono) {
return userMono.doOnNext(user -> {
//删除一个用户
userDao.delete(user);
}).thenEmpty(Mono.empty());
}
@Override
public Mono<Void> update(Mono<User> userMono) {
return userMono.doOnNext(user -> {
//更新一个用户
userDao.update(user);
}).thenEmpty(Mono.empty());
}
}
com.caochenlei.webfluxdemo01.controller.UserController
@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
private UserService userService;
//查找一个用户
@GetMapping("/findOne/{id}")
public Mono<User> findOne(@PathVariable Integer id) {
return userService.findOne(id);
}
//查找全部用户
@GetMapping("/findAll")
public Flux<User> findAll() {
return userService.findAll();
}
//新增一个用户
@PostMapping("/save")
public Mono<Void> save(@RequestBody User user) {
Mono<User> userMono = Mono.just(user);
return userService.save(userMono);
}
//删除一个用户
@PostMapping("/delete")
public Mono<Void> delete(@RequestBody User user) {
Mono<User> userMono = Mono.just(user);
return userService.delete(userMono);
}
//更新一个用户
@PostMapping("/update")
public Mono<Void> update(@RequestBody User user) {
Mono<User> userMono = Mono.just(user);
return userService.update(userMono);
}
}
com.caochenlei.webfluxdemo01.WebfluxDemo01Application,启动主函数
@SpringBootApplication
public class WebfluxDemo01Application {
public static void main(String[] args) {
SpringApplication.run(WebfluxDemo01Application.class, args);
}
}
查询一个:http://localhost:8080/user/findOne/1
查询全部:http://localhost:8080/user/findAll
新增一个:http://localhost:8080/user/save
删除一个:http://localhost:8080/user/delete
修改一个:http://localhost:8080/user/update
9.4、WebFlux的函数式编程模型
项目名称: webflux-demo-02
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.properties
server.port=8080
com.caochenlei.webfluxdemo01.entity.User
public class User {
private Integer id;
private String name;
private String gender;
private Integer age;
public User() {
}
public User(Integer id, String name, String gender, Integer age) {
this.id = id;
this.name = name;
this.gender = gender;
this.age = age;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}
com.caochenlei.webfluxdemo01.dao.UserDao
public interface UserDao {
//查找一个用户
public User findOne(Integer id);
//查找全部用户
public Collection<User> findAll();
//新增一个用户
public void save(User user);
//删除一个用户
public void delete(User user);
//更新一个用户
public void update(User user);
}
com.caochenlei.webfluxdemo01.dao.impl.UserDaoImpl
Repository
public class UserDaoImpl implements UserDao {
//模拟数据库中的数据
private final Map<Integer, User> users = new ConcurrentHashMap<>();
//调用无参构造初始化
public UserDaoImpl() {
users.put(1, new User(1, "张三", "男", 18));
users.put(2, new User(2, "李四", "女", 19));
users.put(3, new User(3, "王五", "男", 20));
}
@Override
public User findOne(Integer id) {
return users.get(id);
}
@Override
public Collection<User> findAll() {
return users.values();
}
@Override
public void save(User user) {
int id = users.size() + 1;
user.setId(id);
users.put(id, user);
}
@Override
public void delete(User user) {
users.remove(user.getId());
}
@Override
public void update(User user) {
int id = user.getId();
users.remove(id);
users.put(id, user);
}
}
com.caochenlei.webfluxdemo01.service.UserService
public interface UserService {
//查找一个用户
public Mono<User> findOne(Integer id);
//查找全部用户
public Flux<User> findAll();
//新增一个用户
public Mono<Void> save(Mono<User> userMono);
//删除一个用户
public Mono<Void> delete(Mono<User> userMono);
//更新一个用户
public Mono<Void> update(Mono<User> userMono);
}
com.caochenlei.webfluxdemo01.service.UserServiceImpl
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserDao userDao;
@Override
public Mono<User> findOne(Integer id) {
return Mono.justOrEmpty(userDao.findOne(id));
}
@Override
public Flux<User> findAll() {
return Flux.fromIterable(userDao.findAll());
}
@Override
public Mono<Void> save(Mono<User> userMono) {
return userMono.doOnNext(user -> {
//保存一个用户
userDao.save(user);
}).thenEmpty(Mono.empty());
}
@Override
public Mono<Void> delete(Mono<User> userMono) {
return userMono.doOnNext(user -> {
//删除一个用户
userDao.delete(user);
}).thenEmpty(Mono.empty());
}
@Override
public Mono<Void> update(Mono<User> userMono) {
return userMono.doOnNext(user -> {
//更新一个用户
userDao.update(user);
}).thenEmpty(Mono.empty());
}
}
com.caochenlei.webfluxdemo01.handler.UserHandler
@Component
public class UserHandler {
@Autowired
private UserService userService;
//查找一个用户
public Mono<ServerResponse> findOne(ServerRequest request) {
//获取id值
int id = Integer.valueOf(request.pathVariable("id"));
Mono<User> userMono = userService.findOne(id);
return ServerResponse
.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userMono, User.class);
}
//查找全部用户
public Mono<ServerResponse> findAll(ServerRequest request) {
Flux<User> userFlux = userService.findAll();
return ServerResponse
.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userFlux, User.class);
}
//新增一个用户
public Mono<ServerResponse> save(ServerRequest request) {
//获取user值
Mono<User> userMono = request.bodyToMono(User.class);
return ServerResponse.ok().build(userService.save(userMono));
}
//删除一个用户
public Mono<ServerResponse> delete(ServerRequest request) {
//获取user值
Mono<User> userMono = request.bodyToMono(User.class);
return ServerResponse.ok().build(userService.delete(userMono));
}
//更新一个用户
public Mono<ServerResponse> update(ServerRequest request) {
//获取user值
Mono<User> userMono = request.bodyToMono(User.class);
return ServerResponse.ok().build(userService.update(userMono));
}
}
com.caochenlei.webfluxdemo01.routers.RouterConfig
@Configuration
public class RouterConfig {
@Bean
public RouterFunction<ServerResponse> userRouter(UserHandler userHandler) {
return RouterFunctions
.route(GET("/user/findOne/{id}").and(accept(MediaType.APPLICATION_JSON)), userHandler::findOne)
.andRoute(GET("/user/findAll").and(accept(MediaType.APPLICATION_JSON)), userHandler::findAll)
.andRoute(POST("/user/save").and(accept(MediaType.APPLICATION_JSON)), userHandler::save)
.andRoute(POST("/user/delete").and(accept(MediaType.APPLICATION_JSON)), userHandler::delete)
.andRoute(POST("/user/update").and(accept(MediaType.APPLICATION_JSON)), userHandler::update);
}
}
com.caochenlei.webfluxdemo01.WebfluxDemo02Application,启动主函数
@SpringBootApplication
public class WebfluxDemo02Application {
public static void main(String[] args) {
SpringApplication.run(WebfluxDemo02Application.class, args);
}
}
查询一个:http://localhost:8080/user/findOne/1
查询全部:http://localhost:8080/user/findAll
新增一个:http://localhost:8080/user/save
删除一个:http://localhost:8080/user/delete
修改一个:http://localhost:8080/user/update
9.5、WebFlux的WebClient编程
本章前提: 需要 webflux-demo-02 处于启动状态,我们需要手动编程实现访问路由
项目名称: webflux-demo-03
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
com.caochenlei.webfluxdemo03.entity.User
public class User {
private Integer id;
private String name;
private String gender;
private Integer age;
public User() {
}
public User(Integer id, String name, String gender, Integer age) {
this.id = id;
this.name = name;
this.gender = gender;
this.age = age;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", gender='" + gender + '\'' +
", age=" + age +
'}';
}
}
com.caochenlei.webfluxdemo03.WebClientDemo
public class WebClientDemo {
public static void main(String[] args) {
//创建Web客户端
WebClient webClient = WebClient.create("http://localhost:8080");
//查询指定用户
User user = webClient
.get()
.uri("/user/findOne/{id}", 2)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(User.class)
.block();
System.out.println(user);
//查询所有用户
Flux<User> userFlux = webClient
.get()
.uri("/user/findAll")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToFlux(User.class);
userFlux.buffer().doOnNext(System.out::println).blockFirst();
//添加一个用户
User user1 = new User(null, "赵六", "男", 16);
String returnValue1 = webClient
.post()
.uri("/user/save")
.body(Mono.just(user1), User.class)
.retrieve()
.bodyToMono(String.class)
.block();
System.out.println(returnValue1);
//删除一个用户
User user2 = new User();
user2.setId(4);
String returnValue2 = webClient
.post()
.uri("/user/delete")
.body(Mono.just(user2), User.class)
.retrieve()
.bodyToMono(String.class)
.block();
System.out.println(returnValue2);
//修改一个用户
User user3 = user;
user3.setName("李思思");
String returnValue3 = webClient
.post()
.uri("/user/update")
.body(Mono.just(user3), User.class)
.retrieve()
.bodyToMono(String.class)
.block();
System.out.println(returnValue3);
}
}