Hystrix 详细说明

在 Hystrix 入门中,使用 Hystrix 时创建命令并给予执行,实际上 Hystrix 有一套较为复杂的执行逻辑,简单来说明以下运作流程:

  • 在命令开始执行时,会做一些准备工作,例如为命令创建响应的线程池等
  • 判断是否打开了缓存,打开了缓存就直接查找缓存并返回结果。
  • 判断断路器是否打开,如果打开了,就表示服务链路不可用,直接执行回退方法。
  • 判断线程池、信号量(计数器)等条件,例如,线程池超负荷,则执行回退方法
  • 执行命令,计算是否要对断路器进行处理,执行完成后,如果满足一定条件,则需要开启断路器。如果执行成功则返回结果,执行失败则执行回退

Hystrix 官方的执行流程图如下:

Hystrix 详细说明

命令执行

Hystrix 命令执行可以使用以下方法来执行命令:

  • toObservable:返回一个最原始的 Observable 实例,Observable 是 RxJava 的类,使用该对象可以观察命令的执行过程,并且将执行信息传递给订阅者,该方法是异步执行
  • observe:调用 toObservable 方法,获取一个原始的 Observable 实例,使用 ReplaySubject 作为原始 Observable 的订阅者,该方法是异步执行
  • queue:通过 toObservable 方法获取原始的 Observable 实例,在调用 Observable 的 toBlocking 方法得到一个 BlockingObservable 实例,最后调用 BlockingObservable 的 toFuture 方法返回 Future 实例,最后调用 Future 实例的 get 方法得到执行结果,该方法是异步执行
  • execute:该方法调用 queue 的 get 方法返回命令的执行结果,该方法时同步执行

除了 execute 方法外,其他的方法都是异步执行,observe 与 toObservable 方法的区别是,toObservable 调用后,不会立即执行,只有当返回的 Observable 实例被订阅后,才会真正的执行命令,并且只能一次订阅,而 observe 方法返回的 Observable 可以支持多次订阅;每个命令实例对象,只能执行一次。四种命令的执行示例如下:

package org.lixue.hystrixclient;

import rx.Observable;

import rx.Observer;

import rx.functions.Action0;

import rx.functions.Action1;

import java.util.concurrent.Future;

public class HystrixClient{

publicstaticvoidmain(String[]args){

SpeakSleepCommand cmd=null;

try{

//调用execute方法

cmd=new SpeakSleepCommand(0);

String result=cmd.execute();

System.out.println("execute请求结果="+result);

//调用queue方法

cmd=new SpeakSleepCommand(0);

Future<String>future=cmd.queue();

result=future.get();

System.out.println("queue 请求结果="+result);

//调用observe方法

cmd=new SpeakSleepCommand(0);

Observable<String> observable=cmd.observe();

observable.subscribe(new Observer<String>(){

public void onCompleted(){

System.out.println("subscribe onCompleted 请求完成");

}

public void onError(Throwable throwable){

System.out.println("subscribe onError 请求结果错误="+throwable);

}

public void onNext(String s){

System.out.println("subscribe onNext 请求结果="+s);

}

});

observable.subscribe(new Action1<String>(){

public void call(Strings){

System.out.println("subscribe onNext call 请求结果="+s);

}

});

//调用toObservable方法

cmd=new SpeakSleepCommand(0);

Observable<String>toObservable=cmd.toObservable();

toObservable.subscribe(newObserver<String>(){

public void onCompleted(){

System.out.println("toObservable subscribe onCompleted 请求完成");

}

public void onError(Throwablet hrowable){

System.out.println("toObservable subscribe onError 请求失败");

}

public void onNext(String s){

System.out.println("toObservable subscribe onNext 请求结果="+s);

}

});

//因为是异步的,因此必须线程等待,否则会导致退出,无法获取toObservable的订阅结果

Thread.sleep(100);

}catch(Exceptionex){

ex.printStackTrace();

}

}

}

回退

根据 Hystrix 的执行流程图可以发现,有三种情况下会触发回退(fallback)操作:

  • 断路器被打开
  • 线程池、队列、信号量满载
  • 实际执行命令失败

Hystrix 的回退机制比较灵活,可以在 A 命令的回退方法中执行 B 命令,如果 B 命令执行失败则会触发 B 命令的回退,这样就会形成一种链式的命令执行。

Hystrix 详细说明

还有一种情况,在 A 命令中调用 B 命令和C 命令,如果 B 命令或者 C 命令调用失败,并且没有提供回退方法,则会调用 A 命令的回退方法,如果 B 命令或 C 明天提供了回退方法,则不会调用 A 命令的回退。多命令调用回退如下图所示:

Hystrix 详细说明

断路器开启/关闭

断路器一旦开启,就会直接调用回退方法,不在执行命令,而且也不会更新链路的健康状况,断路器的开启要满足两个条件:

  • 整个链路达到一定的阈值,默认情况下,10秒内产生超过20次请求,则符合第一个条件
  • 满足第一个条件的情况下,如果请求的错误百分比大于阈值,则会打开断路器,默认为 50%

断路器开启相关配置:

  • hystrix.command.default.metrics.rollingStats.timeInMilliseconds:默认值 10000,滚动窗口的时间范围
  • hystrix.command.default.circuitBreaker.requestVolumeThreshold:默认值 20,滚动窗口期间的最小请求数
  • hystrix.command.default.circuitBreaker.errorThresholdPercentage:默认值 50,滚动窗口期间的调用失败比例,高出该比例开启断路器

断路器打开后,在一段时间内,命令不会再执行,这段时间称为休眠期,休眠期默认是5秒,休眠期结束后,Hystrix 会尝试性的执行一次命令,此时断路器的状态不是开启,也不是关闭,而是一个半开的状态,如果这一次命令执行成功,则会关闭断路器并清空链路的健康信息;如果执行失败,断路器会继续保持打开的状态。

断路器关闭相关配置:

  • hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds:默认值 5000,休眠期的持续时间

隔离机制

命令的真正执行,除了断路器要关闭外,还需要看执行命令的线程池或者信号量是否满载,如果满载,命令就不会执行,而是直接触发回退,这样的机制,在控制命令的执行上,实现了错误的隔离,Hystrix 提供了两种隔离策略:

  • THREAD:默认值,由线程池来决定命令的执行,如果线程池满载,则不会执行命令。线程池默认大小为 10
  • SEMAPHORE:由信号量来决定命令执行,当请求的并发数高于阈值时,就不再执行命令,相对与线程池,信号量的开销更小,但是该策略不支持超时以及异步。

合并请求

默认情况下,会为命令分配线程池来执行命令实例,线程池会消耗一定的性能,对于一些同类型的请求(URL相同,参数不同),Hystrix 提供了合并请求的功能,在一次请求的过程中,可以将一个时间段内的相同请求(参数不同),收集到一个命令中执行,这样节省了线程的开销,减少网络连接,从而提升了执行的性能,实现合并请求的功能,至少包含以下三个条件:

  • 需要一个执行请求的命令,将全部参数进行整理,然后调用外部服务
  • 需要一个合并处理器,用于收集请求,以及结果处理
  • 外部接口提供支持,需要能支持批量处理的功能

合并请求只执行了一个命令,只启动了一个线程,只进行了一次网络请求,但是在收集请求、合并请求、处理结果的过程中仍然会耗费一定的时间,一般情况下,合并请求进行批量处理,比发送多个请求快。

合并请求示例:

在Hystrix 入门的示例中进行修改,增加如下代码:

  • 创建执行请求的命令

    主要用于处理合并后的具体处理命令,组合多个请求的参数,向批量处理接口提交。

    package org.lixue.hystrixclient;

    import com.alibaba.fastjson.JSONObject;

    import com.netflix.hystrix.*;

    import org.apache.http.HttpResponse;

    import org.apache.http.client.methods.HttpGet;

    import org.apache.http.impl.client.CloseableHttpClient;

    import org.apache.http.impl.client.HttpClients;

    import org.apache.http.util.EntityUtils;

    import java.util.Collection;

    import java.util.LinkedList;

    import java.util.List;

    import java.util.Map;

    public class SpeakCommand extends HystrixCommand<Map<String,String>>{

    //CollapsedRequest泛型的第一个参数为返回对象类型,第二个参数为请求对象类型

    Collection<HystrixCollapser.CollapsedRequest<String,String>> collapsedRequests;

    private CloseableHttpClient httpClient;

    private Strin gurl;

    public SpeakCommand(Collection<HystrixCollapser.CollapsedRequest<String,String>> collapsedRequests){

    super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("BatchSpeak")));

    this.collapsedRequests=collapsedRequests;

    this.httpClient=HttpClients.createDefault();

    this.url="http://localhost:8080/speaks?names=";

    }

    protected Map<String,String> run() throws Exception{

    StringBuilde rstringBuilder=new StringBuilder();

    for(HystrixCollapser.CollapsedRequest<String,String> entry:collapsedRequests){

    if(stringBuilder.length()>0){

    stringBuilder.append(",");

    }

    stringBuilder.append(entry.getArgument());

    }

    try{

    HttpGet request=new HttpGet(this.url+stringBuilder.toString());

    HttpResponse response=httpClient.execute(request);

    String responseJson=EntityUtils.toString(response.getEntity());

    Map<String,String> result=(Map<String,String>)JSONObject.parse(responseJson);

    return result;

    }catch(Exceptionex){

    ex.printStackTrace();

    returnnull;

    }

    }

    }

  • 创建合并处理器

    合并处理器,用于处理请求的合并和结果的映射,而具体的请求是调用请求命令来完成的。

    package org.lixue.hystrixclient;

    import com.netflix.hystrix.HystrixCollapser;

    import com.netflix.hystrix.HystrixCollapserKey;

    import com.netflix.hystrix.HystrixCollapserProperties;

    import com.netflix.hystrix.HystrixCommand;

    import java.util.Collection;

    import java.util.Map;

    //HystrixCollapser类型泛型参数,Map<String,String>命令返回类型;String响应返回对象类型;String请求参数对象类型

    public class SpeakCollapserCommand extends HystrixCollapser<Map<String,String>,String,String>{

    String paramName;

    public SpeakCollapserCommand(StringparamName){

    super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("SpeakCollapser"))

    //设置合并请求的时间,在1秒内的请求进行合并

    .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(1000)));

    this.paramName=paramName;

    }

    /**

    * 返回请求参数

    *

    */

    public String getRequestArgument(){

    return this.paramName;

    }

    /**

    * 创建具体请求命令

    *

    *@param collapsedRequests

    *@return

    */

    protected HystrixCommand<Map<String,String>> createCommand(Collection<CollapsedRequest<String,String>> collapsedRequests){

    return new SpeakCommand(collapsedRequests);

    }

    /**

    * 处理返回结果映射到请求

    *

    *@param batchResponse

    *@paramc ollapsedRequests

    */

    protected void mapResponseToRequests(Map<String,String>batchResponse,Collection<CollapsedRequest<String,String>>collapsedRequests){

    for(CollapsedRequest<String,String>entry:collapsedRequests){

    String result=batchResponse.get(entry.getArgument());

    entry.setResponse(result);

    }

    }

    }

  • 创建服务代码

    在 server-provider(该项目提供 REST 服务) 增加批量处理 REST 服务,代码如下:

    package org.lixue.webservices.services;

    import org.springframework.beans.factory.annotation.Value;

    import org.springframework.http.MediaType;

    import org.springframework.web.bind.annotation.*;

    import java.util.Date;

    import java.util.HashMap;

    import java.util.HashSet;

    import java.util.Map;

    @RestController

    public class HelloWorldController{

    @Value("${server.port}")

    private int port;

    @RequestMapping(method=RequestMethod.GET,name="speak",path="/speaks",

    produces=MediaType.APPLICATION_JSON_UTF8_VALUE)

    public Map<String,String> speaks(@RequestParam(value="names")String names) throws InterruptedException {

    Map<String,String> map=new HashMap<>();

    if(names==null || "".equals(names)){

    return map;

    }

    String[]splitName=names.split(",");

    for(String name:splitName){

    map.put(name,"HelloWorld"+name+"Port="+port);

    }

    return map;

    }

    }

  • 创建启动类

    需要在初始化的时候,开启 HystrixRequestContext 否则无法支持合并请求

    package org.lixue.hystrixclient;

    import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

    import rx.Observable;

    import rx.Observer;

    import rx.functions.Action1;

    import java.util.concurrent.Future;

    public class HystrixClient{

    public static void main(String[]args){

    //需要开启HystrixRequest上下文,合并请求和缓存必须开启

    HystrixRequestContext context= HystrixRequestContext.initializeContext();

    try{

    Future<String> collapser01=new SpeakCollapserCommand("001").queue();

    Future<String> collapser02=new SpeakCollapserCommand("002").queue();

    Future<String> collapser03=new SpeakCollapserCommand("003").queue();

    Future<String> collapser04=new SpeakCollapserCommand("004").queue();

    System.out.println(collapser01.get());

    System.out.println(collapser02.get());

    System.out.println(collapser03.get());

    System.out.println(collapser04.get());

    }catch(Exceptionex){

    ex.printStackTrace();

    }

    finally{

    context.shutdown();

    }

    }

    }

  • 测试验证

    首先启动 server-provider 项目,然后启动该项目,由于我们设置的是 1 秒内的请求进行合并,因此可以看到会延迟一秒然后返回结果,可以在服务器端增加日志,看到只接收到了一次请求。

请求缓存

Hystrix 支持缓存功能,如果在一次请求的过程中,多个地方调用同一个接口,可以考虑使用缓存,缓存打开后,下一次命令不会执行,直接重缓存中获取响应并返回,开启缓存比较简单,在命令中重写父类的 getCacheKey 方法即可。

缓存示例:

  • 创建命令

    重写了 getCacheKey 方法,如果返回 null 表示不启用缓存,我们使用请求参数来做为缓存key

    package org.lixue.hystrixclient;

    import com.netflix.hystrix.HystrixCommand;

    import com.netflix.hystrix.HystrixCommandGroupKey;

    import org.apache.http.HttpResponse;

    import org.apache.http.client.methods.HttpGet;

    import org.apache.http.impl.client.CloseableHttpClient;

    import org.apache.http.impl.client.HttpClients;

    import org.apache.http.util.EntityUtils;

    public class SpeakCacheCommand extends HystrixCommand<String>{

    Stringname;

    CloseableHttpClienthttpClient;

    Stringurl;

    public SpeakCacheCommand(String name){

    super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("speakCache")));

    this.name=name;

    this.httpClient=HttpClients.createDefault();

    this.url="http://localhost:8080/speaks?names="+name;

    }

    @Override

    protected String getCacheKey(){

    return this.name;

    }

    protected String run() throws Exception{

    try{

    HttpGet request=new HttpGet(this.url);

    HttpResponse response=httpClient.execute(request);

    return EntityUtils.toString(response.getEntity());

    }catch(Exceptionex){

    ex.printStackTrace();

    return null;

    }

    }

    }

  • 修改启动类

    需要在初始化的时候,开启 HystrixRequestContext 否则无法支持缓存

    package org.lixue.hystrixclient;

    import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

    public class HystrixClient{

    public static void main(String[]args){

    //需要开启HystrixRequest上下文,合并请求和缓存必须开启

    HystrixRequestContextcontext=HystrixRequestContext.initializeContext();

    try{

    SpeakCacheCommand speakCacheCommand=new SpeakCacheCommand("lixue");

    System.out.println("SpeakCacheCommand execute="+speakCacheCommand.execute()

    +"is cache"+speakCacheCommand.isResponseFromCache());

    SpeakCacheCommand speakCacheCommand1=new SpeakCacheCommand("lixue");

    System.out.println("SpeakCacheCommand execute="+speakCacheCommand1.execute()

    +"is cache"+speakCacheCommand1.isResponseFromCache());

    context.shutdown();

    }catch(Exceptionex){

    ex.printStackTrace();

    }finally{

    context.shutdown();

    }

    }

    }

  • 测试验证

    首先启动 server-provider 项目,然后启动该项目,第一次请求没有重缓存读取,而第二次请求由于时相同的请求参数,因此从缓存读取,输出如下:

    SpeakCacheCommand execute={"lixue":"Hello World lixue Port=8080"} is cache false

    SpeakCacheCommand execute={"lixue":"Hello World lixue Port=8080"} is cache true

上一篇:win10环境下安装Ubantu双系统(超详解)


下一篇:iOS开发——屏幕适配篇&Masonry详解