免责声明
本文代表了我对该解决方案的看法,因此,任何建议,修复或讨论都将受到高度赞赏。
短篇小说
我们使用Spring Boot和Spring WebFlux使用Netty HTTP服务器和客户端运行我们的平台。如您所知,Netty没有开箱即用的解决方案将传入的HTTP请求解压缩到HTTP服务器,因此我需要使用Netty功能来“重新发明轮子”。
实施
有几种方法可以在Netty中对传入请求进行减压:
- 通过WebFilter
- 通过添加到Netty事件循环的自定义处理程序
- 通过控制器本身
最方便,最灵活的选项是使用具有高优先配置的WebFilter
接口,因为这种方法使我们不仅可以通过其元数据来过滤请求,还可以通过我们从客户端收到的请求数据来过滤。
根据MSDN文档(Compression in HTTP),为了启动有效负载的减压,服务器必须从客户端获得HTTP Header Content-Encoding: gzip
。因此,我们的第一个目标是认识到在请求中存在这样的标头。让我们创建一个实用程序类CompressionUtils
,该类将拥有一种静态方法来验证该标头的存在和另一种静态方法,该方法将将GZIP输入流转换为字节数组,以进一步将此数组传输到NetTy Pipeline中:
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import lombok.AccessLevel;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import static java.util.Objects.nonNull;
import static org.apache.commons.io.IOUtils.copy;
import static org.springframework.http.HttpHeaders.CONTENT_ENCODING;
import static org.springframework.util.CollectionUtils.isEmpty;
@Component
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CompressionUtils {
public static final String GZIP = "gzip";
public static final String UNKNOWN = "unknown";
public static byte[] getDeflatedBytes(InputStream inputStream) throws IOException {
String string = IOUtils.toString(inputStream, UTF_8);
return string.getBytes();
}
public static boolean isGzipRequest(ServerHttpRequest serverHttpRequest) {
return containsGzip(serverHttpRequest, CONTENT_ENCODING);
}
public static boolean isGzipResponseRequired(ServerHttpRequest serverHttpRequest) {
return containsGzip(serverHttpRequest, ACCEPT_ENCODING);
}
private static boolean containsGzip(ServerHttpRequest serverHttpRequest, String headerName) {
HttpHeaders headers = serverHttpRequest.getHeaders();
if (!isEmpty(headers)) {
String header = headers.getFirst(headerName);
return nonNull(header) && header.contains(GZIP);
}
return false;
}
}
为了防止将传入输入流的转换转换为字节数组,我使用了Apache Commons IO Project,并且static toString
方法使用StringBuilder
类。
我们实施的第二阶段是将传入的ServerHttpRequest
实例包含在我们的实现中。这样,我们可以保留传入请求的源数据并覆盖getBody
方法。 Netty管道调用此方法将传入的数据转换为Netty的DataBuffer
包装器。因此,这是一种交换流量的方便方法,而不是将其与自定义处理程序集成到Netty流中。通过WebFilter
自定义它是一种更清洁的方法,它不会影响所有传入请求,并允许我们控制解压缩过程(使用错误处理和指标)。
所以,我们的包装器类GzipServerHttpRequest
具有以下逻辑的主要方法getBody
:
- 获取源服务器http请求的主体。
- 将其转换为
InputStream
。 - 将反应性的输入流转换为
SequenceInputStream
,以获取整个帖子请求主体。 - 用
GZIPInputStream
解压缩gzip身体。 - 再次将最终字节阵列包装到Netty
DataBuffer
中。
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.RequestPath;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.SslInfo;
import org.springframework.util.MultiValueMap;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import lombok.RequiredArgsConstructor;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Instant;
import java.util.zip.GZIPInputStream;
@SuppressWarnings("NullableProblems")
@RequiredArgsConstructor
public class GzipServerHttpRequest implements ServerHttpRequest {
private final ServerHttpRequest serverHttpRequest;
@Override
public String getId() {
return serverHttpRequest.getId();
}
@Override
public RequestPath getPath() {
return serverHttpRequest.getPath();
}
@Override
public MultiValueMap<String, String> getQueryParams() {
return serverHttpRequest.getQueryParams();
}
@Override
public MultiValueMap<String, HttpCookie> getCookies() {
return serverHttpRequest.getCookies();
}
@Override
public String getMethodValue() {
return serverHttpRequest.getMethodValue();
}
@Override
public URI getURI() {
return serverHttpRequest.getURI();
}
@Override
public HttpHeaders getHeaders() {
return serverHttpRequest.getHeaders();
}
@Override
public InetSocketAddress getRemoteAddress() {
return serverHttpRequest.getRemoteAddress();
}
@Override
public SslInfo getSslInfo() {
return serverHttpRequest.getSslInfo();
}
@Override
public HttpMethod getMethod() {
return serverHttpRequest.getMethod();
}
@Override
public Flux<DataBuffer> getBody() {
final Instant startTime = Instant.now();
return serverHttpRequest.getBody()
.map(dataBuffer -> dataBuffer.asInputStream(true))
.reduce(SequenceInputStream::new)
.handle(this::decompress)
.flux();
}
private void decompress(InputStream inputStream, SynchronousSink<DataBuffer> sink) {
try (var gzipInputStream = new GZIPInputStream(inputStream)) {
byte[] deflatedBytes = getDeflatedBytes(gzipInputStream);
sink.next(new DefaultDataBufferFactory().wrap(deflatedBytes));
} catch (Exception exception) {
sink.error(getException());
}
}
private IllegalGzipRequestException getException() {
String exceptionMessage = String.format("Decompression of a gzip content failed, URI: [%s]", serverHttpRequest.getURI());
return new IllegalGzipRequestException(exceptionMessage);
}
}
一旦我们拥有主要的实现,我们现在可以添加GzipDecompressionFilter
以使用包装的ServerHttpRequest
的新实例将我们的ServerWebExchange
变异:
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import java.time.Instant;
import static java.util.Objects.isNull;
import static org.apache.commons.lang3.exception.ExceptionUtils.getMessage;
import static org.springframework.http.HttpHeaders.CONTENT_ENCODING;
import static org.springframework.http.HttpStatus.OK;
import static org.springframework.util.CollectionUtils.isEmpty;
@Slf4j
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class GzipDecompressionFilter implements WebFilter {
@SuppressWarnings("NullableProblems")
@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
if (!isGzipRequest(serverWebExchange.getRequest()))
return webFilterChain.filter(serverWebExchange);
final Instant startTime = Instant.now();
ServerWebExchange mutatedWebExchange = getMutatedWebExchange(serverWebExchange);
return webFilterChain
.filter(mutatedWebExchange)
.onErrorResume(this::logError);
}
private ServerWebExchange getMutatedWebExchange(ServerWebExchange serverWebExchange) {
ServerHttpRequest mutatedHttpRequest = new GzipServerHttpRequest(serverWebExchange.getRequest());
return serverWebExchange
.mutate()
.request(mutatedHttpRequest)
.build();
}
private Mono<Void> logError(Throwable exception) {
log.error("Gzip decompressed HTTP request failed, exception: [{}]", getMessage(exception));
return Mono.empty();
}
}
我们将添加的最后一类是我们的自定义RuntimeException IllegalGzipRequestException
,以识别我们使用的第三方系统中此类型的错误,我们用于拦截服务中的异常:
public class IllegalGzipRequestException extends RuntimeException {
public IllegalGzipRequestException(String message) {
super(message);
}
}
如果我们的客户希望获得压缩响应,我们可以通过application.properties
文件轻松添加此实现:
server.compression.enabled=true
server.compression.min-response-size=1KB