Hystrix实现服务的隔离
- 这里讲解的主要是如何在RPC服务调用中防止雪崩效应
- 我们在微服务中就是使用Hystrix来实现保护服务,提高容错
- Hystrix是一个服务保护框架,在分布式中可以实现服务的容错(容错指的是服务发生不可用的时候,出错之后的处理方案,就是预备方案),他可以减少与服务的依赖关系(这里并不是业务上的依赖,指的是A调用B,B调用C,C不可用,本应该导致A和B也不可用,但是借由Hystrix框架,可以避免C服务的不可用导致A,B服务不可用的问题)
- 服务的雪崩效应:当一个服务有高并发的请求,服务器受不了这么多的并发请求,会产生服务堆积。A调用B,B调用C,C服务不可用,直接会导致B服务和A服务也不可用。
- 我们在实验的时候,可以高并发的情况下访问一个接口,该接口的大量高并发访问,不能影响该服务的其他接口的访问。
Hystrix的介绍
- 什么是服务的堆积?
就是服务请求的阻塞,大量的请求进行等待,无法访问。A服务调用B服务,B服务调用C服务,C服务不可用的时候,A,B服务也会受到影响,当请求的数量大于服务器允许的最大的请求的时候,会产生请求阻塞,服务不可调用,即服务的堆积。
举例说明,在高并发的情况下,如果tomcat允许最大的线程数是50,那么我们在发送第51个请求的时候,第51个请求会发生阻塞,因为tomcat只允许50个线程。超过的线程数就会进行等待。
- Hystrix是在什么场景下产生的?
微服务的高并发场景下产生的这个框架。
- 假设有两个服务:订单服务和会员服务,客户端调用订单服务,通过订单服务调用会员服务,会员服务返回数据给订单服务,订单服务返回数据给客户端。这里会员服务就是生产者,订单服务就是消费者。
如果会员服务产生了等待,此时订单服务也会产生等待。如果客户端有很多很多的请求访问订单服务,这时候订单服务也会有大量的请求访问会员服务,这时候假设会员服务每次响应需要等待1.5s,这时候订单服务和客户端都会产生雪崩效应,订单服务和客户端都会产生等待。- 如果微服务中没有处理好雪崩效应,那么可能导致一串有依赖关系的服务产生等待
- Hystrix的作用?
(1)服务的保护(不是安全方面的保护,而是指用户的体验性)
- 当服务产生堆积的时候
(2)服务的隔离- 不同的服务接口互相不影响,一般有信号量(不常用)和线程池的方式进行服务的隔离。
(3)服务的降级- 有了服务的熔断,就得有服务的降级。所谓的降级,就是当某个服务熔断了之后,服务器不再调用,此时客户端可以自己准备一个本地的fallback函数回调,返回一个缺省的值。这样做虽然服务水平下降,但是好歹可以用,总比直接挂掉好。可以提高用户的体验
- 目的:可以提高用户的体验,防止服务器的雪崩效应
(4)服务的限流
(5)服务的熔断- 类似于现实世界中的保险丝,当某一个异常的条件被触发的时候,直接熔断掉整个服务,而不是一直等待此服务超时。熔断的触发条件可以根据不同的场景有所不同。
- 设置一个限制,比如说:最多同时访问100个请求,超出的请求会放到缓存的队列中,如果缓存的队列满了,直接拒绝访问,访问不了(缓存队列满了才会做熔断)
- 服务熔断保护了服务。通常高并发的情况下会使用。
注意:一般情况下服务熔断和服务的降级是一起使用的,当请求超出了最大的限制的时候,会给出提醒。简单的说就是如果服务熔断了,进行服务的降级处理
- 为什么会产生服务雪崩?
- tomcat底层使用线程池的技术,tomcat底层默认会创建一个线程池帮我们处理请求,一个线程池管理所有的请求,一个线程用来管理一个请求,假设线程池最大创建50个线程,也就是说最多可以有50个请求,如果超过50个就会进行等待;如果有一个订单微服务,其有两个接口,/orderIndex和/findOrderIndex,此时我们50个线程同时去调用/orderIndex接口,这时候我们再有一个线程(也就是再来一个请求)访问/findOrderIndex的时候,因为tomcat只有一个线程池,且假设其最大为50个线程,因为所有的线程都去处理/orderIndex接口,所以没有线程可以为访问/findOrderIndex的接口服务,所以我们此时会发生服务堆积(也就是服务的阻塞)。
- 服务的雪崩效应产生服务堆积在同一个线程池中,因为在同一个线程池中,所有的请求全部到一个服务中进行访问,这时候会导致其他的服务没有线程去接受请求访问,所以就会产生服务的雪崩效应
- 服务隔离:
- 每个服务接口互相不影响。
- 服务隔离有两种实现的方式:
(1)线程池方式:每个服务接口都有自己独立的线程池,管理运行当前自己的接口。- 就是说每一个接口都有自己独立的线程池,每个接口互相不影响。
- 使用线程池隔离可以完全隔离第三方的应用,请求线程可以快速的放回(本身是一个请求过来一个线程处理,现在是一个请求过来,该请求的线程池进行处理,原本处理请求的线程可以继续接受其他请求)
- 请求线程可以继续接受新的请求,如果出现问题,线程池隔离是独立的不会影响其他的应用。
- 缺点:使用线程池的方式,会使CPU的开销非常大,
(2)计数器方式(也叫作信号量)- 使用原子计数器(或者信号量)来记录当前有多少个线程正在运行。
- 使用的不多。底层使用原子计数器,针对于每个服务都设置自己独立限制的阈值(即当前服务最多可以接收多少个请求)。当我们接口的访问次数大于该阈值的时候,我们可以自己实现拒绝策略,做服务的降级处理。若不超过阈值则进行通行,这时候计数器请求加1,返回数据后计数器减1.
项目整合Hystrix
- 启动应该放在继承Hystrix的类的run方法中。
- 调用的时候调用execute()方法,就会自动调用run方法。
- 目的就是每个接口有自己独立的线程池,用该方法实现服务的隔离,当缓存队列存满了之后,先满足服务的熔断的条件,再做服务的降级处理。
- 代码如下:
(1)代码架构如下
(1)引入pom文件
- high-bingfa的pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xiyou</groupId>
<artifactId>highbingfa</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>membeer</module>
<module>order</module>
</modules>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
</project>
- order的pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>highbingfa</artifactId>
<groupId>com.xiyou</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>order</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-metrics-event-stream</artifactId>
<version>1.5.12</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
<version>1.5.12</version>
</dependency>
</dependencies>
</project>
- member的pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>highbingfa</artifactId>
<groupId>com.xiyou</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>membeer</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
</project>
(2)Java文件
- Member
(1)MemberController
package com.xiyou.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/member")
public class MemberController {
/**
* 调用member服务,做一个简单的测试
* @return
* @throws InterruptedException
*/
@GetMapping("/memberIndex")
public Object memberIndex() throws InterruptedException {
Map<String, Object> hashMap = new HashMap<>();
hashMap.put("code", "200");
hashMap.put("msg", "调用member返回成功");
Thread.sleep(1500);
int i = 1/0;
return hashMap;
}
}
(2)MemberApplication
package com.xiyou;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MemberApplication {
public static void main(String[] args) {
SpringApplication.run(MemberApplication.class, args);
}
}
(3)yml文件
server:
port: 8888
- Order
(1)OrderApplication
package com.xiyou;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
(2)yml文件
server:
port: 8889
# 配置最大的tomcat的线程数(最大20个)
tomcat:
max-threads: 20
(4)Controller文件
OrderController
package com.xiyou.controller;
import com.xiyou.hystrix.OrderHystrixCommand;
import com.xiyou.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 订单服务
*/
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private OrderService orderService;
@GetMapping("/orderIndexHystrix")
public Object orderIndexHystrix() {
return new OrderHystrixCommand(orderService).execute();
}
}
(4)OrderHystrixCommand
package com.xiyou.hystrix;
import com.alibaba.fastjson.JSONObject;
import com.netflix.hystrix.*;
import com.xiyou.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 利用Hystrix实现服务的隔离
* 需要继承Hystrix类,其泛型就是其返回结果函数的放回结果,run方法的返回结果,因为是调用execute调用(默认执行的是run方法)
*/
@Service
public class OrderHystrixCommand extends HystrixCommand<JSONObject> {
@Autowired
private OrderService orderService;
/**
* 有参构造
* 我们这里还需要自定义一些参数,因为Hystrix不会提供无参构造
* @param orderService
*/
public OrderHystrixCommand(OrderService orderService){
super(setter());
this.orderService = orderService;
}
/**
* 主方法,调用其execute方法的时候,会默认执行run方法
* @return
* @throws Exception
*/
@Override
protected JSONObject run() throws Exception {
JSONObject member = orderService.getMember();
System.out.println("当前线程的名字是: " + Thread.currentThread().getName() + " ,订单服务调用会员服务: member: " + member);
return member;
}
/**
* 赋值Hystrix的相关值
* @return
*/
public static Setter setter() {
// 服务分组
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("members");
// 服务标识
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("member");
// 线程池的名字
// 每个线程池的名字不应该相同,因为一个接口就应该提供一个线程池
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("member-pool");
// 配置线程池,每个线程池的大小是10,线程存活的时间是15s,队列等待的阈值是100,超过100 次就拒绝执行,其实就是说我们每个接口最多可以接110条(线程池加上缓存队列的)
HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter().withCoreSize(10)
.withKeepAliveTimeMinutes(15).withQueueSizeRejectionThreshold(100);
// 设置Hystrix的相关属性
HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
// 采用线程池的方式实现服务隔离
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
// 执行是否启用超时设置,这里就是说程序不管执行多久当前线程都不会超时
.withExecutionTimeoutEnabled(false);
return HystrixCommand.Setter.withGroupKey(groupKey).andCommandKey(commandKey).andThreadPoolKey(threadPoolKey)
.andThreadPoolPropertiesDefaults(threadPoolProperties)
.andCommandPropertiesDefaults(commandProperties);
}
@Override
protected JSONObject getFallback() {
// 如果Hystrix发生了熔断,当服务不可用,直接执行FallBack方法
System.out.println("系统错误");
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", "500");
jsonObject.put("msg", "系统错误!");
return jsonObject;
}
}
(5)orderService
package com.xiyou.service;
import com.alibaba.fastjson.JSONObject;
import com.xiyou.utils.HttpClientUtils;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
/**
* 远程调用 获取member端的数据
* @return
*/
public JSONObject getMember() {
JSONObject jsonObject = HttpClientUtils.httpGet("http://localhost:8888/member/memberIndex");
return jsonObject;
}
}
(6)HttpClientUtils
package com.xiyou.utils;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* HttpClient4.3工具类
*
* @author hang.luo
*/
public class HttpClientUtils {
private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); // 日志记录
private static RequestConfig requestConfig = null;
static {
// 设置请求和传输超时时间
requestConfig = RequestConfig.custom().setSocketTimeout(2000).setConnectTimeout(2000).build();
}
/**
* post请求传输json参数
*
* @param url
* url地址
* @param json
* 参数
* @return
*/
public static JSONObject httpPost(String url, JSONObject jsonParam) {
// post请求返回结果
CloseableHttpClient httpClient = HttpClients.createDefault();
JSONObject jsonResult = null;
HttpPost httpPost = new HttpPost(url);
// 设置请求和传输超时时间
httpPost.setConfig(requestConfig);
try {
if (null != jsonParam) {
// 解决中文乱码问题
StringEntity entity = new StringEntity(jsonParam.toString(), "utf-8");
entity.setContentEncoding("UTF-8");
entity.setContentType("application/json");
httpPost.setEntity(entity);
}
CloseableHttpResponse result = httpClient.execute(httpPost);
// 请求发送成功,并得到响应
if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String str = "";
try {
// 读取服务器返回过来的json字符串数据
str = EntityUtils.toString(result.getEntity(), "utf-8");
// 把json字符串转换成json对象
jsonResult = JSONObject.parseObject(str);
} catch (Exception e) {
logger.error("post请求提交失败:" + url, e);
}
}
} catch (IOException e) {
logger.error("post请求提交失败:" + url, e);
} finally {
httpPost.releaseConnection();
}
return jsonResult;
}
/**
* post请求传输String参数 例如:name=Jack&sex=1&type=2
* Content-type:application/x-www-form-urlencoded
*
* @param url
* url地址
* @param strParam
* 参数
* @return
*/
public static JSONObject httpPost(String url, String strParam) {
// post请求返回结果
CloseableHttpClient httpClient = HttpClients.createDefault();
JSONObject jsonResult = null;
HttpPost httpPost = new HttpPost(url);
httpPost.setConfig(requestConfig);
try {
if (null != strParam) {
// 解决中文乱码问题
StringEntity entity = new StringEntity(strParam, "utf-8");
entity.setContentEncoding("UTF-8");
entity.setContentType("application/x-www-form-urlencoded");
httpPost.setEntity(entity);
}
CloseableHttpResponse result = httpClient.execute(httpPost);
// 请求发送成功,并得到响应
if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String str = "";
try {
// 读取服务器返回过来的json字符串数据
str = EntityUtils.toString(result.getEntity(), "utf-8");
// 把json字符串转换成json对象
jsonResult = JSONObject.parseObject(str);
} catch (Exception e) {
logger.error("post请求提交失败:" + url, e);
}
}
} catch (IOException e) {
logger.error("post请求提交失败:" + url, e);
} finally {
httpPost.releaseConnection();
}
return jsonResult;
}
/**
* 发送get请求
*
* @param url
* 路径
* @return
*/
public static JSONObject httpGet(String url) {
// get请求返回结果
JSONObject jsonResult = null;
CloseableHttpClient client = HttpClients.createDefault();
// 发送get请求
HttpGet request = new HttpGet(url);
request.setConfig(requestConfig);
try {
CloseableHttpResponse response = client.execute(request);
// 请求发送成功,并得到响应
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
// 读取服务器返回过来的json字符串数据
HttpEntity entity = response.getEntity();
String strResult = EntityUtils.toString(entity, "utf-8");
// 把json字符串转换成json对象
jsonResult = JSONObject.parseObject(strResult);
} else {
logger.error("get请求提交失败:" + url);
}
} catch (IOException e) {
logger.error("get请求提交失败:" + url, e);
} finally {
request.releaseConnection();
}
return jsonResult;
}
}