Spring-Cloud-Gateway 从升级到放弃

1 为什么要升级为spring-cloud-gateway?

Spring Cloud Gateway features:

  • Built on Spring Framework 5, Project Reactor and Spring Boot 2.0

  • Able to match routes on any request attribute.

  • Predicates and filters are specific to routes.

  • Hystrix Circuit Breaker integration.

  • Spring Cloud DiscoveryClient integration

  • Easy to write Predicates and Filters

  • Request Rate Limiting

  • Path Rewriting

这是官方说的,spring gateway相对spring zuul要新很多,应用也更加*,开发体验更好。但是我在测试中发现,spring-cloud-gateway相对于zuul来说,更加出众的还是其性能,当然最后让我放弃的也是因为这一点。

网上的朋友也有做一些gateway和zuul的性能比较,大多的结论也是gateway要优于zuul,同时也更加稳定。

但是我们不能轻信,所以我也做了测试,这部分测试内容若不感兴趣可以跳过,zuul就不测试了。

2.spring-cloud-gateway的初步测试

  step.1:测试准备:

    1.gateway版本:2.0.1

    2.服务主机:10.1.4.32,16G内存,4核虚拟机

    3.测试客户端:10.1.4.34,16G内存,4核虚拟机

    4.测试工具wrk

  step.2:建立gateway工程并写两个测试http接口,

    1.http://10.1.4.32:14077/hello [POST]

    2.http://10.1.4.32:14077/test [GET]

  step.3:开始测试

  step.4:测试结果   

[wrk@localhost wrk]$ ./wrk  -t  -c500 -d  --latency  http://10.1.4.32:14077/test
Running 10s test @ http://10.1.4.32:14077/test
threads and connections
Thread Stats Avg Stdev Max +/- Stdev
Latency .38ms .26ms .45ms 95.76%
Req/Sec .84k .44k .48k 89.50%
Latency Distribution
% .79ms
% .51ms
% .21ms
% .23ms
requests in .10s, .79MB read
Requests/sec: 160961.07
Transfer/sec: .05MB

以及:

[wrk@localhost wrk]$ ./wrk  -t  -c500 -d  --latency -s scripts/gateway.lua  http://10.1.4.32:14077/hello
Running 10s test @ http://10.1.4.32:14077/hello
threads and connections
Thread Stats Avg Stdev Max +/- Stdev
Latency .21ms .96ms .59ms 96.75%
Req/Sec .62k 604.79 .72k 88.48%
Latency Distribution
% .78ms
% .55ms
% .32ms
% .87ms
requests in .10s, .59MB read
Requests/sec: 98471.14
Transfer/sec: .43MB

说明,如果测试结果差别较大可能是因为测试工具的问题。

结果显示,POST方法的性能TPS达到了10W/s,而GET方法的性能TPS达到了16W/s。

这看起来很不可思议,因为正常的微服务,能达到2W/s的性能已经是良好,达到10W实在是不可思议。但是前面说了spring-cloud-gateway引入了Spring Reactor反应式编程,应对的便是这种高并发需求。

当然,即便spring-cloud-gateway给了我们很大惊喜,但是如果因此就引入了spring-cloud-gateway,那还是会有些草率,毕竟gateway是用来干什么的?是路由和过滤。继续测试。

  step.5:加上路由和过滤器,在配置文件中加入下面内容

spring:
cloud:
gateway:
routes:
- id: test
uri: http://10.1.4.32:14077/test
predicates:
- Path=/tt
filters:
- AddRequestParameter=foo, bar

表示,给test方法加入了路由,并且加入了官方提供的过滤器:AddRequestParameter=foo, bar

  step.6:测试,并附测试结果:

[wrk@localhost wrk]$ ./wrk  -t  -c500 -d  --latency  http://10.1.4.32:14077/tt
Running 10s test @ http://10.1.4.32:14077/tt
threads and connections
Thread Stats Avg Stdev Max +/- Stdev
Latency .99ms .15ms .69ms 70.84%
Req/Sec .82k 155.77 .36k 73.94%
Latency Distribution
% .03ms
% .49ms
% .02ms
% .13ms
requests in .10s, .25MB read
Requests/sec: 27182.88
Transfer/sec: .20MB

性能只剩27000/s,貌似降低了很多,但是比起zuul仍然快了不少。因为在这台机器上,测试zuul或许都不能到达2W。

那么,是不是就应该使用spring-cloud-gateway了?

3.开始使用spring-cloud-gateway

在使用上spring-cluod-gateway之后,我开始编辑自己的过滤器,需求要求写两个过滤器,修改请求体和响应体。

因为需要对特定的请求使用过滤器,所以这里使用gateway-filter,有些代码官方有,有些网友提供,两个过滤器代码大致如下:

解密过滤器,pre:

package com.newland.dc.ctid.fileter;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.newland.dc.common.vo.RequestHeaderVo;
import com.newland.dc.ctid.entity.dto.RequestDto;
import io.netty.buffer.ByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.core.style.ToStringCreator;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference; /**
* Created by garfield on 2019/2/26.
*/
@Component
public class DecryptGatewayFilterFactory extends AbstractGatewayFilterFactory<DecryptGatewayFilterFactory.Config>{ private static Logger log = LoggerFactory.getLogger(DecryptGatewayFilterFactory.class); public static final String DECRYPT_HEADER = "decrypt_header"; public DecryptGatewayFilterFactory() {
super(Config.class);
} private Gson gson = new GsonBuilder().serializeNulls().create(); @Override
@SuppressWarnings("unchecked")
public GatewayFilter apply(Config config) {
return new DecryptGatewayFilter(config);
} public class DecryptGatewayFilter implements GatewayFilter, Ordered {
Config config; DecryptGatewayFilter(Config config) {
this.config = config;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.debug(config.toString());
ServerHttpRequest request = exchange.getRequest();
MediaType contentType = request.getHeaders().getContentType(); boolean postRequest = "POST".equalsIgnoreCase(request.getMethodValue()) && !contentType.toString().contains("multipart/form-data");
//判断是否为POST请求
if (postRequest) { Flux<DataBuffer> body = request.getBody();
AtomicReference<String> bodyRef = new AtomicReference<>();//缓存读取的request body信息
body.subscribe(dataBuffer -> {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());
DataBufferUtils.release(dataBuffer);
bodyRef.set(charBuffer.toString());
});//读取request body到缓存
String bodyStr = bodyRef.get();//获取request body
log.debug(bodyStr);//这里是我们需要做的操作
RequestDto requestDto = gson.fromJson(bodyStr, RequestDto.class);
log.debug("decrypt filter");
//save header to response header
RequestHeaderVo headerVo = requestDto.getHeader();
headerVo.setAppVersion("");
//此处可以传递一些变量
exchange.getResponse().getHeaders().add(DECRYPT_HEADER, gson.toJson(headerVo)); DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer); request = new ServerHttpRequestDecorator(request){
@Override
public Flux<DataBuffer> getBody() {
return bodyFlux;
}
};//封装我们的request
}
return chain.filter(exchange.mutate().request(request.mutate().header("a","").build()).build());
}; @Override
public int getOrder() {
return -;
}
} public static DataBuffer stringBuffer(String value) {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8); NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes);
return buffer;
} @Override
public ServerHttpRequest.Builder mutate(ServerHttpRequest request) {
return null;
} public static class Config {
private boolean decrypt; public boolean isDecrypt() {
return decrypt;
} public void setDecrypt(boolean decrypt) {
this.decrypt = decrypt;
} @Override
public String toString() {
return new ToStringCreator(this)
.append("decrypt", decrypt)
.toString();
}
} @Override
public List<String> shortcutFieldOrder() {
return Arrays.asList("decrypt");
}
}

加密过滤器,使用源码的提供的修改方法,post:

package com.newland.dc.ctid.fileter;

import com.google.gson.Gson;
import com.newland.dc.common.vo.RequestHeaderVo;
import com.newland.dc.ctid.entity.dto.RequestDto;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory;
import org.springframework.core.style.ToStringCreator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono; import java.util.Arrays;
import java.util.List; /**
* @Auther: garfield
* @Date: 2019/3/5 15:33
* @Description:
*/
@Component
public class AnGatewayFilterFactory extends AbstractGatewayFilterFactory<AnGatewayFilterFactory.Config> { private Gson gson = new Gson(); public AnGatewayFilterFactory() {
super(Config.class);
} @Override
public GatewayFilter apply(Config config) { ModifyResponseBodyGatewayFilterFactory m1 = new ModifyResponseBodyGatewayFilterFactory(null);
ModifyResponseBodyGatewayFilterFactory.Config c1 = new ModifyResponseBodyGatewayFilterFactory.Config();
c1.setInClass(String.class);
c1.setOutClass(String.class);
c1.setNewContentType("application/json"); c1.setRewriteFunction((exchange, body) -> {
ServerWebExchange ex = (ServerWebExchange) exchange;
//此处更改响应体
RequestHeaderVo requestHeaderVo = new RequestHeaderVo();
RequestDto requestDto = gson.fromJson(body.toString(), RequestDto.class);
requestDto.setHeader(requestHeaderVo);
body = gson.toJson(requestDto);
return Mono.just(body);
});
return m1.apply(c1);
} public static class Config {
private boolean decrypt; public boolean isDecrypt() {
return decrypt;
} public void setDecrypt(boolean decrypt) {
this.decrypt = decrypt;
} @Override
public String toString() {
return new ToStringCreator(this)
.append("encrypt", decrypt)
.toString();
}
} @Override
public List<String> shortcutFieldOrder() {
return Arrays.asList("encrypt");
}
}

这里需要转移一下话题,这个过滤器修改其实有几种方法,可以自己写,也可以应用源码提供的例子。上面的两种写法已经测试都能使用,其实我还有两种方式,大同小异就是了,但也准备贴出来,也记录一下问题:

下面这个其实就是源码中的例子,只不过不引用,自己写:

    @Override
@SuppressWarnings("unchecked")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(exchange.getResponse()) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
ServerHttpRequest request = exchange.getRequest(); MediaType originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(originalResponseContentType);
ResponseAdapter responseAdapter = new ResponseAdapter(body, httpHeaders);
DefaultClientResponse clientResponse = new DefaultClientResponse(responseAdapter, ExchangeStrategies.withDefaults()); Mono<DataBuffer> modifiedBody = clientResponse.bodyToMono(DataBuffer.class).map(encrypt(config, new RequestHeaderVo())); BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, DataBuffer.class);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, exchange.getResponse().getHeaders());
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
Flux<DataBuffer> messageBody = outputMessage.getBody();
HttpHeaders headers = getDelegate().getHeaders();
if (headers.getContentLength() < && !headers.containsKey(HttpHeaders.TRANSFER_ENCODING)) {
messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
}
return this.getDelegate().writeWith(messageBody);
}));
} @Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body)
.flatMapSequential(p -> p));
}
};
return chain.filter(exchange.mutate().response(responseDecorator).build()); } @Override
public int getOrder() {
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - ;
} } private Function<DataBuffer, DataBuffer> encrypt(Config config, RequestHeaderVo headerVo) {
if (config.encrypt) {
return (i) -> {
InputStream inputStream = i.asInputStream(); byte[] bytes = new byte[];
try {
bytes = new byte[inputStream.available()]; inputStream.read(bytes);
} catch (IOException e) {
e.printStackTrace();
}
//进行我们的操作
String body = new String(bytes);
log.debug("this is response encrypt");
log.debug(body);
log.debug(headerVo.toString());
// body = encryptService.responseEncrypt(body, headerVo); //进行我们的操作
return i.write(TokenGatewayFilterFactory.stringBuffer(body));
// return i.write(new String(body).getBytes()); }; } else {
return i -> i;
} }

这种例子中,发现修改response body的时候,会引起代码进入NioEventLoop类中的run方法,死循环无法退出,我也不清楚为什么,修改需谨慎。

另一种,跟这位网友写得差不多,只不过我没测试就是了:https://www.jianshu.com/p/9f00e0e1681c

        @Override
@SuppressWarnings("unchecked")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange.mutate().response(new ServerHttpResponseDecorator(exchange.getResponse()) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
if (getStatusCode().equals(HttpStatus.OK) && body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = Flux.first(body);
return super.writeWith(fluxBody.map(dataBuffer -> {
System.out.println(dataBuffer.readableByteCount()); byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
//释放掉内存
DataBufferUtils.release(dataBuffer);
//responseData就是下游系统返回的内容,可以查看修改
String responseData = new String(content, Charset.forName("UTF-8")); log.debug("响应内容:{}", responseData);
log.debug("this is response encrypt");
System.out.println(responseData); byte[] newContent = responseData.getBytes();
// body = encryptService.responseEncrypt(body, headerVo);
byte[] uppedContent = new String(newContent, Charset.forName("UTF-8")).getBytes();
return bufferFactory.wrap(uppedContent);
}));
} else {
log.error("响应code异常:{}", getStatusCode());
}
return super.writeWith(body);
}
}).build());
} @Override
public int getOrder() {
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - ;
}
}

这个方法会出现问题,body的截取长度经常没有完全。

我本来是到这个网址下面寻找答案,作者是这样回复的:

  上面只是简单的样例,FIux是发送多个数据的,当报文长时会拆分,处理一次只能拿到一部分报文,可以使用Flux.toArray方法将数据聚合后处理,也可以参照https://www.jianshu.com/p/9b781fb1aaa0里面的响应处理。

确实是这个问题,所以我们也可以仿照他的另外一个例子写,大家可以到他的简书博客中去看,值得提醒的是,他的例子中,版本也是2.0.1,若是版本改为2.1以上,就不能用哦!

这里蛮贴一下:

package com.newland.dc.ctid.fileter;

import com.google.gson.Gson;
import com.newland.dc.common.vo.RequestHeaderVo;
import com.newland.dc.ctid.service.SecurityService;
import com.newland.dc.log.kafka.KafkaLog;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.support.*;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.style.ToStringCreator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseCookie;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Function; import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR; /**
* @Auther: garfield
* @Date: 2019/2/28 上午10:45
* @Description:
*/
@Component
public class EncryptGatewayFilterFactory extends AbstractGatewayFilterFactory<EncryptGatewayFilterFactory.Config> { private static Logger log = LoggerFactory.getLogger(EncryptGatewayFilterFactory.class); @Autowired
private SecurityService encryptService; public EncryptGatewayFilterFactory() {
super(Config.class);
} // private Gson gson = new GsonBuilder().serializeNulls().create();
private Gson gson = new Gson(); @Value("${server.host:10.10.10.10}")
private String serverHost; @Value("${server.port}")
private String serverPort; @Override
@SuppressWarnings("unchecked")
public GatewayFilter apply(Config config) {
return new EncryptGatewayFilter(config);
} @Override
public ServerHttpRequest.Builder mutate(ServerHttpRequest request) {
return null;
} public static class Config { private boolean encrypt; public boolean isEncrypt() {
return encrypt;
} public Config setEncrypt(boolean encrypt) {
this.encrypt = encrypt;
return this;
} @Override
public String toString() {
return new ToStringCreator(this)
.append("encrypt", encrypt)
.toString();
}
} @Override
public List<String> shortcutFieldOrder() {
return Arrays.asList("encrypt");
} public class EncryptGatewayFilter implements GatewayFilter, Ordered {
Config config; EncryptGatewayFilter(Config config) {
this.config = config;
} @Override
@SuppressWarnings("unchecked")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String trace = exchange.getRequest().getHeaders().getFirst("trace");
ServerRequest serverRequest = new DefaultServerRequest(exchange);
return serverRequest.bodyToMono(String.class).flatMap(reqBody -> {
//重写原始请求
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
return httpHeaders;
} @Override
public Flux<DataBuffer> getBody() {
//打印原始请求日志
log.info("[Trace:{}]-gateway request:headers=[{}],body=[{}]", trace, getHeaders(), reqBody);
return Flux.just(reqBody).map(bx -> exchange.getResponse().bufferFactory().wrap(bx.getBytes()));
}
};
//重写原始响应
BodyHandlerServerHttpResponseDecorator responseDecorator = new BodyHandlerServerHttpResponseDecorator(
initBodyHandler(exchange), exchange.getResponse()); return chain.filter(exchange.mutate().request(decorator).response(responseDecorator).build());
});
} @Override
public int getOrder() {
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - ;
} } public interface BodyHandlerFunction
extends BiFunction<ServerHttpResponse, Publisher<? extends DataBuffer>, Mono<Void>> {
} protected BodyHandlerFunction initBodyHandler(ServerWebExchange exchange) {
return (resp, body) -> {
//拦截
MediaType originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(originalResponseContentType);
DefaultClientResponseAdapter clientResponseAdapter = new DefaultClientResponseAdapter(body, httpHeaders);
Mono<String> bodyMono = clientResponseAdapter.bodyToMono(String.class);
//此处可以获得前面放置的参数
return bodyMono.flatMap((respBody) -> {
// 打印返回响应日志
System.out.println(respBody);
return resp.writeWith(Flux.just(respBody).map(bx -> resp.bufferFactory().wrap(bx.getBytes())));
}).then();
};
} public static class DefaultClientResponseAdapter extends DefaultClientResponse { /**
* @param body
* @param httpHeaders
*/
public DefaultClientResponseAdapter(Publisher<? extends DataBuffer> body,
HttpHeaders httpHeaders) {
this(new ResponseAdapter(body, httpHeaders),
ExchangeStrategies.withDefaults());
} /**
* @param response
* @param strategies
*/
public DefaultClientResponseAdapter(ClientHttpResponse response,
ExchangeStrategies strategies) {
super(response, strategies);
} /**
* ClientHttpResponse 适配器
*/
static class ResponseAdapter implements ClientHttpResponse {
/**
* 响应数据
*/
private final Flux<DataBuffer> flux;
/**
*
*/
private final HttpHeaders headers; public ResponseAdapter(Publisher<? extends DataBuffer> body,
HttpHeaders headers) {
this.headers = headers;
if (body instanceof Flux) {
flux = (Flux) body;
} else {
flux = ((Mono) body).flux();
}
} @Override
public Flux<DataBuffer> getBody() {
return flux;
} @Override
public HttpHeaders getHeaders() {
return headers;
} @Override
public HttpStatus getStatusCode() {
return null;
} @Override
public int getRawStatusCode() {
return ;
} @Override
public MultiValueMap<String, ResponseCookie> getCookies() {
return null;
}
}
} class BodyHandlerServerHttpResponseDecorator extends ServerHttpResponseDecorator { /**
* body 处理拦截器
*/
private BodyHandlerFunction bodyHandler = initDefaultBodyHandler(); /**
* 构造函数
*
* @param bodyHandler
* @param delegate
*/
public BodyHandlerServerHttpResponseDecorator(BodyHandlerFunction bodyHandler, ServerHttpResponse delegate) {
super(delegate);
if (bodyHandler != null) {
this.bodyHandler = bodyHandler;
}
} @Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
//body 拦截处理器处理响应
return bodyHandler.apply(getDelegate(), body);
} @Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMapSequential(p -> p));
} /**
* 默认body拦截处理器
*
* @return
*/
private BodyHandlerFunction initDefaultBodyHandler() {
return (resp, body) -> resp.writeWith(body);
}
}
}

那么万事具备,代码都写好了,我们又需要进行性能测试。这边要记住,我用的是官方的那个例子,其他的写法也用过了,但是结果差不多。

4.再一次测试spring-cloud-gateway 网关路由性能

  step.1:性能测试,改一下配置,表示加入了过滤器。这里为什么只有一个过滤器,因为这个过滤器问题比较大,过程就略过了。

      - id: an
uri: http://10.1.4.32:14077/hello
predicates:
- Path=/an
filters:
- An

  经过多次测试,其他的过滤器都还好,只有修改response body的过滤器,严重影响性能,且有读写错误。

  step.2:测试,以及测试结果

[wrk@localhost wrk]$ ./wrk  -t 15 -c500 -d 10 --latency -s scripts/gateway.lua  http://10.1.4.32:14077/an
Running 10s test @ http://10.1.4.32:14077/an
15 threads and 500 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.03s 488.75ms 2.00s 60.62%
Req/Sec 26.59 13.84 80.00 67.60%
Latency Distribution
50% 931.54ms
75% 1.45s
90% 1.76s
99% 1.97s
3848 requests in 10.10s, 1.64MB read
Socket errors: connect 0, read 0, write 0, timeout 458
Requests/sec: 381.05
Transfer/sec: 166.71KB

结果多出一行,socket错误,而且还是超时,而且,日志中也存在错误:

--06T16::,|INFO ||AsyncResolver-bootstrap-executor-||||Resolving eureka endpoints via configuration
--06T16::,|ERROR||reactor-http-server-epoll-||||Unhandled failure: Connection has been closed, response already set (status=)
--06T16::,|WARN ||reactor-http-server-epoll-||||Handling completed with error: Connection has been closed
--06T16::,|ERROR||reactor-http-server-epoll-||||Unhandled failure: null, response already set (status=)
--06T16::,|WARN ||reactor-http-server-epoll-||||Handling completed with error: null
--06T16::,|ERROR||reactor-http-server-epoll-||||Unhandled failure: syscall:write(..) failed: 断开的管道, response already set (status=null)
--06T16::,|WARN ||reactor-http-server-epoll-||||Handling completed with error: syscall:write(..) failed: 断开的管道
--06T16::,|ERROR||reactor-http-server-epoll-||||Unhandled failure: syscall:write(..) failed: 断开的管道, response already set (status=null)
--06T16::,|WARN ||reactor-http-server-epoll-||||Handling completed with error: syscall:write(..) failed: 断开的管道
--06T16::,|INFO ||AsyncResolver-bootstrap-executor-||||Resolving eureka endpoints via configuration

这个问题很严重了,因为单个请求的时候,并不会报错,这个错误只发生在高并发压测下,无法追踪。最重要的是,我们看到性能只剩下300/s,这是万万不能接受的,生产更不能接收。

这个问题很难解释,因为我们采用的是官方提供的写法,我们回头看官方的修改response 类,好吧,不用看了,因为:

package org.springframework.cloud.gateway.filter.factory.rewrite;
/**
* This filter is BETA and may be subject to change in a future release.
*/
public class ModifyResponseBodyGatewayFilterFactory
extends AbstractGatewayFilterFactory<ModifyResponseBodyGatewayFilterFactory.Config> {

官方已经说了,这是测试版本,不顶用。

不死心,又想起了gateway提供的GlobalFilter,将刚才的代码写到全局过滤器中再试试,但是结果相同!

凉凉...

跪求结论跟我不同的启发文档,或者只能等下一版本了。

上一篇:sdut 2416:Fruit Ninja II(第三届山东省省赛原题,数学题)


下一篇:Python 默认值字典