Spring Cloud 之 服务容错组件 Hystrix
1、Spring Cloud Hystrix 简介
Spring Cloud Hystrix是基于Netflix的开源框架Hystrix实现的,用作服务容错的组件。Hystrix组件就像日常生活中的保险丝一样,能对高并发的Web应用系统起到熔断保护的作用。Hystrix具备了服务降级、服务熔断、线程隔离、请求缓存、请求合并以及服务监控等强大功能。
2、基于Hystrix的服务降级
为了实现服务降级的效果,我们需要在服务提供者中提供一个可用服务和一个不可用服务(通过设置休眠时间实现)。然后,在消费者应用中,添加Hystrix相关依赖,实现服务降级的实现。具体步骤如下:
2.1、服务提供者(eureka-provider)修改
服务提供者只需要修改测试Controller类即可,增加一个不可用接口,具体如下:
@Controller
public class ProviderController {
Logger logger = Logger.getLogger(ProviderController.class);
@RequestMapping("/provider")
@ResponseBody
public String provider(){
logger.info("调用服务提供者ProviderController的provider()方法!");
return "Hello World!";
}
@RequestMapping("/unavailable")
@ResponseBody
public String unavailable() throws InterruptedException {
logger.info("调用服务提供者ProviderController的unavailable()方法!");
Thread.sleep(1000 * 10);
return "Hello World!";
}
}
在上面代码中,我们增加了一个unavailable()方法,该方法通过休眠10秒钟实现服务的不可用状态(Hystrix默认超过3秒就会认为服务不可用,超时时间可以通过配置文件进行配置)。
2.2、在消费者端(eureka-consumer)引入Hystrix依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
<version>1.4.7.RELEASE</version>
</dependency>
2.3、 启用Hystrix功能
在启动类中添加注解@EnableCircuitBreaker即可启动Hystrix相关功能。
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients(defaultConfiguration = FeignLoggingConfig.class)
@EnableCircuitBreaker
public class EurekaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaConsumerApplication.class, args);
}
}
2.4、定义了一个HystrixService类,实现Hystrix功能的集成
实现了一个HystrixService类,然后通过@HystrixCommand注解一个方法,然后通过fallbackMethod属性定义当该方法执行失败或超时后的回调方法,如下所示:
@Service
public class HystrixService {
@Autowired
private FeignApiClient feignApiClient;
@HystrixCommand(fallbackMethod = "fallback")
public String provider(){
return feignApiClient.provider();
}
@HystrixCommand(fallbackMethod = "fallback")
public String unavailable(){
return feignApiClient.unavailable();
}
public String fallback(){
return "error";
}
}
2.5、定义测试类HystrixController
定义了一个测试类,其中注入了HystrixService实例,并定义了两个测试接口,一个调用可用服务,一个调用不可用服务。
@RestController
public class HystrixController {
Logger logger = Logger.getLogger(HystrixController.class);
@Autowired
private HystrixService hystrixService;
@RequestMapping("/provider")
public String provider(){
logger.info("执行消费者FeignController的provider()方法!");
return hystrixService.provider();
}
@RequestMapping("/unavailable")
public String unavailable(){
logger.info("执行消费者FeignController的unavailable()方法!");
return hystrixService.unavailable();
}
}
2.6、启动并验证
启动成功后,分别访问http://localhost:8010/provider、http://localhost:8010/unavailable两个接口,其中可用接口会返回正确的结果,而不可用接口直接访问了“error”,即回调方法中的返回值。至此,Hystrix的简单用法就实现了,我们后续再详细学习Hystrix相关的详细配置和其他用法。
3、依赖隔离
在Hystrix中,使用舱壁模式(Bulkhead)实现线程池的隔离,它会为每一个Hystrix命令创建一个独立的线程池,这样当某个在Hystrix命令包装下的依赖服务出现延迟过高的情况,也只是对该依赖服务的调用产生影响,而不会蔓延到其他的服务。
Hystrix中除了使用线程池之外,还可以使用信号量来控制单个依赖服务的并发度,信号量的开销要远比线程池的开销小得多,但是它不能设置超时和实现异步访问。所以,只有在依赖服务是足够可靠的情况下才使用信号量。
如何使用Hystrix来实现依赖隔离呢?其实,在前面的实例当中,当我们使用
@HystrixCommand注解的时候,其实Hystrix组件就会自动的为这个服务调用实现隔离。因此,依赖隔离和服务降级基本上都是同时存在和出现的。
舱壁模式(Bulkhead)隔离了每个工作负载或服务的关键资源,如连接池、内存和CPU。使用舱壁避免了单个工作负载(或服务)消耗掉所有资源,从而导致其他服务出现故障的场景。这种模式主要是通过防止由一个服务引起的级联故障来增加系统的弹性。
4、请求合并
Hystrix支持将多个请求自动合并为一个请求,通过合并可以减少HystrixCommand并发执行所需的线程和网络连接数量,极大地节省了开销,提高了系统效率。
4.1、通过分别继承HystrixCollapser和HystrixCommand类实现
首先,实现一个基础的处理业务逻辑的Service方法,其中必须有一个可以批处理数据的业务方法findAll()。代码如下:
@Service
public class HystrixService {
Logger logger = Logger.getLogger(HystrixService.class);
//@HystrixCommand
public List<String> findAll(List<Integer> ids) {
logger.info("进入了findAll方法,开始执行批处理!");
for(int i=0;i<ids.size();i++){
logger.info("ID:" + ids.get(i));
}
//相当于调用了Provider返回的数据
List<String> list = Arrays.asList("张三","李四","王五");
return list;
}
}
然后需要创建一个HystrixBatchCommon类,该类继承了HystrixCommand类,主要用来实现批处理方法的调用,同时可以增加一些关于Hystrix相关特性,比如服务降级、依赖隔离等。
public class HystrixBatchCommon extends HystrixCommand<List<String>> {
Logger logger = Logger.getLogger(HystrixBatchCommon.class);
private HystrixService hystrixService;
private List<Integer> ids;
public HystrixBatchCommon(HystrixService hystrixService, List<Integer> ids){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")).
andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
this.hystrixService =hystrixService ;
this.ids = ids;
}
@Override
protected List<String> run() throws Exception {
List<String> list = hystrixService.findAll(ids);
return list;
}
@Override
protected List<String> getFallback() {
logger.info("HystrixBatchCommon 出现报错信息!");
return null;
}
}
再实现一个HystrixCollapseCommand类,该类继承了HystrixCollapser类,主要实现请求的合并,然后通过HystrixBatchCommon实现批处理方法的调用。其中HystrixCollapser抽象类定义中,指定了三个不同的类型:
- 第一个是合并后批量请求的返回类型
- 第二个是单个请求返回的类型
- 第三个是请求参数类型
然后其中重写了三个方法:
-
第一个:getRequestArgument()方法用来定义获取请求参数。
-
第二个:createCommand()方法用来合并请求产生批量命令的具体实现方法
-
第三个:mapResponseToRequests()方法用来处理批量命令返回的结果,需要实现将批量结果拆分并传递给合并前的各个请求。
public class HystrixCollapseCommand extends HystrixCollapser<List,String,Integer> {
Logger logger = Logger.getLogger(HystrixCollapseCommand.class); private HystrixService hystrixService; private Integer id; public HystrixCollapseCommand(HystrixService hystrixService, Integer id){ //设置合并请求窗口为1000毫秒,方便测试 super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("testCollapseCommand")). andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(1000))); this.hystrixService = hystrixService; this.id = id; } @Override public Integer getRequestArgument() { return this.id; } @Override protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, Integer>> collapsedRequests) { logger.info("HystrixCollapseCommand========>"); //按请求数声名UserId的集合 List<Integer> ids = new ArrayList<>(collapsedRequests.size()); //通过请求将100毫秒中的请求参数取出来装进集合中 ids.addAll(collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList())); //返回UserBatchCommand对象,自动执行UserBatchCommand的run方法 HystrixCommand<List<String>> re =new HystrixBatchCommon(hystrixService, ids); return re; } @Override protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> collapsedRequests) { int count = 0 ; for(CollapsedRequest<String,Integer> collapsedRequest : collapsedRequests){ //从批响应集合中按顺序取出结果 String user = batchResponse.get(count++); //将结果放回原Request的响应体内 collapsedRequest.setResponse(user); } }
}
最后,编写一个测试方法,验证请求的合并效果,如下所示:
@RequestMapping("/mergeRequest2")
public String mergeRequest2(Integer id) throws ExecutionException, InterruptedException {
logger.info("执行消费者FeignController的mergeRequest2()方法!");
//HystrixRequestContext context = HystrixRequestContext.initializeContext();
// Future<String> f1 = new HystrixCollapseCommand(hystrixService, id).queue();
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
Future<String> f1 = new HystrixCollapseCommand(hystrixService, 1).queue();
Future<String> f2 = new HystrixCollapseCommand(hystrixService, 2).queue();
Future<String> f3 = new HystrixCollapseCommand(hystrixService, 3).queue();
Thread.sleep(3000);
Future<String> f4 = new HystrixCollapseCommand(hystrixService, 5).queue();
Future<String> f5 = new HystrixCollapseCommand(hystrixService, 6).queue();
String u1 = f1.get();
String u2 = f2.get();
String u3 = f3.get();
String u4 = f4.get();
String u5 = f5.get();
System.out.println(u1);
System.out.println(u2);
System.out.println(u3);
System.out.println(u4);
System.out.println(u5);
} catch (Exception e) {
e.printStackTrace();
}finally {
context.close();
}
return null;//f1.get().toString();
}
启动项目后,访问http://localhost:8020/mergeRequest2地址,在控制台出现内容,其中findAll方法只执行了两次,说明合并生效了。
疑问:在实际工作过程中,请求合适是如何使用的呢?这里无法直接返回请求结果,否则合并将失效。猜想:应该通过异步消息之类的方式实现,等待验证。
4.2、通过注解实现请求合并
实现注解的方式比较简单,只需要修改业务处理的Service方法即可,需要一个单个请求处理的方法用来添加@HystrixCollapser注解,该方法中的逻辑实际不会被执行,主要用来请求合并,然后在实际批处理的方法上,通过@HystrixCommand注解,用来进行合并后的处理。
@Service
public class HystrixService {
Logger logger = Logger.getLogger(HystrixService.class);
@HystrixCollapser(batchMethod = "findAll", scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties = {
//请求时间间隔在 5000ms 之内的请求会被合并为一个请求,默认为 10ms
@HystrixProperty(name = "timerDelayInMilliseconds", value = "500"),
//设置触发批处理执行之前,在批处理中允许的最大请求数。
@HystrixProperty(name = "maxRequestsInBatch", value = "200")
})
public Future<String> getById(Integer id) {
logger.info("不执行该代码");
return null;
}
@HystrixCommand
public List<String> findAll(List<Integer> ids) {
logger.info("进入了findAll方法,开始执行批处理!");
for(int i=0;i<ids.size();i++){
logger.info("ID:" + ids.get(i));
}
//相当于调用了Provider返回的数据
List<String> list = Arrays.asList("张三","李四","王五");
return list;
}
}
注解方式的验证,不能通过上述提供的方式进行,我们在HystrixController类中单独提供了一个如下的方法:
@RequestMapping("/mergeRequest")
public String mergeRequest(Integer id) throws ExecutionException, InterruptedException {
logger.info("执行消费者FeignController的mergeRequest()方法!");
Future<String> future = hystrixService.getById(id);
return null;
}
然后通过postman的批请求方式进行验证,首先保存几个请求,然后执行,如下:
请求后,在控制台可以看见如下内容,说明两次请求合并成了一次,进行执行。
5、监控面板
我们在前面提到被@HystrixCommand注解的方法,会被Hystrix记录调用情况,那记录的这些数据怎么查看呢?
5.1、监控节点实例
为了监控服务中接口被调用情况,需要添加相关的依赖,即在消费者端(eureka-consumer)引入actuator依赖,如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
然后,在application.properties增加相关配置,即暴露端点
#暴露全部端点
management.endpoints.web.exposure.include=*
#也可以设置不暴露的端点
management.endpoints.web.exposure.exclude=beans
端点信息如下:
配置完成后,重新启动服务。访问http://192.168.1.87:8020/actuator/health地址,页面会出现如下内容,说明服务正常运行:
{"status":"UP"}
访问http://192.168.1.87:8020/actuator/hystrix.stream地址,这个时候,会出现如下内容,这些内容就是服务使用情况的数据。
如果出现如下404页面,一般都是management.endpoints.web.exposure.include配置问题,检查其配置即可。
5.2、监控面板
在前面我们可以通过访问http://192.168.1.87:8020/actuator/hystrix.stream地址,查看应用监控的数据,但是这些数据都是json格式的,很不方便直观的了解服务运行情况。其实,Hystrix已经提供了一套可视化的监控客户端,即Hystrix Dashboard。下面我们来搭建Hystrix Dashboard服务,过程也非常的简单。
为了保证Hystrix Dashboard的独立性,我们单独创建了一个子模块来搭建Hystrix Dashboard,项目名称为:hystrix-dashboard。
首先,需要添加相关的依赖信息,内容如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
<version>1.4.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix-dashboard</artifactId>
<version>1.4.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
然后,修改application.properties配置文件
spring.application.name=hystrix-dashboard
server.port=8050
#设置与Eureka Server交互的地址。
eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8000/eureka/
最后,创建启动类,并添加相关注解即可,如下所示:
@SpringBootApplication
@EnableHystrixDashboard
public class HystrixDashboardApplication {
public static void main(String[] args) {
SpringApplication.run(HystrixDashboardApplication.class, args);
}
}
完成上述配置后,重新启动项目。访问http://localhost:8050/hystrix地址,可以看到Hystrix Dashboard的监控首页,如下所示:
但是在Hystrix Dashboard的监控页面并没有相关的监控信息和内容,通过页面提示我们可以知道,如果想监控那些数据,通过填入对应的url,然后点击“Monitor Stream”按钮进行查看。这里我们选择使用hystrix.stream监控单个应用的情况,即输入url为http://192.168.1.87:8020/actuator/hystrix.stream,然后点击按钮查看即可。
这个时候,如果出现“Unable to connect to Command Metric Stream.”报错,如下页面:
然后,查看hystrix-dashboard控制台,发现打印了如下日志,说明缺少了hystrix.dashboard.proxyStreamAllowList配置。
2020-11-11 14:20:34.909 WARN 70100 --- [nio-8050-exec-9] ashboardConfiguration$ProxyStreamServlet : Origin parameter: http://192.168.1.87:8020/actuator/hystrix.stream is not in the allowed list of proxy host names. If it should be allowed add it to hystrix.dashboard.proxyStreamAllowList.
这时候,在application.properties配置文件中增加如下配置即可,需要注意的是:这个配置不支持“*”,如果需要配置多个IP,使用“,”分割即可。
hystrix.dashboard.proxyStreamAllowList=192.168.1.87
然后重新启动,再刷新页面会发现出现如下界面,说明成功了:
可视化内容说明:一个实心圆、一条曲线、指标描述。
- 实心圆:共有两种含义。它通过颜色的变化代表了实例的健康程度,它的健康度从绿色、黄色、橙色、红色递减。该实心圆除了颜色的变化之外,它的大小也会根据实例的请求流量发生变化,流量越大该实心圆就越大。所以通过该实心圆的展示,我们就可以在大量的实例中快速的发现故障实例和高压力实例。
- 曲线:用来记录流量的相对变化(近期内的),我们可以通过它来观察到流量的上升和下降趋势。
- 指标描述如下: