1. 背景
在微服务分布式环境下,服务被我们拆分成了许多服务单元,服务之间通过注册和订阅机制相互依赖。系统间的依赖十分的庞大和复杂,一个请求可能会经过多个依赖服务,最后完成调用。
分布式应用中存在错综复杂的相互依赖。
1.1 微服务面临的问题
当系统中某个服务出现延迟或者不可用时,那么整个用户请求都被阻塞,最终导致该用户功能不可用。依赖的服务越多,那么不可用的风险就越大。
高请求量情况下,由于网络原因或者是服务自身的不可用,导致出现故障或者延时。这些问题会导致服务调用方对外提供的服务出现延迟,此时调用方的外来请求不断增加,任务不断的积压,资源不断被占用,最后导致服务调用方自身服务瘫痪。
高并发情况下如果其中一个服务不可用,那么整个系统都可能会面临崩溃的风险。对比传统高可用,传统高可用相互独立,互不影响。
举个例子,在我们的模拟交易系统中,我们会将我们的系统拆分成学生、交易、股票、教学等一系列模块。学生端发出交易请求到交易中心,交易中心又会发请求到股票服务查询一些股票的基本信息。这个时候如果股票服务由于自身或者网络原因出现延迟,那么交易中心的请求就会阻塞等待股票服务的返回。漫长的等待之后,股票服务调用失败,返回信息给交易中心,交易中心又将失败结果返回给学生端。在高并发的情况下,这些阻塞的线程就会导致交易中心资源被大量占用,最终导致交易中心不可用。
1.2 断路器
在我们的家里,都会安装断路器或者是保险丝保护电路过载。当电流过大时自动跳闸或者熔断,避免设备损坏甚至火灾等严重后果。
那么在我们微服务体系中有没有这么一个“断路器”,当服务不可用时及时切断调用链路,快速响应失败,来保护我们服务的安全呢?
2.1 Hystrix简介
对于以上的问题,Spring Clould Hystrix实现了断路器、依赖隔离、监控等一系列功能。Hystrix是由Netflix开源的一个延迟和容错库,用于隔离访问远程系统、服务或者第三方库,防止级联失败,从而提升系统的可用性与容错性。Hystrix/test/advisor主要通过以下几点实现可用性与容错性。
- 包裹请求:使用HystrixCommand(或HystrixObservableCommand)包裹对依赖的调用逻辑,每个命令在独立线程中执行。这里使用了设计模式中的“命令模式”。
- 跳闸机制:当某服务的错误率超过一定阈值时,Hystrix可以自动或者手动跳闸,停止请求该服务一段时间。
- 资源隔离:Hystrix为每个依赖都维护了一个小型的线程池(或者信号量)。如果该线程池已满,发往该依赖的请求就被立即拒绝,而不是排队等候,从而加速失败判定。
- 监控:Hystrix可以近乎实时地监控运行指标和配置的变化,例如成功、失败、超时和被拒绝的请求等。
- 回退机制:当请求失败、超时、被拒绝,或当断路器打开时,执行回退逻辑。回退逻辑可由开发人员自行提供,例如返回一个缺省值。
2.2 如何使用
2.2.1 单独使用
- 引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
- 编码
@RestController
@RequestMapping("/stock")
public class StockController {
@Autowired
private StockService stockService;
@GetMapping()
public Stock getStock() {
return StockService.getStock();
}
}
@Service
public class StockService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand(fallbackMethod = "getStockFallback")
public Stock getStock() {
// 如果调用多次调用失败,直接触发熔断,调用getStockFallback()
return restTemplate.getForObeject("http://STOCK-SERVICE/stock", Stock.class);
}
@GetMapping()
public Stock getStockFallback() {
// 降级逻辑不依赖网络等其他有风险的渠道
return new Stock();
}
}
- 详细配置参考
spring-cloud-netflix-hystrix-x.x.x.x.jar/MATA-INF/spring-configuration-metadata.json
2.2.2 结合Feign使用
- 引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
- 编码
@RestController
@RequestMapping("/stock")
public class StockController {
@Autowired
private RestTemplate restTemplate;
@GetMapping()
public Stock getStock() {
return StockService.getStock();
}
}
@FeignClient(value = "STOCK-SERVICE", fallbackFactory = StockServiceFallback.class)
@RequestMapping("/stock")
public class StockService {
@GetMapping()
public Stock getStock();
}
@Component
public class StockServiceFallback implements StockService {
public Stock getStock() {
return new Stock();
}
}
- 详细配置参考
spring-cloud-openfeign-core-x.x.x.x.jar/MATA-INF/spring-configuration-metadata.json
3 原理分析
3.1 Hystrix在整个体系中的位置
3.2 集成Feign源码分析
在Feign源码分析中我们知道了,@EnableFeignClients
注解中导入了FeignClientsRegistrar.class
,registerBeanDefinitions()
为入口函数,我们从registerBeanDefinitions()
函数开始分析。
class FeignClientsRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
// 扫描到所有@FeignClient注解
public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
// 省略...
for (String basePackage : basePackages) {
Set<BeanDefinition> candidateComponents = scanner.findCandidateComponents(basePackage);
for (BeanDefinition candidateComponent : candidateComponents) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
// 省略...
Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(FeignClient.class.getCanonicalName());
registerFeignClient(registry, annotationMetadata, attributes);
}
}
}
}
// 解析@FeignClient,生产FeignClient工厂
private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
// 省略...
// 定义FeignClientFactoryBean
BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(FeignClientFactoryBean.class);
// 解析@FeignClient,读取将其中的属性配置到FeignClientFactoryBean
definition.addPropertyValue("url", getUrl(attributes));
definition.addPropertyValue("path", getPath(attributes));
// 省略...
// 注册
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}
}
FeignClientsRegistrar
中我们注册了FeignClientFactoryBean
,在分析这个类之前我们先看一下配置类。
@Configuration
public class FeignClientsConfiguration {
@Configuration
@ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
protected static class HystrixFeignConfiguration {
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "feign.hystrix.enabled")
public Feign.Builder feignHystrixBuilder() {
return HystrixFeign.builder();
}
}
}
@Configuration
@ConditionalOnClass(Feign.class)
@EnableConfigurationProperties({FeignClientProperties.class, FeignHttpClientProperties.class})
public class FeignAutoConfiguration {
@Configuration
@ConditionalOnClass(name = "feign.hystrix.HystrixFeign")
protected static class HystrixFeignTargeterConfiguration {
@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter() {
return new HystrixTargeter();
}
}
@Configuration
@ConditionalOnMissingClass("feign.hystrix.HystrixFeign")
protected static class DefaultFeignTargeterConfiguration {
@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter() {
return new DefaultTargeter();
}
}
}
看完配置之后我们分析一下FeignClientFactoryBean
这个类。
FeignClientFactoryBean
实现了FactoryBean
接口。实现了FactoryBean
接口代表他是一个工厂类,最终会通过getObject()
方法返回真正的类。关于FactoryBean
的知识大家可以自行去扩展学习。
class FeignClientFactoryBean implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {
@Override
public Object getObject() throws Exception {
return getTarget();
}
<T> T getTarget() {
FeignContext context = applicationContext.getBean(FeignContext.class);
// 此处的Feign.Builde类就是配置类中调用HystrixFeign.builder()函数生成
Feign.Builder builder = feign(context);
// 省略...
// 配置了hystrix则返回HystrixTargeter
Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
this.type, this.name, url));
}
}
确认了Feign.Builder
h和Targeter
之后我们看一下HystrixTargeter
这个类。
class HystrixTargeter implements Targeter {
@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
Target.HardCodedTarget<T> target) {
// 两种处理方式对应@FeignClient注解
// 降级方法
Class<?> fallback = factory.getFallback();
if (fallback != void.class) {
return targetWithFallback(factory.getName(), context, target, builder, fallback);
}
// 降级方法工厂
Class<?> fallbackFactory = factory.getFallbackFactory();
if (fallbackFactory != void.class) {
return targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory);
}
return feign.target(target);
}
private <T> T targetWithFallback(String feignClientName, FeignContext context,
Target.HardCodedTarget<T> target,
HystrixFeign.Builder builder, Class<?> fallback) {
T fallbackInstance = getFromContext("fallback", feignClientName, context, fallback, target.type());
return builder.target(target, fallbackInstance);
}
}
最终builder.target(target, fallbackInstance)
会生成一个HystrixInvocationHandler
动态代理对象。HystrixInvocationHandler
对象实现了InvocationHandler
接口,该接口的invoke
方法就是后续我们触发hystrix逻辑的入口。
3.3 Hystrix原理分析
-
创建
HystrixCommand
或者HystrixObservableCommand
对象,将需要的参数和其他信息包装成一个命令(命令模式)。-
HystrixCommand
:依赖服务返回单个对象。
-
-
HystrixObservableCommand
:依赖服务返回多个对象。hystrix有一个组合请求的功能,它可以把某一个时间段内的请求组合成一次请求发送给服务提供者。减少请求次数,但是增加了部分请求的等待时间。// 此处沿着feign的流程走下来,我们只看HystrixCommand的执行 final class HystrixInvocationHandler implements InvocationHandler { @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { // 省略... HystrixCommand<Object> hystrixCommand = new HystrixCommand<Object>(setterMethodMap.get(method)) { @Override protected Object run() throws Exception { try { // 第6步,最终我们的方法会在这里执行 return HystrixInvocationHandler.this.dispatch.get(method).invoke(args); } catch (Exception e) { throw e; } catch (Throwable t) { throw (Error) t; } } // 省略... }; // 第9步,根据我们的使用方式会有下列的一些返回情况 if (Util.isDefault(method)) { return hystrixCommand.execute(); } else if (isReturnsHystrixCommand(method)) { return hystrixCommand; } else if (isReturnsObservable(method)) { return hystrixCommand.toObservable(); } else if (isReturnsSingle(method)) { return hystrixCommand.toObservable().toSingle(); } else if (isReturnsCompletable(method)) { return hystrixCommand.toObservable().toCompletable(); } return hystrixCommand.execute(); } }
-
执行命令。四种执行方式。
-
execute()
:同步执行,返回单个结果对象。 -
queue()
:返回一个Future
对象 -
observe()
:返回一个可订阅的Observable
对象。hot observable,不管是否有订阅者都会发布事件。 -
toObservable()
:返回一个可订阅的Observable
对象。cold observable,等待直到有订阅者才会发布事件。详见RxJava。
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { public Observable<R> observe() { // us a ReplaySubject to buffer the eagerly subscribed-to Observable ReplaySubject<R> subject = ReplaySubject.create(); // eagerly kick off subscription final Subscription sourceSubscription = toObservable().subscribe(subject); // return the subject that can be subscribed to later while the execution has already started return subject.doOnUnsubscribe(new Action0() { @Override public void call() { sourceSubscription.unsubscribe(); } }); } public Observable<R> toObservable() { // 省略... } } public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> { public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } } public Future<R> queue() { // 省略... } }
-
-
缓存功能是否启用。如果启用并且命中缓存,那么直接返回缓存的
Observable
对象。abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { public Observable<R> toObservable() { // 省略... return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // 省略... final boolean requestCacheEnabled =isRequestCachingEnabled(); final String cacheKey = getCacheKey(); // 先从缓存拿 if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } // 执行 Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks); Observable<R> afterCache; // 放入缓存 if (requestCacheEnabled && cacheKey != null) { // wrap it for caching HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); // 省略... } else { afterCache = hystrixObservable; } return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook); } }); } }
-
检查断路器是否打开。如果打开则执行fallback逻辑,否则进入下一步。
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); // 判断是否短路 if (circuitBreaker.allowRequest()) { // 省略... if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { // 拒绝请求 return handleShortCircuitViaFallback(); } } }
-
线程池、队列、信号量是否已满。如果已满则执行fallback逻辑,否则执行命令。注意hystrix会为每个依赖服务单独创建线程池,线程池之间相互独立,互不干扰。
public class HystrixContextScheduler extends Scheduler { private class HystrixContextSchedulerWorker extends Worker { @Override public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { if (threadPool != null) { // 判断线程池是否已满 if (!threadPool.isQueueSpaceAvailable()) { // 拒绝提交 throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold."); } } return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit); } } }
-
请求服务。该处执行的方式取决于我们编写的方法,
HystrixCommand.run()
方法对应单个结果,HystrixObservableCommand.construct()
对应多个结果。此处请求失败或者超时都会执行fallback逻辑,请求成功则返回结果。public class HystrixContextScheduler extends Scheduler { private static class ThreadPoolWorker extends Worker { @Override public Subscription schedule(final Action0 action) { // 省略... ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(); // 提交command FutureTask<?> f = (FutureTask<?>) executor.submit(sa); sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor)); return sa; } } }
-
Hystrix向断路器报告成功、失败、拒绝和超时等信息。断路器维护一组计数器来统计这些数据。断路器会根据这些数据来控制开闭。
public class HystrixCommandMetrics extends HystrixMetrics { public static class HealthCounts { private final long totalCount; private final long errorCount; private final int errorPercentage; // 省略... // 统计数据信息 public HealthCounts plus(long[] eventTypeCounts) { long updatedTotalCount = totalCount; long updatedErrorCount = errorCount; long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()]; long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()]; long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()]; long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()]; long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()]; updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount); updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount); return new HealthCounts(updatedTotalCount, updatedErrorCount); } } }
-
执行失败时进入fallback逻辑。fallback逻辑为我们手动定义的逻辑,确保最终能够稳定的返回数据(不依赖网络等其他有风险的渠道)。如果降级逻辑抛出异常,那么呢对应第2布的四种执行方式会有不同的处理逻辑,一般为抛出异常或者通知调用者终止(订阅onError())。
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { // 省略... // 失败回退 final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); // 各种失败回退情况 if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; // 省略... return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); } }
-
根据第2步中定义的命令返回不同的对象。
3.4 断路器原理分析
下面是图中对应的断路器接口,我们会按流程图来依次分析。
/**
* 断路器的逻辑最原始调用对象为HystrixCommand类,如果失败数量超过阈值就会阻挡请求。每经过一个时间窗口后会有一次重试,直到重试成功后关闭断路器。
*/
public interface HystrixCircuitBreaker {
/**
* 通过配置文件中的配置,和`isOpen()`判断是否允许请求。
* 调用isOpen()判断是否打开,如果没有返回true。如果打开,判断是否已经过了休眠时间,如果是返回true,否则返回false。
*/
public boolean allowRequest();
/**
* 根据统计数据判断断路器是否打开。
* 拿到最新一个时间窗口的统计数据判断断路器是否打开。
*/
public boolean isOpen();
/**
* 增加成功数据,默认时间窗口为10。如果断路器打开,则关闭断路器。
* 判断断路器是否打开,如果打开则关闭断路器,然后重置计数器。如果关闭则增加一次成功调用。
*/
/* package */void markSuccess();
}
原理章节开头我们讲到toObservable()
方法会返回一个Observable
对象。Observable
对象包含了命令执行过程中的回调函数,我们来看下这个对象。
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
public Observable<R> toObservable() {
// 所有工作完成后这个方法会被回调
final Action0 terminateCommandCleanup = new Action0() {
@Override
public void call() {
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
// 注意这个方法会统计调用数据,我们后面再分析
handleCommandEnd(true); //user code did run
}
}
};
// 执行入口
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
}
}
原理分析章节第4步,我们讲到执行命令会判断断路器是否打开,其实是我们执行入口中调用的applyHystrixSemantics
函数。
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// 判断是否短路
if (circuitBreaker.allowRequest()) {
// 省略...
// 执行
return executeCommandAndObserve(_cmd).doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);
} else {
// 拒绝请求
return handleShortCircuitViaFallback();
}
}
}
HystrixCircuitBreaker
具体的实现在HystrixCircuitBreakerImpl
中。我们先看allowRequest()
和isOpen()
函数。
static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
// 断路器是否打开
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
// 短路器打开一段时间后允许单个请求做一次测试,该字段记录上次断路器打开或者上次测试的时间
private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
@Override
public boolean allowRequest() {
// 配置文件中断路器是否强制打开
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
// 配置文件中断路器是否强制关闭
if (properties.circuitBreakerForceClosed().get()) {
isOpen();
return true;
}
// 判断断路器是否打开,打开情况下根据时间判断是否可做测试
return !isOpen() || allowSingleTest();
}
@Override
public boolean isOpen() {
// 打开直接返回
if (circuitOpen.get()) {
return true;
}
// 关闭的情况下获取统计数据
HealthCounts health = metrics.getHealthCounts();
// 一个时间窗口(默认10秒)内访问的次数是否小于阈值(默认20次)。请求次数少的短路器就不会打开。
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
return false;
}
// 判断失败率,默认为50%
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// 失败率过高时打开断路器
if (circuitOpen.compareAndSet(false, true)) {
// 记录打开时间
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
// 这个地方考虑多线程的因素
return true;
}
}
}
public boolean allowSingleTest() {
// 上次断路器打开的时间或者是上次测试的时间
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 1) 断路器打开
// 2) 上次断路器打开的时间或者是上次测试的时间大于阈值(默认5秒)
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// 更新时间,允许做一个测试
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
return true;
}
}
return false;
}
}
接下来再看markSuccess()
。
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
// 成功执行会回调此方法
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
// 标记执行成功
circuitBreaker.markSuccess();
}
}
};
}
}
static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
public void markSuccess() {
if (circuitOpen.get()) {
// 关闭断路器
if (circuitOpen.compareAndSet(true, false)) {
metrics.resetStream();
}
}
}
}
到这为止我们已经分析了短路器的所有接口,那么断路器是如何做统计的?
我们来看一下第一步中的handleCommandEnd(true)
方法,通过handleCommandEnd(true) -> markCommandDone() -> executionDone()
我们最终到executionDone()
方法。
public class HystrixThreadEventStream {
public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
// 此处会发送一个通知
writeOnlyCommandCompletionSubject.onNext(event);
}
}
public class HealthCountsStream extends BucketedRollingCounterStream<HystrixCommandCompletion, long[], HystrixCommandMetrics.HealthCounts> {
// 该处会接收到通知
private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() {
@Override
public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) {
// 统计执行情况
return healthCounts.plus(bucketEventCounts);
}
};
}
我们找到了统计入口,接下来我们看一下HealthCounts
类。
public static class HealthCounts {
private final long totalCount;
private final long errorCount;
private final int errorPercentage;
public HealthCounts plus(long[] eventTypeCounts) {
long updatedTotalCount = totalCount;
long updatedErrorCount = errorCount;
// eventTypeCounts数组,不同的位置代表不同的统计数值。[EMIT(false), SUCCESS(true), FAILURE(false), TIMEOUT(false)]
long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
// 计算总请求数量
updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
// 计算总错误数量
updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
return new HealthCounts(updatedTotalCount, updatedErrorCount);
}
}
桶
上面一秒代表一格,每个格子中是这一秒里的统计数据。总共10个格子,代表10秒。我们会以这10秒里的数据作为断路器是否打开的依据。
滑动窗口
每过一秒,最后一个桶被丢弃(黑色方块),白色方块为当前这一秒新建的一个桶。
3.3依赖隔离
hystrix会为每一个依赖的服务创建一个独立的线程池,线程池之间互不影响。如果其中一个服务出现故障,那么只会对这一个服务产生影响,不会影响其他依赖的服务。
-
THREAD (线程隔离) : 使用该方式,HystrixCommand将在单独的线程上执行,并发请求受到线程池中的线程数量的限制。
-
SEMAPHORE (信号量隔离) :使用该方式, HystrixCommand 将在调用线程上执行,
开销相对较小,并发请求受到信号量个数的限制。
Hystrix 中默认并且推荐使用线程隔离(THREAD),因为这种方式有一个除网络超时以外的额外保护层。
指标 | 使用线程池(ms) | 未使用线程池(ms) | 差距(ms) |
---|---|---|---|
中位数 | 2 | 2 | 2 |
90% | 5 | 8 | 3 |
99% | 28 | 37 | 9 |
使用线程池会有性能损失,大部分的情况下9ms的延迟可以忽略。
一般来说, 只有当调用负载非常高时(例如每个实例每秒调用数百次)或者延迟要求非常低才需要使用信号
量隔离, 因为在这种场景下使用THREAD 开销会比较高。信号量隔离一般仅适用于非
网络调用的隔离。
4 监控
-
Hystrix暴露/actuator/hystrix.stream 端点开放监控数据。
-
集成hystrix-dashboard 模块,将数据图形化显示。
-
集成turbine模块聚合多个微服务的监控数据。
-
可通过mq中间件收集监控数据。
数据解释
声明
本博客所有内容仅供学习,不为商用,如有侵权,请联系博主谢谢。
参考文献
[1] Hystrix简介 https://github.com/Netflix/Hystrix/wiki
[2] Hystrix原理 https://github.com/Netflix/Hystrix/wiki/How-it-Works
[3] Hystrix监控 https://github.com/Netflix-Skunkworks/hystrix-dashboard
[4] Hystrix与Sentinel的对比 https://github.com/alibaba/Sentinel/wiki/Sentinel-%E4%B8%8E-Hystrix-%E7%9A%84%E5%AF%B9%E6%AF%94
[5] 《Spring Cloud与Docker微服务架构实战(第2版)》 周立 著