Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID)
private boolean enableClientTransactionId;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY,
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY)
private boolean enableCreateIdempotency;

private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

Expand Down Expand Up @@ -1174,6 +1178,10 @@ public boolean getIsClientTransactionIdEnabled() {
return enableClientTransactionId;
}

public boolean getIsCreateIdempotencyEnabled() {
return enableCreateIdempotency;
}

/**
* Enum config to allow user to pick format of x-ms-client-request-id header
* @return tracingContextFormat config if valid, else default ALL_ID_FORMAT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ public static String containerProperty(String property, String fsName, String ac
public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = "fs.azure.blob.dir.delete.max.thread";
/**Flag to enable/disable sending client transactional ID during create/rename operations: {@value}*/
public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = "fs.azure.enable.client.transaction.id";
/**Flag to enable/disable create idempotency during create operation: {@value}*/
public static final String FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = "fs.azure.enable.create.blob.idempotency";

private ConfigurationKeys() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,5 +239,7 @@ public final class FileSystemConfigurations {

public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = true;

public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = true;

private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,34 @@ public AbfsRestOperation createPath(final String path,
final TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsRestOperation op;
if (isFileCreation) {
// Create a file with the specified parameters
op = createFile(path, overwrite, permissions, isAppendBlob, eTag,
contextEncryptionAdapter, tracingContext);
if (getAbfsConfiguration().getIsCreateIdempotencyEnabled()) {
AbfsRestOperation statusOp = null;
try {
// Check if the file already exists by calling GetPathStatus
statusOp = getPathStatus(path, tracingContext, null, false);
} catch (AbfsRestOperationException ex) {
// If the path does not exist, continue with file creation
// For other errors, rethrow the exception
if (ex.getStatusCode() != HTTP_NOT_FOUND) {
throw ex;
}
}
// If the file exists and overwrite is not allowed, throw conflict
if (statusOp != null && statusOp.hasResult() && !overwrite) {
throw new AbfsRestOperationException(
HTTP_CONFLICT,
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
PATH_EXISTS,
null);
} else {
// Proceed with file creation (force overwrite = true)
op = createFile(path, true, permissions, isAppendBlob, eTag,
contextEncryptionAdapter, tracingContext);
}
} else {
op = createFile(path, overwrite, permissions, isAppendBlob, eTag,
contextEncryptionAdapter, tracingContext);
}
} else {
// Create a directory with the specified parameters
op = createDirectory(path, permissions, isAppendBlob, eTag,
Expand Down Expand Up @@ -584,7 +609,6 @@ public AbfsRestOperation createPathRestOp(final String path,
if (eTag != null && !eTag.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
}

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.PutBlob,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -54,7 +56,15 @@ public static void dumpHeadersToDebugLog(final String origin,
if (key == null) {
key = "HTTP Response";
}
String values = StringUtils.join(";", entry.getValue());
List<String> valuesList = entry.getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to null pointer exceptions on enabling AbfsIoUtils logging if value is null.

if (valuesList == null) {
valuesList = Collections.emptyList();
} else {
valuesList = valuesList.stream()
.map(v -> v == null ? "" : v) // replace null with empty string
.collect(Collectors.toList());
}
String values = StringUtils.join(";", valuesList);
if (key.contains("Cookie")) {
values = "*cookie info*";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -88,6 +89,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_MKDIR_OVERWRITE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
Expand All @@ -96,6 +98,7 @@
import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -464,6 +467,7 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite)
Configuration config = new Configuration(this.getRawConfiguration());
config.set("fs.azure.enable.conditional.create.overwrite",
Boolean.toString(enableConditionalCreateOverwrite));
config.set("fs.azure.enable.create.idempotency", "false");
AzureBlobFileSystemStore store = currentFs.getAbfsStore();
AbfsClient client = store.getClientHandler().getIngressClient();

Expand Down Expand Up @@ -1087,6 +1091,7 @@ public void testParallelCreateOverwriteFalse()
throws Exception {
Configuration configuration = getRawConfiguration();
configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false");
configuration.set(FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY, "false");
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
configuration)) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Expand Down Expand Up @@ -2236,6 +2241,98 @@ public void testFailureInGetPathStatusDuringCreateRecovery() throws Exception {
}
}

/**
* Test to simulate a successful create operation followed by a connection reset
* on the response, triggering a retry.
*
* This test verifies that the create operation is retried in the event of a
* connection reset during the response phase. The test creates a mock
* AzureBlobFileSystem and its associated components to simulate the create
* operation and the connection reset. It then verifies that the create
* operation is retried once before succeeding.
*
* @throws Exception if an error occurs during the test execution.
*/
@Test
public void testCreateIdempotencyForNonHnsBlob() throws Exception {
assumeThat(isAppendBlobEnabled()).as("Not valid for APPEND BLOB").isFalse();
assumeHnsDisabled();
assumeBlobServiceType();
// Create a spy of AzureBlobFileSystem
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
// Create a spy of AzureBlobFileSystemStore
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());

// Create spies for the client handler and blob client
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
fs.getAbfsStore().setClient(blobClient);
fs.getAbfsStore().setClientHandler(clientHandler);
// Set up the spies to return the mocked objects
Mockito.doReturn(clientHandler).when(store).getClientHandler();
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();

AtomicInteger createCount = new AtomicInteger(0);

Mockito.doAnswer(answer -> {
// Set up the mock for the create operation
AbfsClientTestUtil.setMockAbfsRestOperationForCreateOperation(blobClient,
(httpOperation) -> {
Mockito.doAnswer(invocation -> {
// Call the real processResponse method
invocation.callRealMethod();

int currentCount = createCount.incrementAndGet();
if (currentCount == 2) {
Mockito.when(httpOperation.getStatusCode())
.thenReturn(
HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error
Mockito.when(httpOperation.getStorageErrorMessage())
.thenReturn("CONNECTION_RESET"); // Error message
throw new IOException("Connection Reset");
}
return null;
}).when(httpOperation).processResponse(
Mockito.nullable(byte[].class),
Mockito.anyInt(),
Mockito.anyInt()
);

return httpOperation;
});
return answer.callRealMethod();
}).when(blobClient).createPath(
Mockito.anyString(),
Mockito.anyBoolean(),
Mockito.anyBoolean(),
Mockito.any(AzureBlobFileSystemStore.Permissions.class),
Mockito.anyBoolean(), Mockito.nullable(String.class), Mockito.any(ContextEncryptionAdapter.class),
any(TracingContext.class)
);

Path path = new Path("/test/file");
fs.create(path, false);
Mockito.verify(blobClient, Mockito.times(1)).createPath(
Mockito.anyString(),
Mockito.anyBoolean(),
Mockito.anyBoolean(),
Mockito.any(AzureBlobFileSystemStore.Permissions.class),
Mockito.anyBoolean(), Mockito.nullable(String.class), Mockito.any(ContextEncryptionAdapter.class),
any(TracingContext.class));

Mockito.verify(blobClient, Mockito.times(2)).createPathRestOp(
Mockito.anyString(),
Mockito.anyBoolean(),
Mockito.anyBoolean(),
Mockito.anyBoolean(),
Mockito.nullable(String.class), Mockito.any(ContextEncryptionAdapter.class),
any(TracingContext.class));
assertIsFile(fs, path);
}
}

/**
* Mocks and returns an instance of {@link AbfsDfsClient} for the given AzureBlobFileSystem.
* This method sets up the necessary mock behavior for the client handler and ingress client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
Expand All @@ -74,6 +75,7 @@
import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
Expand Down Expand Up @@ -108,6 +110,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assumptions.assumeThat;

/**
* Test rename operation.
Expand Down Expand Up @@ -1702,6 +1705,85 @@ public void testRenamePathRetryIdempotency() throws Exception {
}
}

/**
* Test to simulate a successful copy blob operation followed by a connection reset
* on the response, triggering a retry.
*
* This test verifies that the copy blob operation is retried in the event of a
* connection reset during the response phase. The test creates a mock
* AzureBlobFileSystem and its associated components to simulate the copy blob
* operation and the connection reset. It then verifies that the create
* operation is retried once before succeeding.
*
* @throws Exception if an error occurs during the test execution.
*/
@Test
public void testRenameIdempotencyForNonHnsBlob() throws Exception {
assumeThat(isAppendBlobEnabled()).as("Not valid for APPEND BLOB").isFalse();
assumeHnsDisabled();
assumeBlobServiceType();
// Create a spy of AzureBlobFileSystem
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
// Create a spy of AzureBlobFileSystemStore
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());

// Create spies for the client handler and blob client
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
fs.getAbfsStore().setClient(blobClient);
fs.getAbfsStore().setClientHandler(clientHandler);
// Set up the spies to return the mocked objects
Mockito.doReturn(clientHandler).when(store).getClientHandler();
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();

AtomicInteger copyBlobCount = new AtomicInteger(0);
Path sourceDir = path("/testSrc");
assertMkdirs(fs, sourceDir);
String filename = "file1";
Path sourceFilePath = new Path(sourceDir, filename);
touch(sourceFilePath);
Path destFilePath = new Path(sourceDir, "file2");
Mockito.doAnswer(answer -> {
// Set up the mock for the create operation
AbfsClientTestUtil.setMockAbfsRestOperationForCopyBlobOperation(blobClient, sourceFilePath, destFilePath,
(httpOperation) -> {
Mockito.doAnswer(invocation -> {
// Call the real processResponse method
invocation.callRealMethod();

int currentCount = copyBlobCount.incrementAndGet();
if (currentCount == 1) {
Mockito.when(httpOperation.getStatusCode())
.thenReturn(
HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error
Mockito.when(httpOperation.getStorageErrorMessage())
.thenReturn("CONNECTION_RESET"); // Error message
throw new IOException("Connection Reset");
}
return null;
}).when(httpOperation).processResponse(
Mockito.nullable(byte[].class),
Mockito.anyInt(),
Mockito.anyInt()
);

return httpOperation;
});
return answer.callRealMethod();
}).when(blobClient).copyBlob(
Mockito.any(Path.class),
Mockito.any(Path.class),
Mockito.nullable(String.class),
Mockito.any(TracingContext.class)
);
Assertions.assertThat(fs.rename(sourceFilePath, destFilePath))
.describedAs("Rename should succeed.")
.isTrue();
}
}

/**
* Test to verify that the client transaction ID is included in the response header
* after renaming a file in Azure Blob Storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ public void testScenario8() throws Exception {
}
Assertions.assertThat(e.getMessage())
.as("Expected error message to contain 'AlreadyExists'")
.contains("AlreadyExists");
.containsIgnoringCase("Exists");
}

// Remove file
Expand Down
Loading