springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

文章目录

1、sentinel网关限流组件

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
Sentinel 网关流控

  • 支持针对不同的路由和自定义的 API 分组进行流控,
  • 支持针对请求属性(如 URL 参数,Client IP,Header 等)进行流控。

Sentinel 1.6.3 引入了网关Gateway 的流控控制台的支持,

  • route维度 :用户可以直接在 Sentinel 控制台上查看 API Gateway 实时的 route

    也就是对单个路由进行限流springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

  • 自定义api维度:用户自定义 API 分组监控,管理网关规则和 API 分组配置。

    对路由进行分组限流

1.1、route维度

第一步,在api-gateway微服务中加入依赖

        <!-- 限流插件 -->
        <dependency>
            <groupId>com.alibaba.csp</groupId>
            <artifactId>sentinel-spring-cloud-gateway-adapter</artifactId>
        </dependency>

第二步,编写配置类GatewayConfiguration.java

import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRule;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayRuleManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.SentinelGatewayFilter;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.BlockRequestHandler;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.exception.SentinelGatewayBlockExceptionHandler;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.context.annotation.Bean;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.result.view.ViewResolver;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import javax.annotation.PostConstruct;
import java.util.*;

@Configuration
public class GatewayConfiguration {
    private final List<ViewResolver> viewResolvers;
    private final ServerCodecConfigurer serverCodecConfigurer;
    public GatewayConfiguration(ObjectProvider<List<ViewResolver>> olv, ServerCodecConfigurer serverCodecConfigurer) {
        this.viewResolvers = olv.getIfAvailable(Collections::emptyList);
        this.serverCodecConfigurer = serverCodecConfigurer;
    }

    // 初始化一个限流的过滤器
    @Bean
    @Order(Ordered.HIGHEST_PRECEDENCE)
    public GlobalFilter sentinelGatewayFilter(){
        return new SentinelGatewayFilter();
    }

    // 配置初始化限流参数
    @PostConstruct
    public void initGatewayRules(){
        Set<GatewayFlowRule> rules = new HashSet<>();
        rules.add(
                new GatewayFlowRule("product_route")   // 路由ID
                        .setCount(1)         // 限流阈值
                        .setIntervalSec(1)   // 统计时间窗口,单位秒
        );
        GatewayRuleManager.loadRules(rules);
    }

    // 限流异常处理器
    @Bean
    @Order(Ordered.HIGHEST_PRECEDENCE)
    public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler(){
        return new SentinelGatewayBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
    }

    // 自定义异常页面
    @PostConstruct
    public void initBlockHandlers(){
        BlockRequestHandler blockRequestHandler = new BlockRequestHandler() {
            @Override
            public Mono<ServerResponse> handleRequest(ServerWebExchange serverWebExchange, Throwable throwable) {
                Map map = new HashMap<>();
                map.put("code",0);
                map.put("message","接口被限流了");
                return ServerResponse.status(HttpStatus.OK).contentType(MediaType.APPLICATION_JSON_UTF8).body(BodyInserters.fromObject(map));
            }
        };
        GatewayCallbackManager.setBlockHandler(blockRequestHandler);
    }
}

重启OrderApplication 、ProductApplication、ApiGatewayApplication
访问时一秒内快速刷新多次页面,就会出现限流
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
我们在yaml配置文件配置一个order的路由
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

        - id: order_route
          uri: lb://service-order
          order: 1
          predicates:
            - Path=/order-serv/**
          filters:
            - StripPrefix=1

再访问order试试,发现它没被限流
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
网关流控实现原理
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

1.2、自定义api维度

自定义api分组其实是一种更细腻度的限流规则

在GatewayConfiguration.java下面继续添加方法initCustomizedApis(){}

    // 自定义API分组
    public void initCustomizedApis(){
        Set<ApiDefinition> definitions = new HashSet<>();
        ApiDefinition api1 = new ApiDefinition("product_api1")
                .setPredicateItems(new HashSet<ApiPredicateItem>(){{
                    // 以/product_serv/product/api1
                    add(new ApiPathPredicateItem()
                            .setPattern("/product-serv/product/api1/**")
                            .setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX));
                }});
        ApiDefinition api2 = new ApiDefinition("product_api2")
                .setPredicateItems(new HashSet<ApiPredicateItem>(){{
                    // 以/product_serv/product/api1
                    add(new ApiPathPredicateItem().setPattern("/product-serv/product/api2/demo1"));
                }});
        definitions.add(api1);
        definitions.add(api2);
        GatewayApiDefinitionManager.loadApiDefinitions(definitions);
    }

再往public void initGatewayRules()里面添加限流规则

        Set<GatewayFlowRule> rules = new HashSet<>();
        rules.add(new GatewayFlowRule("product_api1").setCount(1).setIntervalSec(1));
        rules.add(new GatewayFlowRule("product_api2").setCount(1).setIntervalSec(1));
        GatewayRuleManager.loadRules(rules);

GatewayConfiguration.java完整代码

import com.alibaba.csp.sentinel.adapter.gateway.common.SentinelGatewayConstants;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinition;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPathPredicateItem;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPredicateItem;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.GatewayApiDefinitionManager;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRule;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayRuleManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.SentinelGatewayFilter;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.BlockRequestHandler;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.exception.SentinelGatewayBlockExceptionHandler;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.result.view.ViewResolver;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import javax.annotation.PostConstruct;
import java.util.*;

@Configuration
public class GatewayConfiguration {
    private final List<ViewResolver> viewResolvers;
    private final ServerCodecConfigurer serverCodecConfigurer;
    public GatewayConfiguration(List<ViewResolver> viewResolvers, ServerCodecConfigurer serverCodecConfigurer) {
        this.viewResolvers = viewResolvers;
        this.serverCodecConfigurer = serverCodecConfigurer;
    }
    // 初始化一个限流的过滤器
    @Bean
    @Order(Ordered.HIGHEST_PRECEDENCE)
    public GlobalFilter sentinelGatewayFilter() {
        return new SentinelGatewayFilter();
    }

    // 配置初始化限流参数
    @PostConstruct
    public void initGatewayRules() {
//        Set<GatewayFlowRule> rules = new HashSet<>();
//        rules.add(
//                new GatewayFlowRule("product_route")   // 路由ID
//                        .setCount(1)         // 限流阈值
//                        .setIntervalSec(1)   // 统计时间窗口,单位秒
//        );
//        GatewayRuleManager.loadRules(rules);
        Set<GatewayFlowRule> rules = new HashSet<>();
        // 限制访问数量 1,时间1秒
        rules.add(new
                GatewayFlowRule("product_api1").setCount(1).setIntervalSec(1));
        // 限制访问数量 1,时间1秒
        rules.add(new
                GatewayFlowRule("product_api2").setCount(1).setIntervalSec(1));
        GatewayRuleManager.loadRules(rules);
    }

    // 限流异常处理器
    @Bean
    @Order(Ordered.HIGHEST_PRECEDENCE)
    public SentinelGatewayBlockExceptionHandler
    sentinelGatewayBlockExceptionHandler() {
        return new SentinelGatewayBlockExceptionHandler(viewResolvers,
                serverCodecConfigurer);
    }

    // 自定义异常页面
    @PostConstruct
    public void initBlockHandlers() {
        BlockRequestHandler blockRequestHandler = new BlockRequestHandler() {
            public Mono<ServerResponse> handleRequest(ServerWebExchange
                                                              serverWebExchange, Throwable throwable) {
                System.out.println("限流提醒");
                Map map = new HashMap<>();
                map.put("code", 0);
                map.put("message", "接口被限流了");
                return ServerResponse.status(HttpStatus.OK).
                        contentType(MediaType.APPLICATION_JSON_UTF8).
                        body(BodyInserters.fromObject(map));
            }
        };
        GatewayCallbackManager.setBlockHandler(blockRequestHandler);
    }
    // 自定义API分组
    @PostConstruct
    private void initCustomizedApis() {
        System.out.println("自定义API分组,这个方法");
        Set<ApiDefinition> definitions = new HashSet<>();
        ApiDefinition api1 = new ApiDefinition("product_api1")
                .setPredicateItems(new HashSet<ApiPredicateItem>() {{
                    // 以/product-serv/product/api1/ 开头的请求
                    add(new ApiPathPredicateItem().setPattern("/product-serv/product/api1/**").
                            setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX));
                }});
        ApiDefinition api2 = new ApiDefinition("product_api2")
                .setPredicateItems(new HashSet<ApiPredicateItem>() {{
                    // 以/product-serv/product/api2/demo1 完整的url路径匹配
                    add(new ApiPathPredicateItem().setPattern("/product-serv/product/api2/demo1/"));
                }});
        definitions.add(api1);
        definitions.add(api2);
        GatewayApiDefinitionManager.loadApiDefinitions(definitions);
    }
}

这里可能会报如下错误,不用管,编译不会出错
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

接下来模拟api1的路径,shop-product里面
ProductController.java

    @RequestMapping("/product/api1/demo1/")
    public String demo1(){
        return "api1/demo1";
    }

    @RequestMapping("/product/api1/demo2/")
    public String demo2(){
        return "api1/demo2";
    }

    @RequestMapping("/product/api2/demo1/")
    public String demo3(){
        return "api2/demo1";
    }

    @RequestMapping("/product/api2/demo2/")
    public String demo4(){
        return "api2/demo2";
    }

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
接下来就可以重启api-gateway了
重启之前先改一下配置文件
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
关闭全局过滤器AuthGlobalFilter.java
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

重启ApiGatewayApplication、ProductApplication
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
访问http://localhost:7000/product-serv/product/api1/demo1/

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
连续刷新
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
访问
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
连续刷新

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
http://localhost:7000/product-serv/product/api2/demo2/ 不会被限流,不管刷新多少次。

2、链路追踪技术

如下图,目前我们访问如下网址,它是从网关到订单再到商品这么一个流程。
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
它链路如下图
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
现在我们要之际追踪这个链路,看看整个链路上哪儿最耗时。

链路追踪技术可以解决的问题:

  • 快速发现链路中的问题
  • 判断故障影响范围
  • 梳理服务依赖,及依赖合理性
  • 分析链路性能问题及实时容量规划

常见的链路追踪技术:

  • cat:由大众点评基于java开发,通过埋点方式监控、对代码侵入很大。
  • zipkin:开源的分布式的跟踪系统,结合spring-cloud-sleuth使用,功能简单。
  • pinpoint:基于字节码注入,接入端无代码侵入。
  • skyWalking:本土开源的基于字节码注入的调用链分析,接入端无代码侵入,ui功能强。
  • Sleuth:springCloud提供的分布式系统中的链路追踪解决方案。

注意:springcloud alibaba技术栈中并没有提供自己的网关,我们可以采用Sleuth来做链路追踪解决方案。

所以我们使用Sleuth的时候:是先使用Sleuth做链路追踪,然后把信息输出给zipkin,zipkin进行数据的存储、收集、展示、查找。

2.1、Sleuth相关知识

相关术语:

  • Trace:一条链路,Trace id 链路的唯一标识。
  • Span:链路上的一个单元,Span id单元的唯一标识。
  • Annotation:记录一段时间内的事件,内部使用的重要注释
    • cs :客户端发出请求时
    • sr :服务端接收到请求时,sr-cs = 网络延迟(服务调用时间)
    • ss : 服务端处理完毕准备发送到客户端时,ss-sr = 服务器处理的时间
    • cr :客户端接收到服务端的响应时,cr-sr = 请求的总时间

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

2.2、Sleuth入门案例

第一步加入依赖

因为我们每个一个微服务都应该拥有Sleuth,所以我们将依赖加入到父工程的pom.xml中

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>
    </dependencies>

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

这里顺便展示一下父工程的pom.xml到目前的所有内容

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.haha</groupId>
    <artifactId>mycloud</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>shop-common</module>
        <module>shop-user</module>
        <module>shop-product</module>
        <module>shop-order</module>
        <module>api-gateway</module>
    </modules>
    <!-- 父工程 -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <version>2.1.13.RELEASE</version>
        <artifactId>spring-boot-starter-parent</artifactId>
    </parent>

    <!-- 版本依赖的锁定 -->
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
        <spring-cloud-alibaba.version>2.1.3.RELEASE</spring-cloud-alibaba.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud-alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>
    </dependencies>
</project>

然后把所有微服务都停掉,再重开如下三个微服务
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
接下来我们访问一下http://localhost:7000/order-serv/order/prod/1
然后再查看后台,sleuth相关的日志就被打印出来了
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
现在我们使用zipkin输出链路结果

2.3、Sleuth继承zipkin

zipkin提供一个管理、展示的ui界面。
zipkin分为两个端:服务端(jar包)、客户端(注入依赖)。
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
第一步,下载服务端jar包

https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec

启动指令

java -jar zipkin-server-2.12.9-exec.jar

web界面

http://localhost:9411/zipkin/

第二步,客户端加入依赖
我们也直接放在客户端的主项目pom.xml中

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-zipkin</artifactId>
        </dependency>

然后在api-gateway添加配置

spring:
  zipkin:
    base-url: http://127.0.0.1:9411/   #zipkin server的请求地址
    discovery-client-enabled: false    #让nacos把他当成一个URL,而不要当做服务名
  sleuth:
    sampler:
      probability: 1.0   #采样的百分比

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
在shop-order、shop-pruduct里面也加上如上配置
配置完成把三个微服务重启
然后访问http://localhost:7000/order-serv/order/prod/2

首先我们看到的后台打印的链路第三方输出已经开启了
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
然后在zipkin的后台界面可以看到结果
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
点击其中一次请求的数据可以看到具体信息
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

2.4、zipkin数据持久化

zipkin服务端停止后所有的数据都会丢掉,下面我们将其存在mysql中

2.4.1、用mysql做数据持久化

第一步,创建mysql的数据环境
首先创建一个叫zipkin的库,然后执行以下代码建表

CREATE TABLE IF NOT EXISTS zipkin_spans (
  `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
  `trace_id` BIGINT NOT NULL,
  `id` BIGINT NOT NULL,
  `name` VARCHAR(255) NOT NULL,
  `parent_id` BIGINT,
  `debug` BIT(1),
  `start_ts` BIGINT COMMENT 'Span.timestamp(): epoch micros used for endTs query and to implement TTL',
  `duration` BIGINT COMMENT 'Span.duration(): micros used for minDuration and maxDuration query'
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;
 
ALTER TABLE zipkin_spans ADD UNIQUE KEY(`trace_id_high`, `trace_id`, `id`) COMMENT 'ignore insert on duplicate';
ALTER TABLE zipkin_spans ADD INDEX(`trace_id_high`, `trace_id`, `id`) COMMENT 'for joining with zipkin_annotations';
ALTER TABLE zipkin_spans ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTracesByIds';
ALTER TABLE zipkin_spans ADD INDEX(`name`) COMMENT 'for getTraces and getSpanNames';
ALTER TABLE zipkin_spans ADD INDEX(`start_ts`) COMMENT 'for getTraces ordering and range';
 
CREATE TABLE IF NOT EXISTS zipkin_annotations (
  `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
  `trace_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.trace_id',
  `span_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.id',
  `a_key` VARCHAR(255) NOT NULL COMMENT 'BinaryAnnotation.key or Annotation.value if type == -1',
  `a_value` BLOB COMMENT 'BinaryAnnotation.value(), which must be smaller than 64KB',
  `a_type` INT NOT NULL COMMENT 'BinaryAnnotation.type() or -1 if Annotation',
  `a_timestamp` BIGINT COMMENT 'Used to implement TTL; Annotation.timestamp or zipkin_spans.timestamp',
  `endpoint_ipv4` INT COMMENT 'Null when Binary/Annotation.endpoint is null',
  `endpoint_ipv6` BINARY(16) COMMENT 'Null when Binary/Annotation.endpoint is null, or no IPv6 address',
  `endpoint_port` SMALLINT COMMENT 'Null when Binary/Annotation.endpoint is null',
  `endpoint_service_name` VARCHAR(255) COMMENT 'Null when Binary/Annotation.endpoint is null'
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;
 
ALTER TABLE zipkin_annotations ADD UNIQUE KEY(`trace_id_high`, `trace_id`, `span_id`, `a_key`, `a_timestamp`) COMMENT 'Ignore insert on duplicate';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`, `span_id`) COMMENT 'for joining with zipkin_spans';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTraces/ByIds';
ALTER TABLE zipkin_annotations ADD INDEX(`endpoint_service_name`) COMMENT 'for getTraces and getServiceNames';
ALTER TABLE zipkin_annotations ADD INDEX(`a_type`) COMMENT 'for getTraces';
ALTER TABLE zipkin_annotations ADD INDEX(`a_key`) COMMENT 'for getTraces';
 
CREATE TABLE IF NOT EXISTS zipkin_dependencies (
  `day` DATE NOT NULL,
  `parent` VARCHAR(255) NOT NULL,
  `child` VARCHAR(255) NOT NULL,
  `call_count` BIGINT
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;
 
ALTER TABLE zipkin_dependencies ADD UNIQUE KEY(`day`, `parent`, `child`);

第二步,重启zipkin服务端,

java -jar zipkin-server-2.12.9-exec.jar --STORAGE_TYPE=mysql --MYSQL_HOST=127.0.0.1 --MYSQL_TCP_PORT=3306 --MYSQL_USER=root --MYSQL_PASS=root --MYSQL_DB=zipkin

注意如上代码包含了mysql的账号密码

这样数据就能持久化了

2.4.2、用elasticsearch做数据持久化

elasticsearch下载地址:

https://www.elastic.co/cn/downloads/past-releases/elasticsearch-6-8-4

下载完启动,双击elasticsearch.bat就可以启动
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
然后重启zipkin客户端
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

3、RocketMQ 消息驱动

3.1、MQ简介

MQ : 跨进程的通信机制,用于传递消息。是一种先进先出的数据结构。

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

3.1.1、MQ第一种应用场景

先来看看MQ对传统的用户注册的优化
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

3.1.2、MQ第二种应用场景

在针对一些大流量、暴增流量进入系统后,可以利用MQ进行流量削峰。
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

3.1.3、MQ常见产品

  • ZeroMQ:号称最快的消息队列系统,c语言开发,适应它需要开发大量代码,且它仅提供非持久性队列。
  • RabbitMQ:使用erlang开发,性能好,但不利于二次开发和维护
  • ActiveMQ:实现了JMS1.1规范,可以和spring-jms轻松融合,可以持久化,对队列数较多支持情况不好。
  • RocketMQ:阿里巴巴的MQ中间间,Java开发,性能好,能够撑住双十一的大流量。
  • Kafka:apache下的一个子项目,高性能分布式消息队列系统,相对于ActiveMQ是一个轻量级的。

3.2、RocketMQ

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
名词解释:

  • Broker:核心,负责消息的接收、存储、投递等。相当于邮递员
  • NameServer:Broker向它注册路由信息,同时Producker和Consumer向其获取路由信息。相当于邮局
  • Producer:消息生产者。相当于发件人
  • Consumer:消息消费者。相当于收件人
  • Topic: 用来区分不同类型的消息,发送和接收消息前都需要创建Topic。相当于地区
  • Message Queue: 一个Topic里面有多个Message Queue,消息可以并行往各个Message Queue发消息,消费者可以并行从多个Message Queue读消息。相当于邮件
  • message:消息的载体。相当于邮件里的内容。
  • Producer Group:生产组
  • Consumer Group:消费组

3.2.1、RocketMQ 的启动

下载RocketMQ

RocketMQ官网:http://rocketmq.apache.org/

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
下载二进制包
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
环境要求

  • Linux 64位操作系统
  • jdk 1.8+

1、上传文件到linux系统,解压并移动到安装目录

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

2、切换到安装目录rocketmq,启动NameServer
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

查看是否启动成功:netstat -an | grep 9876
9876是监听的端口

3、启动Broker
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

然后就可以测试RocketMQ了

3.2.2、RocketMQ 的测试

1、测试消息发送
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
2、测试消息接收
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

3.3、RocketMQ控制台

安装RocketMQ控制台
1、下载
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
下载完解压,我们发现它是一个maven工程,接下来我们只需要用maven命令进行打包,然后运行

2、修改配置文件
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

3、打包成jar,并启动
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

打包过程可能会联网下载很多依赖,需要耐心等待

4、访问localhost:7777
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

3.4、spring微服务 + RocketMQ

3.4.1、测试在微服务中 发送消息

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
首先往生成者中加入依赖
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
接下来在shop-order的test中建一个测试发送消息的类
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

如上代码中,注意ip地址

然后启动这个测试类
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
在web界面就可以看到
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

3.4.2、测试在微服务中 接收消息

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
还是在shop-order的test中建一个测试接收消息的类
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
启动这个测试类
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
再点击一下消息生产者
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

3.4.3、下单案例

案例:用户下单,下单成功后向用户发送短信
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

shop-order 发送消息

1、在shop-order中添加rocketmq的依赖
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
2、添加配置
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
3、在OrderController中找到下单成功地方,向mq中投递下单成功的消息
先注入一个RocketMQTemplate
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
再投递消息
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
然后重启order微服务,并访问
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
再看一下后台有没有收到消息:localhost:7777
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
点击消息详情,就能看到消息已经过来了
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

shop-user 接收消息

1、在shop-user中加入依赖
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
2、在UserApplication.java中加入注解

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
3、修改配置
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

4、写一个实现类,消费消息
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
然后启动UserApplication.java
再重新下一个订单
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
再查看后台,就发现已经接收了一个消息
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

3.4.4、消息的类型

普通消息

RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送、单向发送

  • 可靠同步发送:一个消息结束后才发下一个消息
    springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
  • 可靠异步发送:一个消息发送,不管结没结束,下一个接着发
    springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
  • 单向发送:发送了不管服务器接到与否
    springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
    可靠同步发送例:
    1、添加依赖到shop-order中
    springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
    写一个测试类:
    springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
    启动这个测试类:同步消息成功发送
    springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
    我们可以在web中看到这个消息的主题“test-topic-1”
    springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
    添加一个标签:直接在主题后面 ” :标签 “

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
然后再重发一条消息(重新启动这个test方法)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
可靠异步发送例:

还是在刚刚那个测试类里面,新建一个测试方法testAsyncSend()
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
如上Thread.sleep(300000),是为了让回调方法onSuccess成功回调,否则程序执行完就直接结束了。
启动该测试方法
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
在Rocketmq页面查看
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
单向发送例:
还是在刚刚那个测试类里面,新建一个测试方法testOneWay(){}
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
启动该测试方法
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

顺序消息

Rocketmq本身是不保证消息的顺序的
Rocketmq会把消息随机放到一个Message Queue里面,消费者也是随机取。
那顺序消息是怎么实现的呢?
顺序消息是把需要排序的消息放在同一个Message Queue里面,就有了顺序了。
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
我们先来看看MQ的各个主体的状态
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
然后,我们以单向消息为例,发送10个消息,观察每个消息都落在了哪个Message Queue里面
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
启动这个测试方法,再查看主题test-topic-1的状态
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
我们发现每个Message Queue里面都有新加入的消息
接下来我们发送顺序消息(只需要使用sendOneWayOrderly方法就可以了)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
上面的sendOneWayOrderly的第三个参数hashKey的作用是用来决定这些消息发送到哪个队列上的
启动该测试方法
查看结果
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
我们发现都落在了第一个队列上了
同样的,除了单向消息有Orderly,其他的同步、异步消息也都有

事务消息

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
第一步:发送半事务消息
在shop-order里面新建一个OrderServiceImpl4.java
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
第二步是服务端接收半事务消息(不需要我们写什么代码),发送成功后,消息方接收到成功的消息,进行第三步执行本地事务。
先在OrderServiceImpl4.java中写一个本地事务
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
再单独新建一个OrderServiceImpl4Listener.java(第三步执行本地事务)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

接下来第四步,提交一个Commit或Rollback
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
第五步消息回查
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
为了记录消息回查的成功失败,我们需要建立一张表
在shop-common这个微服务里面新增实体类TxLog.java
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
在shop-order里添加TxLogDao.java
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
消息的成功与失败实际上是本地事务控制的,所以我们在OrderServiceImpl4.java的createOrder方法里面插入TxLog
首先自动注入TxLogDao
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
插入一条TxLog前,需要为TxLog设置一个TxId,我们用uuid取一个值
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
在OrderServiceImpl4Listener.java中拿到这个TxId,并通过orderServiceImpl4.createOrder传过去
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
最后在消息回查函数里,就可以查到数据库里记录的成功与否
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
最后需要注意OrderServiceImpl4Listener.java上需要加这个注解
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
接下来写一个调用的代码
在shop-order微服务里面新建一个OrderController4.java
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
然后重启product、order
接下来访问一个连接
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

3.4.5、消息消费要注意的细节

springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)
我们新建一个SmsService.java举例
springcloud alibaba 3(sentinel网关限流组件、链路追踪、Sleuth、zipkin、Rocketmq)

上一篇:zipkin2 应用trace


下一篇:30、Spring Cloud 中整合Zipkin进行服务跟踪zipkin-server