-
Notifications
You must be signed in to change notification settings - Fork 12k
Description
Support S3 backend for TiredStorage
Target
- Implement Amazon S3 backend for TieredStorage.
- Optimize upload and download efficiency.
Implment
Core Idea
When the upper layer calls commit0
, the data is directly uploaded, and our upload buffer and pre-read mechanism are implemented in TieredFileSegment
and TieredMessageFetcher
, respectively. We do not need to do optimization at this layer and treat commit0 as the method that is actually persisted to S3. When we fill an entire Segment
, we can use UploadPartCopy
to actually organize these objects of different sizes into one large object.
Complete Process
We need to maintain metadata S3SegmentMetadata
for this Segment
, which maintains the mapping between the position
of this logical Segment
and the directory and size of the file on S3. For example, the internal small file metadata of the first Segment
of CommitLog
of queue1
.
0
-><clusterName1/brokerName1/topic1/queue1/commitLog/seg-0/chunk-0, 1024>
1024
-><clusterName1/brokerName1/topic1/queue1/commitLog/seg-0/chunk-1024, 4096>
5120
-><clusterName1/brokerName1/topic1/queue1/commitLog/seg-0/chunk-5120, 5120>
Create the S3SegmentFile object.
That is, call the S3Segment#S3Segment
constructor function. The path of the logical Segment
file is concatenated according to the rules, and the format is as follows: {clusterName}/{brokerName}/{topic}/{queue}/{type}/seg-{baseOffset}
is used as the path of the logical Segment file. The path below is called baseDir
.
Build S3SegementMetadata
Get the objects under baseDir
and construct the metadata.
Commit
That is, the upper layer calls the S3Segment#commit0()
method. Suppose to write data with position=0
and length=1024
, that is, write to the object path baseDir/chunk-0
with a size of 1024
. Upload it directly through the S3 client asynchronously. After the upload is completed, update the S3SegmentMetadata
:
0
-> <baseDir/chunk-0, 1024>
Read
That is, call the S3Segment#read0()
method. Suppose to read 1M data with position=0
and length=1024
. Then you can find the object baseDir/chunk-0
starting from position 0 through S3SegmentMetadata
, and download it directly through the S3 client.
Commit
Suppose we write data with position=1024
and length=4096
, that is, submit 4K data from position 1K. Upload it directly through the S3 client asynchronously. After the upload is completed, update the S3SegmentMetadata
:
0
-> <baseDir/chunk-0, 1024>
1024
-> <baseDir/chunk-1024, 4096>
read
Assuming we are reading from position = 512
and length = 1024
, according to S3SegmentMetadata, we need to fetch the data of [512, 1024)
from baseDir/chunk-0
and [1024, 1536)
from baseDir/chunk-1024
.
commit
Assuming we are writing data with position = 5120
and length = 5120
, we upload the object baseDir/chunk-5120
, and the S3SegmentMetadata
becomes:
0
-> <baseDir/chunk-0, 1024>
1024
-> <baseDir/chunk-1024, 4096>
5120
-> <baseDir/chunk-5120, 5120>
Segment full
Assuming our Segment
size is 10240
Bytes. We have filled the above Segment
by now.
We can asynchronously trigger an uploadPartCopy
to consolidate all the chunks
into a large segment
object. The path is baseDir/segment
. After the copy is successful, we can asynchronously delete all baseDir/chunk-*
, and update S3SegmentMetadata
to:
0
-> <baseDir/segment, 10240>
Restart - Consolidated
At this point, we first concatenate the path baseDir/segment
to determine whether the object exists. If it exists, it is a consolidated large Segment
object, so we record the corresponding metadata locally. read0()
can directly read the object based on the offset. Then, we check whether the baseDir/chunk-*
objects currently exist. If they do, it means that the asynchronous deletion has not been successful, so we can try to delete them again asynchronously.
Restart - Not Consolidated
If the concatenated path baseDir/segment
does not exist, it may be due to consolidation failure or the current Segment
not being fully written. We can list all the chunk-*
paths under baseDir
and then determine whether they are full (an interface can be added to determine this during recovery). If it is full, we can consolidate and delete it asynchronously. If it is not full, we can restore the metadata S3SegmentMetadata
normally.
Possible Optimizations
Upload Buffer Pooling
A general UploadBufferPool
can be used as the upload buffer. Each time commit0
is called, the data is first put into the corresponding Buffer in the pool. When the overall buffer pool reaches the set threshold, the actual data upload is triggered.
- When
commit0
is called, the data is read into the correspondingBuffer
in thequeue
. - When the
Upload Buffer Pool
reaches the threshold, the actual data is uploaded to S3. All the data in eachqueue
'sBuffer
forms an object. - Update
S3SegmentMetadata
andTieredFileSegment#commitPosition
.
Tasks
- Implement and run the basic solution.
- Test the performance of the basic solution.
- If performance is related to uploading in batches, implement an optimization solution with uploading buffering pooling and compare its performance.
Elimination implement
new configuration
Configuration | Type | Unit | Default Value | Function |
---|---|---|---|---|
s3Region | String | Region name about S3 service | ||
s3Bucket | String | object's store bucket in S3 | ||
s3AccessKey | String | IAM's accessKey | ||
s3SecretAccessKey | String | IAM's Secret AccessKey | ||
s3ChunkSize | long | bytes | 4 *1024* 1024 |
chunk num in one S3Segment |
readaheadChunkNum | int | 8 | readahead chunk num in each read0 calling |
|
s3ClientThreadNum | int | tieredStoreGroupCommitSize /s3ChunkSize |
threads' num in S3Client's threadpool |
A segment is treated as a logical file and is divided into multiple physical files, or multiple physical objects, in the S3 view. We assume that each physical object has a default size of 4 MB, which is named chunk
.
For ease of process representation, we assume that readaheadChunkNum is 2 in the following.
Process
Create
S3SegmentFile
object。
This is done in the S3Segment#S3Segment
constructor. The path of the logical segment file is constructed by concatenating the following components according to a set of rules: clusterName/brokerName/topic/queue/type-baseOffset
. The path below this point is referred to as baseDir
.
Create real logical
segment
That is, the S3Segment#createFile()
method is called. Since no data has been written yet, we need to create the first chunk
object and allocate 4MB of memory to cache the data for this chunk. We request the creation of an object from S3 in the format baseDir/chunk-startOffset
, which means creating a baseDir/chunk-0
object in S3 now.
commit
The Segment#commit0()
method is called.
We assume that wrting 2MB data this time.
The data is directlly writed into chunk-0
, and uploaded to S3.
read
That is, the S3Segment#read0()
method is called. Suppose we are currently reading 1MB of data with position = 0 and length = 1024
. Then it directly hits in the local chunk-0
buffer and returns.
commit
Suppose this time we write position= 2048, length= 12 * 1024
data, that is, submit 12MB of data from 2MB position.
At this point, the first 2MB of chunk-0 is cached locally, so we can directly concatenate the first 2MB of chunk-0
with the first 2MB of the stream to form a complete chunk-0
. Next, we correctly locate the first 2MB of chunk-4096
, chunk-8192
, and chunk-12288
, and then upload them to S3. For the case of multiple chunks uploading at the same time, we use asynchronous/thread pool to upload them. If some chunks fail to upload, they are cached and then retried in the background asynchronously. If they fail multiple times, appropriate logical processing is performed.
read
After the above commit, only the first 2MB of chunk-12288
is cached locally. Now, we read 4096 bytes of data starting from position = 2048
, which means reading the second half of chunk-0
and the first half of chunk-4096
. Since we have enabled the pre-reading mechanism and the parameter is 2, we need to read two more chunks. Considering that we only read half of chunk-4096
, we only need to read one more chunk, which is chunk-8192
.
Then we read chunk-0
, chunk-4096
, and chunk-8192
from S3. According to the pre-reading mechanism, we do not save chunk-0
and only save chunk-4096
and chunk-8192
in memory.
read
Now, we read 4096 bytes of data starting from position = 6144
, which means reading the second half of chunk-4096
and the first half of chunk-8192
. Since we have pre-loaded chunk-4096
and chunk-8192
into memory, we can directly return the data without reading from S3.
Segment
is full
At this point, we can asynchronously trigger an uploadPartCopy
operation to consolidate all the chunks
into a single large segment
object, and record the basic information of the segment
in the object metadata. The object path is clusterName/brokerName/topic/queue/type-baseOffset-seg
. After the copy operation is successful, we can asynchronously delete the parent path of the chunks.
restart with
segment
Now we concatenate the path clusterName/brokerName/topic/queue/type-baseOffset-seg
and check whether the object exists. If it exists, it means it is the already organized large Segment
object, then we record the corresponding metadata locally, and read0()
can directly read the object based on the offset. Next, we check if there is an object under .../type-baseOffset
. If it exists, it means the asynchronous deletion has not been successful, so we can re-attempt asynchronous deletion.
restart with unexist
segment
If the path .../type-baseOffset-seg
does not exist, it may be due to failed consolidation or the current segment
has not been written to capacity. In this case, we can list all the chunk files under the path and then determine if the segment has been fully written (this can be checked by adding an interface that is called during recovery). If the segment
has been fully written, we can consolidate the chunks
asynchronously and then delete them. If the segment
has not been fully written, we can simply recover the latest chunk
by caching it.
Advantages and disadvantages
Advantages
- The pre-fetching mechanism and the caching of incomplete chunks can help reduce network requests.
- Optimal use case for this design is sequential read access, which fully utilizes the prefetching mechanism.
Disadvantages
chunk
caches can lead to excessive memory usage. Suppose that 1000 queues, even if only onechunk
is cached for one queue, can reach 4GB of memory usage.