Gateway自定义全局过滤器

背景

最近在做项目架构重构的工作,主要是将之前的php框架迁移到java框架里来,需要对之前的接口做兼容,因为之前接口访问对参数做了编码,需要在网关中做统一处理,且对于所有访问的请求做日志记录,解决方案就用到写全局过滤器的方式来处理。其中对于请求数据的流需要多次获取并修改,但是SpringBoot版本过高会致使RequestBody不能重复读,因此也用到了缓存请求数据流的方式来处理。

缓存数据流

package com.feibaiedu.common.gateway.filter;


import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * @Description: 解决由于SpringBoot版本高而致使RequestBody不能重复读的问题
 */
@Component
public class CacheBodyGlobalFilter implements Ordered, GlobalFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        //此处,如果是post请求且没有任何参数,如果用下面缓存的方法则不会流入到下层业务,直接放行即可
        if (exchange.getRequest().getHeaders().getContentType() == null ||
                (exchange.getRequest().getMethod() == HttpMethod.POST
                && exchange.getRequest().getHeaders().getContentLength() == 0)) {
            return chain.filter(exchange);
        } else {
            Mono<Void> ret = DataBufferUtils.join(exchange.getRequest().getBody())
                    .flatMap(dataBuffer -> {
                        DataBufferUtils.retain(dataBuffer);
                        Flux<DataBuffer> cachedFlux = Flux.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
                        ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
                                exchange.getRequest()) {
                            @Override
                            public Flux<DataBuffer> getBody() {
                                return cachedFlux;
                            }
                        };
                        return chain.filter(exchange.mutate().request(mutatedRequest).build());
                    });
            return ret;
        }
    }

    @Override
    public int getOrder() {
        return 0;
    }
}

全局过滤器

package com.feibaiedu.common.gateway.filter;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBufAllocator;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URIBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;


/**
 * @Author: Liu Jibin
 * @Date: Created in 18:09 2021/6/4
 * @ModifiedBy:
 */
@Slf4j
@Component
public class DecodeGlobalGatewayFilter implements GlobalFilter, Ordered {
    private final DataBufferFactory dataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);

    @Autowired
    private ObjectMapper objectMapper;

    boolean headerChangeFlag = false;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        System.out.println("DecodeGlobalGatewayFilter");
        ServerHttpRequest serverHttpRequest = exchange.getRequest();
        HttpMethod method = serverHttpRequest.getMethod();
        String contentType = serverHttpRequest.getHeaders().getFirst(HttpHeaders.CONTENT_TYPE);
        long contentLength = serverHttpRequest.getHeaders().getContentLength();

        //如果没有参数,直接放行
        if (method == HttpMethod.POST && contentLength == 0) {
            return chain.filter(exchange);
        }
        //json类型,直接放行
        if (method == HttpMethod.POST && isInclusionRelation(MediaType.APPLICATION_JSON_VALUE, contentType)) {
            return chain.filter(exchange);
        }

        //post请求时,如果是文件上传之类的请求,不修改请求消息体
        if (method == HttpMethod.POST && (isInclusionRelation(MediaType.APPLICATION_FORM_URLENCODED_VALUE, contentType) ||
                isInclusionRelation(MediaType.APPLICATION_JSON_VALUE, contentType) )) {
            //从请求里获取Post请求体
            String bodyStr = resolveBodyFromRequest(serverHttpRequest);

            // 这种处理方式,必须保证post请求时,原始post表单必须有数据过来,不然会报错
            if (StringUtils.isBlank(bodyStr)) {
                /*ServerHttpResponse response = exchange.getResponse();
                response.setStatusCode(HttpStatus.BAD_REQUEST);
                return response.setComplete();*/
                bodyStr = "time=MTIzNA%3D%3D";
            }

            //application/x-www-form-urlencoded和application/json才添加参数
            //其他上传文件之类的,不做参数处理,因为文件流添加参数,文件原格式就会出问题了
            if (isInclusionRelation(MediaType.APPLICATION_JSON_VALUE, contentType)) {
                // json,增加参数
                JSONObject json = null;
                if (StringUtils.isBlank(bodyStr)) {
                    json = new JSONObject();
                } else {
                    json = JSON.parseObject(bodyStr); //创建jsonObject对象
                }
//                json.addProperty(ServiceConstants.COMMON_PARAMETER_ENTERPRISEID,authenticationVO.getEnterpriseId());
//                json.addProperty(ServiceConstants.COMMON_PARAMETER_USERID,authenticationVO.getUserId());
                // 转换回字符串
                bodyStr = JSON.toJSONString(json);
            }
            if (isInclusionRelation(MediaType.APPLICATION_FORM_URLENCODED_VALUE, contentType)) {
                // 普通键值对,取出参数,拼接成json传输
                URIBuilder newBuilder = null;
                try {
                    newBuilder = new URIBuilder("http://example.com/?" + bodyStr);
                    //获取键值对列表
                    JSONObject jsonObject = new JSONObject();
                    List<NameValuePair> params = newBuilder.getQueryParams();
                    Base64.Decoder decoder = Base64.getDecoder();
                    for (NameValuePair item : params) {
                        byte[] decode = decoder.decode(item.getValue());
                        //jsonObject.put(item.getName(), new String(decode));
                        //jsonObject.put(item.getName(), s);
                        if (toJsonArray(decode) != null) {
                            jsonObject.put(item.getName(), toJsonArray(decode));
                        }
                        else {
                            jsonObject.put(item.getName(), new String(decode));
                        }
                    }
                    bodyStr = JSON.toJSONString(jsonObject);
                    System.out.println(bodyStr);
                    VisitLogOut(exchange, bodyStr);
                    contentType = MediaType.APPLICATION_JSON_VALUE;
                    headerChangeFlag = true;
                    //serverHttpRequest = serverHttpRequest.mutate().header("Content-Type","application/json").build();

                } catch (URISyntaxException e) {
                    e.printStackTrace();
                }

                //bodyStr = String.format(bodyStr+"&%s=%s&%s=%s",ServiceConstants.COMMON_PARAMETER_ENTERPRISEID,authenticationVO.getEnterpriseId(),ServiceConstants.COMMON_PARAMETER_USERID,authenticationVO.getUserId());

            }

            //记录日志
            //logger.info("全局参数处理: {} url:{} 参数:{}", method.toString(), serverHttpRequest.getURI().getRawPath(), bodyStr);

            //下面的将请求体再次封装写回到request里,传到下一级,否则,由于请求体已被消费,后续的服务将取不到值
            URI uri = serverHttpRequest.getURI();
            URI newUri = UriComponentsBuilder.fromUri(uri).build(true).toUri();
            ServerHttpRequest request = exchange.getRequest().mutate().uri(newUri).build();
            DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
            Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);

            // 定义新的消息头
            HttpHeaders headers = new HttpHeaders();
            headers.putAll(exchange.getRequest().getHeaders());

            // 添加消息头
            //headers.set(ServiceConstants.SHIRO_SESSION_PRINCIPALS,GsonUtils.toJson(authenticationVO));

            // 由于修改了传递参数,需要重新设置CONTENT_LENGTH,长度是字节长度,不是字符串长度
            int length = bodyStr.getBytes().length;
            headers.remove(HttpHeaders.CONTENT_LENGTH);
            headers.setContentLength(length);

            // 设置CONTENT_TYPE
            if (StringUtils.isNotBlank(contentType)) {
                headers.set(HttpHeaders.CONTENT_TYPE, contentType);
            }

            // 由于post的body只能订阅一次,由于上面代码中已经订阅过一次body。所以要再次封装请求到request才行,不然会报错请求已经订阅过
            request = new ServerHttpRequestDecorator(request) {
                @Override
                public HttpHeaders getHeaders() {
                    long contentLength = headers.getContentLength();
                    HttpHeaders httpHeaders = new HttpHeaders();
                    httpHeaders.putAll(super.getHeaders());
                    //封装request,传给下一级
                    if (headerChangeFlag) {
                        httpHeaders.set(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
                    }
                    if (contentLength > 0) {
                        httpHeaders.setContentLength(contentLength);
                    } else {
                        httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
                    }
                    return httpHeaders;
                }

                @Override
                public Flux<DataBuffer> getBody() {
                    return bodyFlux;
                }
            };
            request.mutate().header(HttpHeaders.CONTENT_LENGTH, Integer.toString(bodyStr.length()));
            return chain.filter(exchange.mutate().request(request).build());
        } else if (method == HttpMethod.GET) {
            //记录日志
            //logger.info("全局参数处理: {} url:{} 参数:{}",method.toString(),serverHttpRequest.getURI().getRawPath(),newRequestQueryParams.toString());
            // 获取原参数
            URI uri = serverHttpRequest.getURI();
            StringBuilder query = new StringBuilder();
            String originalQuery = uri.getRawQuery();
            if (org.springframework.util.StringUtils.hasText(originalQuery)) {
                query.append(originalQuery);
                if (originalQuery.charAt(originalQuery.length() - 1) != ‘&‘) {
                    query.append(‘&‘);
                }
            }
            // 添加查询参数
            //query.append(ServiceConstants.COMMON_PARAMETER_ENTERPRISEID+"="+authenticationVO.getEnterpriseId() +"&"+ServiceConstants.COMMON_PARAMETER_USERID+"="+authenticationVO.getUserId());

            // 替换查询参数
            URI newUri = UriComponentsBuilder.fromUri(uri)
                    .replaceQuery(query.toString())
                    .build(true)
                    .toUri();

            ServerHttpRequest request = exchange.getRequest().mutate().uri(newUri).build();
            return chain.filter(exchange.mutate().request(request).build());
        }

        return chain.filter(exchange);

    }

    // 访问记录: 打印入参日志
    private void VisitLogOut(ServerWebExchange exchange, String bodyStr) {
        String split = "\n-----------------------------------------------------------------------------";

        StringBuilder reqMsg = new StringBuilder();
        // 获取请求信息
        ServerHttpRequest request = exchange.getRequest();
        InetSocketAddress address = request.getRemoteAddress();
        String method = request.getMethodValue();
        URI uri = request.getURI();
        HttpHeaders headers = request.getHeaders();
        // 获取请求query
        Map queryMap = request.getQueryParams();
        String query = JSON.toJSONString(queryMap);
        // 拼接请求日志
        reqMsg.append("\n header=").append(headers);
        reqMsg.append("\n query=").append(query);
        reqMsg.append("\n params=").append(bodyStr);
        reqMsg.append("\n address=").append(address.getAddress()).append(" & port=" + address.getPort());
        reqMsg.append("\n method=").append(method);
        reqMsg.append("\n url=").append(uri.getPath());
        reqMsg.append(split);
        log.trace(reqMsg.toString());
    }

    private class InputStreamHolder {
        InputStream inputStream;
    }

    private boolean isInclusionRelation(String source, String target) {
        if (source.length() == target.length() && source.equalsIgnoreCase(target)) {
            return true;
        }
        if (source.length() < target.length()) {
            String sub = target.substring(0, source.length());
            if (source.equalsIgnoreCase(sub)) {
                return true;
            }
        }
        return false;
    }


    /**
     * 从Flux<DataBuffer>中获取字符串的方法
     *
     * @return 请求体
     */
    private String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest) {
        //获取请求体
        Flux<DataBuffer> body = serverHttpRequest.getBody();
        AtomicReference<String> bodyRef = new AtomicReference<>();
        body.subscribe(buffer -> {
            CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
            DataBufferUtils.release(buffer);
            bodyRef.set(charBuffer.toString());
        });
        //获取request body
        return bodyRef.get();
    }

    /**
     * 字符串转DataBuffer
     *
     * @param value
     * @return
     */
    private DataBuffer stringBuffer(String value) {
        byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
        NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
        DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
        buffer.write(bytes);
        return buffer;
    }

    /**
     * json数组转byte []
     */
    public byte[] toByteArray(JSONArray obj) {
        return obj.toString().getBytes();
    }

    /**
     * byte []转json数组
     */
    public JSONArray toJsonArray(byte [] bytes) {
        try {
            return JSON.parseArray(new String(bytes));
        }
        catch (Exception e) {
            return null;
        }
    }


    @Override
    public int getOrder() {
        return 1;
    }
}

Gateway自定义全局过滤器

上一篇:格式转换器:将任意文件类型转换为 "dds"等格式


下一篇:UVa1200 - A DP Problem 题解