1 为什么要升级为spring-cloud-gateway?
Spring Cloud Gateway features:
Built on Spring Framework 5, Project Reactor and Spring Boot 2.0
Able to match routes on any request attribute.
Predicates and filters are specific to routes.
Hystrix Circuit Breaker integration.
Spring Cloud DiscoveryClient integration
Easy to write Predicates and Filters
Request Rate Limiting
Path Rewriting
这是官方说的,spring gateway相对spring zuul要新很多,应用也更加*,开发体验更好。但是我在测试中发现,spring-cloud-gateway相对于zuul来说,更加出众的还是其性能,当然最后让我放弃的也是因为这一点。
网上的朋友也有做一些gateway和zuul的性能比较,大多的结论也是gateway要优于zuul,同时也更加稳定。
但是我们不能轻信,所以我也做了测试,这部分测试内容若不感兴趣可以跳过,zuul就不测试了。
2.spring-cloud-gateway的初步测试
step.1:测试准备:
1.gateway版本:2.0.1
2.服务主机:10.1.4.32,16G内存,4核虚拟机
3.测试客户端:10.1.4.34,16G内存,4核虚拟机
4.测试工具wrk
step.2:建立gateway工程并写两个测试http接口,
1.http://10.1.4.32:14077/hello [POST]
2.http://10.1.4.32:14077/test [GET]
step.3:开始测试
step.4:测试结果
[wrk@localhost wrk]$ ./wrk -t -c500 -d --latency http://10.1.4.32:14077/test
Running 10s test @ http://10.1.4.32:14077/test
threads and connections
Thread Stats Avg Stdev Max +/- Stdev
Latency .38ms .26ms .45ms 95.76%
Req/Sec .84k .44k .48k 89.50%
Latency Distribution
% .79ms
% .51ms
% .21ms
% .23ms
requests in .10s, .79MB read
Requests/sec: 160961.07
Transfer/sec: .05MB
以及:
[wrk@localhost wrk]$ ./wrk -t -c500 -d --latency -s scripts/gateway.lua http://10.1.4.32:14077/hello
Running 10s test @ http://10.1.4.32:14077/hello
threads and connections
Thread Stats Avg Stdev Max +/- Stdev
Latency .21ms .96ms .59ms 96.75%
Req/Sec .62k 604.79 .72k 88.48%
Latency Distribution
% .78ms
% .55ms
% .32ms
% .87ms
requests in .10s, .59MB read
Requests/sec: 98471.14
Transfer/sec: .43MB
说明,如果测试结果差别较大可能是因为测试工具的问题。
结果显示,POST方法的性能TPS达到了10W/s,而GET方法的性能TPS达到了16W/s。
这看起来很不可思议,因为正常的微服务,能达到2W/s的性能已经是良好,达到10W实在是不可思议。但是前面说了spring-cloud-gateway引入了Spring Reactor反应式编程,应对的便是这种高并发需求。
当然,即便spring-cloud-gateway给了我们很大惊喜,但是如果因此就引入了spring-cloud-gateway,那还是会有些草率,毕竟gateway是用来干什么的?是路由和过滤。继续测试。
step.5:加上路由和过滤器,在配置文件中加入下面内容
spring:
cloud:
gateway:
routes:
- id: test
uri: http://10.1.4.32:14077/test
predicates:
- Path=/tt
filters:
- AddRequestParameter=foo, bar
表示,给test方法加入了路由,并且加入了官方提供的过滤器:AddRequestParameter=foo, bar
step.6:测试,并附测试结果:
[wrk@localhost wrk]$ ./wrk -t -c500 -d --latency http://10.1.4.32:14077/tt
Running 10s test @ http://10.1.4.32:14077/tt
threads and connections
Thread Stats Avg Stdev Max +/- Stdev
Latency .99ms .15ms .69ms 70.84%
Req/Sec .82k 155.77 .36k 73.94%
Latency Distribution
% .03ms
% .49ms
% .02ms
% .13ms
requests in .10s, .25MB read
Requests/sec: 27182.88
Transfer/sec: .20MB
性能只剩27000/s,貌似降低了很多,但是比起zuul仍然快了不少。因为在这台机器上,测试zuul或许都不能到达2W。
那么,是不是就应该使用spring-cloud-gateway了?
3.开始使用spring-cloud-gateway
在使用上spring-cluod-gateway之后,我开始编辑自己的过滤器,需求要求写两个过滤器,修改请求体和响应体。
因为需要对特定的请求使用过滤器,所以这里使用gateway-filter,有些代码官方有,有些网友提供,两个过滤器代码大致如下:
解密过滤器,pre:
package com.newland.dc.ctid.fileter; import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.newland.dc.common.vo.RequestHeaderVo;
import com.newland.dc.ctid.entity.dto.RequestDto;
import io.netty.buffer.ByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.core.style.ToStringCreator;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference; /**
* Created by garfield on 2019/2/26.
*/
@Component
public class DecryptGatewayFilterFactory extends AbstractGatewayFilterFactory<DecryptGatewayFilterFactory.Config>{ private static Logger log = LoggerFactory.getLogger(DecryptGatewayFilterFactory.class); public static final String DECRYPT_HEADER = "decrypt_header"; public DecryptGatewayFilterFactory() {
super(Config.class);
} private Gson gson = new GsonBuilder().serializeNulls().create(); @Override
@SuppressWarnings("unchecked")
public GatewayFilter apply(Config config) {
return new DecryptGatewayFilter(config);
} public class DecryptGatewayFilter implements GatewayFilter, Ordered {
Config config; DecryptGatewayFilter(Config config) {
this.config = config;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.debug(config.toString());
ServerHttpRequest request = exchange.getRequest();
MediaType contentType = request.getHeaders().getContentType(); boolean postRequest = "POST".equalsIgnoreCase(request.getMethodValue()) && !contentType.toString().contains("multipart/form-data");
//判断是否为POST请求
if (postRequest) { Flux<DataBuffer> body = request.getBody();
AtomicReference<String> bodyRef = new AtomicReference<>();//缓存读取的request body信息
body.subscribe(dataBuffer -> {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());
DataBufferUtils.release(dataBuffer);
bodyRef.set(charBuffer.toString());
});//读取request body到缓存
String bodyStr = bodyRef.get();//获取request body
log.debug(bodyStr);//这里是我们需要做的操作
RequestDto requestDto = gson.fromJson(bodyStr, RequestDto.class);
log.debug("decrypt filter");
//save header to response header
RequestHeaderVo headerVo = requestDto.getHeader();
headerVo.setAppVersion("");
//此处可以传递一些变量
exchange.getResponse().getHeaders().add(DECRYPT_HEADER, gson.toJson(headerVo)); DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer); request = new ServerHttpRequestDecorator(request){
@Override
public Flux<DataBuffer> getBody() {
return bodyFlux;
}
};//封装我们的request
}
return chain.filter(exchange.mutate().request(request.mutate().header("a","").build()).build());
}; @Override
public int getOrder() {
return -;
}
} public static DataBuffer stringBuffer(String value) {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8); NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes);
return buffer;
} @Override
public ServerHttpRequest.Builder mutate(ServerHttpRequest request) {
return null;
} public static class Config {
private boolean decrypt; public boolean isDecrypt() {
return decrypt;
} public void setDecrypt(boolean decrypt) {
this.decrypt = decrypt;
} @Override
public String toString() {
return new ToStringCreator(this)
.append("decrypt", decrypt)
.toString();
}
} @Override
public List<String> shortcutFieldOrder() {
return Arrays.asList("decrypt");
}
}
加密过滤器,使用源码的提供的修改方法,post:
package com.newland.dc.ctid.fileter; import com.google.gson.Gson;
import com.newland.dc.common.vo.RequestHeaderVo;
import com.newland.dc.ctid.entity.dto.RequestDto;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory;
import org.springframework.core.style.ToStringCreator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono; import java.util.Arrays;
import java.util.List; /**
* @Auther: garfield
* @Date: 2019/3/5 15:33
* @Description:
*/
@Component
public class AnGatewayFilterFactory extends AbstractGatewayFilterFactory<AnGatewayFilterFactory.Config> { private Gson gson = new Gson(); public AnGatewayFilterFactory() {
super(Config.class);
} @Override
public GatewayFilter apply(Config config) { ModifyResponseBodyGatewayFilterFactory m1 = new ModifyResponseBodyGatewayFilterFactory(null);
ModifyResponseBodyGatewayFilterFactory.Config c1 = new ModifyResponseBodyGatewayFilterFactory.Config();
c1.setInClass(String.class);
c1.setOutClass(String.class);
c1.setNewContentType("application/json"); c1.setRewriteFunction((exchange, body) -> {
ServerWebExchange ex = (ServerWebExchange) exchange;
//此处更改响应体
RequestHeaderVo requestHeaderVo = new RequestHeaderVo();
RequestDto requestDto = gson.fromJson(body.toString(), RequestDto.class);
requestDto.setHeader(requestHeaderVo);
body = gson.toJson(requestDto);
return Mono.just(body);
});
return m1.apply(c1);
} public static class Config {
private boolean decrypt; public boolean isDecrypt() {
return decrypt;
} public void setDecrypt(boolean decrypt) {
this.decrypt = decrypt;
} @Override
public String toString() {
return new ToStringCreator(this)
.append("encrypt", decrypt)
.toString();
}
} @Override
public List<String> shortcutFieldOrder() {
return Arrays.asList("encrypt");
}
}
这里需要转移一下话题,这个过滤器修改其实有几种方法,可以自己写,也可以应用源码提供的例子。上面的两种写法已经测试都能使用,其实我还有两种方式,大同小异就是了,但也准备贴出来,也记录一下问题:
下面这个其实就是源码中的例子,只不过不引用,自己写:
@Override
@SuppressWarnings("unchecked")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(exchange.getResponse()) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
ServerHttpRequest request = exchange.getRequest(); MediaType originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(originalResponseContentType);
ResponseAdapter responseAdapter = new ResponseAdapter(body, httpHeaders);
DefaultClientResponse clientResponse = new DefaultClientResponse(responseAdapter, ExchangeStrategies.withDefaults()); Mono<DataBuffer> modifiedBody = clientResponse.bodyToMono(DataBuffer.class).map(encrypt(config, new RequestHeaderVo())); BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, DataBuffer.class);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, exchange.getResponse().getHeaders());
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
Flux<DataBuffer> messageBody = outputMessage.getBody();
HttpHeaders headers = getDelegate().getHeaders();
if (headers.getContentLength() < && !headers.containsKey(HttpHeaders.TRANSFER_ENCODING)) {
messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
}
return this.getDelegate().writeWith(messageBody);
}));
} @Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body)
.flatMapSequential(p -> p));
}
};
return chain.filter(exchange.mutate().response(responseDecorator).build()); } @Override
public int getOrder() {
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - ;
} } private Function<DataBuffer, DataBuffer> encrypt(Config config, RequestHeaderVo headerVo) {
if (config.encrypt) {
return (i) -> {
InputStream inputStream = i.asInputStream(); byte[] bytes = new byte[];
try {
bytes = new byte[inputStream.available()]; inputStream.read(bytes);
} catch (IOException e) {
e.printStackTrace();
}
//进行我们的操作
String body = new String(bytes);
log.debug("this is response encrypt");
log.debug(body);
log.debug(headerVo.toString());
// body = encryptService.responseEncrypt(body, headerVo); //进行我们的操作
return i.write(TokenGatewayFilterFactory.stringBuffer(body));
// return i.write(new String(body).getBytes()); }; } else {
return i -> i;
} }
这种例子中,发现修改response body的时候,会引起代码进入NioEventLoop类中的run方法,死循环无法退出,我也不清楚为什么,修改需谨慎。
另一种,跟这位网友写得差不多,只不过我没测试就是了:https://www.jianshu.com/p/9f00e0e1681c
@Override
@SuppressWarnings("unchecked")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange.mutate().response(new ServerHttpResponseDecorator(exchange.getResponse()) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
if (getStatusCode().equals(HttpStatus.OK) && body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = Flux.first(body);
return super.writeWith(fluxBody.map(dataBuffer -> {
System.out.println(dataBuffer.readableByteCount()); byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
//释放掉内存
DataBufferUtils.release(dataBuffer);
//responseData就是下游系统返回的内容,可以查看修改
String responseData = new String(content, Charset.forName("UTF-8")); log.debug("响应内容:{}", responseData);
log.debug("this is response encrypt");
System.out.println(responseData); byte[] newContent = responseData.getBytes();
// body = encryptService.responseEncrypt(body, headerVo);
byte[] uppedContent = new String(newContent, Charset.forName("UTF-8")).getBytes();
return bufferFactory.wrap(uppedContent);
}));
} else {
log.error("响应code异常:{}", getStatusCode());
}
return super.writeWith(body);
}
}).build());
} @Override
public int getOrder() {
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - ;
}
}
这个方法会出现问题,body的截取长度经常没有完全。
我本来是到这个网址下面寻找答案,作者是这样回复的:
上面只是简单的样例,FIux是发送多个数据的,当报文长时会拆分,处理一次只能拿到一部分报文,可以使用Flux.toArray方法将数据聚合后处理,也可以参照https://www.jianshu.com/p/9b781fb1aaa0里面的响应处理。
确实是这个问题,所以我们也可以仿照他的另外一个例子写,大家可以到他的简书博客中去看,值得提醒的是,他的例子中,版本也是2.0.1,若是版本改为2.1以上,就不能用哦!
这里蛮贴一下:
package com.newland.dc.ctid.fileter; import com.google.gson.Gson;
import com.newland.dc.common.vo.RequestHeaderVo;
import com.newland.dc.ctid.service.SecurityService;
import com.newland.dc.log.kafka.KafkaLog;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.support.*;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.style.ToStringCreator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseCookie;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Function; import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR; /**
* @Auther: garfield
* @Date: 2019/2/28 上午10:45
* @Description:
*/
@Component
public class EncryptGatewayFilterFactory extends AbstractGatewayFilterFactory<EncryptGatewayFilterFactory.Config> { private static Logger log = LoggerFactory.getLogger(EncryptGatewayFilterFactory.class); @Autowired
private SecurityService encryptService; public EncryptGatewayFilterFactory() {
super(Config.class);
} // private Gson gson = new GsonBuilder().serializeNulls().create();
private Gson gson = new Gson(); @Value("${server.host:10.10.10.10}")
private String serverHost; @Value("${server.port}")
private String serverPort; @Override
@SuppressWarnings("unchecked")
public GatewayFilter apply(Config config) {
return new EncryptGatewayFilter(config);
} @Override
public ServerHttpRequest.Builder mutate(ServerHttpRequest request) {
return null;
} public static class Config { private boolean encrypt; public boolean isEncrypt() {
return encrypt;
} public Config setEncrypt(boolean encrypt) {
this.encrypt = encrypt;
return this;
} @Override
public String toString() {
return new ToStringCreator(this)
.append("encrypt", encrypt)
.toString();
}
} @Override
public List<String> shortcutFieldOrder() {
return Arrays.asList("encrypt");
} public class EncryptGatewayFilter implements GatewayFilter, Ordered {
Config config; EncryptGatewayFilter(Config config) {
this.config = config;
} @Override
@SuppressWarnings("unchecked")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String trace = exchange.getRequest().getHeaders().getFirst("trace");
ServerRequest serverRequest = new DefaultServerRequest(exchange);
return serverRequest.bodyToMono(String.class).flatMap(reqBody -> {
//重写原始请求
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
return httpHeaders;
} @Override
public Flux<DataBuffer> getBody() {
//打印原始请求日志
log.info("[Trace:{}]-gateway request:headers=[{}],body=[{}]", trace, getHeaders(), reqBody);
return Flux.just(reqBody).map(bx -> exchange.getResponse().bufferFactory().wrap(bx.getBytes()));
}
};
//重写原始响应
BodyHandlerServerHttpResponseDecorator responseDecorator = new BodyHandlerServerHttpResponseDecorator(
initBodyHandler(exchange), exchange.getResponse()); return chain.filter(exchange.mutate().request(decorator).response(responseDecorator).build());
});
} @Override
public int getOrder() {
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - ;
} } public interface BodyHandlerFunction
extends BiFunction<ServerHttpResponse, Publisher<? extends DataBuffer>, Mono<Void>> {
} protected BodyHandlerFunction initBodyHandler(ServerWebExchange exchange) {
return (resp, body) -> {
//拦截
MediaType originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(originalResponseContentType);
DefaultClientResponseAdapter clientResponseAdapter = new DefaultClientResponseAdapter(body, httpHeaders);
Mono<String> bodyMono = clientResponseAdapter.bodyToMono(String.class);
//此处可以获得前面放置的参数
return bodyMono.flatMap((respBody) -> {
// 打印返回响应日志
System.out.println(respBody);
return resp.writeWith(Flux.just(respBody).map(bx -> resp.bufferFactory().wrap(bx.getBytes())));
}).then();
};
} public static class DefaultClientResponseAdapter extends DefaultClientResponse { /**
* @param body
* @param httpHeaders
*/
public DefaultClientResponseAdapter(Publisher<? extends DataBuffer> body,
HttpHeaders httpHeaders) {
this(new ResponseAdapter(body, httpHeaders),
ExchangeStrategies.withDefaults());
} /**
* @param response
* @param strategies
*/
public DefaultClientResponseAdapter(ClientHttpResponse response,
ExchangeStrategies strategies) {
super(response, strategies);
} /**
* ClientHttpResponse 适配器
*/
static class ResponseAdapter implements ClientHttpResponse {
/**
* 响应数据
*/
private final Flux<DataBuffer> flux;
/**
*
*/
private final HttpHeaders headers; public ResponseAdapter(Publisher<? extends DataBuffer> body,
HttpHeaders headers) {
this.headers = headers;
if (body instanceof Flux) {
flux = (Flux) body;
} else {
flux = ((Mono) body).flux();
}
} @Override
public Flux<DataBuffer> getBody() {
return flux;
} @Override
public HttpHeaders getHeaders() {
return headers;
} @Override
public HttpStatus getStatusCode() {
return null;
} @Override
public int getRawStatusCode() {
return ;
} @Override
public MultiValueMap<String, ResponseCookie> getCookies() {
return null;
}
}
} class BodyHandlerServerHttpResponseDecorator extends ServerHttpResponseDecorator { /**
* body 处理拦截器
*/
private BodyHandlerFunction bodyHandler = initDefaultBodyHandler(); /**
* 构造函数
*
* @param bodyHandler
* @param delegate
*/
public BodyHandlerServerHttpResponseDecorator(BodyHandlerFunction bodyHandler, ServerHttpResponse delegate) {
super(delegate);
if (bodyHandler != null) {
this.bodyHandler = bodyHandler;
}
} @Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
//body 拦截处理器处理响应
return bodyHandler.apply(getDelegate(), body);
} @Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMapSequential(p -> p));
} /**
* 默认body拦截处理器
*
* @return
*/
private BodyHandlerFunction initDefaultBodyHandler() {
return (resp, body) -> resp.writeWith(body);
}
}
}
那么万事具备,代码都写好了,我们又需要进行性能测试。这边要记住,我用的是官方的那个例子,其他的写法也用过了,但是结果差不多。
4.再一次测试spring-cloud-gateway 网关路由性能
step.1:性能测试,改一下配置,表示加入了过滤器。这里为什么只有一个过滤器,因为这个过滤器问题比较大,过程就略过了。
- id: an
uri: http://10.1.4.32:14077/hello
predicates:
- Path=/an
filters:
- An
经过多次测试,其他的过滤器都还好,只有修改response body的过滤器,严重影响性能,且有读写错误。
step.2:测试,以及测试结果
[wrk@localhost wrk]$ ./wrk -t 15 -c500 -d 10 --latency -s scripts/gateway.lua http://10.1.4.32:14077/an
Running 10s test @ http://10.1.4.32:14077/an
15 threads and 500 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.03s 488.75ms 2.00s 60.62%
Req/Sec 26.59 13.84 80.00 67.60%
Latency Distribution
50% 931.54ms
75% 1.45s
90% 1.76s
99% 1.97s
3848 requests in 10.10s, 1.64MB read
Socket errors: connect 0, read 0, write 0, timeout 458
Requests/sec: 381.05
Transfer/sec: 166.71KB
结果多出一行,socket错误,而且还是超时,而且,日志中也存在错误:
--06T16::,|INFO ||AsyncResolver-bootstrap-executor-||||Resolving eureka endpoints via configuration
--06T16::,|ERROR||reactor-http-server-epoll-||||Unhandled failure: Connection has been closed, response already set (status=)
--06T16::,|WARN ||reactor-http-server-epoll-||||Handling completed with error: Connection has been closed
--06T16::,|ERROR||reactor-http-server-epoll-||||Unhandled failure: null, response already set (status=)
--06T16::,|WARN ||reactor-http-server-epoll-||||Handling completed with error: null
--06T16::,|ERROR||reactor-http-server-epoll-||||Unhandled failure: syscall:write(..) failed: 断开的管道, response already set (status=null)
--06T16::,|WARN ||reactor-http-server-epoll-||||Handling completed with error: syscall:write(..) failed: 断开的管道
--06T16::,|ERROR||reactor-http-server-epoll-||||Unhandled failure: syscall:write(..) failed: 断开的管道, response already set (status=null)
--06T16::,|WARN ||reactor-http-server-epoll-||||Handling completed with error: syscall:write(..) failed: 断开的管道
--06T16::,|INFO ||AsyncResolver-bootstrap-executor-||||Resolving eureka endpoints via configuration
这个问题很严重了,因为单个请求的时候,并不会报错,这个错误只发生在高并发压测下,无法追踪。最重要的是,我们看到性能只剩下300/s,这是万万不能接受的,生产更不能接收。
这个问题很难解释,因为我们采用的是官方提供的写法,我们回头看官方的修改response 类,好吧,不用看了,因为:
package org.springframework.cloud.gateway.filter.factory.rewrite;
/**
* This filter is BETA and may be subject to change in a future release.
*/
public class ModifyResponseBodyGatewayFilterFactory
extends AbstractGatewayFilterFactory<ModifyResponseBodyGatewayFilterFactory.Config> {
官方已经说了,这是测试版本,不顶用。
不死心,又想起了gateway提供的GlobalFilter,将刚才的代码写到全局过滤器中再试试,但是结果相同!
凉凉...
跪求结论跟我不同的启发文档,或者只能等下一版本了。