-
Notifications
You must be signed in to change notification settings - Fork 88
Open
Labels
api: bigquerystorageIssues related to the googleapis/java-bigquerystorage API.Issues related to the googleapis/java-bigquerystorage API.
Description
Environment details
- Specify the API at the beginning of the title. For example, "BigQuery: ...").
General, Core, and Other are also allowed as types - OS type and version: GKE 1.26.14-gke.1044000 (x86-64) running amazoncorretto:11-al2023-headless
- Java version: 11
- version(s): 26.17.0 (com.google.cloud:libraries-bom:26.17.0)
Steps to reproduce
- Create a StreamWrite with
.setEnableConnectionPool(true)
. - Write to a BigQuery table that fails, e.g. because it doesn't exist, or the schema is wrong. (why? because some times the schema is out of sync. Nevertheless, such a condition should not cause a memory leak). Keep sending traffic to (2) persistently.
- Observe that
com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool
is continually creating new ConnectionWorkers (expected), but old andisConnectionInUnrecoverableState()
ConnectionWorkers are retained in fieldconnectionToWriteStream
(bug), and neverclose
ed (bug)
Code example
Problem is caused by
Line 338 in d8d5278
private void clearFinalizedConnectionWorker() { |
Stack trace
None captured
External references such as API reference guides
Any additional information below
The following is the diff between 26.17.0 and the current main branch version:
@@ -18,6 +18,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.FlowController;
+import com.google.api.gax.retrying.RetrySettings;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
@@ -41,6 +42,7 @@
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/** Pool of connections to accept appends and distirbute to different connections. */
@@ -65,6 +67,11 @@
private final java.time.Duration maxRetryDuration;
/*
+ * Retry settings for in-stream retries.
+ */
+ private RetrySettings retrySettings;
+
+ /*
* Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block.
*/
private final FlowController.LimitExceededBehavior limitExceededBehavior;
@@ -91,6 +98,10 @@
* TraceId for debugging purpose.
*/
private final String traceId;
+ /*
+ * Sets the compression to use for the calls
+ */
+ private String compressorName;
/** Used for test on the number of times createWorker is called. */
private final AtomicInteger testValueCreateConnectionCount = new AtomicInteger(0);
@@ -199,14 +210,18 @@
java.time.Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
- BigQueryWriteSettings clientSettings) {
+ @Nullable String comperssorName,
+ BigQueryWriteSettings clientSettings,
+ RetrySettings retrySettings) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.maxRetryDuration = maxRetryDuration;
this.limitExceededBehavior = limitExceededBehavior;
this.traceId = traceId;
+ this.compressorName = comperssorName;
this.clientSettings = clientSettings;
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
+ this.retrySettings = retrySettings;
}
/**
@@ -379,7 +394,9 @@
maxRetryDuration,
limitExceededBehavior,
traceId,
- clientSettings);
+ compressorName,
+ clientSettings,
+ retrySettings);
connectionWorkerPool.add(connectionWorker);
log.info(
String.format(
I don't think this issue is fixed by upgrading.
Metadata
Metadata
Assignees
Labels
api: bigquerystorageIssues related to the googleapis/java-bigquerystorage API.Issues related to the googleapis/java-bigquerystorage API.