文章目录
1 何为响应式
1.1 为什么需要响应性
假设我们的小型业务是开一家网店,销售一些价格颇具吸引力的尖端产品。与该领域的大多数项目一样,我们将聘请软件工程师来解决遇到的一切问题。我们选择了传统的开发方法,通过一系列开发活动创建了我们的商店。
平时,每小时约有1000名用户访问我们的服务。为了满足日常需求,我们购买了一台现代化的计算机并在上面运行Tomcat Web服务器,同时为Tomcat的线程池配置了500个线程。大多数用户请求的平均响应时间约为250毫秒。通过对该配置的响应能力进行简单的计算,可以确定系统每秒可以处理大约2000个用户请求。据统计,前面提到的用户数平均每秒产生约1000个请求。因此,当前系统的能力足以应对平均负载。
双十一对于客户和零售商来说是宝贵的一天。对客户来说,这是一个以折扣价购买商品的机会;对零售商来说,这是一种赚钱和推广产品的方式。然而,这一天涌入客户的数量超乎寻常,而这可能是导致生产事故的重要原因。
当然,我们的系统出现了故障!在某个时间点,系统负载超出了最高预期。线程池中没有空闲线程来处理用户请求。备份服务器也无法处理这种意料之外的访问量,最终导致响应时间延长和周期性的服务中断。此时,我们开始丢失部分用户请求。最后,客户因为不满转而选择了我们的竞争对手。
最终,许多潜在客户和大量资金流失了,商店的评级也下降了。这完全是因为我们无法在负载增加时保持即时响应性。
应用程序应该对变化做出响应,这种变化应该包括需求(负载)的变化以及外部服务可用性的变化。换句话说,它应该对可能影响系统响应用户请求能力的任何变化做出响应。
1.2 如何具备即时响应性
实现这一核心目标的首要方法之一是依靠弹性(elasticity)。弹性描述了系统在不同负载下保持即时响应的能力,这意味着当更多用户开始使用它时,系统的吞吐量应该自动增加;而当需求下降时,吞吐量应该自动减少。从应用程序的角度来看,这个特性可以确保系统的响应能力,因为系统在任何时间点都可以得到扩展而不会影响平均延迟。
- 提供额外的计算资源或更多实例可以增加系统的吞吐量,响应性也将随之增强;
- 如果需求量低,系统应该降低资源消耗,从而减少业务费用。
实现分布式系统的可伸缩性有一定难度,该任务通常受限于系统内的瓶颈或同步点。
合格的系统在发生故障的情况下能够保持即时响应,即具有回弹性(resilience)。这可以通过在系统的功能组件之间应用隔离机制,隔离所有内部故障并实现独立性来实现。让我们回头看看亚马逊网上商店。亚马逊有许多不同的功能组件,如订单列表、支付服务、广告服务、评论服务以及很多其他服务。举个例子,在支付服务中断的情况下,亚马逊可以接受用户订单,然后通过调度自动重新提交请求,从而避免用户遭遇故障。另一个例子可能是实现评论服务的隔离。如果评论服务中断,商品购买和订单列表服务应该不受任何影响,正常工作。
弹性和回弹性是紧密耦合的,只有两者同时启用才能实现真正的即时响应系统。通过可伸缩性,我们可以拥有组件的多个副本。这样,如果一个组件出现故障,我们就可以检测到这一点,并切换到另一个副本(集群的作用),从而使它对系统其余部分的影响最小。
1.3 非阻塞消息通信
通常,在分布式系统中,为了服务之间的通信实现有效的资源利用,我们必须采用消息驱动的通信原则。服务之间的整体交互可以描述为:每个元素在消息到达时会对它们做出响应,否则就处于休眠状态;反之,组件应该能够以非阻塞方式发送消息。
实现消息驱动通信的方法之一是使用消息代理服务器。在这种情况下,通过监控消息队列,系统能够控制负载管理和弹性。此外,消息通信提供了清晰的流量控制并简化了整体设计。
1.4 响应式系统的基本原则
- 即时响应性
- 回弹性
- 弹性
- 消息驱动
用分布式系统实现的业务的主要价值在于即时响应性,实现一个即时响应性系统意味着遵循弹性和回弹性等基本原则。最后,获得具有即时响应性、弹性和回弹性的系统的基本方法之一是采用消息驱动的通信。此外,遵循这些原则构建的系统具有高度的可维护性和可扩展性,因为系统中的所有组件都是相互独立且适当隔离的。
2 服务级别的响应性
“大型系统由多个小系统组成,因此也依赖于这些组成部分的响应式特性。也就是说,响应式系统的设计原则适用于各个级别、各种规模的系统,有助于它们很好地组合在一起。”
因此,在组件级别上提供响应式设计和实现也很重要。
比如:下单的时候需要调用促销服务计算商品的优惠后价格
public class OrderService {
private final PromotionService promotionService;
public OrderService(PromotionService promotionService) {
this.promotionService = promotionService;
}
/**
* 下单
* 返回订单id
*/
public String order(Long goodId) {
// 计算优惠价格
Long calculate = promotionService.calculate(goodId);
System.out.println("商品优惠计算结束");
return UUID.randomUUID().toString();
}
}
public class PromotionService {
/**
* 计算商品优惠价格
* @param goodId
* @return
*/
public Long calculate(Long goodId){
try {
// 模拟阻塞耗时
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1000L;
}
}
当用户下单的时候,这种实现不能立即返回而是要同步等待促销服务的执行耗时,我们的服务会实时紧密耦合在一起,或者简单地说就是OrdersService的执行过程与promotionService的执行过程紧密耦合。遗憾的是,使用这种技术,当promotionService处于处理阶段时,我们无法继续执行任何其他操作。
2.1 回调技术
在Java中,我们可以通过应用回调(callback)技术来解决该问题,以实现跨组件通信。
/**
* 计算商品优惠价格,增加回调
* @param goodId
* @return
*/
public void calculate(Long goodId, Consumer<Long> callback){
try {
// 模拟阻塞耗时
TimeUnit.SECONDS.sleep(10);
// 假设优惠之后的价格是1000
callback.accept(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 下单
*/
public String order(Long goodId) {
promotionService.calculate(goodId, promotionPrice -> {
System.out.println("商品优惠计算结束,优惠之后的价格是:" + promotionPrice);
});
return UUID.randomUUID().toString();
}
这种虽然可以在调用促销服务时候,干其他的事情,当促销服务计算结束后,被动的调用回调函数即可
但是这个还是还是存在一个问题:不能立即把订单id响应给客户,这个时候就需要异步回调,让调用促销服务的时候再另一个线程中
2.2 异步回调
/**
* 计算商品优惠价格,异步回调
* @param goodId
* @return
*/
public void calculate(Long goodId, Consumer<Long> callback){
// 开启一个线程去处理
new Thread(()->{
try {
// 模拟阻塞耗时
TimeUnit.SECONDS.sleep(10);
callback.accept(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
这样就可以及时的把订单id响应回去
2.2 juc包
2.2.1 java.util.concurrent.Future
回调技术不是唯一的选择。另一个选择是java.util.concurrent.Future,它在某种程度上隐藏了执行行为并解耦了组件。
/**
* 计算商品优惠价格,
* @param goodId
* @return
*/
public Future<Long> calculate(Long goodId){
Future<Long> future = Executors.newFixedThreadPool(1).submit(() -> {
try {
// 模拟阻塞耗时
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 假设优惠之后的价格是1000
return 1000L;
});
return future;
}
/**
* 下单
*/
public String order(Long goodId) throws ExecutionException, InterruptedException {
Future<Long> future = promotionService.calculate(goodId);
// get方法是阻塞的
Long promotionPrice = future.get();
System.out.println("商品优惠计算结束,优惠之后的价格是:" + promotionPrice);
return UUID.randomUUID().toString();
}
在这里,calculate方法接受一个参数并返回Future。Future是一个类包装器,它使我们能检查是否有可用的结果,以及能否以阻塞的方式获取它。
但是future的 get方法是阻塞的,所以这样又不能立即返回了。在Future类的支持下,我们避免了回调地狱,并将实现多线程的复杂性隐藏在了特定Future实现的背后。无论如何,为了获得需要的结果,我们必须阻塞当前的线程并与外部执行进行同步,这显著降低了可伸缩性。
2.2.2 CompletableFuture
Java 8提供了CompletionStage以及它的直接实现CompletableFuture。同样,这些类提供了类似promise的API
/**
* 计算商品优惠价格,
* @param goodId
* @return
*/
public CompletableFuture<Long> calculate(Long goodId){
CompletableFuture<Long> future = new CompletableFuture<>();
Executors.newFixedThreadPool(1).submit(() -> {
try {
// 模拟阻塞耗时
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 假设优惠之后的价格是1000
future.complete(1000L);
});
return future;
}
/**
* 下单
*/
public String order(Long goodId) {
CompletableFuture<Long> future = promotionService.calculate(goodId);
future.thenAccept(price -> {
System.out.println("商品优惠计算结束,优惠之后的价格是:" + price);
});
return UUID.randomUUID().toString();
}
关于future可参考:juc并发包专栏