Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,8 @@ private void initializeStorageApiMode() {
autoCreateTables,
errantRecordHandler,
getSchemaManager(),
attemptSchemaUpdate
attemptSchemaUpdate,
config
);
storageApiWriter = writer;

Expand All @@ -659,7 +660,8 @@ private void initializeStorageApiMode() {
autoCreateTables,
errantRecordHandler,
getSchemaManager(),
attemptSchemaUpdate
attemptSchemaUpdate,
config
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public enum DecimalHandlingMode {
public static final String TOPICS_REGEX_DEFAULT = "";
public static final String ENABLE_BATCH_CONFIG = "enableBatchLoad";
public static final String BATCH_LOAD_INTERVAL_SEC_CONFIG = "batchLoadIntervalSec";
public static final String CONNECTOR_NAME_CONFIG = "name";
public static final String GCS_BUCKET_NAME_CONFIG = "gcsBucketName";
public static final String GCS_FOLDER_NAME_CONFIG = "gcsFolderName";
public static final String GCS_FOLDER_NAME_DEFAULT = "";
Expand Down Expand Up @@ -947,6 +948,11 @@ public boolean visible(String s, Map<String, Object> map) {
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW
).defineInternal(
CONNECTOR_NAME_CONFIG,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.LOW
).define(
DECIMAL_HANDLING_MODE_CONFIG,
DECIMAL_HANDLING_MODE_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.wepay.kafka.connect.bigquery.ErrantRecordHandler;
import com.wepay.kafka.connect.bigquery.SchemaManager;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
import com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiConnectException;
import com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiErrorResponses;
import com.wepay.kafka.connect.bigquery.utils.Time;
Expand All @@ -61,7 +62,10 @@ public abstract class StorageWriteApiBase {
private static final Logger logger = LoggerFactory.getLogger(StorageWriteApiBase.class);
private static final double RETRY_DELAY_MULTIPLIER = 1.1;
private static final int MAX_RETRY_DELAY_MINUTES = 1;
public static final String TRACE_ID_FORMAT = "AivenKafkaConnector:%s";
protected final JsonStreamWriterFactory jsonWriterFactory;
private final String traceId;

protected final int retry;
protected final long retryWait;
private final boolean autoCreateTables;
Expand All @@ -79,7 +83,47 @@ public abstract class StorageWriteApiBase {
* @param writeSettings Write Settings for stream which carry authentication and other header information
* @param autoCreateTables boolean flag set if table should be created automatically
* @param errantRecordHandler Used to handle errant records
* @param config Connector configurations
*/
protected StorageWriteApiBase(int retry,
long retryWait,
BigQueryWriteSettings writeSettings,
boolean autoCreateTables,
ErrantRecordHandler errantRecordHandler,
SchemaManager schemaManager,
boolean attemptSchemaUpdate,
BigQuerySinkConfig config) {
this.retry = retry;
this.retryWait = retryWait;
this.autoCreateTables = autoCreateTables;
this.writeSettings = writeSettings;
this.errantRecordHandler = errantRecordHandler;
this.schemaManager = schemaManager;
this.attemptSchemaUpdate = attemptSchemaUpdate;
try {
this.writeClient = getWriteClient();
} catch (IOException e) {
logger.error("Failed to create Big Query Storage Write API write client due to {}", e.getMessage());
throw new BigQueryStorageWriteApiConnectException("Failed to create Big Query Storage Write API write client", e);
}
this.jsonWriterFactory = getJsonWriterFactory();
this.traceId = generateTraceId(config.getString(BigQuerySinkConfig.CONNECTOR_NAME_CONFIG));
this.time = Time.SYSTEM;
}

/**
* @deprecated This constructor does not support does not support configuration of additional write settings.
* Use {@link #StorageWriteApiBase(int retry, long retryWait, BigQueryWriteSettings writeSettings,
* boolean autoCreateTables, ErrantRecordHandler errantRecordHandler, SchemaManager schemaManager,
* boolean attemptSchemaUpdate, BigQuerySinkConfig config)} instead.
*
* @param retry How many retries to make in the event of a retriable error.
* @param retryWait How long to wait in between retries.
* @param writeSettings Write Settings for stream which carry authentication and other header information
* @param autoCreateTables boolean flag set if table should be created automatically
* @param errantRecordHandler Used to handle errant records
*/
@Deprecated
protected StorageWriteApiBase(int retry,
long retryWait,
BigQueryWriteSettings writeSettings,
Expand All @@ -101,6 +145,7 @@ protected StorageWriteApiBase(int retry,
throw new BigQueryStorageWriteApiConnectException("Failed to create Big Query Storage Write API write client", e);
}
this.jsonWriterFactory = getJsonWriterFactory();
this.traceId = generateTraceId(null);
this.time = Time.SYSTEM;
}

Expand Down Expand Up @@ -312,6 +357,7 @@ protected JsonStreamWriterFactory getJsonWriterFactory() {
.build();
return streamOrTableName -> JsonStreamWriter.newBuilder(streamOrTableName, writeClient)
.setRetrySettings(retrySettings)
.setTraceId(traceId)
.build();
}

Expand Down Expand Up @@ -451,4 +497,10 @@ protected void failTask(RuntimeException failure) {
throw failure;
}

private String generateTraceId(String connectorName) {
String suffix = (connectorName != null)
? connectorName
: "default";
return String.format(TRACE_ID_FORMAT, suffix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.protobuf.Descriptors;
import com.wepay.kafka.connect.bigquery.ErrantRecordHandler;
import com.wepay.kafka.connect.bigquery.SchemaManager;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
import com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiConnectException;
import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -86,22 +87,53 @@ public StorageWriteApiBatchApplicationStream(
boolean autoCreateTables,
ErrantRecordHandler errantRecordHandler,
SchemaManager schemaManager,
boolean attemptSchemaUpdate) {
boolean attemptSchemaUpdate,
BigQuerySinkConfig config) {
super(
retry,
retryWait,
writeSettings,
autoCreateTables,
errantRecordHandler,
schemaManager,
attemptSchemaUpdate
attemptSchemaUpdate,
config
);
streams = new ConcurrentHashMap<>();
currentStreams = new ConcurrentHashMap<>();
tableLocks = new ConcurrentHashMap<>();
streamLocks = new ConcurrentHashMap<>();
}

/**
* @deprecated This constructor does not support configuration of additional write settings.
* Use {@link #StorageWriteApiBatchApplicationStream(int retry, long retryWait, BigQueryWriteSettings writeSettings,
* boolean autoCreateTables, ErrantRecordHandler errantRecordHandler, SchemaManager schemaManager,
* boolean attemptSchemaUpdate, BigQuerySinkConfig config)} instead.
*/
@Deprecated
public StorageWriteApiBatchApplicationStream(
int retry,
long retryWait,
BigQueryWriteSettings writeSettings,
boolean autoCreateTables,
ErrantRecordHandler errantRecordHandler,
SchemaManager schemaManager,
boolean attemptSchemaUpdate) {
super(
retry,
retryWait,
writeSettings,
autoCreateTables,
errantRecordHandler,
schemaManager,
attemptSchemaUpdate);
streams = new ConcurrentHashMap<>();
currentStreams = new ConcurrentHashMap<>();
tableLocks = new ConcurrentHashMap<>();
streamLocks = new ConcurrentHashMap<>();
}

/**
* Takes care of resource cleanup
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.protobuf.Descriptors;
import com.wepay.kafka.connect.bigquery.ErrantRecordHandler;
import com.wepay.kafka.connect.bigquery.SchemaManager;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
import com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiConnectException;
import java.io.IOException;
import java.util.List;
Expand All @@ -54,18 +55,44 @@ public StorageWriteApiDefaultStream(int retry,
boolean autoCreateTables,
ErrantRecordHandler errantRecordHandler,
SchemaManager schemaManager,
boolean attemptSchemaUpdate) {
boolean attemptSchemaUpdate,
BigQuerySinkConfig config) {
super(
retry,
retryWait,
writeSettings,
autoCreateTables,
errantRecordHandler,
schemaManager,
attemptSchemaUpdate
attemptSchemaUpdate,
config
);
}

/**
* @deprecated This constructor does not support configuration of additional write settings.
* Use {@link #StorageWriteApiDefaultStream(int retry, long retryWait, BigQueryWriteSettings writeSettings,
* boolean autoCreateTables, ErrantRecordHandler errantRecordHandler, SchemaManager schemaManager,
* boolean attemptSchemaUpdate, BigQuerySinkConfig config)} instead.
*/
@Deprecated
public StorageWriteApiDefaultStream(int retry,
long retryWait,
BigQueryWriteSettings writeSettings,
boolean autoCreateTables,
ErrantRecordHandler errantRecordHandler,
SchemaManager schemaManager,
boolean attemptSchemaUpdate) {
super(
retry,
retryWait,
writeSettings,
autoCreateTables,
errantRecordHandler,
schemaManager,
attemptSchemaUpdate);
}

@Override
public void preShutdown() {
logger.info("Closing all writer for default stream on all tables");
Expand Down