Hystrix 是 Netflix 开发的用于容错处理的库,不过 2018 年后就不在开发了,只进入维护模式,官方推荐新项目使用 resilience4j(resilience4j 的开发是受 Hystrix 启发,但为 Java 8 和函数式编程设计,并且没有过多依赖)
分布式系统中,组件之间的调用依赖关系有时候比较复杂,一个组件出错可能会影响很多组件,如果处理不好可能会造成整个系统崩溃,Hystirx 就是为了提高系统的容错能力
Hystrix 的主要目的
- 阻止故障的连锁反应
- 快速失败和快速恢复(出错时防止排队,避免进一步消耗资源)
- 回退和降级(提高可用性,减少对用户的影响)
- 线程和信号量的隔离(防止单独的依赖耗尽资源),熔断机制(限制单独的依赖的影响)
- 实时监控和配置更改
- 并发执行,支持并发的请求缓存
- 通过合并请求实现自动批处理
Hystrix 的流程 https://github.com/Netflix/Hystrix/wiki/How-it-Works
- 将要执行的命令封装到 HystrixCommand 或 HystrixObservableCommand (同样的命令也要重新 new)
- 调用四个执行函数中的一个(具体后面再讲)
- 如果缓存打开并且命中缓存,直接返回缓存
- 是否处于熔断中,如果是,进入第 8 步回退
- 是否信号量或线程已经用完,如果是,进入第 8 步回退
- 真正执行命令,如果失败,或者超时,进入第 8 步回退,否则进入第 9 步
- 计算熔断器监控指标
- 回退
- 返回结果
执行函数
- execute:实际调用的是 queue().get() 会立刻阻塞、执行命令
- queue:实际调用的是 toObservable().toBlocking().toFuture(),当执行 future.get() 时才真正阻塞并执行命令
- toObservable:只有当调用了 subscribe 函数时才触发命令的执行,但不会阻塞
- observe:实际上是调用了 toObservable().subscribe 触发命令执行,再调用 doOnUnsubscribe 好让后面的用户代码执行 subscribe,不会阻塞
例子 https://github.com/Netflix/Hystrix/wiki/How-To-Use
<!--
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
</dependency>
-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
public class CommandHelloWorld extends HystrixCommand<String> {
private final String name;
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String run() {
return "Hello " + name + "!";
}
}
@RestController
public class Controller {
@RequestMapping("/hello")
public String hello() throws InterruptedException, ExecutionException {
// 就算同样的命令也要重新 new 一个
// sync
// 实际上调用的是 queue().get()
String exeucte_result = new CommandHelloWorld("Bob").execute();
// async
// 实际调用的是 toObservable().toBlocking().toFuture()
// 当执行 future.get() 时才真正执行命令
Future<String> future = new CommandHelloWorld("World").queue();
// async
// returns a “hot” Observable that executes the command immediately,
// though because the Observable is filtered through a ReplaySubject you are not in danger
// of losing any items that it emits before you have a chance to subscribe
//
// 实际上是调用了 toObservable().subscribe 让命令执行,再调用 doOnUnsubscribe 好让后面的用户代码执行 subscribe
final StringBuilder observe_string = new StringBuilder();
Observable<String> observe = new CommandHelloWorld("Hystirx").observe();
//observe.subscribe(d -> System.out.println(d));
observe.subscribe(d -> observe_string.append(d));
// async
// returns a “cold” Observable that won’t execute the command and begin emitting its results
// until you subscribe to the Observable
final StringBuilder observable_string = new StringBuilder();
Observable<String> observable = new CommandHelloWorld("Spring").toObservable();
//observable.subscribe(d -> System.out.println(d));
observable.subscribe(d -> observable_string.append(d));
// 无论是 observe() 还是 toObservable() 实际都是异步的,因为 subscribe 函数不会阻塞,
// 可以看到 observe_string 和 observable_string 有时有值,有时是空的
return "execute: " + exeucte_result + "<br>queue: " + future.get() +
"<br>observe: " + observe_string + "<br>observable: " + observable_string;
}
}
如果改用 HystrixObservableCommand 代码如下
(HystrixCommand 主要用于阻塞式,同时也提供 observe() 和 toObservable() 函数用于非阻塞)
(HystrixObservableCommand 只支持非阻塞的模式即 observe() 和 toObservable() 函数)
public class CommandHelloWorld extends HystrixObservableCommand<String> {
private final String name;
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected Observable<String> construct() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
try {
if (!observer.isUnsubscribed()) {
// a real example would do work like a network call here
observer.onNext("Hello");
observer.onNext(name + "!");
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} ).subscribeOn(Schedulers.io());
}
}
可以通过重载 getFallback 函数实现回退
public class CommandHelloFailure extends HystrixCommand<String> {
private final String name;
public CommandHelloFailure(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String run() {
throw new RuntimeException("this command always fails");
}
@Override
protected String getFallback() {
return "Hello Failure " + name + "!";
}
}
@RequestMapping("/fail")
public String fail() {
return new CommandHelloFailure("Hystirx").execute();
}
通过重载 getCacheKey 使用 cache
public class CommandUsingRequestCache extends HystrixCommand<Boolean> {
private final int value;
public CommandUsingRequestCache(int value) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.value = value;
}
@Override
protected Boolean run() {
return value == 0 || value % 2 == 0;
}
@Override
protected String getCacheKey() {
// 获取 key,相同的 key 使用同一个 cache,这里直接把传给 HystrixCommand 的 value 作为 key
return String.valueOf(value);
}
}
@RequestMapping("/cache")
public String cache() {
final StringBuilder result = new StringBuilder();
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
CommandUsingRequestCache command2a = new CommandUsingRequestCache(2);
CommandUsingRequestCache command2b = new CommandUsingRequestCache(2);
result.append("<br>first: " + command2a.execute());
result.append("<br>from cache: " + command2a.isResponseFromCache());
result.append("<br>second: " + command2b.execute());
result.append("<br>from cache: " + command2b.isResponseFromCache());
} finally {
context.shutdown();
}
// start a new request context
context = HystrixRequestContext.initializeContext();
try {
CommandUsingRequestCache command3b = new CommandUsingRequestCache(2);
result.append("<br>third: " + command3b.execute());
result.append("<br>from cache: " + command3b.isResponseFromCache());
} finally {
context.shutdown();
}
return result.toString();
}
折叠请求(用于批量执行)
public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {
private final Integer key;
public CommandCollapserGetValueForKey(Integer key) {
System.out.println("CommandCollapserGetValueForKey : " + key);
this.key = key;
}
@Override
public Integer getRequestArgument() {
return key;
}
@Override
protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
System.out.println("createCommand : " + requests);
return new BatchCommand(requests);
}
@Override
protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
System.out.println("mapResponseToRequests : " + batchResponse);
int count = 0;
for (CollapsedRequest<String, Integer> request : requests) {
request.setResponse(batchResponse.get(count++));
}
}
private static final class BatchCommand extends HystrixCommand<List<String>> {
private final Collection<CollapsedRequest<String, Integer>> requests;
private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
System.out.println("BatchCommand : " + requests);
this.requests = requests;
}
@Override
protected List<String> run() {
System.out.println("BatchCommand : run");
ArrayList<String> response = new ArrayList<String>();
for (CollapsedRequest<String, Integer> request : requests) {
// artificial response for each argument received in the batch
response.add("ValueForKey: " + request.getArgument());
}
return response;
}
}
}
@RequestMapping("/collapser")
public String collapser() throws InterruptedException, ExecutionException {
final StringBuilder result = new StringBuilder();
HystrixRequestContext context = HystrixRequestContext.initializeContext();
Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();
Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();
Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();
Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();
try {
System.out.println("f1.get()");
result.append("<br>" + f1.get());
System.out.println("f2.get()");
result.append("<br>" + f2.get());
System.out.println("f3.get()");
result.append("<br>" + f3.get());
System.out.println("f4.get()");
result.append("<br>" + f4.get());
} finally {
context.shutdown();
}
return result.toString();
}
打出的日志如下
CommandCollapserGetValueForKey : 1
CommandCollapserGetValueForKey : 2
CommandCollapserGetValueForKey : 3
CommandCollapserGetValueForKey : 4
f1.get()
createCommand : [com.netflix.hystrix.collapser.CollapsedRequestSubject@58a7cdbd, com.netflix.hystrix.collapser.CollapsedRequestSubject@566028d9, com.netflix.hystrix.collapser.CollapsedRequestSubject@525fa6cd, com.netflix.hystrix.collapser.CollapsedRequestSubject@175d22e2]
BatchCommand : [com.netflix.hystrix.collapser.CollapsedRequestSubject@58a7cdbd, com.netflix.hystrix.collapser.CollapsedRequestSubject@566028d9, com.netflix.hystrix.collapser.CollapsedRequestSubject@525fa6cd, com.netflix.hystrix.collapser.CollapsedRequestSubject@175d22e2]
BatchCommand : run
mapResponseToRequests : [ValueForKey: 1, ValueForKey: 2, ValueForKey: 3, ValueForKey: 4]
f2.get()
f3.get()
f4.get()
可以看到执行第一个 get 的时候就已经把所有命令批量执行好了,后面的 get 就是直接取出结果
执行 caching, collapsing 等操作的时候都需要先 setup request context
HystrixRequestContext context = HystrixRequestContext.initializeContext();
......
context.shutdown();
或者在 Java web 应用中,通过 Servlet Filter 完成
public class HystrixRequestContextServletFilter implements Filter {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
chain.doFilter(request, response);
} finally {
context.shutdown();
}
}
}
<!--web.xml-->
<filter>
<display-name>HystrixRequestContextServletFilter</display-name>
<filter-name>HystrixRequestContextServletFilter</filter-name>
<filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>HystrixRequestContextServletFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
设置 HystrixCommand 的配置
public class CommandWithProperties extends HystrixCommand<Integer> {
private Integer id;
public CommandWithProperties(Integer id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withCircuitBreakerEnabled(true) // 打开熔断功能(默认就是 true)
.withMetricsRollingStatisticalWindowInMilliseconds(15000) // 设置统计窗口为 15 秒,默认是 10 秒
.withExecutionTimeoutInMilliseconds(900) // 命令超时时间是 0.9 秒,默认是 1 秒
.withCircuitBreakerRequestVolumeThreshold(10) // 15 秒统计窗口内至少有 10 个请求,熔断器才进行错误率的计算,默认是 20
.withCircuitBreakerSleepWindowInMilliseconds(5000) // 熔断 5 秒后会重新尝试
.withCircuitBreakerErrorThresholdPercentage(50) // 错误率达到 50 开启熔断保护
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) // 命令需要获取信号量还是线程,默认是线程
.withExecutionIsolationSemaphoreMaxConcurrentRequests(10))); // 最大并发请求量
this.id = id;
}
@Override
protected Integer run() {
if (this.id < 10) {
try {
// 迫使 timeout
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return this.id;
}
@Override
protected Integer getFallback() {
return -1;
}
}
@RequestMapping("/properties")
public String properties() throws InterruptedException, ExecutionException {
final StringBuilder result = new StringBuilder();
for (int i = 0; i < 20; i++) {
// i 小于 10 的时候会失败
// 至少有 10 个调用就开始计算失败率,发现失败率大于 50% 就开启熔断保护
// 当 i 大于等于 10 的时候本来应该不失败,但由于熔断保护,会直接进入 fall back
// 熔断状态超过 5 秒后会进入半打开状态
CommandWithProperties command = new CommandWithProperties(i);
Integer r = command.execute();
result.append("<br>execute " + i + ", result " + r + ", isCircuitBreakerOpen " + command.isCircuitBreakerOpen());
if (i == 15) {
// 等待 5s,使得熔断器进入半打开状态
Thread.sleep(5000);
}
}
return result.toString();
}
返回结果为
execute 0, result -1, isCircuitBreakerOpen false
execute 1, result -1, isCircuitBreakerOpen false
execute 2, result -1, isCircuitBreakerOpen false
execute 3, result -1, isCircuitBreakerOpen false
execute 4, result -1, isCircuitBreakerOpen false
execute 5, result -1, isCircuitBreakerOpen false
execute 6, result -1, isCircuitBreakerOpen false
execute 7, result -1, isCircuitBreakerOpen false
execute 8, result -1, isCircuitBreakerOpen false
execute 9, result -1, isCircuitBreakerOpen true
execute 10, result -1, isCircuitBreakerOpen true
execute 11, result -1, isCircuitBreakerOpen true
execute 12, result -1, isCircuitBreakerOpen true
execute 13, result -1, isCircuitBreakerOpen true
execute 14, result -1, isCircuitBreakerOpen true
execute 15, result -1, isCircuitBreakerOpen true
execute 16, result 16, isCircuitBreakerOpen false
execute 17, result 17, isCircuitBreakerOpen false
execute 18, result 18, isCircuitBreakerOpen false
execute 19, result 19, isCircuitBreakerOpen false
所有默认配置可以在 HystrixCommandProperties 里面找到
static final Integer default_metricsRollingStatisticalWindow = 10000;// default => statisticalWindow: 10000 = 10 seconds (and default of 10 buckets so each bucket is 1 second)
private static final Integer default_metricsRollingStatisticalWindowBuckets = 10;// default => statisticalWindowBuckets: 10 = 10 buckets in a 10 second window so each bucket is 1 second
private static final Integer default_circuitBreakerRequestVolumeThreshold = 20;// default => statisticalWindowVolumeThreshold: 20 requests in 10 seconds must occur before statistics matter
private static final Integer default_circuitBreakerSleepWindowInMilliseconds = 5000;// default => sleepWindow: 5000 = 5 seconds that we will sleep before trying again after tripping the circuit
private static final Integer default_circuitBreakerErrorThresholdPercentage = 50;// default => errorThresholdPercentage = 50 = if 50%+ of requests in 10 seconds are failures or latent then we will trip the circuit
private static final Boolean default_circuitBreakerForceOpen = false;// default => forceCircuitOpen = false (we want to allow traffic)
static final Boolean default_circuitBreakerForceClosed = false;// default => ignoreErrors = false
private static final Integer default_executionTimeoutInMilliseconds = 1000; // default => executionTimeoutInMilliseconds: 1000 = 1 second
private static final Boolean default_executionTimeoutEnabled = true;
private static final ExecutionIsolationStrategy default_executionIsolationStrategy = ExecutionIsolationStrategy.THREAD;
private static final Boolean default_executionIsolationThreadInterruptOnTimeout = true;
private static final Boolean default_executionIsolationThreadInterruptOnFutureCancel = false;
private static final Boolean default_metricsRollingPercentileEnabled = true;
private static final Boolean default_requestCacheEnabled = true;
private static final Integer default_fallbackIsolationSemaphoreMaxConcurrentRequests = 10;
private static final Boolean default_fallbackEnabled = true;
private static final Integer default_executionIsolationSemaphoreMaxConcurrentRequests = 10;
private static final Boolean default_requestLogEnabled = true;
private static final Boolean default_circuitBreakerEnabled = true;
private static final Integer default_metricsRollingPercentileWindow = 60000; // default to 1 minute for RollingPercentile
private static final Integer default_metricsRollingPercentileWindowBuckets = 6; // default to 6 buckets (10 seconds each in 60 second window)
private static final Integer default_metricsRollingPercentileBucketSize = 100; // default to 100 values max per bucket
private static final Integer default_metricsHealthSnapshotIntervalInMilliseconds = 500; // default to 500ms as max frequency between allowing snapshots of health (error percentage etc)