SpringCloud gateway源码走读(顺带聊聊响应式)

1.概述

最近公司海外要搭建一套网关系统,调研相关开源组件,最终选择了springcloud gateway(scg)的网关框架。为了更好的使用,便走读了其核心源代码。本文对其重点源码就行剖析。scg是基于spring webflux实现的。如果 响应式基础不错的话会更容易理解。

2.聊聊响应式(事件循环)

其实对响应式开发一直很感兴趣,如果整个链路都支持响应式api,那么对于cpu的吞吐量会有很大的提升。毕竟更少的线程能负载高的qps,是非常吸引人的事情。说个玩笑话,对于泛娱乐c端业务,完全可以采用此架构,只不过学习成本确实很高,这一点就可以把这个方案否决掉。但是对于网关来讲,webflux还是非常迷人的。至少对我公司业务,网关鉴权通过响应式的rpc call可以大幅提升吞吐量。

两个问题:为什么可以减少线程数?是否可以提高响应速度?

响应式确实不能降低一个请求的处理时间,比如一个请求查询数据库300ms,这个除非在db优化,不然没有人能降低这个时间。响应式也是一个道理。那么响应式有什么用,这也就来到第二个问题。

非响应式的场景

对于一个http请求,接受请求是tomcat线程(默认500个),在处理redis操作的时候,用的redis的连接池,但是redis要经过一次网络io,阻塞的过程中,这个tomcat线程什么都干不了。

响应式

netty的eventloop接收http请求,执行redis操作的时候通过redis响应式API,eventloop会立刻返回(将数据丢给redis客户端线程),redis底层基于epol进行非阻塞IO,redis客户端线程将数据丢给网络然后返回,这个过程都非阻塞的。等redis 处理完成之后,epoll的机制去通知redis线程执行回调时间,完成整个网络。整个执行过程中线程各司其职(完全cpu密集操作),eventloop只负责接收以及传递,redis负责丢网络以及监听返回。当然你可以理解将阻塞交给了操作系统。

3.源码解析

底层通过netty启动的http服务器。所有请求进来之后先由DispatcherHandler的handle方法处理。handle方法会调用所有HandlerMapping的getHandlerInternal方法,选择一个支持该请求的Handler执行后续逻辑。

RoutePredicateHandlerMapping

我们重点关心scg实现的HandlerMapping:
RoutePredicateHandlerMapping

@Bean

public RoutePredicateHandlerMapping routePredicateHandlerMapping(FilteringWebHandler webHandler,

      RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) {

   return new RoutePredicateHandlerMapping(webHandler, routeLocator, globalCorsProperties, environment);

}


RoutePredicateHandlerMappinggetHandlerInternal会根据配置的路由信息对请求进行匹配,如果有一个配置的路由和请求匹配成功(该请求为我们gateway路由的请求),则将该路由信息绑定到请求,最终返回FilteringWebHandler。执行后续逻辑。

RoutePredicateHandlerMapping#lookupRoute

通过该方法迭代所有路由进行判断。

protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
        return this.routeLocator.getRoutes()
                .concatMap(route -> Mono.just(route).filterWhen(r -> {
                    // add the current route we are testing
                    exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                    return r.getPredicate().apply(exchange);
                })
                        .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
                        .onErrorResume(e -> Mono.empty()))
                .next()
                // TODO: error handling
                .map(route -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Route matched: " + route.getId());
                    }
                    validateRoute(route, exchange);
                    return route;
                });
    }

DispatcherHandler#invokeHandler

处理路由结束并返回FilteringWebHandler,DispatcherHandler会执行invokeHandler方法。

private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
   if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {
      return Mono.empty();  // CORS rejection

   }
   if (this.handlerAdapters != null) {

      for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
         if (handlerAdapter.supports(handler)) {
            return handlerAdapter.handle(exchange, handler);
         }
      }
   }
   return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));

}

该方法遍历所有适配器,执行对应的handle方法。如果是gateway路由的请求,则会由SimpleHandlerAdapter处理。也就是调用SimpleHandlerAdapter的handle方法。该方法其实就是调用FilteringWebHandler的handle方法。

FilteringWebHandler#handle

public Mono<Void> handle(ServerWebExchange exchange) {
   Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
   List<GatewayFilter> gatewayFilters = route.getFilters();
   List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
   combined.addAll(gatewayFilters);
   // TODO: needed or cached?
   AnnotationAwareOrderComparator.sort(combined);
   return new DefaultGatewayFilterChain(combined).filter(exchange);
}

这个方法主要是执行所有过滤器的逻辑。scg有两种过滤器,一个是GlobalFilter,一个是GatewayFilter。在用户上的区别:GatewayFilter是针对某个或者某组路由,而GlobalFilter是所有路由都会执行。

GlobalFilter在FilteringWebHandler初始化的时候调用loadFilters加载。核心其实就是将其包装成GatewayFilter存储到FilteringWebHandler的全局变量中。

private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
   return filters.stream().map(filter -> {
      GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
      if (filter instanceof Ordered) {
         int order = ((Ordered) filter).getOrder();
         return new OrderedGatewayFilter(gatewayFilter, order);
      }
      return gatewayFilter;
   }).collect(Collectors.toList());
}

在FilteringWebHandler的handle执行的时候,会从路由中取出对应路由配置的GatewayFilter。构造成DefaultGatewayFilterChain(filter链)执行。

后续其实就是执行过滤逻辑。过滤器可以自己实现,gateway也有默认的实现。比如转发代理服务的过滤器NettyRoutingFilter。以及负责负载均衡的过滤器
ReactiveLoadBalancerClientFilter。


ReactiveLoadBalancerClientFilter其实就是根据路由选择对应的负载均衡实现,返回一个可用的server。当然有意义的是LoadBalancerLifecycle。在完成后调用成功或者失败都会回调LoadBalancerLifecycle的对应方法。我们可以做一些监控等等。

因为是业务初期,对于熔断降级限流的事情还没有做。后续会详细介绍

4.总结

scg的代码并不多,只不过大部分是响应式代码,读起来比较吃力,但是如果熟悉之后还是比较简单的。

作者:大远哥
链接:https://juejin.cn/post/7044447160821088263
来源:稀土掘金

上一篇:利用Kong API Gateway实现反向代理


下一篇:Anchor Free系列模型13