Skip to content

BigQueryStorageWrite: StreamWriter (via ConnectionWorkerPool) leaks memory on connection failures #2479

@yzhaoa

Description

@yzhaoa

Environment details

  1. Specify the API at the beginning of the title. For example, "BigQuery: ...").
    General, Core, and Other are also allowed as types
  2. OS type and version: GKE 1.26.14-gke.1044000 (x86-64) running amazoncorretto:11-al2023-headless
  3. Java version: 11
  4. version(s): 26.17.0 (com.google.cloud:libraries-bom:26.17.0)

Steps to reproduce

  1. Create a StreamWrite with .setEnableConnectionPool(true).
  2. Write to a BigQuery table that fails, e.g. because it doesn't exist, or the schema is wrong. (why? because some times the schema is out of sync. Nevertheless, such a condition should not cause a memory leak). Keep sending traffic to (2) persistently.
  3. Observe that com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool is continually creating new ConnectionWorkers (expected), but old and isConnectionInUnrecoverableState() ConnectionWorkers are retained in field connectionToWriteStream (bug), and never closeed (bug)

Code example

Problem is caused by

: the cleanup operation neither removes the ConnectionWorker from the other field that tracks the ConnectionWorker object, connectionToWriteStream, nor closes it.

Stack trace

None captured

External references such as API reference guides

Any additional information below

The following is the diff between 26.17.0 and the current main branch version:

@@ -18,6 +18,7 @@
 import com.google.api.core.ApiFuture;
 import com.google.api.core.ApiFutures;
 import com.google.api.gax.batching.FlowController;
+import com.google.api.gax.retrying.RetrySettings;
 import com.google.auto.value.AutoValue;
 import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
 import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
@@ -41,6 +42,7 @@
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;

 /** Pool of connections to accept appends and distirbute to different connections. */
@@ -65,6 +67,11 @@
   private final java.time.Duration maxRetryDuration;

   /*
+   * Retry settings for in-stream retries.
+   */
+  private RetrySettings retrySettings;
+
+  /*
    * Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block.
    */
   private final FlowController.LimitExceededBehavior limitExceededBehavior;
@@ -91,6 +98,10 @@
    * TraceId for debugging purpose.
    */
   private final String traceId;
+  /*
+   * Sets the compression to use for the calls
+   */
+  private String compressorName;

   /** Used for test on the number of times createWorker is called. */
   private final AtomicInteger testValueCreateConnectionCount = new AtomicInteger(0);
@@ -199,14 +210,18 @@
       java.time.Duration maxRetryDuration,
       FlowController.LimitExceededBehavior limitExceededBehavior,
       String traceId,
-      BigQueryWriteSettings clientSettings) {
+      @Nullable String comperssorName,
+      BigQueryWriteSettings clientSettings,
+      RetrySettings retrySettings) {
     this.maxInflightRequests = maxInflightRequests;
     this.maxInflightBytes = maxInflightBytes;
     this.maxRetryDuration = maxRetryDuration;
     this.limitExceededBehavior = limitExceededBehavior;
     this.traceId = traceId;
+    this.compressorName = comperssorName;
     this.clientSettings = clientSettings;
     this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
+    this.retrySettings = retrySettings;
   }

   /**
@@ -379,7 +394,9 @@
             maxRetryDuration,
             limitExceededBehavior,
             traceId,
-            clientSettings);
+            compressorName,
+            clientSettings,
+            retrySettings);
     connectionWorkerPool.add(connectionWorker);
     log.info(
         String.format(

I don't think this issue is fixed by upgrading.

Metadata

Metadata

Assignees

No one assigned

    Labels

    api: bigquerystorageIssues related to the googleapis/java-bigquerystorage API.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions