响应式介绍

文章目录

1 何为响应式

1.1 为什么需要响应性

假设我们的小型业务是开一家网店,销售一些价格颇具吸引力的尖端产品。与该领域的大多数项目一样,我们将聘请软件工程师来解决遇到的一切问题。我们选择了传统的开发方法,通过一系列开发活动创建了我们的商店。
平时,每小时约有1000名用户访问我们的服务。为了满足日常需求,我们购买了一台现代化的计算机并在上面运行Tomcat Web服务器,同时为Tomcat的线程池配置了500个线程。大多数用户请求的平均响应时间约为250毫秒。通过对该配置的响应能力进行简单的计算,可以确定系统每秒可以处理大约2000个用户请求。据统计,前面提到的用户数平均每秒产生约1000个请求。因此,当前系统的能力足以应对平均负载。

双十一对于客户和零售商来说是宝贵的一天。对客户来说,这是一个以折扣价购买商品的机会;对零售商来说,这是一种赚钱和推广产品的方式。然而,这一天涌入客户的数量超乎寻常,而这可能是导致生产事故的重要原因。
当然,我们的系统出现了故障!在某个时间点,系统负载超出了最高预期。线程池中没有空闲线程来处理用户请求。备份服务器也无法处理这种意料之外的访问量,最终导致响应时间延长和周期性的服务中断。此时,我们开始丢失部分用户请求。最后,客户因为不满转而选择了我们的竞争对手。
最终,许多潜在客户和大量资金流失了,商店的评级也下降了。这完全是因为我们无法在负载增加时保持即时响应性。

应用程序应该对变化做出响应,这种变化应该包括需求(负载)的变化以及外部服务可用性的变化。换句话说,它应该对可能影响系统响应用户请求能力的任何变化做出响应。

1.2 如何具备即时响应性

实现这一核心目标的首要方法之一是依靠弹性(elasticity)。弹性描述了系统在不同负载下保持即时响应的能力,这意味着当更多用户开始使用它时,系统的吞吐量应该自动增加;而当需求下降时,吞吐量应该自动减少。从应用程序的角度来看,这个特性可以确保系统的响应能力,因为系统在任何时间点都可以得到扩展而不会影响平均延迟。

  • 提供额外的计算资源或更多实例可以增加系统的吞吐量,响应性也将随之增强;
  • 如果需求量低,系统应该降低资源消耗,从而减少业务费用。

实现分布式系统的可伸缩性有一定难度,该任务通常受限于系统内的瓶颈或同步点。

合格的系统在发生故障的情况下能够保持即时响应,即具有回弹性(resilience)。这可以通过在系统的功能组件之间应用隔离机制,隔离所有内部故障并实现独立性来实现。让我们回头看看亚马逊网上商店。亚马逊有许多不同的功能组件,如订单列表、支付服务、广告服务、评论服务以及很多其他服务。举个例子,在支付服务中断的情况下,亚马逊可以接受用户订单,然后通过调度自动重新提交请求,从而避免用户遭遇故障。另一个例子可能是实现评论服务的隔离。如果评论服务中断,商品购买和订单列表服务应该不受任何影响,正常工作。

弹性和回弹性是紧密耦合的,只有两者同时启用才能实现真正的即时响应系统。通过可伸缩性,我们可以拥有组件的多个副本。这样,如果一个组件出现故障,我们就可以检测到这一点,并切换到另一个副本(集群的作用),从而使它对系统其余部分的影响最小。

1.3 非阻塞消息通信

通常,在分布式系统中,为了服务之间的通信实现有效的资源利用,我们必须采用消息驱动的通信原则。服务之间的整体交互可以描述为:每个元素在消息到达时会对它们做出响应,否则就处于休眠状态;反之,组件应该能够以非阻塞方式发送消息。

实现消息驱动通信的方法之一是使用消息代理服务器。在这种情况下,通过监控消息队列,系统能够控制负载管理和弹性。此外,消息通信提供了清晰的流量控制并简化了整体设计。

1.4 响应式系统的基本原则

  • 即时响应性
  • 回弹性
  • 弹性
  • 消息驱动

响应式介绍
用分布式系统实现的业务的主要价值在于即时响应性,实现一个即时响应性系统意味着遵循弹性和回弹性等基本原则。最后,获得具有即时响应性、弹性和回弹性的系统的基本方法之一是采用消息驱动的通信。此外,遵循这些原则构建的系统具有高度的可维护性和可扩展性,因为系统中的所有组件都是相互独立且适当隔离的。

2 服务级别的响应性

“大型系统由多个小系统组成,因此也依赖于这些组成部分的响应式特性。也就是说,响应式系统的设计原则适用于各个级别、各种规模的系统,有助于它们很好地组合在一起。”

因此,在组件级别上提供响应式设计和实现也很重要。

比如:下单的时候需要调用促销服务计算商品的优惠后价格

public class OrderService {


    private final PromotionService promotionService;

    public OrderService(PromotionService promotionService) {
        this.promotionService = promotionService;
    }

    /**
     * 下单
     * 返回订单id
     */
    public String order(Long goodId) {
        // 计算优惠价格
        Long calculate = promotionService.calculate(goodId);
        System.out.println("商品优惠计算结束");
        return UUID.randomUUID().toString();
    }
}

public class PromotionService {

  /**
   * 计算商品优惠价格
   * @param goodId
   * @return
   */
  public Long calculate(Long goodId){
      try {
          // 模拟阻塞耗时
          TimeUnit.SECONDS.sleep(10);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      return 1000L;
  }
}

当用户下单的时候,这种实现不能立即返回而是要同步等待促销服务的执行耗时,我们的服务会实时紧密耦合在一起,或者简单地说就是OrdersService的执行过程与promotionService的执行过程紧密耦合。遗憾的是,使用这种技术,当promotionService处于处理阶段时,我们无法继续执行任何其他操作。

2.1 回调技术

在Java中,我们可以通过应用回调(callback)技术来解决该问题,以实现跨组件通信。

/**
  * 计算商品优惠价格,增加回调
  * @param goodId
  * @return
  */
 public void calculate(Long goodId, Consumer<Long> callback){
     try {
         // 模拟阻塞耗时
         TimeUnit.SECONDS.sleep(10);
         // 假设优惠之后的价格是1000
         callback.accept(1000L);
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
 }
/**
 * 下单
 */
public String order(Long goodId) {
    promotionService.calculate(goodId, promotionPrice -> {
        System.out.println("商品优惠计算结束,优惠之后的价格是:" + promotionPrice);
    });
    return UUID.randomUUID().toString();
}

这种虽然可以在调用促销服务时候,干其他的事情,当促销服务计算结束后,被动的调用回调函数即可

但是这个还是还是存在一个问题:不能立即把订单id响应给客户,这个时候就需要异步回调,让调用促销服务的时候再另一个线程中

2.2 异步回调

/**
  * 计算商品优惠价格,异步回调
  * @param goodId
  * @return
  */
 public void calculate(Long goodId, Consumer<Long> callback){
     // 开启一个线程去处理
     new Thread(()->{
         try {
             // 模拟阻塞耗时
             TimeUnit.SECONDS.sleep(10);
             callback.accept(1000L);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }).start();

 }

这样就可以及时的把订单id响应回去

2.2 juc包

2.2.1 java.util.concurrent.Future

回调技术不是唯一的选择。另一个选择是java.util.concurrent.Future,它在某种程度上隐藏了执行行为并解耦了组件。

/**
 * 计算商品优惠价格,
 * @param goodId
 * @return
 */
public Future<Long> calculate(Long goodId){
    Future<Long> future = Executors.newFixedThreadPool(1).submit(() -> {
        try {
            // 模拟阻塞耗时
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 假设优惠之后的价格是1000
        return 1000L;
    });
    return future;
}
/**
 * 下单
 */
public String order(Long goodId) throws ExecutionException, InterruptedException {
    Future<Long> future = promotionService.calculate(goodId);
    // get方法是阻塞的
    Long promotionPrice = future.get();
    System.out.println("商品优惠计算结束,优惠之后的价格是:" + promotionPrice);
    return UUID.randomUUID().toString();
}

在这里,calculate方法接受一个参数并返回Future。Future是一个类包装器,它使我们能检查是否有可用的结果,以及能否以阻塞的方式获取它。

但是future的 get方法是阻塞的,所以这样又不能立即返回了。在Future类的支持下,我们避免了回调地狱,并将实现多线程的复杂性隐藏在了特定Future实现的背后。无论如何,为了获得需要的结果,我们必须阻塞当前的线程并与外部执行进行同步,这显著降低了可伸缩性。

2.2.2 CompletableFuture

Java 8提供了CompletionStage以及它的直接实现CompletableFuture。同样,这些类提供了类似promise的API

/**
 * 计算商品优惠价格,
 * @param goodId
 * @return
 */
public CompletableFuture<Long> calculate(Long goodId){
    CompletableFuture<Long> future = new CompletableFuture<>();
    Executors.newFixedThreadPool(1).submit(() -> {
        try {
            // 模拟阻塞耗时
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 假设优惠之后的价格是1000
        future.complete(1000L);
    });
    return future;
}
/**
 * 下单
 */
public String order(Long goodId) {
    CompletableFuture<Long> future = promotionService.calculate(goodId);
    future.thenAccept(price -> {
        System.out.println("商品优惠计算结束,优惠之后的价格是:" + price);
    });
    return UUID.randomUUID().toString();

}

关于future可参考:juc并发包专栏

上一篇:1689. Partitioning Into Minimum Number Of Deci-Binary Numbers


下一篇:策略模式