在 Hystrix 入门中,使用 Hystrix 时创建命令并给予执行,实际上 Hystrix 有一套较为复杂的执行逻辑,简单来说明以下运作流程:
- 在命令开始执行时,会做一些准备工作,例如为命令创建响应的线程池等
- 判断是否打开了缓存,打开了缓存就直接查找缓存并返回结果。
- 判断断路器是否打开,如果打开了,就表示服务链路不可用,直接执行回退方法。
- 判断线程池、信号量(计数器)等条件,例如,线程池超负荷,则执行回退方法
- 执行命令,计算是否要对断路器进行处理,执行完成后,如果满足一定条件,则需要开启断路器。如果执行成功则返回结果,执行失败则执行回退
Hystrix 官方的执行流程图如下:
命令执行
Hystrix 命令执行可以使用以下方法来执行命令:
- toObservable:返回一个最原始的 Observable 实例,Observable 是 RxJava 的类,使用该对象可以观察命令的执行过程,并且将执行信息传递给订阅者,该方法是异步执行
- observe:调用 toObservable 方法,获取一个原始的 Observable 实例,使用 ReplaySubject 作为原始 Observable 的订阅者,该方法是异步执行
- queue:通过 toObservable 方法获取原始的 Observable 实例,在调用 Observable 的 toBlocking 方法得到一个 BlockingObservable 实例,最后调用 BlockingObservable 的 toFuture 方法返回 Future 实例,最后调用 Future 实例的 get 方法得到执行结果,该方法是异步执行
- execute:该方法调用 queue 的 get 方法返回命令的执行结果,该方法时同步执行
除了 execute 方法外,其他的方法都是异步执行,observe 与 toObservable 方法的区别是,toObservable 调用后,不会立即执行,只有当返回的 Observable 实例被订阅后,才会真正的执行命令,并且只能一次订阅,而 observe 方法返回的 Observable 可以支持多次订阅;每个命令实例对象,只能执行一次。四种命令的执行示例如下:
package org.lixue.hystrixclient;
import rx.Observable;
import rx.Observer;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.concurrent.Future;
public class HystrixClient{
publicstaticvoidmain(String[]args){
SpeakSleepCommand cmd=null;
try{
//调用execute方法
cmd=new SpeakSleepCommand(0);
String result=cmd.execute();
System.out.println("execute请求结果="+result);
//调用queue方法
cmd=new SpeakSleepCommand(0);
Future<String>future=cmd.queue();
result=future.get();
System.out.println("queue 请求结果="+result);
//调用observe方法
cmd=new SpeakSleepCommand(0);
Observable<String> observable=cmd.observe();
observable.subscribe(new Observer<String>(){
public void onCompleted(){
System.out.println("subscribe onCompleted 请求完成");
}
public void onError(Throwable throwable){
System.out.println("subscribe onError 请求结果错误="+throwable);
}
public void onNext(String s){
System.out.println("subscribe onNext 请求结果="+s);
}
});
observable.subscribe(new Action1<String>(){
public void call(Strings){
System.out.println("subscribe onNext call 请求结果="+s);
}
});
//调用toObservable方法
cmd=new SpeakSleepCommand(0);
Observable<String>toObservable=cmd.toObservable();
toObservable.subscribe(newObserver<String>(){
public void onCompleted(){
System.out.println("toObservable subscribe onCompleted 请求完成");
}
public void onError(Throwablet hrowable){
System.out.println("toObservable subscribe onError 请求失败");
}
public void onNext(String s){
System.out.println("toObservable subscribe onNext 请求结果="+s);
}
});
//因为是异步的,因此必须线程等待,否则会导致退出,无法获取toObservable的订阅结果
Thread.sleep(100);
}catch(Exceptionex){
ex.printStackTrace();
}
}
}
回退
根据 Hystrix 的执行流程图可以发现,有三种情况下会触发回退(fallback)操作:
- 断路器被打开
- 线程池、队列、信号量满载
- 实际执行命令失败
Hystrix 的回退机制比较灵活,可以在 A 命令的回退方法中执行 B 命令,如果 B 命令执行失败则会触发 B 命令的回退,这样就会形成一种链式的命令执行。
还有一种情况,在 A 命令中调用 B 命令和C 命令,如果 B 命令或者 C 命令调用失败,并且没有提供回退方法,则会调用 A 命令的回退方法,如果 B 命令或 C 明天提供了回退方法,则不会调用 A 命令的回退。多命令调用回退如下图所示:
断路器开启/关闭
断路器一旦开启,就会直接调用回退方法,不在执行命令,而且也不会更新链路的健康状况,断路器的开启要满足两个条件:
- 整个链路达到一定的阈值,默认情况下,10秒内产生超过20次请求,则符合第一个条件
- 满足第一个条件的情况下,如果请求的错误百分比大于阈值,则会打开断路器,默认为 50%
断路器开启相关配置:
- hystrix.command.default.metrics.rollingStats.timeInMilliseconds:默认值 10000,滚动窗口的时间范围
- hystrix.command.default.circuitBreaker.requestVolumeThreshold:默认值 20,滚动窗口期间的最小请求数
- hystrix.command.default.circuitBreaker.errorThresholdPercentage:默认值 50,滚动窗口期间的调用失败比例,高出该比例开启断路器
断路器打开后,在一段时间内,命令不会再执行,这段时间称为休眠期,休眠期默认是5秒,休眠期结束后,Hystrix 会尝试性的执行一次命令,此时断路器的状态不是开启,也不是关闭,而是一个半开的状态,如果这一次命令执行成功,则会关闭断路器并清空链路的健康信息;如果执行失败,断路器会继续保持打开的状态。
断路器关闭相关配置:
- hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds:默认值 5000,休眠期的持续时间
隔离机制
命令的真正执行,除了断路器要关闭外,还需要看执行命令的线程池或者信号量是否满载,如果满载,命令就不会执行,而是直接触发回退,这样的机制,在控制命令的执行上,实现了错误的隔离,Hystrix 提供了两种隔离策略:
- THREAD:默认值,由线程池来决定命令的执行,如果线程池满载,则不会执行命令。线程池默认大小为 10
- SEMAPHORE:由信号量来决定命令执行,当请求的并发数高于阈值时,就不再执行命令,相对与线程池,信号量的开销更小,但是该策略不支持超时以及异步。
合并请求
默认情况下,会为命令分配线程池来执行命令实例,线程池会消耗一定的性能,对于一些同类型的请求(URL相同,参数不同),Hystrix 提供了合并请求的功能,在一次请求的过程中,可以将一个时间段内的相同请求(参数不同),收集到一个命令中执行,这样节省了线程的开销,减少网络连接,从而提升了执行的性能,实现合并请求的功能,至少包含以下三个条件:
- 需要一个执行请求的命令,将全部参数进行整理,然后调用外部服务
- 需要一个合并处理器,用于收集请求,以及结果处理
- 外部接口提供支持,需要能支持批量处理的功能
合并请求只执行了一个命令,只启动了一个线程,只进行了一次网络请求,但是在收集请求、合并请求、处理结果的过程中仍然会耗费一定的时间,一般情况下,合并请求进行批量处理,比发送多个请求快。
合并请求示例:
在Hystrix 入门的示例中进行修改,增加如下代码:
-
创建执行请求的命令
主要用于处理合并后的具体处理命令,组合多个请求的参数,向批量处理接口提交。
package org.lixue.hystrixclient;
import com.alibaba.fastjson.JSONObject;
import com.netflix.hystrix.*;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class SpeakCommand extends HystrixCommand<Map<String,String>>{
//CollapsedRequest泛型的第一个参数为返回对象类型,第二个参数为请求对象类型
Collection<HystrixCollapser.CollapsedRequest<String,String>> collapsedRequests;
private CloseableHttpClient httpClient;
private Strin gurl;
public SpeakCommand(Collection<HystrixCollapser.CollapsedRequest<String,String>> collapsedRequests){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("BatchSpeak")));
this.collapsedRequests=collapsedRequests;
this.httpClient=HttpClients.createDefault();
this.url="http://localhost:8080/speaks?names=";
}
protected Map<String,String> run() throws Exception{
StringBuilde rstringBuilder=new StringBuilder();
for(HystrixCollapser.CollapsedRequest<String,String> entry:collapsedRequests){
if(stringBuilder.length()>0){
stringBuilder.append(",");
}
stringBuilder.append(entry.getArgument());
}
try{
HttpGet request=new HttpGet(this.url+stringBuilder.toString());
HttpResponse response=httpClient.execute(request);
String responseJson=EntityUtils.toString(response.getEntity());
Map<String,String> result=(Map<String,String>)JSONObject.parse(responseJson);
return result;
}catch(Exceptionex){
ex.printStackTrace();
returnnull;
}
}
}
-
创建合并处理器
合并处理器,用于处理请求的合并和结果的映射,而具体的请求是调用请求命令来完成的。
package org.lixue.hystrixclient;
import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCollapserProperties;
import com.netflix.hystrix.HystrixCommand;
import java.util.Collection;
import java.util.Map;
//HystrixCollapser类型泛型参数,Map<String,String>命令返回类型;String响应返回对象类型;String请求参数对象类型
public class SpeakCollapserCommand extends HystrixCollapser<Map<String,String>,String,String>{
String paramName;
public SpeakCollapserCommand(StringparamName){
super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("SpeakCollapser"))
//设置合并请求的时间,在1秒内的请求进行合并
.andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(1000)));
this.paramName=paramName;
}
/**
* 返回请求参数
*
*/
public String getRequestArgument(){
return this.paramName;
}
/**
* 创建具体请求命令
*
*@param collapsedRequests
*@return
*/
protected HystrixCommand<Map<String,String>> createCommand(Collection<CollapsedRequest<String,String>> collapsedRequests){
return new SpeakCommand(collapsedRequests);
}
/**
* 处理返回结果映射到请求
*
*@param batchResponse
*@paramc ollapsedRequests
*/
protected void mapResponseToRequests(Map<String,String>batchResponse,Collection<CollapsedRequest<String,String>>collapsedRequests){
for(CollapsedRequest<String,String>entry:collapsedRequests){
String result=batchResponse.get(entry.getArgument());
entry.setResponse(result);
}
}
}
-
创建服务代码
在 server-provider(该项目提供 REST 服务) 增加批量处理 REST 服务,代码如下:
package org.lixue.webservices.services;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@RestController
public class HelloWorldController{
@Value("${server.port}")
private int port;
@RequestMapping(method=RequestMethod.GET,name="speak",path="/speaks",
produces=MediaType.APPLICATION_JSON_UTF8_VALUE)
public Map<String,String> speaks(@RequestParam(value="names")String names) throws InterruptedException {
Map<String,String> map=new HashMap<>();
if(names==null || "".equals(names)){
return map;
}
String[]splitName=names.split(",");
for(String name:splitName){
map.put(name,"HelloWorld"+name+"Port="+port);
}
return map;
}
}
-
创建启动类
需要在初始化的时候,开启 HystrixRequestContext 否则无法支持合并请求
package org.lixue.hystrixclient;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
import java.util.concurrent.Future;
public class HystrixClient{
public static void main(String[]args){
//需要开启HystrixRequest上下文,合并请求和缓存必须开启
HystrixRequestContext context= HystrixRequestContext.initializeContext();
try{
Future<String> collapser01=new SpeakCollapserCommand("001").queue();
Future<String> collapser02=new SpeakCollapserCommand("002").queue();
Future<String> collapser03=new SpeakCollapserCommand("003").queue();
Future<String> collapser04=new SpeakCollapserCommand("004").queue();
System.out.println(collapser01.get());
System.out.println(collapser02.get());
System.out.println(collapser03.get());
System.out.println(collapser04.get());
}catch(Exceptionex){
ex.printStackTrace();
}
finally{
context.shutdown();
}
}
}
-
测试验证
首先启动 server-provider 项目,然后启动该项目,由于我们设置的是 1 秒内的请求进行合并,因此可以看到会延迟一秒然后返回结果,可以在服务器端增加日志,看到只接收到了一次请求。
请求缓存
Hystrix 支持缓存功能,如果在一次请求的过程中,多个地方调用同一个接口,可以考虑使用缓存,缓存打开后,下一次命令不会执行,直接重缓存中获取响应并返回,开启缓存比较简单,在命令中重写父类的 getCacheKey 方法即可。
缓存示例:
-
创建命令
重写了 getCacheKey 方法,如果返回 null 表示不启用缓存,我们使用请求参数来做为缓存key
package org.lixue.hystrixclient;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
public class SpeakCacheCommand extends HystrixCommand<String>{
Stringname;
CloseableHttpClienthttpClient;
Stringurl;
public SpeakCacheCommand(String name){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("speakCache")));
this.name=name;
this.httpClient=HttpClients.createDefault();
this.url="http://localhost:8080/speaks?names="+name;
}
@Override
protected String getCacheKey(){
return this.name;
}
protected String run() throws Exception{
try{
HttpGet request=new HttpGet(this.url);
HttpResponse response=httpClient.execute(request);
return EntityUtils.toString(response.getEntity());
}catch(Exceptionex){
ex.printStackTrace();
return null;
}
}
}
-
修改启动类
需要在初始化的时候,开启 HystrixRequestContext 否则无法支持缓存
package org.lixue.hystrixclient;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
public class HystrixClient{
public static void main(String[]args){
//需要开启HystrixRequest上下文,合并请求和缓存必须开启
HystrixRequestContextcontext=HystrixRequestContext.initializeContext();
try{
SpeakCacheCommand speakCacheCommand=new SpeakCacheCommand("lixue");
System.out.println("SpeakCacheCommand execute="+speakCacheCommand.execute()
+"is cache"+speakCacheCommand.isResponseFromCache());
SpeakCacheCommand speakCacheCommand1=new SpeakCacheCommand("lixue");
System.out.println("SpeakCacheCommand execute="+speakCacheCommand1.execute()
+"is cache"+speakCacheCommand1.isResponseFromCache());
context.shutdown();
}catch(Exceptionex){
ex.printStackTrace();
}finally{
context.shutdown();
}
}
}
-
测试验证
首先启动 server-provider 项目,然后启动该项目,第一次请求没有重缓存读取,而第二次请求由于时相同的请求参数,因此从缓存读取,输出如下:
SpeakCacheCommand execute={"lixue":"Hello World lixue Port=8080"} is cache false
SpeakCacheCommand execute={"lixue":"Hello World lixue Port=8080"} is cache true