目录
前言
FeignClient原理及使用
FeignClient对hystrix的支持
hystrix两大功能相关配置
Hystrix原理介绍
主要流程图
getFallback()降级逻辑
HystrixCommand和HystrixObservableCommand
RxJava简单介绍
总结
hystrix异步方案
调用方启用线程池异步
参考资料
前言
之前是想做一个feignClient调用的异步化组件,然后调研后发现feignClient原生已对hystrix支持的很好,而hystrix已提供完善的异步化方案,并且提供降级/熔断等功能及策略配置。所以进而研究了下hystrix的原理,最后,掉进了rxjava的坑。
FeignClient原理及使用
首先移步官网
原理:Spring Cloud应用在启动时,Feign会扫描标有@FeignClient注解的接口,生成代理,并注册到Spring容器中。生成代理时Feign会为每个接口方法创建一个RequetTemplate对象,该对象封装了HTTP请求需要的全部信息,请求参数名、请求方法等信息都是在这个过程中确定的,Feign的模板化就体现在这里。
Feign 的精华是一种设计思想,它设计了一种全新的HTTP调用方式,屏蔽了具体的调用细节,与Spring MVC 注解的结合更是极大提高了效率(没有重复造*,又设计一套新注解。
请求判断逻辑:SynchronousMethodHandler.executeAndDecode (![200,300) || !404)
FeignClient对hystrix的支持
官网描述很清楚,两种方案
- 配置feign.hystrix.enabled=true
- 新建配置类,使用的时候在注解里加入configuration参数即可
hystrix两大功能相关配置
隔离
hystrix: command: #全局默认配置 default : #线程隔离相关(执行相关) execution: timeout: enabled: false isolation : #配置请求隔离的方式,这里是默认的线程池方式。还有一种信号量的方式semaphore,使用比较少。 strategy: THREAD thread: #方式执行的超时时间,默认为1000毫秒,在实际场景中需要根据情况设置 timeoutInMilliseconds: 1000 #发生超时时是否中断方法的执行,默认值为 true 。不要改。 interruptOnTimeout: true #是否在方法执行被取消时中断方法,默认值为 false 。没有实际意义,默认就好! interruptOnCancel: false |
熔断
hystrix: command: #全局默认配置 default : #熔断器相关配置 circuitBreaker: #说明:是否启动熔断器,默认为 true 。 enabled: false #说明1:启用熔断器功能窗口时间内的最小请求数,假设我们设置的窗口时间为10秒, #说明2:那么如果此时默认值为20的话,那么即便10秒内有19个请求都失败也不会打开熔断器。 #说明3:此配置项需要根据接口的QPS进行计算,值太小会有误打开熔断器的可能,而如果值太大超出了时间窗口内的总请求数,则熔断永远也不会被触发 #说明4:建议设置一般为:QPS*窗口描述*60% requestVolumeThreshold: 20 #说明1:熔断器被打开后,所有的请求都会被快速失败掉,但是何时恢复服务是一个问题。熔断器打开后,Hystrix会在经过一段时间后就放行一条请求 #说明2:如果请求能够执行成功,则说明此时服务可能已经恢复了正常,那么熔断器会关闭;相反执行失败,则认为服务仍然不可用,熔断器保持打开。 #说明3:所以此配置的作用是指定熔断器打开后多长时间内允许一次请求尝试执行,官方默认配置为5秒。 sleepWindowInMilliseconds: 5000 #说明1:该配置是指在通过滑动窗口获取到当前时间段内Hystrix方法执行失败的几率后,根据此配置来判断是否需要打开熔断器 #说明2:这里官方的默认配置为50,即窗口时间内超过50%的请求失败后就会打开熔断器将后续请求快速失败掉 errorThresholdPercentage: 50 |
Hystrix原理介绍
官网:https://github.com/Netflix/Hystrix/wiki
Hystrix 的设计方案是通过命令模式加 RxJava 实现的观察者模式来开发的,想完全熟悉 Hystrix 的运作流程需要熟练掌握 RxJava
主要流程图
分析后结果如下:
getFallback()降级逻辑
1,以下四种情况将触发getFallback调用:
(
1
):run()方法抛出非HystrixBadRequestException异常
(
2
):run()方法调用超时
(
3
):熔断器开启拦截调用
(
4
):线程池/队列/信号量是否跑满
2,没有实现getFallback的Command将直接抛出异常
3,fallback降级逻辑调用成功直接返回
4,降级逻辑调用失败抛出异常
HystrixCommand和HystrixObservableCommand
1、HystrixCommand提供了同步和异步两种执行方式,而HystrixObservableCommand只有异步方式
2、HystrixCommand的run方法是用内部线程池的线程来执行的,而HystrixObservableCommand则是由调用方(例如Tomcat容器)的线程来执行的,因为是异步,所以两种方式都能很好的起到资源隔离的效果。
3、HystrixCommand一次只能发送单条数据返回,而HystrixObservableCommand一次可以发送多条数据返回,从上面的示例可以看出。
HystrixCommand 的 execute 和 queue 方法,以及 HystrixObservableCommand 的 observe()和 toObserve() 方法,最后都会转化成 HystrixObservableCommand 的 toObserve() 方法。
如图:
详细:https://www.jianshu.com/p/b9af028efebb
hystrix任务执行代码:AbstractCommand.executeCommandAndObserve
冷observe和热:https://zhuanlan.zhihu.com/p/28628089
AbstractCommand.executeCommandAndObserve
RxJava简单介绍
RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。
其实, RxJava 的本质可以压缩为异步这一个词。说到根上,它就是一个实现异步操作的库,而别的定语都是基于这之上的。
Rx基于观察者模式,他是一种编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。
总结
hystrix异步方案
- 启用hystrix
- 给接口方法的返回值用HystrixCommand包裹
- 调用时,有以下几种方式:
Future<EpgFilterRespDTO> future = epgInfoClient.epgFilterFuture(EpgFilterReqDTO.of(Lists.newArrayList(2447052500L), "204.101.161.159" , 20000030 , "2.3" , 80000013 , "zh_tw" , "tw" , PlatformEnum.APPLE_TV)).queue(); //异步方式 Observable<EpgFilterRespDTO> observable = epgInfoClient.epgFilterFuture(EpgFilterReqDTO.of(Lists.newArrayList(2447052500L), "204.101.161.159" , 20000030 , "2.3" , 80000013 , "zh_tw" , "tw" , PlatformEnum.APPLE_TV)).observe(); //热观察 System.out.println(observable.toBlocking().single()) //直接获取 Observable<EpgFilterRespDTO> toobservable = epgInfoClient.epgFilterFuture(EpgFilterReqDTO.of(Lists.newArrayList(2447052500L), "204.101.161.159" , 20000030 , "2.3" , 80000013 , "zh_tw" , "tw" , PlatformEnum.APPLE_TV)).toObservable(); //冷观察 observable.subscribe( new Observer<EpgFilterRespDTO>() { @Override public void onCompleted() { System.out.println(); } @Override public void onError(Throwable throwable) { System.out.println(throwable); } @Override public void onNext(EpgFilterRespDTO epgFilterRespDTO) { System.out.println(epgFilterRespDTO); } }); toobservable.subscribe( new Observer<EpgFilterRespDTO>() { @Override public void onCompleted() { System.out.println(); } @Override public void onError(Throwable throwable) { System.out.println(throwable); } @Override public void onNext(EpgFilterRespDTO epgFilterRespDTO) { System.out.println(epgFilterRespDTO); } }); |
调用方启用线程池异步
参考资料