互联网高并发解决方案(1)-基于Hystrix实现服务隔离与降级

Hystrix实现服务的隔离

  • 这里讲解的主要是如何在RPC服务调用中防止雪崩效应
  • 我们在微服务中就是使用Hystrix来实现保护服务,提高容错
  • Hystrix是一个服务保护框架,在分布式中可以实现服务的容错(容错指的是服务发生不可用的时候,出错之后的处理方案,就是预备方案),他可以减少与服务的依赖关系(这里并不是业务上的依赖,指的是A调用B,B调用C,C不可用,本应该导致A和B也不可用,但是借由Hystrix框架,可以避免C服务的不可用导致A,B服务不可用的问题)
  • 服务的雪崩效应:当一个服务有高并发的请求,服务器受不了这么多的并发请求,会产生服务堆积。A调用B,B调用C,C服务不可用,直接会导致B服务和A服务也不可用。
  • 我们在实验的时候,可以高并发的情况下访问一个接口,该接口的大量高并发访问,不能影响该服务的其他接口的访问。

Hystrix的介绍

  • 什么是服务的堆积?

就是服务请求的阻塞,大量的请求进行等待,无法访问。A服务调用B服务,B服务调用C服务,C服务不可用的时候,A,B服务也会受到影响,当请求的数量大于服务器允许的最大的请求的时候,会产生请求阻塞,服务不可调用,即服务的堆积。
举例说明,在高并发的情况下,如果tomcat允许最大的线程数是50,那么我们在发送第51个请求的时候,第51个请求会发生阻塞,因为tomcat只允许50个线程。超过的线程数就会进行等待。

  • Hystrix是在什么场景下产生的?

微服务的高并发场景下产生的这个框架。
互联网高并发解决方案(1)-基于Hystrix实现服务隔离与降级

  • 假设有两个服务:订单服务和会员服务,客户端调用订单服务,通过订单服务调用会员服务,会员服务返回数据给订单服务,订单服务返回数据给客户端。这里会员服务就是生产者,订单服务就是消费者。
    如果会员服务产生了等待,此时订单服务也会产生等待。如果客户端有很多很多的请求访问订单服务,这时候订单服务也会有大量的请求访问会员服务,这时候假设会员服务每次响应需要等待1.5s,这时候订单服务和客户端都会产生雪崩效应,订单服务和客户端都会产生等待。
  • 如果微服务中没有处理好雪崩效应,那么可能导致一串有依赖关系的服务产生等待
  • Hystrix的作用?

(1)服务的保护(不是安全方面的保护,而是指用户的体验性)

  • 当服务产生堆积的时候
    (2)服务的隔离
  • 不同的服务接口互相不影响,一般有信号量(不常用)和线程池的方式进行服务的隔离。
    (3)服务的降级
  • 有了服务的熔断,就得有服务的降级。所谓的降级,就是当某个服务熔断了之后,服务器不再调用,此时客户端可以自己准备一个本地的fallback函数回调,返回一个缺省的值。这样做虽然服务水平下降,但是好歹可以用,总比直接挂掉好。可以提高用户的体验
  • 目的:可以提高用户的体验,防止服务器的雪崩效应
    (4)服务的限流
    (5)服务的熔断
  • 类似于现实世界中的保险丝,当某一个异常的条件被触发的时候,直接熔断掉整个服务,而不是一直等待此服务超时。熔断的触发条件可以根据不同的场景有所不同。
  • 设置一个限制,比如说:最多同时访问100个请求,超出的请求会放到缓存的队列中,如果缓存的队列满了,直接拒绝访问,访问不了(缓存队列满了才会做熔断)
  • 服务熔断保护了服务。通常高并发的情况下会使用。

注意:一般情况下服务熔断和服务的降级是一起使用的,当请求超出了最大的限制的时候,会给出提醒。简单的说就是如果服务熔断了,进行服务的降级处理

  • 为什么会产生服务雪崩?
  • tomcat底层使用线程池的技术,tomcat底层默认会创建一个线程池帮我们处理请求,一个线程池管理所有的请求,一个线程用来管理一个请求,假设线程池最大创建50个线程,也就是说最多可以有50个请求,如果超过50个就会进行等待;如果有一个订单微服务,其有两个接口,/orderIndex和/findOrderIndex,此时我们50个线程同时去调用/orderIndex接口,这时候我们再有一个线程(也就是再来一个请求)访问/findOrderIndex的时候,因为tomcat只有一个线程池,且假设其最大为50个线程,因为所有的线程都去处理/orderIndex接口,所以没有线程可以为访问/findOrderIndex的接口服务,所以我们此时会发生服务堆积(也就是服务的阻塞)。
  • 服务的雪崩效应产生服务堆积在同一个线程池中,因为在同一个线程池中,所有的请求全部到一个服务中进行访问,这时候会导致其他的服务没有线程去接受请求访问,所以就会产生服务的雪崩效应
  • 服务隔离:
  • 每个服务接口互相不影响。
  • 服务隔离有两种实现的方式:
    (1)线程池方式:每个服务接口都有自己独立的线程池,管理运行当前自己的接口。
  • 就是说每一个接口都有自己独立的线程池,每个接口互相不影响。
  • 使用线程池隔离可以完全隔离第三方的应用,请求线程可以快速的放回(本身是一个请求过来一个线程处理,现在是一个请求过来,该请求的线程池进行处理,原本处理请求的线程可以继续接受其他请求)
  • 请求线程可以继续接受新的请求,如果出现问题,线程池隔离是独立的不会影响其他的应用。
    互联网高并发解决方案(1)-基于Hystrix实现服务隔离与降级
  • 缺点:使用线程池的方式,会使CPU的开销非常大,
    (2)计数器方式(也叫作信号量)
  • 使用原子计数器(或者信号量)来记录当前有多少个线程正在运行。
  • 使用的不多。底层使用原子计数器,针对于每个服务都设置自己独立限制的阈值(即当前服务最多可以接收多少个请求)。当我们接口的访问次数大于该阈值的时候,我们可以自己实现拒绝策略,做服务的降级处理。若不超过阈值则进行通行,这时候计数器请求加1,返回数据后计数器减1.

项目整合Hystrix

  • 启动应该放在继承Hystrix的类的run方法中。
  • 调用的时候调用execute()方法,就会自动调用run方法。
  • 目的就是每个接口有自己独立的线程池,用该方法实现服务的隔离,当缓存队列存满了之后,先满足服务的熔断的条件,再做服务的降级处理。
  • 代码如下:
    (1)代码架构如下
    互联网高并发解决方案(1)-基于Hystrix实现服务隔离与降级

 

(1)引入pom文件

  • high-bingfa的pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xiyou</groupId>
    <artifactId>highbingfa</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>membeer</module>
        <module>order</module>
    </modules>
    <packaging>pom</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

</project>
  • order的pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>highbingfa</artifactId>
        <groupId>com.xiyou</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>order</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-metrics-event-stream</artifactId>
            <version>1.5.12</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-javanica</artifactId>
            <version>1.5.12</version>
        </dependency>
    </dependencies>

</project>
  • member的pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>highbingfa</artifactId>
        <groupId>com.xiyou</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>membeer</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

    </dependencies>

</project>

(2)Java文件

  1. Member
    (1)MemberController
package com.xiyou.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping("/member")
public class MemberController {

    /**
     * 调用member服务,做一个简单的测试
     * @return
     * @throws InterruptedException
     */
    @GetMapping("/memberIndex")
    public Object memberIndex() throws InterruptedException {
        Map<String, Object> hashMap = new HashMap<>();
        hashMap.put("code", "200");
        hashMap.put("msg", "调用member返回成功");
        Thread.sleep(1500);
        int i = 1/0;
        return hashMap;
    }

}

(2)MemberApplication

package com.xiyou;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MemberApplication {
    public static void main(String[] args) {
        SpringApplication.run(MemberApplication.class, args);
    }
}

(3)yml文件

server:
  port: 8888
  1. Order
    (1)OrderApplication
package com.xiyou;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class OrderApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }
}

(2)yml文件

server:
  port: 8889
  # 配置最大的tomcat的线程数(最大20个)
  tomcat:
    max-threads: 20

(4)Controller文件

OrderController

package com.xiyou.controller;

import com.xiyou.hystrix.OrderHystrixCommand;
import com.xiyou.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 订单服务
 */
@RestController
@RequestMapping("/order")
public class OrderController {

    @Autowired
    private OrderService orderService;

    @GetMapping("/orderIndexHystrix")
    public Object orderIndexHystrix() {
        return new OrderHystrixCommand(orderService).execute();
    }

}

(4)OrderHystrixCommand

package com.xiyou.hystrix;

import com.alibaba.fastjson.JSONObject;
import com.netflix.hystrix.*;
import com.xiyou.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 利用Hystrix实现服务的隔离
 * 需要继承Hystrix类,其泛型就是其返回结果函数的放回结果,run方法的返回结果,因为是调用execute调用(默认执行的是run方法)
 */
@Service
public class OrderHystrixCommand extends HystrixCommand<JSONObject> {

    @Autowired
    private OrderService orderService;

    /**
     * 有参构造
     * 我们这里还需要自定义一些参数,因为Hystrix不会提供无参构造
     * @param orderService
     */
    public OrderHystrixCommand(OrderService orderService){
        super(setter());
        this.orderService = orderService;
    }

    /**
     * 主方法,调用其execute方法的时候,会默认执行run方法
     * @return
     * @throws Exception
     */
    @Override
    protected JSONObject run() throws Exception {
        JSONObject member = orderService.getMember();
        System.out.println("当前线程的名字是: " + Thread.currentThread().getName() + " ,订单服务调用会员服务: member: " + member);
        return member;
    }

    /**
     * 赋值Hystrix的相关值
     * @return
     */
    public static Setter setter() {
        // 服务分组
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("members");
        // 服务标识
        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("member");
        // 线程池的名字
        // 每个线程池的名字不应该相同,因为一个接口就应该提供一个线程池
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("member-pool");
        // 配置线程池,每个线程池的大小是10,线程存活的时间是15s,队列等待的阈值是100,超过100 次就拒绝执行,其实就是说我们每个接口最多可以接110条(线程池加上缓存队列的)
        HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter().withCoreSize(10)
            .withKeepAliveTimeMinutes(15).withQueueSizeRejectionThreshold(100);
        // 设置Hystrix的相关属性
        HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
                // 采用线程池的方式实现服务隔离
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                // 执行是否启用超时设置,这里就是说程序不管执行多久当前线程都不会超时
                .withExecutionTimeoutEnabled(false);
        return HystrixCommand.Setter.withGroupKey(groupKey).andCommandKey(commandKey).andThreadPoolKey(threadPoolKey)
                .andThreadPoolPropertiesDefaults(threadPoolProperties)
                .andCommandPropertiesDefaults(commandProperties);
    }

    @Override
    protected JSONObject getFallback() {
        // 如果Hystrix发生了熔断,当服务不可用,直接执行FallBack方法
        System.out.println("系统错误");
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("code", "500");
        jsonObject.put("msg", "系统错误!");
        return jsonObject;
    }
}

(5)orderService

package com.xiyou.service;

import com.alibaba.fastjson.JSONObject;
import com.xiyou.utils.HttpClientUtils;
import org.springframework.stereotype.Service;

@Service
public class OrderService {

    /**
     * 远程调用 获取member端的数据
     * @return
     */
    public JSONObject getMember() {
        JSONObject jsonObject = HttpClientUtils.httpGet("http://localhost:8888/member/memberIndex");
        return jsonObject;
    }

}

(6)HttpClientUtils

package com.xiyou.utils;

import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * HttpClient4.3工具类
 * 
 * @author hang.luo
 */
public class HttpClientUtils {
	private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); // 日志记录

	private static RequestConfig requestConfig = null;

	static {
		// 设置请求和传输超时时间
		requestConfig = RequestConfig.custom().setSocketTimeout(2000).setConnectTimeout(2000).build();
	}

	/**
	 * post请求传输json参数
	 * 
	 * @param url
	 *            url地址
	 * @param json
	 *            参数
	 * @return
	 */
	public static JSONObject httpPost(String url, JSONObject jsonParam) {
		// post请求返回结果
		CloseableHttpClient httpClient = HttpClients.createDefault();
		JSONObject jsonResult = null;
		HttpPost httpPost = new HttpPost(url);
		// 设置请求和传输超时时间
		httpPost.setConfig(requestConfig);
		try {
			if (null != jsonParam) {
				// 解决中文乱码问题
				StringEntity entity = new StringEntity(jsonParam.toString(), "utf-8");
				entity.setContentEncoding("UTF-8");
				entity.setContentType("application/json");
				httpPost.setEntity(entity);
			}
			CloseableHttpResponse result = httpClient.execute(httpPost);
			// 请求发送成功,并得到响应
			if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
				String str = "";
				try {
					// 读取服务器返回过来的json字符串数据
					str = EntityUtils.toString(result.getEntity(), "utf-8");
					// 把json字符串转换成json对象
					jsonResult = JSONObject.parseObject(str);
				} catch (Exception e) {
					logger.error("post请求提交失败:" + url, e);
				}
			}
		} catch (IOException e) {
			logger.error("post请求提交失败:" + url, e);
		} finally {
			httpPost.releaseConnection();
		}
		return jsonResult;
	}

	/**
	 * post请求传输String参数 例如:name=Jack&sex=1&type=2
	 * Content-type:application/x-www-form-urlencoded
	 * 
	 * @param url
	 *            url地址
	 * @param strParam
	 *            参数
	 * @return
	 */
	public static JSONObject httpPost(String url, String strParam) {
		// post请求返回结果
		CloseableHttpClient httpClient = HttpClients.createDefault();
		JSONObject jsonResult = null;
		HttpPost httpPost = new HttpPost(url);
		httpPost.setConfig(requestConfig);
		try {
			if (null != strParam) {
				// 解决中文乱码问题
				StringEntity entity = new StringEntity(strParam, "utf-8");
				entity.setContentEncoding("UTF-8");
				entity.setContentType("application/x-www-form-urlencoded");
				httpPost.setEntity(entity);
			}
			CloseableHttpResponse result = httpClient.execute(httpPost);
			// 请求发送成功,并得到响应
			if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
				String str = "";
				try {
					// 读取服务器返回过来的json字符串数据
					str = EntityUtils.toString(result.getEntity(), "utf-8");
					// 把json字符串转换成json对象
					jsonResult = JSONObject.parseObject(str);
				} catch (Exception e) {
					logger.error("post请求提交失败:" + url, e);
				}
			}
		} catch (IOException e) {
			logger.error("post请求提交失败:" + url, e);
		} finally {
			httpPost.releaseConnection();
		}
		return jsonResult;
	}

	/**
	 * 发送get请求
	 * 
	 * @param url
	 *            路径
	 * @return
	 */
	public static JSONObject httpGet(String url) {
		// get请求返回结果
		JSONObject jsonResult = null;
		CloseableHttpClient client = HttpClients.createDefault();
		// 发送get请求
		HttpGet request = new HttpGet(url);
		request.setConfig(requestConfig);
		try {
			CloseableHttpResponse response = client.execute(request);

			// 请求发送成功,并得到响应
			if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
				// 读取服务器返回过来的json字符串数据
				HttpEntity entity = response.getEntity();
				String strResult = EntityUtils.toString(entity, "utf-8");
				// 把json字符串转换成json对象
				jsonResult = JSONObject.parseObject(strResult);
			} else {
				logger.error("get请求提交失败:" + url);
			}
		} catch (IOException e) {
			logger.error("get请求提交失败:" + url, e);
		} finally {
			request.releaseConnection();
		}
		return jsonResult;
	}

}
上一篇:Spring Cloud教程 第四弹 Hystrix熔断器


下一篇:Spring Cloud构建微服务架构:服务容错保护(Hystrix依赖隔离)【Dalston版】