Skip to content

ReactorNettyClient requestProcessor can retain data from queries #492

@simonbasle

Description

@simonbasle

Bug Report

Versions

  • Driver: 0.8.12.BUILD-SNAPSHOT
  • Database: (brought by testcontainers 1.16.3)
  • Java: 1.8.0
  • OS: OSX 10.15.7

Current Behavior

Following a report from a user first in Spring Framework then in Reactor Core, I investigated a memory leak where reducing a large dataset into a single array via Flux#reduce led to OutOfMemoryError.

In addition to the reproducer in the above issues from the original author (in Kotlin and using both Spring and Spring Data R2dbc), I've managed to create a self-contained test class that can run from the r2dbc-postgresql test suite with minimum effort (see below).

In a nutshell, the reduced byte[] from previous loops are retained, preventing garbage collection through netty channels / selectors. It appears that one major component in that retention is the ReactorNettyClient requestProcessor. This EmitterProcessor instance has a single subscriber, left even when the query has completed. This is congruent with what could be observed in the OP original repro.

This could point with a pooling issue in OP's reproducer, kind of simulated here by the fact the Connection is not closed?
I'm mentioning this because at first I didn't close the Connection in my above repro and I was seeing very similar paths to GC roots for the retained byte[]...

It also appears that setting Statement#fetchSize higher than the number of returned rows makes the issue go away.
edit: fetchSize doesn't help if Connection is not closed.

Table schema

Input Code
CREATE SCHEMA IF NOT EXISTS master_data AUTHORIZATION postgres;

CREATE TABLE master_data.csv (
  id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY,
  column1 text NOT NULL,
  column2 text NOT NULL,
  column3 text NOT NULL,
  CONSTRAINT csv_pkey PRIMARY KEY (id)
);

insert into master_data.csv (id,column1,column2,column3)
  select g.id, 'value11', 'value21', 'value31'
  from generate_series(1,1000000) as g(id)

Steps to reproduce

In addition to the OP's own Kotlin reproducer here, my own self-contained reproducer is below.

At LIMIT 30000, the OOM doesn't occur. The 20s pause at the end can be leveraged to trigger a Heap Dump from eg. JVisualVm, which can be inspected for retained ErasableByteArrayOutputStreams (or their internal byte[]). At that limit, the byte[] arrays should be the top size objects.

Typically:
image

Self-contained reproducer
package io.r2dbc.postgresql;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.PostgreSQLR2DBCDatabaseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Testcontainers
public class MemoryLeakIntegrationTests {

    Logger logger = LoggerFactory.getLogger(MemoryLeakIntegrationTests.class);

    @Container
    PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:13-alpine")
        .withDatabaseName("tpp")
        .withUsername("postgres")
        .withPassword("postgres")
        .withInitScript("leak-init.sql")
        .withExposedPorts(5432);


    static class ErasableByteArrayOutputStream extends ByteArrayOutputStream {

        @Override
        public synchronized void reset() {
            super.reset();
            buf = new byte[32];
        }
    }

    @Test
    void test() throws InterruptedException {
        logger.info("Will connect to port {}", postgres.getMappedPort(5432));

        final int LIMIT = 30_000; //originally, no LIMIT => 1_000_000 elements
        //the retaining is observable via a heap dump even with a LIMIT 30_000 (although the OOM doesn't occur)

        ConnectionFactory connectionFactory = ConnectionFactories.get(PostgreSQLR2DBCDatabaseContainer.getOptions(postgres));

        for (int i = 0; i < 10; i++) {
            final int round = i;

            Mono.usingWhen(connectionFactory.create(),
                    connection -> {
                        logger.info("Connection acquired {}", connection);

                        return Flux.from(connection
                                .createStatement("SELECT master_data.csv.* FROM master_data.csv ORDER BY id LIMIT " + LIMIT)
                                .execute()
                            )
                            .publishOn(Schedulers.boundedElastic())
                            .flatMap(r -> r.map((row, rowmetadata) -> {
                                int nbColumns = rowmetadata.getColumnNames().size();
                                List<String> rowContent = new ArrayList<>(nbColumns);
                                for (int colId = 0; colId < nbColumns; colId++) {
                                    if (colId == 0) {
                                        long id = row.get(colId, Long.class);
                                        if (id % (LIMIT / 3) == 0) {
                                            System.out.println("passed id " + id);
                                        }
                                        rowContent.add("" + id);
                                    }
                                    else {
                                        rowContent.add(row.get(colId, String.class));
                                    }
                                }
                                return rowContent;
                            }))
                            .flatMapIterable(Function.identity())
                            // having a ErasableByteArrayOutputStream class is useful to spot
                            // the relevant objects in profilers, plus it allows dirty workaround with reset().
                            .reduce(new ErasableByteArrayOutputStream(), (output, el) -> {
                                try {
                                    output.write(el.getBytes(StandardCharsets.UTF_8));
                                    output.write(" ".getBytes(StandardCharsets.UTF_8));
                                }
                                catch (IOException e) {
                                    e.printStackTrace();
                                }
                                return output;
                            })
                            .map(it -> {
                                byte[] result = it.toByteArray();
                                // custom reset is a dirty workaround to avoid retaining.
                                //it.reset();
                                return result;
                            });
                    },
                
                    // connection not closed: the byte[] will all stay in memory
                    connection -> Mono.empty()
                    // on the other hand, closing the connection ensures they get GCed
//                    connection -> Mono.from(connection.close())
//                        .doFinally(sig -> {
//                            logger.info("Connection closed {}", connection);
//                        })
                )
                .doOnNext(it -> logger.info("ByteArray with size {} was created in round {}", it.length, round))
                .block(Duration.ofMinutes(5));
        }

        System.gc();
        System.out.println("Will sleep for 20s");
        // capture a Heap Dump here, or even after 2-3 rounds, to observe the issue
        Thread.sleep(20_000);
    }
}

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions