Skip to content

Commit ffc0a6d

Browse files
HADOOP-19343. Add Google Cloud Storage Connector
Add implementation for create() API
1 parent 4235dc6 commit ffc0a6d

19 files changed

+3529
-2
lines changed

hadoop-project/pom.xml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686

8787
<!-- Protobuf version for backward compatibility -->
8888
<!-- This is used in hadoop-common for compilation only -->
89-
<protobuf.version>2.5.0</protobuf.version>
89+
<protobuf.version>3.25.3</protobuf.version>
9090
<!-- Protobuf scope in hadoop common -->
9191
<!-- set to "provided" so protobuf2 is no longer exported as a dependency -->
9292
<common.protobuf2.scope>provided</common.protobuf2.scope>
@@ -108,7 +108,7 @@
108108
<findbugs.version>3.0.5</findbugs.version>
109109
<dnsjava.version>3.6.1</dnsjava.version>
110110

111-
<guava.version>27.0-jre</guava.version>
111+
<guava.version>33.1.0-jre</guava.version>
112112
<guice.version>5.1.0</guice.version>
113113

114114
<bouncycastle.version>1.78.1</bouncycastle.version>
@@ -2141,6 +2141,11 @@
21412141
<artifactId>failsafe</artifactId>
21422142
<version>2.4.4</version>
21432143
</dependency>
2144+
<dependency>
2145+
<groupId>com.google.cloud</groupId>
2146+
<artifactId>google-cloud-storage</artifactId>
2147+
<version>2.44.1</version>
2148+
</dependency>
21442149
</dependencies>
21452150
</dependencyManagement>
21462151

hadoop-tools/hadoop-gcp/pom.xml

Lines changed: 616 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.gs;
20+
21+
class Constants {
22+
// URI scheme for GCS.
23+
static final String SCHEME = "gs";
24+
static final String PATH_DELIMITER = "/";
25+
26+
static final String GCS_CONFIG_PREFIX = "fs.gs";
27+
28+
static final String BASE_KEY_PREFIX = "google.cloud";
29+
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.gs;
20+
21+
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
22+
23+
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
24+
25+
import java.time.Duration;
26+
import java.util.Map;
27+
import javax.annotation.Nullable;
28+
29+
/**
30+
* Options that can be specified when creating a file in the {@link GoogleCloudStorageFileSystem}.
31+
*/
32+
class CreateOptions {
33+
private final ImmutableMap<String, byte[]> attributes;
34+
private final String contentType;
35+
private final boolean ensureNoDirectoryConflict;
36+
private final Duration interval;
37+
private final long overwriteGenerationId;
38+
private final WriteMode mode;
39+
40+
public static final CreateOptions DEFAULT = builder().build();
41+
42+
public String getContentEncoding() {
43+
return contentEncoding;
44+
}
45+
46+
private final String contentEncoding;
47+
48+
private CreateOptions(CreateOperationOptionsBuilder builder) {
49+
this.attributes = ImmutableMap.copyOf(builder.attributes);
50+
this.contentType = builder.contentType;
51+
this.ensureNoDirectoryConflict = builder.ensureNoDirectoryConflict;
52+
this.interval = builder.interval;
53+
this.overwriteGenerationId = builder.overwriteGenerationId;
54+
this.mode = builder.mode;
55+
this.contentEncoding = builder.contentEncoding;
56+
}
57+
58+
public boolean isOverwriteExisting() {
59+
return this.mode == WriteMode.OVERWRITE;
60+
}
61+
62+
enum WriteMode {
63+
/**
64+
* Write new bytes to the end of the existing file rather than the beginning.
65+
*/
66+
APPEND,
67+
/**
68+
* Creates a new file for write and fails if file already exists.
69+
*/
70+
CREATE_NEW,
71+
/**
72+
* Creates a new file for write or overwrites an existing file if it already exists.
73+
*/
74+
OVERWRITE
75+
}
76+
77+
public static CreateOperationOptionsBuilder builder() {
78+
return new CreateOperationOptionsBuilder();
79+
}
80+
81+
/**
82+
* Extended attributes to set when creating a file.
83+
*/
84+
public ImmutableMap<String, byte[]> getAttributes() {
85+
return attributes;
86+
}
87+
88+
/**
89+
* Content-type to set when creating a file.
90+
*/
91+
@Nullable
92+
public String getContentType() {
93+
return contentType;
94+
}
95+
96+
/**
97+
* Configures the minimum time interval (milliseconds) between consecutive sync/flush calls
98+
*/
99+
public Duration getMinSyncInterval() {
100+
return interval;
101+
}
102+
103+
/**
104+
* If true, makes sure there isn't already a directory object of the same name. If false, you run
105+
* the risk of creating hard-to-cleanup/access files whose names collide with directory names. If
106+
* already sure no such directory exists, then this is safe to set for improved performance.
107+
*/
108+
public boolean isEnsureNoDirectoryConflict() {
109+
return ensureNoDirectoryConflict;
110+
}
111+
112+
/**
113+
* Whether to overwrite an existing file with the same name.
114+
*/
115+
public WriteMode getWriteMode() {
116+
return mode;
117+
}
118+
119+
/**
120+
* Generation of existing object to overwrite. Ignored if set to {@link
121+
* StorageResourceId#UNKNOWN_GENERATION_ID}, but otherwise this is used instead of {@code
122+
* overwriteExisting}, where 0 indicates no existing object, and otherwise an existing object will
123+
* only be overwritten by the newly created file if its generation matches this provided
124+
* generationId.
125+
*/
126+
public long getOverwriteGenerationId() {
127+
return overwriteGenerationId;
128+
}
129+
130+
static class CreateOperationOptionsBuilder {
131+
private Map<String, byte[]> attributes = ImmutableMap.of();
132+
private String contentType = "application/octet-stream";
133+
private boolean ensureNoDirectoryConflict = true;
134+
private Duration interval = Duration.ZERO;
135+
private long overwriteGenerationId = StorageResourceId.UNKNOWN_GENERATION_ID;
136+
private WriteMode mode = WriteMode.CREATE_NEW;
137+
138+
private String contentEncoding = null;
139+
140+
public CreateOperationOptionsBuilder setAttributes(Map<String, byte[]> attributes) {
141+
this.attributes = attributes;
142+
return this;
143+
}
144+
145+
public CreateOperationOptionsBuilder setContentType(String contentType) {
146+
this.contentType = contentType;
147+
return this;
148+
}
149+
150+
public CreateOperationOptionsBuilder setEnsureNoDirectoryConflict(
151+
boolean ensureNoDirectoryConflict) {
152+
this.ensureNoDirectoryConflict = ensureNoDirectoryConflict;
153+
return this;
154+
}
155+
156+
public CreateOperationOptionsBuilder setMinSyncInterval(Duration interval) {
157+
this.interval = interval;
158+
return this;
159+
}
160+
161+
public CreateOperationOptionsBuilder setOverwriteGenerationId(long overwriteGenerationId) {
162+
this.overwriteGenerationId = overwriteGenerationId;
163+
return this;
164+
}
165+
166+
public CreateOperationOptionsBuilder setWriteMode(WriteMode mode) {
167+
this.mode = mode;
168+
return this;
169+
}
170+
171+
CreateOptions build() {
172+
CreateOptions options = new CreateOptions(this);
173+
174+
checkArgument(!options.getAttributes().containsKey("Content-Type"),
175+
"The Content-Type attribute must be set via the contentType option");
176+
if (options.getWriteMode() != WriteMode.OVERWRITE) {
177+
checkArgument(options.getOverwriteGenerationId() == StorageResourceId.UNKNOWN_GENERATION_ID,
178+
"overwriteGenerationId is set to %s but it can be set only in OVERWRITE mode",
179+
options.getOverwriteGenerationId());
180+
}
181+
182+
return options;
183+
}
184+
}
185+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.hadoop.fs.gs;
18+
19+
import io.grpc.Status;
20+
import io.grpc.StatusRuntimeException;
21+
22+
import javax.annotation.Nullable;
23+
24+
/**
25+
* Implementation for {@link ErrorTypeExtractor} for exception specifically thrown from gRPC path.
26+
*/
27+
class ErrorTypeExtractor {
28+
29+
enum ErrorType {
30+
NOT_FOUND, OUT_OF_RANGE, ALREADY_EXISTS, FAILED_PRECONDITION, INTERNAL, RESOURCE_EXHAUSTED, UNAVAILABLE, UNKNOWN
31+
}
32+
33+
// public static final ErrorTypeExtractor INSTANCE = new ErrorTypeExtractor();
34+
35+
private static final String BUCKET_ALREADY_EXISTS_MESSAGE =
36+
"FAILED_PRECONDITION: Your previous request to create the named bucket succeeded and you already own it.";
37+
38+
private ErrorTypeExtractor() {
39+
}
40+
41+
static ErrorType getErrorType(Exception error) {
42+
switch (Status.fromThrowable(error).getCode()) {
43+
case NOT_FOUND:
44+
return ErrorType.NOT_FOUND;
45+
case OUT_OF_RANGE:
46+
return ErrorType.OUT_OF_RANGE;
47+
case ALREADY_EXISTS:
48+
return ErrorType.ALREADY_EXISTS;
49+
case FAILED_PRECONDITION:
50+
return ErrorType.FAILED_PRECONDITION;
51+
case RESOURCE_EXHAUSTED:
52+
return ErrorType.RESOURCE_EXHAUSTED;
53+
case INTERNAL:
54+
return ErrorType.INTERNAL;
55+
case UNAVAILABLE:
56+
return ErrorType.UNAVAILABLE;
57+
default:
58+
return ErrorType.UNKNOWN;
59+
}
60+
}
61+
62+
static boolean bucketAlreadyExists(Exception e) {
63+
ErrorType errorType = getErrorType(e);
64+
if (errorType == ErrorType.ALREADY_EXISTS) {
65+
return true;
66+
}
67+
// The gRPC API currently throws a FAILED_PRECONDITION status code instead of ALREADY_EXISTS,
68+
// so we handle both these conditions in the interim.
69+
// TODO: remove once the status codes are fixed.
70+
else if (errorType == ErrorType.FAILED_PRECONDITION) {
71+
StatusRuntimeException statusRuntimeException = getStatusRuntimeException(e);
72+
return statusRuntimeException != null && BUCKET_ALREADY_EXISTS_MESSAGE.equals(
73+
statusRuntimeException.getMessage());
74+
}
75+
return false;
76+
}
77+
78+
/**
79+
* Extracts StatusRuntimeException from the Exception, if it exists.
80+
*/
81+
@Nullable
82+
static private StatusRuntimeException getStatusRuntimeException(Exception e) {
83+
Throwable cause = e;
84+
// Keeping a counter to break early from the loop to avoid infinite loop condition due to
85+
// cyclic exception chains.
86+
int currentExceptionDepth = 0, maxChainDepth = 1000;
87+
while (cause != null && currentExceptionDepth < maxChainDepth) {
88+
if (cause instanceof StatusRuntimeException) {
89+
return (StatusRuntimeException) cause;
90+
}
91+
cause = cause.getCause();
92+
currentExceptionDepth++;
93+
}
94+
return null;
95+
}
96+
}

0 commit comments

Comments
 (0)