问题描述
spring cloud gateway webflux 服务运行一段时间后,Netty 无法分配堆外内存抛出异常,日志如下:
reactor.netty.ReactorNetty$InternalNettyException: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 2038431751, max: 2040135680)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
Original Stack Trace:
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 2038431751, max: 2040135680)
at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:806)
at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:735)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:649)
推断服务中有堆外内存泄露。
分析过程
添加 JVM 启动参数,探测堆外内存泄露: ` -Dio.netty.leakDetectionLevel=advanced`,
使用说明如下:
The io.netty.leakDetectionLevel property is used to enable or disable leak detection in Netty, a popular networking framework for Java. Leaks in Netty refer to situations where resources, such as memory, are not properly released after they are no longer needed, leading to memory leaks and other performance issues.
The io.netty.leakDetectionLevel property can be set to one of the following values:
DISABLED: Leak detection is disabled.
SIMPLE: Leak detection is enabled, and a warning is logged if a resource leak is detected.
ADVANCED: Leak detection is enabled, and detailed information about the leak is logged, including the stack trace where the resource was acquired and the stack trace where it was not released.
Enabling leak detection can be useful during development and testing to identify and fix resource leaks. However, it is generally recommended to disable leak detection in production environments to avoid the performance overhead of checking for leaks on every resource allocation and deallocation.
服务日志输出以下泄露源:
# case 1
2023-07-07 18:35:00.030 [reactor-http-epoll-4] ERROR io.netty.util.ResourceLeakDetector - [] - [TID: N/A] - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53)
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:120)
io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75)
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:785)
io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:425)
io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:391)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
结论
根据对照泄露日志并分析代码,发现问题出现在如下代码,
引用计数:
@Component
public class TestFilter1 implements Ordered, GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
DefaultDataBuffer defaultDataBuffer = defaultDataBufferFactory.allocateBuffer(0);
Flux<DataBuffer> bodyDataBuffer = exchange.getRequest().getBody() // README: 引用计数+1
.defaultIfEmpty(defaultDataBuffer);
return DataBufferUtils.join(bodyDataBuffer)
.flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);// README: 引用计数+1
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
exchange.getAttributes().put(CACHED_REQUEST_BODY_OBJECT, cachedFlux);
return chain.filter(exchange.mutate().request(mutatedRequest).build());
});
}
@Override
public int getOrder() {
return ORDER_CACHE_BODY_FILTER;
}
}
引用释放:
@Component
public class TestFilter2 implements Ordered, GlobalFilter {
private static final Logger logger = LoggerFactory.getLogger(TestFilter2.class);
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
logger.warn(resolveBodyFromRequest(exchange));
return chain.filter(exchange);
}
@Override
public int getOrder() {
return ORDER_INPUT_LOG_FILTER;
}
private String resolveBodyFromRequest(ServerWebExchange exchange) {
Flux<DataBuffer> body = exchange.getAttribute(CACHED_REQUEST_BODY_OBJECT);
if (body == null) {
return null;
}
AtomicReference<String> rawRef = new AtomicReference<>();
body.subscribe(buffer -> {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
DataBufferUtils.release(buffer);// README: 引用计数-1
rawRef.set(Strings.fromUTF8ByteArray(bytes));
});
return rawRef.get();
}
}
从代码块上能分析出,引用计数多记了一次。
解决方案
1、`DataBufferUtils.retain` 放在 defer 里面,保证使用时 retain 和 release 成对出现;
2、释放 getBody 中的 retain
样例代码如下:
@Component
public class TestFilter1 implements Ordered, GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
DefaultDataBuffer defaultDataBuffer = defaultDataBufferFactory.allocateBuffer(0);
Flux<DataBuffer> bodyDataBuffer = exchange.getRequest().getBody()
.defaultIfEmpty(defaultDataBuffer);
return DataBufferUtils.join(bodyDataBuffer)
.flatMap(dataBuffer -> {
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.retainedSlice(0, dataBuffer.readableByteCount())));// 1、defer 中实现 retain
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
exchange.getAttributes().put(CACHED_REQUEST_BODY_OBJECT, cachedFlux);
return chain.filter(exchange.mutate().request(mutatedRequest).build())
.doFinally(unused -> DataBufferUtils.release(dataBuffer));// 2、释放 getBody 中的 retain
});
}
@Override
public int getOrder() {
return ORDER_CACHE_BODY_FILTER;
}
}
样例代码
org.springframework.cloud.gateway.filter.AdaptCachedBodyGlobalFilter#filter
org.springframework.cloud.gateway.handler.predicate.ReadBodyRoutePredicateFactory
org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter
参考
https://netty.io/wiki/reference-counted-objects.html