131
131
import static org .apache .hadoop .fs .azurebfs .constants .FileSystemConfigurations .DATA_BLOCKS_BUFFER_DEFAULT ;
132
132
import static org .apache .hadoop .fs .azurebfs .constants .InternalConstants .CAPABILITY_SAFE_READAHEAD ;
133
133
import static org .apache .hadoop .fs .azurebfs .services .AbfsErrors .ERR_CREATE_ON_ROOT ;
134
+ import static org .apache .hadoop .fs .azurebfs .services .AbfsErrors .ERR_INVALID_ABFS_STATE ;
134
135
import static org .apache .hadoop .fs .azurebfs .services .AbfsErrors .UNAUTHORIZED_SAS ;
135
136
import static org .apache .hadoop .fs .impl .PathCapabilitiesSupport .validatePathCapabilityArgs ;
136
137
import static org .apache .hadoop .fs .statistics .IOStatisticsLogging .logIOStatisticsAtLevel ;
@@ -148,7 +149,11 @@ public class AzureBlobFileSystem extends FileSystem
148
149
private URI uri ;
149
150
private Path workingDir ;
150
151
private AzureBlobFileSystemStore abfsStore ;
151
- private boolean isClosed ;
152
+
153
+ /**
154
+ * Flag to indicate whether the file system is closed or not initiated.
155
+ */
156
+ private boolean isClosed = true ;
152
157
private final String fileSystemId = UUID .randomUUID ().toString ();
153
158
154
159
private boolean delegationTokenEnabled = false ;
@@ -312,6 +317,7 @@ public void initialize(URI uri, Configuration configuration)
312
317
}
313
318
314
319
rateLimiting = RateLimitingFactory .create (abfsConfiguration .getRateLimit ());
320
+ isClosed = false ;
315
321
LOG .debug ("Initializing AzureBlobFileSystem for {} complete" , uri );
316
322
}
317
323
@@ -329,8 +335,8 @@ public String toString() {
329
335
final StringBuilder sb = new StringBuilder (
330
336
"AzureBlobFileSystem{" );
331
337
sb .append ("uri=" ).append (fullPathUri );
332
- sb .append (", user='" ).append (abfsStore .getUser ()).append ('\'' );
333
- sb .append (", primaryUserGroup='" ).append (abfsStore .getPrimaryGroup ()).append ('\'' );
338
+ sb .append (", user='" ).append (getAbfsStore () .getUser ()).append ('\'' );
339
+ sb .append (", primaryUserGroup='" ).append (getAbfsStore () .getPrimaryGroup ()).append ('\'' );
334
340
sb .append ("[" + CAPABILITY_SAFE_READAHEAD + "]" );
335
341
sb .append ('}' );
336
342
return sb .toString ();
@@ -354,7 +360,7 @@ public FSDataInputStream open(final Path path, final int bufferSize) throws IOEx
354
360
// bufferSize is unused.
355
361
LOG .debug (
356
362
"AzureBlobFileSystem.open path: {} bufferSize as configured in 'fs.azure.read.request.size': {}" ,
357
- path , abfsStore .getAbfsConfiguration ().getReadBufferSize ());
363
+ path , getAbfsStore () .getAbfsConfiguration ().getReadBufferSize ());
358
364
return open (path , Optional .empty ());
359
365
}
360
366
@@ -517,7 +523,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr
517
523
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
518
524
fileSystemId , FSOperationType .APPEND , tracingHeaderFormat ,
519
525
listener );
520
- OutputStream outputStream = abfsStore
526
+ OutputStream outputStream = getAbfsStore ()
521
527
.openFileForWrite (qualifiedPath , statistics , false , tracingContext );
522
528
return new FSDataOutputStream (outputStream , statistics );
523
529
} catch (AzureBlobFileSystemException ex ) {
@@ -782,7 +788,7 @@ public boolean mkdirs(final Path f, final FsPermission permission) throws IOExce
782
788
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
783
789
fileSystemId , FSOperationType .MKDIR , false , tracingHeaderFormat ,
784
790
listener );
785
- abfsStore .createDirectory (qualifiedPath ,
791
+ getAbfsStore () .createDirectory (qualifiedPath ,
786
792
permission == null ? FsPermission .getDirDefault () : permission ,
787
793
FsPermission .getUMask (getConf ()), tracingContext );
788
794
statIncrement (DIRECTORIES_CREATED );
@@ -795,10 +801,10 @@ public boolean mkdirs(final Path f, final FsPermission permission) throws IOExce
795
801
796
802
@ Override
797
803
public synchronized void close () throws IOException {
798
- if (isClosed ) {
804
+ if (isClosed () ) {
799
805
return ;
800
806
}
801
- if (abfsStore .getClient ().isMetricCollectionEnabled ()) {
807
+ if (getAbfsStore () .getClient ().isMetricCollectionEnabled ()) {
802
808
TracingContext tracingMetricContext = new TracingContext (
803
809
clientCorrelationId ,
804
810
fileSystemId , FSOperationType .GET_ATTR , true ,
@@ -819,7 +825,7 @@ public synchronized void close() throws IOException {
819
825
IOSTATISTICS_LOGGING_LEVEL_DEFAULT );
820
826
logIOStatisticsAtLevel (LOG , iostatisticsLoggingLevel , getIOStatistics ());
821
827
}
822
- IOUtils .cleanupWithLogger (LOG , abfsStore , delegationTokenManager ,
828
+ IOUtils .cleanupWithLogger (LOG , getAbfsStore () , delegationTokenManager ,
823
829
getAbfsClient ());
824
830
this .isClosed = true ;
825
831
if (LOG .isDebugEnabled ()) {
@@ -866,7 +872,7 @@ public void breakLease(final Path f) throws IOException {
866
872
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
867
873
fileSystemId , FSOperationType .BREAK_LEASE , tracingHeaderFormat ,
868
874
listener );
869
- abfsStore .breakLease (qualifiedPath , tracingContext );
875
+ getAbfsStore () .breakLease (qualifiedPath , tracingContext );
870
876
} catch (AzureBlobFileSystemException ex ) {
871
877
checkException (f , ex );
872
878
}
@@ -884,6 +890,8 @@ public void breakLease(final Path f) throws IOException {
884
890
*/
885
891
@ Override
886
892
public Path makeQualified (Path path ) {
893
+ // Every API works on qualified paths. If store is null better to fail early.
894
+ Preconditions .checkState (getAbfsStore () != null );
887
895
// To support format: abfs://{dfs.nameservices}/file/path,
888
896
// path need to be first converted to URI, then get the raw path string,
889
897
// during which {dfs.nameservices} will be omitted.
@@ -917,7 +925,7 @@ public String getScheme() {
917
925
public Path getHomeDirectory () {
918
926
return makeQualified (new Path (
919
927
FileSystemConfigurations .USER_HOME_DIRECTORY_PREFIX
920
- + "/" + abfsStore .getUser ()));
928
+ + "/" + getAbfsStore () .getUser ()));
921
929
}
922
930
923
931
/**
@@ -939,7 +947,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file,
939
947
if (file .getLen () < start ) {
940
948
return new BlockLocation [0 ];
941
949
}
942
- final String blobLocationHost = abfsStore .getAbfsConfiguration ().getAzureBlockLocationHost ();
950
+ final String blobLocationHost = getAbfsStore () .getAbfsConfiguration ().getAzureBlockLocationHost ();
943
951
944
952
final String [] name = {blobLocationHost };
945
953
final String [] host = {blobLocationHost };
@@ -973,15 +981,15 @@ protected void finalize() throws Throwable {
973
981
* @return the short name of the user who instantiated the FS
974
982
*/
975
983
public String getOwnerUser () {
976
- return abfsStore .getUser ();
984
+ return getAbfsStore () .getUser ();
977
985
}
978
986
979
987
/**
980
988
* Get the group name of the owner of the FS.
981
989
* @return primary group name
982
990
*/
983
991
public String getOwnerUserPrimaryGroup () {
984
- return abfsStore .getPrimaryGroup ();
992
+ return getAbfsStore () .getPrimaryGroup ();
985
993
}
986
994
987
995
private boolean deleteRoot () throws IOException {
@@ -1053,7 +1061,7 @@ public void setOwner(final Path path, final String owner, final String group)
1053
1061
Path qualifiedPath = makeQualified (path );
1054
1062
1055
1063
try {
1056
- abfsStore .setOwner (qualifiedPath ,
1064
+ getAbfsStore () .setOwner (qualifiedPath ,
1057
1065
owner ,
1058
1066
group ,
1059
1067
tracingContext );
@@ -1090,15 +1098,15 @@ public void setXAttr(final Path path,
1090
1098
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
1091
1099
fileSystemId , FSOperationType .SET_ATTR , true , tracingHeaderFormat ,
1092
1100
listener );
1093
- Hashtable <String , String > properties = abfsStore
1101
+ Hashtable <String , String > properties = getAbfsStore ()
1094
1102
.getPathStatus (qualifiedPath , tracingContext );
1095
1103
String xAttrName = ensureValidAttributeName (name );
1096
1104
boolean xAttrExists = properties .containsKey (xAttrName );
1097
1105
XAttrSetFlag .validate (name , xAttrExists , flag );
1098
1106
1099
- String xAttrValue = abfsStore .decodeAttribute (value );
1107
+ String xAttrValue = getAbfsStore () .decodeAttribute (value );
1100
1108
properties .put (xAttrName , xAttrValue );
1101
- abfsStore .setPathProperties (qualifiedPath , properties , tracingContext );
1109
+ getAbfsStore () .setPathProperties (qualifiedPath , properties , tracingContext );
1102
1110
} catch (AzureBlobFileSystemException ex ) {
1103
1111
checkException (path , ex );
1104
1112
}
@@ -1130,12 +1138,12 @@ public byte[] getXAttr(final Path path, final String name)
1130
1138
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
1131
1139
fileSystemId , FSOperationType .GET_ATTR , true , tracingHeaderFormat ,
1132
1140
listener );
1133
- Hashtable <String , String > properties = abfsStore
1141
+ Hashtable <String , String > properties = getAbfsStore ()
1134
1142
.getPathStatus (qualifiedPath , tracingContext );
1135
1143
String xAttrName = ensureValidAttributeName (name );
1136
1144
if (properties .containsKey (xAttrName )) {
1137
1145
String xAttrValue = properties .get (xAttrName );
1138
- value = abfsStore .encodeAttribute (xAttrValue );
1146
+ value = getAbfsStore () .encodeAttribute (xAttrValue );
1139
1147
}
1140
1148
} catch (AzureBlobFileSystemException ex ) {
1141
1149
checkException (path , ex );
@@ -1173,7 +1181,7 @@ public void setPermission(final Path path, final FsPermission permission)
1173
1181
Path qualifiedPath = makeQualified (path );
1174
1182
1175
1183
try {
1176
- abfsStore .setPermission (qualifiedPath , permission , tracingContext );
1184
+ getAbfsStore () .setPermission (qualifiedPath , permission , tracingContext );
1177
1185
} catch (AzureBlobFileSystemException ex ) {
1178
1186
checkException (path , ex );
1179
1187
}
@@ -1210,7 +1218,7 @@ public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec)
1210
1218
Path qualifiedPath = makeQualified (path );
1211
1219
1212
1220
try {
1213
- abfsStore .modifyAclEntries (qualifiedPath , aclSpec , tracingContext );
1221
+ getAbfsStore () .modifyAclEntries (qualifiedPath , aclSpec , tracingContext );
1214
1222
} catch (AzureBlobFileSystemException ex ) {
1215
1223
checkException (path , ex );
1216
1224
}
@@ -1245,7 +1253,7 @@ public void removeAclEntries(final Path path, final List<AclEntry> aclSpec)
1245
1253
Path qualifiedPath = makeQualified (path );
1246
1254
1247
1255
try {
1248
- abfsStore .removeAclEntries (qualifiedPath , aclSpec , tracingContext );
1256
+ getAbfsStore () .removeAclEntries (qualifiedPath , aclSpec , tracingContext );
1249
1257
} catch (AzureBlobFileSystemException ex ) {
1250
1258
checkException (path , ex );
1251
1259
}
@@ -1273,7 +1281,7 @@ public void removeDefaultAcl(final Path path) throws IOException {
1273
1281
Path qualifiedPath = makeQualified (path );
1274
1282
1275
1283
try {
1276
- abfsStore .removeDefaultAcl (qualifiedPath , tracingContext );
1284
+ getAbfsStore () .removeDefaultAcl (qualifiedPath , tracingContext );
1277
1285
} catch (AzureBlobFileSystemException ex ) {
1278
1286
checkException (path , ex );
1279
1287
}
@@ -1303,7 +1311,7 @@ public void removeAcl(final Path path) throws IOException {
1303
1311
Path qualifiedPath = makeQualified (path );
1304
1312
1305
1313
try {
1306
- abfsStore .removeAcl (qualifiedPath , tracingContext );
1314
+ getAbfsStore () .removeAcl (qualifiedPath , tracingContext );
1307
1315
} catch (AzureBlobFileSystemException ex ) {
1308
1316
checkException (path , ex );
1309
1317
}
@@ -1340,7 +1348,7 @@ public void setAcl(final Path path, final List<AclEntry> aclSpec)
1340
1348
Path qualifiedPath = makeQualified (path );
1341
1349
1342
1350
try {
1343
- abfsStore .setAcl (qualifiedPath , aclSpec , tracingContext );
1351
+ getAbfsStore () .setAcl (qualifiedPath , aclSpec , tracingContext );
1344
1352
} catch (AzureBlobFileSystemException ex ) {
1345
1353
checkException (path , ex );
1346
1354
}
@@ -1368,7 +1376,7 @@ public AclStatus getAclStatus(final Path path) throws IOException {
1368
1376
Path qualifiedPath = makeQualified (path );
1369
1377
1370
1378
try {
1371
- return abfsStore .getAclStatus (qualifiedPath , tracingContext );
1379
+ return getAbfsStore () .getAclStatus (qualifiedPath , tracingContext );
1372
1380
} catch (AzureBlobFileSystemException ex ) {
1373
1381
checkException (path , ex );
1374
1382
return null ;
@@ -1395,7 +1403,7 @@ public void access(final Path path, final FsAction mode) throws IOException {
1395
1403
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
1396
1404
fileSystemId , FSOperationType .ACCESS , tracingHeaderFormat ,
1397
1405
listener );
1398
- this .abfsStore .access (qualifiedPath , mode , tracingContext );
1406
+ this .getAbfsStore () .access (qualifiedPath , mode , tracingContext );
1399
1407
} catch (AzureBlobFileSystemException ex ) {
1400
1408
checkCheckAccessException (path , ex );
1401
1409
}
@@ -1417,11 +1425,11 @@ public boolean exists(Path f) throws IOException {
1417
1425
public RemoteIterator <FileStatus > listStatusIterator (Path path )
1418
1426
throws IOException {
1419
1427
LOG .debug ("AzureBlobFileSystem.listStatusIterator path : {}" , path );
1420
- if (abfsStore .getAbfsConfiguration ().enableAbfsListIterator ()) {
1428
+ if (getAbfsStore () .getAbfsConfiguration ().enableAbfsListIterator ()) {
1421
1429
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
1422
1430
fileSystemId , FSOperationType .LISTSTATUS , true , tracingHeaderFormat , listener );
1423
1431
AbfsListStatusRemoteIterator abfsLsItr =
1424
- new AbfsListStatusRemoteIterator (path , abfsStore ,
1432
+ new AbfsListStatusRemoteIterator (path , getAbfsStore () ,
1425
1433
tracingContext );
1426
1434
return RemoteIterators .typeCastingRemoteIterator (abfsLsItr );
1427
1435
} else {
@@ -1503,7 +1511,7 @@ private boolean fileSystemExists() throws IOException {
1503
1511
try {
1504
1512
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
1505
1513
fileSystemId , FSOperationType .TEST_OP , tracingHeaderFormat , listener );
1506
- abfsStore .getFilesystemProperties (tracingContext );
1514
+ getAbfsStore () .getFilesystemProperties (tracingContext );
1507
1515
} catch (AzureBlobFileSystemException ex ) {
1508
1516
try {
1509
1517
checkException (null , ex );
@@ -1522,7 +1530,7 @@ private void createFileSystem(TracingContext tracingContext) throws IOException
1522
1530
LOG .debug (
1523
1531
"AzureBlobFileSystem.createFileSystem uri: {}" , uri );
1524
1532
try {
1525
- abfsStore .createFilesystem (tracingContext );
1533
+ getAbfsStore () .createFilesystem (tracingContext );
1526
1534
} catch (AzureBlobFileSystemException ex ) {
1527
1535
checkException (null , ex );
1528
1536
}
@@ -1745,14 +1753,21 @@ public boolean failed() {
1745
1753
1746
1754
@ VisibleForTesting
1747
1755
public AzureBlobFileSystemStore getAbfsStore () {
1756
+ if (abfsStore == null ) {
1757
+ throw new IllegalStateException (ERR_INVALID_ABFS_STATE );
1758
+ }
1748
1759
return abfsStore ;
1749
1760
}
1750
1761
1751
1762
@ VisibleForTesting
1752
1763
AbfsClient getAbfsClient () {
1753
- return abfsStore .getClient ();
1764
+ return getAbfsStore () .getClient ();
1754
1765
}
1755
1766
1767
+ @ VisibleForTesting
1768
+ boolean isClosed () {
1769
+ return isClosed ;
1770
+ }
1756
1771
/**
1757
1772
* Get any Delegation Token manager created by the filesystem.
1758
1773
* @return the DT manager or null.
0 commit comments