-
Notifications
You must be signed in to change notification settings - Fork 188
Description
Bug Report
Versions
- Driver: 0.8.12.BUILD-SNAPSHOT
- Database: (brought by testcontainers 1.16.3)
- Java: 1.8.0
- OS: OSX 10.15.7
Current Behavior
Following a report from a user first in Spring Framework then in Reactor Core, I investigated a memory leak where reducing a large dataset into a single array via Flux#reduce
led to OutOfMemoryError
.
In addition to the reproducer in the above issues from the original author (in Kotlin and using both Spring and Spring Data R2dbc), I've managed to create a self-contained test class that can run from the r2dbc-postgresql
test suite with minimum effort (see below).
In a nutshell, the reduced byte[]
from previous loops are retained, preventing garbage collection through netty channels / selectors. It appears that one major component in that retention is the ReactorNettyClient
requestProcessor
. This EmitterProcessor
instance has a single subscriber, left even when the query has completed. This is congruent with what could be observed in the OP original repro.
This could point with a pooling issue in OP's reproducer, kind of simulated here by the fact the Connection
is not closed?
I'm mentioning this because at first I didn't close the Connection
in my above repro and I was seeing very similar paths to GC roots for the retained byte[]
...
It also appears that setting Statement#fetchSize
higher than the number of returned rows makes the issue go away.
edit: fetchSize doesn't help if Connection
is not closed.
Table schema
Input Code
CREATE SCHEMA IF NOT EXISTS master_data AUTHORIZATION postgres;
CREATE TABLE master_data.csv (
id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY,
column1 text NOT NULL,
column2 text NOT NULL,
column3 text NOT NULL,
CONSTRAINT csv_pkey PRIMARY KEY (id)
);
insert into master_data.csv (id,column1,column2,column3)
select g.id, 'value11', 'value21', 'value31'
from generate_series(1,1000000) as g(id)
Steps to reproduce
In addition to the OP's own Kotlin reproducer here, my own self-contained reproducer is below.
At LIMIT 30000
, the OOM doesn't occur. The 20s pause at the end can be leveraged to trigger a Heap Dump from eg. JVisualVm, which can be inspected for retained ErasableByteArrayOutputStream
s (or their internal byte[]
). At that limit, the byte[]
arrays should be the top size objects.
Self-contained reproducer
package io.r2dbc.postgresql;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.PostgreSQLR2DBCDatabaseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@Testcontainers
public class MemoryLeakIntegrationTests {
Logger logger = LoggerFactory.getLogger(MemoryLeakIntegrationTests.class);
@Container
PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:13-alpine")
.withDatabaseName("tpp")
.withUsername("postgres")
.withPassword("postgres")
.withInitScript("leak-init.sql")
.withExposedPorts(5432);
static class ErasableByteArrayOutputStream extends ByteArrayOutputStream {
@Override
public synchronized void reset() {
super.reset();
buf = new byte[32];
}
}
@Test
void test() throws InterruptedException {
logger.info("Will connect to port {}", postgres.getMappedPort(5432));
final int LIMIT = 30_000; //originally, no LIMIT => 1_000_000 elements
//the retaining is observable via a heap dump even with a LIMIT 30_000 (although the OOM doesn't occur)
ConnectionFactory connectionFactory = ConnectionFactories.get(PostgreSQLR2DBCDatabaseContainer.getOptions(postgres));
for (int i = 0; i < 10; i++) {
final int round = i;
Mono.usingWhen(connectionFactory.create(),
connection -> {
logger.info("Connection acquired {}", connection);
return Flux.from(connection
.createStatement("SELECT master_data.csv.* FROM master_data.csv ORDER BY id LIMIT " + LIMIT)
.execute()
)
.publishOn(Schedulers.boundedElastic())
.flatMap(r -> r.map((row, rowmetadata) -> {
int nbColumns = rowmetadata.getColumnNames().size();
List<String> rowContent = new ArrayList<>(nbColumns);
for (int colId = 0; colId < nbColumns; colId++) {
if (colId == 0) {
long id = row.get(colId, Long.class);
if (id % (LIMIT / 3) == 0) {
System.out.println("passed id " + id);
}
rowContent.add("" + id);
}
else {
rowContent.add(row.get(colId, String.class));
}
}
return rowContent;
}))
.flatMapIterable(Function.identity())
// having a ErasableByteArrayOutputStream class is useful to spot
// the relevant objects in profilers, plus it allows dirty workaround with reset().
.reduce(new ErasableByteArrayOutputStream(), (output, el) -> {
try {
output.write(el.getBytes(StandardCharsets.UTF_8));
output.write(" ".getBytes(StandardCharsets.UTF_8));
}
catch (IOException e) {
e.printStackTrace();
}
return output;
})
.map(it -> {
byte[] result = it.toByteArray();
// custom reset is a dirty workaround to avoid retaining.
//it.reset();
return result;
});
},
// connection not closed: the byte[] will all stay in memory
connection -> Mono.empty()
// on the other hand, closing the connection ensures they get GCed
// connection -> Mono.from(connection.close())
// .doFinally(sig -> {
// logger.info("Connection closed {}", connection);
// })
)
.doOnNext(it -> logger.info("ByteArray with size {} was created in round {}", it.length, round))
.block(Duration.ofMinutes(5));
}
System.gc();
System.out.println("Will sleep for 20s");
// capture a Heap Dump here, or even after 2-3 rounds, to observe the issue
Thread.sleep(20_000);
}
}