雪崩效应即在多个服务节点当中,如果有一个服务不可用而这个不可用的服务导致整个应用资源都耗在这里,进而影响整个系统的崩溃。在分布式环境中,不可避免地会出现雪崩效应。Hystrix是一个netflix实现了 circuit breaker pattern模式的库,它通过处理并发量,降低延迟故障和优雅的容错处理来帮助您控制这些分布式服务之间的交互。Hystrix通过隔离服务之间的访问点,停止跨服务的级联故障和优雅的降级来提高系统的整体弹性。
5.2.1 为什么需要Hystrix
hystrix的作用:
- 通过第三方客户端库访问依赖项(通常是通过网络),以保护和控制延迟和故障。
- 防止雪崩效应
- 当遇到的问题 快速失败并优雅的恢复
- 有效的进行监控与警告
5.2.2 防止雪崩效应
Hystrix采用了如下方式来防止雪崩效应:
- 在HystrixCommand或HystrixObservableCommand对象中封装对外部系统(或“依赖项”)的所有调用,该对象通常在单独的线程中执行(这是命令模式的一个示例)。
- 为每个依赖项维护一个小的线程池(或信号量); 如果它被填满了,发送给该依赖项的请求将立即被拒绝,而不是排队等待。
- Hystrix会根据根据标准衡量是否成功、失败(由客户端抛出的异常)、超时和拒绝再一次的请求
- 触发断路器,在一段时间内停止对特定服务的所有请求,如果服务的错误率超过阈值,则手动或自动停止。
- 当请求失败、被拒绝、超时或短路时,执行回退逻辑。
- 全方位的进行监控
5.2.3 工作流程
一、 构建HystrixCommand或者HystrixObservableCommand对象
第一步是构造一个HystrixCommand
或HystrixObservableCommand
对象,表示您正在向依赖项发出的请求。HystrixCommand用于对一个依赖项产生独立的响应,而HystrixObservableCommand拿到的是基于rxjava实现的响应式结果observerable
二、 执行Command命令
通过使用Hystrix命令对象的以下四种方法之一(前两种方法仅适用于简单的HystrixCommand对象,不适用于HystrixObservableCommand),有四种方法可以执行命令:
- execute:阻塞的,可以从依赖项返回单个结果(或者在出现错误时抛出异常)
- queue: 从依赖项返回Future对象
- observer:订阅依赖项返回的结果,并返回复制该源头Observable做为返回值对象
- toObservable: 返回Observable对象,只有订阅它的时候才会执行hystrix command命令
三、 查看缓存中是否有响应结果
如果开启了缓存,并且缓存中有针对于本次请求结果的缓存,那么将会读取缓存中的值
四、 断路器是否打开
当执行命令时,Hystrix会检查断路器是否打开。如果断路器的状态是open或者tripped,那么Hystrix不会执行相关命令,它会路由至第8步。如果断路器没有打开则会执行第5步
五、 信号量/队列/线程池是否填满
如果与命令关联的线程池和队列或信号量已经满了,那么Hystrix将不会执行命令,它会立即路由到(8)进行回退的操作。
六、 HystrixObservableCommand.construct() or HystrixCommand.run()
Hystrix通过调用如下方法进行对依赖项的请求:
- HystrixCommand.run()— 返回单独的结果响应
- HystrixObservableCommand.construct()— 返回一个Observable对象
如果run()或construct()方法超过命令的超时值,线程将抛出TimeoutException(如果命令本身不在自己的线程中运行,则单独的计时器线程将抛出该异常)。在这种情况下,Hystrix将响应传递到步骤8来获取回退,如果最终返回值run()或construct()方法没有取消/中断,它将丢弃。
七、 计算断路的健康值
Hystrix向断路器报告成功、故障、拒绝和超时等信息,断路器维护一组动态计数器,用于计算统计数据。它使用这些统计数据来确定电路什么时候应该“跳闸”,如果已经跳闸,它会短路任何后续请求,直到恢复周期结束,在此期间,它会在第一次检查某些健康检查后再次关闭电路
八、 调用fallback方法
当Command命令执行失败时,Hystrix会尝试进行回滚的操作,常见的失败可原因如下:
- construct() or run() 抛出异常时
- 当断路器被打开时
- 线程、队列或者信号量充满时
- commnad执行超时
九、 成功的进行响应
5.2.4 跳闸原理
电路开闭的方式如下:
一、 当断路器满足某个阀值
HystrixCommandProperties.circuitBreakerRequestVolumeThreshold()
二、 当错误百分比超过某个阀值
HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()
三、 而后断路器有关—>开
四、当断路器打开时,所有的请求都会进行短路操作,最常见的方式就是执行fallback方法
在一定时间以后,我们可以通过HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds(
)来设置此值。下一个请求会被放行(此时断路器状态是half-open),如果这次请求失败了,仍会打开断路器。如果成功了,则断路器进行关闭
5.2.5 隔离策略
5.2.5.1 线程组隔离
客户机在单独的线程上执行。这将它们与调用线程(Tomcat线程池)隔离开来,以便调用者可以“避开”耗时太长的依赖项调用。官方推荐使用这种方式进行服务与依赖之间的隔离,官方解释的好处如下:
- The application is fully protected from runaway client libraries. The pool for a given dependency library can fill up without impacting the rest of the application.
- The application can accept new client libraries with far lower risk. If an issue occurs, it is isolated to the library and doesn’t affect everything else.
- When a failed client becomes healthy again, the thread pool will clear up and the application immediately resumes healthy performance, as opposed to a long recovery when the entire Tomcat container is overwhelmed.
- If a client library is misconfigured, the health of a thread pool will quickly demonstrate this (via increased errors, latency, timeouts, rejections, etc.) and you can handle it (typically in real-time via dynamic properties) without affecting application functionality.
- If a client service changes performance characteristics (which happens often enough to be an issue) which in turn cause a need to tune properties (increasing/decreasing timeouts, changing retries, etc.) this again becomes visible through thread pool metrics (errors, latency, timeouts, rejections) and can be handled without impacting other clients, requests, or users.
- Beyond the isolation benefits, having dedicated thread pools provides built-in concurrency which can be leveraged to build asynchronous facades on top of synchronous client libraries (similar to how the Netflix API built a reactive, fully-asynchronous Java API on top of Hystrix commands).
我认为其最主要的优点就是各个服务模块的“独立性”,不依赖与容器中(tomcat)的线程组,出了问题可以把风险降到最低同时也可以快速恢复。当然其主要缺点是增加了计算开销。每个命令执行都涉及到在单独的线程上运行命令所涉及的排队、调度和上下文切换。不过官方决定接受这种开销的成本以换取它所提供的好处,他们认为这种成本和性能影响不大。线程组隔离依赖的示例图如下:
5.2.5.2 信号量隔离
信号量隔离,通常通过设置一个值来限制针对一项依赖的并发请求数目,这种方式可以允许不适用线程池的方式下降低负载量,如果您信任客户端并且只希望减少负载,那么可以使用这种方法。一旦达到限制阀值,信号量拒绝其他线程的请求,但是填充信号量的线程不能离开。
如果使用ThreadLocal绑定变量或传递时,一定要使用信号量的隔离方式
5.2.6 单独使用Hystrix示例
package com.iteng.springcloud.hystrix;
import com.netflix.hystrix.*;
public class FirstHystrixExample extends HystrixCommand<String> {
public FirstHystrixExample() {
super(
Setter. withGroupKey(HystrixCommandGroupKey.Factory.asKey(FirstHystrixExample.class.getSimpleName())).
andCommandKey(HystrixCommandKey.Factory.asKey("test")).
andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(30)).
andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)).
andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000)).
andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationSemaphoreMaxConcurrentRequests(1)).
andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withFallbackIsolationSemaphoreMaxConcurrentRequests(200)).
andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withCircuitBreakerEnabled(true))
);
}
@Override
protected String run() throws Exception {
// Thread.sleep(30000);
return "hello";
//return "hello";
}
@Override
protected String getFallback() {
return "error msg...";
}
}
关于Hystrix的相关配置请参考官网:地址 ,这里需要指定hystrix的组和commandkey,我们可以通过以下方式来做:
Setter.
withGroupKey(HystrixCommandGroupKey.Factory.asKey(FirstHystrixExample.class.getSimpleName())).
andCommandKey(HystrixCommandKey.Factory.asKey("test"))
在这里我们可以指定类名为组,方法名为key。利用动态代理来实现HystrixCommand
调用示例:
static void execute() {
FirstHystrixExample firstHystrixExample = new FirstHystrixExample();
System.out.println(firstHystrixExample.execute());
}
static void future() {
FirstHystrixExample firstHystrixExample = new FirstHystrixExample();
Future<String> future = firstHystrixExample.queue();
try {
String s = future.get(2, TimeUnit.SECONDS);
System.out.println(s);
} catch (Exception e) {
e.printStackTrace();
}
}
static void observer() {
FirstHystrixExample firstHystrixExample = new FirstHystrixExample();
firstHystrixExample.observe().subscribeOn(Schedulers.newThread()).subscribe(s -> System.out.println(Thread.currentThread().getName() + ":" + s));
}
5.2.7 SpringCloud集成Hystrix
在SpringCloud中添加Hystrix的支持,我们需要在maven或者gradle里添加groupId为org.springframework.cloud,AffactId为spring-cloud-starter-netflix-hystrix的依赖。代码示例如下:
@SpringBootApplication
@EnableCircuitBreaker
public class Application {
public static void main(String[] args) {
new SpringApplicationBuilder(Application.class).web(true).run(args);
}
}
@Component
public class StoreIntegration {
@HystrixCommand(fallbackMethod = "defaultStores")
public Object getStores(Map<String, Object> parameters) {
//do stuff that might fail
}
public Object defaultStores(Map<String, Object> parameters) {
return /* something useful */;
}
}
其中SpringCloud在这里会把@HystrixCommand标注的方法包装成代理对象连接至Hystrix circuit breaker,Hystrix断路保护器会计算什么时候开闭。
5.2.7.1 源码分析
我们可以来看一下它的源码:
在spring-cloud-netflix-core-2.0.1.RELEASE.jar中的META-INF/spring.factories里有如下配置:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
那么在HystrixCircuitBreakerConfiguration当中可以看到如下实现:
/*
* Copyright 2013-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.hystrix;
import org.apache.catalina.core.ApplicationContext;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.client.actuator.HasFeatures;
import org.springframework.cloud.client.actuator.NamedFeature;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.netflix.hystrix.Hystrix;
import com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect;
/**
* @author Spencer Gibb
* @author Christian Dupuis
* @author Venil Noronha
*/
@Configuration
public class HystrixCircuitBreakerConfiguration {
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
@Bean
public HystrixShutdownHook hystrixShutdownHook() {
return new HystrixShutdownHook();
}
@Bean
public HasFeatures hystrixFeature() {
return HasFeatures.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
}
// ...省略部分代码
/**
* {@link DisposableBean} that makes sure that Hystrix internal state is cleared when
* {@link ApplicationContext} shuts down.
*/
private class HystrixShutdownHook implements DisposableBean {
@Override
public void destroy() throws Exception {
// Just call Hystrix to reset thread pool etc.
Hystrix.reset();
}
}
}
在这里我们可以看到创建了一个HystrixCommandAspect的切面。在切面里有几行关键的代码:
//定义寻找@HystrixCommand的切点
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
//环绕通知
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
//根据切点找到对应的执行方法
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
//如果方法上@HystrixCommand与@HystrixCollapser则扔出异常
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
/*
根据Joinpoint切点拿到MetaHolder,该类封装了与Hystrix相关的要素,如配置等
根据metaHolder拿到HystrixInvokable对象,该对象定义了Hystrix的执行规范
*/
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
//执行hystrix的Command
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
//通过Observable的方式执行
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause() != null ? e.getCause() : e;
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
5.2.7.2 Hystrix传播ThreadLocal
如果我们想传播ThreadLocal至@HystrixCommand中,只有设置默认策略为semaphore才可以,因为在默认情况下,Hystrix隔离策略是线程级别的,因此调用run方法时已经是Hystrix单独维护的线程了,示例:
package com.iteng.springcloud.hystrix.service;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HystrixService {
private ThreadLocal<String> threadLocal = new ThreadLocal<>();
@Autowired
private HystrixService hystrixService;
@HystrixCommand(fallbackMethod = "fallback")
@GetMapping("/sleep/{value}")
public String index(@PathVariable Integer value) {
String r = threadLocal.get();
return Thread.currentThread().getName() + ":" + r;
}
@GetMapping
public String test() {
threadLocal.set("test");
return hystrixService.index(1);
}
public String fallback(Integer value) {
return "error msg...";
}
}
另外我们可以通过扩展HystrixConcurrencyStrategy的方式来处理ThreadLocal的问题,在这里官方明确的告诉我们这个办法来解决:
/**
* Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
* <p>
* This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
* <p>
* <b>Default Implementation</b>
* <p>
* Pass-thru that does no wrapping.
*
* @param callable
* {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor}
* @return {@code Callable<T>} either as a pass-thru or wrapping the one given
*/
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return callable;
}
使用示例:
package com.iteng.springcloud.hystrix.concurrencystrategy;
import com.iteng.springcloud.hystrix.FirstHystrixExample;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import java.util.concurrent.Callable;
public class TestConcurrencyStrategy extends HystrixConcurrencyStrategy {
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return new Callable<T>() {
@Override
public T call() throws Exception {
FirstHystrixExample.test.set("333");
return callable.call();
}
};
}
}
由于默认情况下Hystrix加载HystrixProperties默认是用ServiceLoader,具体可见(HystrixPlugins)因此需创建META-INF/services/com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy
文件里面做如下配置:
com.iteng.springcloud.hystrix.concurrencystrategy.TestConcurrencyStrategy
那么改造5.2.6的示例:
package com.iteng.springcloud.hystrix;
import com.netflix.hystrix.*;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import java.util.concurrent.Callable;
public class FirstHystrixExample extends HystrixCommand<String> {
//....省略部分代码...
public static ThreadLocal<String> test =new ThreadLocal<>();
// .....
@Override
protected String run() throws Exception {
System.out.println(Thread.currentThread().getName()+":"+test.get());
return "hello";
}
}
那么在SpringCloud中我们可以将TestConcurrencyStrategy配置为一个bean就可以了
5.2.7.3 健康信息
hystrix提供了对健康信息的检查,我们可以通过/health端点进行有效的监控,例如:
{
"hystrix": {
"openCircuitBreakers": [
"StoreIntegration::getStoresByLocationLink"
],
"status": "CIRCUIT_OPEN"
},
"status": "UP"
}