我使用的gateway版本是2020.0.1,在通信时使用到了websocket。本来用得好好的,结果在某天出现了异常:Max frame length of 65536 has been exceeded。
看报错信息就知道是因为websocket的帧超过了默认的65536限制,这个限制可以在源码中的这个类 reactor.netty.http.websocket.WebsocketSpec 中看得到。
本来我也没想着自己去解决,因为这种问题一般来说大家都可能会遇到,网上应该有很多的解决办法。然而事实上,我在网上搜了半天,搜索到的结果几乎都是同一篇文章,而且过程相当麻烦,又是继承重写,又是引包改配置,直接把我劝退了。
于是,我开始了自己的探索。
在我的debug大法下,从升级握手开始追踪。找到了我们的适配器,这个一般是写在配置中
@Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); }
里面有个handle方法,用来提交数据
public WebSocketHandlerAdapter() { this(new HandshakeWebSocketService()); } public WebSocketHandlerAdapter(WebSocketService webSocketService) { Assert.notNull(webSocketService, "'webSocketService' is required"); this.webSocketService = webSocketService; } @Override public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) { WebSocketHandler webSocketHandler = (WebSocketHandler) handler; return getWebSocketService().handleRequest(exchange, webSocketHandler).then(Mono.empty()); }
注意到,还有两个构造方法,其中一个可以传入一个WebSocketService实例,这里举例的实例是HandshakeWebSocketService实例。
我们按照代码追踪,进入到handleRequest方法内部
@Override public Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler) { ServerHttpRequest request = exchange.getRequest(); HttpMethod method = request.getMethod(); HttpHeaders headers = request.getHeaders(); if (HttpMethod.GET != method) { return Mono.error(new MethodNotAllowedException( request.getMethodValue(), Collections.singleton(HttpMethod.GET))); } if (!"WebSocket".equalsIgnoreCase(headers.getUpgrade())) { return handleBadRequest(exchange, "Invalid 'Upgrade' header: " + headers); } List<String> connectionValue = headers.getConnection(); if (!connectionValue.contains("Upgrade") && !connectionValue.contains("upgrade")) { return handleBadRequest(exchange, "Invalid 'Connection' header: " + headers); } String key = headers.getFirst(SEC_WEBSOCKET_KEY); if (key == null) { return handleBadRequest(exchange, "Missing \"Sec-WebSocket-Key\" header"); } String protocol = selectProtocol(headers, handler); return initAttributes(exchange).flatMap(attributes -> this.upgradeStrategy.upgrade(exchange, handler, protocol, () -> createHandshakeInfo(exchange, request, protocol, attributes)) ); }
注意到,最后调用了upgradeStrategy的upgrade的方法,而这个upgradeStrategy是RequestUpgradeStrategy的一个实现类,也是是可以在创建WebSocketService实例的时候传入的。
public HandshakeWebSocketService(RequestUpgradeStrategy upgradeStrategy) { Assert.notNull(upgradeStrategy, "RequestUpgradeStrategy is required"); this.upgradeStrategy = upgradeStrategy; }
那么,我们就进入这个upgrade方法看看,我这里举例的实例是RequestUpgradeStrategy的一个实现类ReactorNettyRequestUpgradeStrategy。
@Override public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler, @Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) { ServerHttpResponse response = exchange.getResponse(); HttpServerResponse reactorResponse = ServerHttpResponseDecorator.getNativeResponse(response); HandshakeInfo handshakeInfo = handshakeInfoFactory.get(); NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); URI uri = exchange.getRequest().getURI(); // Trigger WebFlux preCommit actions and upgrade return response.setComplete() .then(Mono.defer(() -> { WebsocketServerSpec spec = buildSpec(subProtocol); return reactorResponse.sendWebsocket((in, out) -> { ReactorNettyWebSocketSession session = new ReactorNettyWebSocketSession( in, out, handshakeInfo, bufferFactory, spec.maxFramePayloadLength()); return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]"); }, spec); })); }
到了这里,并没有发现什么异样。但是注意到这样一句代码
WebsocketServerSpec spec = buildSpec(subProtocol);
而这个叫buildSpec的实现就就在类里面
WebsocketServerSpec buildSpec(@Nullable String subProtocol) { WebsocketServerSpec.Builder builder = this.specBuilderSupplier.get(); if (subProtocol != null) { builder.protocols(subProtocol); } if (this.maxFramePayloadLength != null) { builder.maxFramePayloadLength(this.maxFramePayloadLength); } if (this.handlePing != null) { builder.handlePing(this.handlePing); } return builder.build(); }
可以看到,WebsocketServerSpec是由一个specBuilderSupplier构建出来的,那么specBuilderSupplier它是怎么来的呢?答案就在代码上面的构造方法中。是的,这个specBuilderSupplier他除了默认的构建之外,也能从外部传进来
public ReactorNettyRequestUpgradeStrategy() { this(WebsocketServerSpec::builder); } public ReactorNettyRequestUpgradeStrategy(Supplier<WebsocketServerSpec.Builder> builderSupplier) { Assert.notNull(builderSupplier, "WebsocketServerSpec.Builder is required"); this.specBuilderSupplier = builderSupplier; }
点进这个WebsocketServerSpec看一下
public interface WebsocketServerSpec extends WebsocketSpec { static WebsocketServerSpec.Builder builder() { return new WebsocketServerSpec.Builder(); } public static final class Builder extends reactor.netty.http.websocket.WebsocketSpec.Builder<WebsocketServerSpec.Builder> { private Builder() { } public final WebsocketServerSpec build() { return new WebsocketServerSpecImpl(this); } } }
进入其父类WebsocketSpec看一下,是不是发现了我们想要的东西
public interface WebsocketSpec { ...... public static class Builder<SPEC extends WebsocketSpec.Builder<SPEC>> implements Supplier<SPEC> { int maxFramePayloadLength = 65536; ...... } ...... }
那么,这一长串的路径就理通畅了,如果没看明白的话,请自己去追踪一下代码。
- WebSocketHandlerAdapter内部有一个WebSocketService实例,调用handleRequest方法提交数据
- WebSocketService实例内部有一个RequestUpgradeStrategy实例,调用RequestUpgradeStrategy的upgrade方法
- RequestUpgradeStrategy内部有一个WebsocketServerSpec.Builder的实例,用来构建参数
- 并且,也是最重要的,每一环,都可以将需要的实例作为构造参数直接传进去,而不使用默认的。
一通分析之后,最终的解决办法也很简单,在你的配置类中修改配置成如下状态
@Bean public WebSocketHandlerAdapter handlerAdapter() {
// 这里可以根据自己的实际情况决定使用哪种实现类和实现方式,数字可以改成可配置的 注意,这个websocketProperties是我自己的写的配置属性类,不要照搬过去啊 int frameSizeLimit = webSocketProperties().getFrameSizeLimit() <= 0 ? 65536 : webSocketProperties().getFrameSizeLimit(); WebsocketServerSpec.Builder builder = WebsocketServerSpec.builder().maxFramePayloadLength(frameSizeLimit); RequestUpgradeStrategy upgradeStrategy = new ReactorNettyRequestUpgradeStrategy(builder); return new WebSocketHandlerAdapter(new HandshakeWebSocketService(upgradeStrategy)); }
到这里,你的项目应该不会再报出Max frame length of 65536 has been exceeded这种问题了,希望我的解决办法能够帮到大家!