如何在Netty服务器上实施传入HTTP请求的GZIP减压
#java #springboot #netty #gzip

免责声明

本文代表了我对该解决方案的看法,因此,任何建议,修复或讨论都将受到高度赞赏。

短篇小说

我们使用Spring Boot和Spring WebFlux使用Netty HTTP服务器和客户端运行我们的平台。如您所知,Netty没有开箱即用的解决方案将传入的HTTP请求解压缩到HTTP服务器,因此我需要使用Netty功能来“重新发明轮子”。

实施

有几种方法可以在Netty中对传入请求进行减压:

  • 通过WebFilter
  • 通过添加到Netty事件循环的自定义处理程序
  • 通过控制器本身

最方便,最灵活的选项是使用具有高优先配置的WebFilter接口,因为这种方法使我们不仅可以通过其元数据来过滤请求,还可以通过我们从客户端收到的请求数据来过滤。

根据MSDN文档(Compression in HTTP),为了启动有效负载的减压,服务器必须从客户端获得HTTP Header Content-Encoding: gzip。因此,我们的第一个目标是认识到在请求中存在这样的标头。让我们创建一个实用程序类CompressionUtils,该类将拥有一种静态方法来验证该标头的存在和另一种静态方法,该方法将将GZIP输入流转换为字节数组,以进一步将此数组传输到NetTy Pipeline中:

import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;

import lombok.AccessLevel;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;

import static java.util.Objects.nonNull;
import static org.apache.commons.io.IOUtils.copy;
import static org.springframework.http.HttpHeaders.CONTENT_ENCODING;
import static org.springframework.util.CollectionUtils.isEmpty;

@Component
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CompressionUtils {

    public static final String GZIP = "gzip";
    public static final String UNKNOWN = "unknown";

    public static byte[] getDeflatedBytes(InputStream inputStream) throws IOException {
        String string = IOUtils.toString(inputStream, UTF_8);
        return string.getBytes();
    }

    public static boolean isGzipRequest(ServerHttpRequest serverHttpRequest) {
        return containsGzip(serverHttpRequest, CONTENT_ENCODING);
    }

    public static boolean isGzipResponseRequired(ServerHttpRequest serverHttpRequest) {
        return containsGzip(serverHttpRequest, ACCEPT_ENCODING);
    }

    private static boolean containsGzip(ServerHttpRequest serverHttpRequest, String headerName) {
        HttpHeaders headers = serverHttpRequest.getHeaders();
        if (!isEmpty(headers)) {
            String header = headers.getFirst(headerName);
            return nonNull(header) && header.contains(GZIP);
        }

        return false;
    }

}

为了防止将传入输入流的转换转换为字节数组,我使用了Apache Commons IO Project,并且static toString方法使用StringBuilder类。

我们实施的第二阶段是将传入的ServerHttpRequest实例包含在我们的实现中。这样,我们可以保留传入请求的源数据并覆盖getBody方法。 Netty管道调用此方法将传入的数据转换为Netty的DataBuffer包装器。因此,这是一种交换流量的方便方法,而不是将其与自定义处理程序集成到Netty流中。通过WebFilter自定义它是一种更清洁的方法,它不会影响所有传入请求,并允许我们控制解压缩过程(使用错误处理和指标)。

所以,我们的包装器类GzipServerHttpRequest具有以下逻辑的主要方法getBody

  1. 获取源服务器http请求的主体。
  2. 将其转换为InputStream
  3. 将反应性的输入流转换为SequenceInputStream,以获取整个帖子请求主体。
  4. GZIPInputStream解压缩gzip身体。
  5. 再次将最终字节阵列包装到Netty DataBuffer中。
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.RequestPath;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.SslInfo;
import org.springframework.util.MultiValueMap;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;

import lombok.RequiredArgsConstructor;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Instant;
import java.util.zip.GZIPInputStream;

@SuppressWarnings("NullableProblems")
@RequiredArgsConstructor
public class GzipServerHttpRequest implements ServerHttpRequest {

    private final ServerHttpRequest serverHttpRequest;

    @Override
    public String getId() {
        return serverHttpRequest.getId();
    }

    @Override
    public RequestPath getPath() {
        return serverHttpRequest.getPath();
    }

    @Override
    public MultiValueMap<String, String> getQueryParams() {
        return serverHttpRequest.getQueryParams();
    }

    @Override
    public MultiValueMap<String, HttpCookie> getCookies() {
        return serverHttpRequest.getCookies();
    }

    @Override
    public String getMethodValue() {
        return serverHttpRequest.getMethodValue();
    }

    @Override
    public URI getURI() {
        return serverHttpRequest.getURI();
    }

    @Override
    public HttpHeaders getHeaders() {
        return serverHttpRequest.getHeaders();
    }

    @Override
    public InetSocketAddress getRemoteAddress() {
        return serverHttpRequest.getRemoteAddress();
    }

    @Override
    public SslInfo getSslInfo() {
        return serverHttpRequest.getSslInfo();
    }

    @Override
    public HttpMethod getMethod() {
        return serverHttpRequest.getMethod();
    }

    @Override
    public Flux<DataBuffer> getBody() {
        final Instant startTime = Instant.now();
        return serverHttpRequest.getBody()
                .map(dataBuffer -> dataBuffer.asInputStream(true))
                .reduce(SequenceInputStream::new)
                .handle(this::decompress)
                .flux();
    }

    private void decompress(InputStream inputStream, SynchronousSink<DataBuffer> sink) {
        try (var gzipInputStream = new GZIPInputStream(inputStream)) {
            byte[] deflatedBytes = getDeflatedBytes(gzipInputStream);
            sink.next(new DefaultDataBufferFactory().wrap(deflatedBytes));
        } catch (Exception exception) {
            sink.error(getException());
        }
    }

    private IllegalGzipRequestException getException() {
        String exceptionMessage = String.format("Decompression of a gzip content failed, URI: [%s]", serverHttpRequest.getURI());
        return new IllegalGzipRequestException(exceptionMessage);
    }

}

一旦我们拥有主要的实现,我们现在可以添加GzipDecompressionFilter以使用包装的ServerHttpRequest的新实例将我们的ServerWebExchange变异:

import lombok.extern.slf4j.Slf4j;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;

import java.time.Instant;

import static java.util.Objects.isNull;
import static org.apache.commons.lang3.exception.ExceptionUtils.getMessage;
import static org.springframework.http.HttpHeaders.CONTENT_ENCODING;
import static org.springframework.http.HttpStatus.OK;
import static org.springframework.util.CollectionUtils.isEmpty;

@Slf4j
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class GzipDecompressionFilter implements WebFilter {

    @SuppressWarnings("NullableProblems")
    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
        if (!isGzipRequest(serverWebExchange.getRequest()))
            return webFilterChain.filter(serverWebExchange);

        final Instant startTime = Instant.now();
        ServerWebExchange mutatedWebExchange = getMutatedWebExchange(serverWebExchange);
        return webFilterChain
                .filter(mutatedWebExchange)
                .onErrorResume(this::logError);
    }

    private ServerWebExchange getMutatedWebExchange(ServerWebExchange serverWebExchange) {
        ServerHttpRequest mutatedHttpRequest = new GzipServerHttpRequest(serverWebExchange.getRequest());
        return serverWebExchange
                .mutate()
                .request(mutatedHttpRequest)
                .build();
    }

    private Mono<Void> logError(Throwable exception) {
        log.error("Gzip decompressed HTTP request failed, exception: [{}]", getMessage(exception));
        return Mono.empty();
    }

}

我们将添加的最后一类是我们的自定义RuntimeException IllegalGzipRequestException,以识别我们使用的第三方系统中此类型的错误,我们用于拦截服务中的异常:

public class IllegalGzipRequestException extends RuntimeException {

    public IllegalGzipRequestException(String message) {
        super(message);
    }

}

如果我们的客户希望获得压缩响应,我们可以通过application.properties文件轻松添加此实现:

server.compression.enabled=true
server.compression.min-response-size=1KB

资源

  1. Compression in HTTP
  2. Interface WebFilter
  3. Project Lombok
  4. Class Compression
  5. Netty HttpContentEncoder