Skip to content

Support Amazon S3 backend in TieredStorage #6154

@TheR1sing3un

Description

@TheR1sing3un

Support S3 backend for TiredStorage

Target

  1. Implement Amazon S3 backend for TieredStorage.
  2. Optimize upload and download efficiency.

Implment

Core Idea

When the upper layer calls commit0, the data is directly uploaded, and our upload buffer and pre-read mechanism are implemented in TieredFileSegment and TieredMessageFetcher, respectively. We do not need to do optimization at this layer and treat commit0 as the method that is actually persisted to S3. When we fill an entire Segment, we can use UploadPartCopy to actually organize these objects of different sizes into one large object.

Complete Process

We need to maintain metadata S3SegmentMetadata for this Segment, which maintains the mapping between the position of this logical Segment and the directory and size of the file on S3. For example, the internal small file metadata of the first Segment of CommitLog of queue1.

  • 0 -> <clusterName1/brokerName1/topic1/queue1/commitLog/seg-0/chunk-0, 1024>
  • 1024 -> <clusterName1/brokerName1/topic1/queue1/commitLog/seg-0/chunk-1024, 4096>
  • 5120 -> <clusterName1/brokerName1/topic1/queue1/commitLog/seg-0/chunk-5120, 5120>

Create the S3SegmentFile object.

That is, call the S3Segment#S3Segment constructor function. The path of the logical Segment file is concatenated according to the rules, and the format is as follows: {clusterName}/{brokerName}/{topic}/{queue}/{type}/seg-{baseOffset} is used as the path of the logical Segment file. The path below is called baseDir.

Build S3SegementMetadata

Get the objects under baseDir and construct the metadata.

Commit

That is, the upper layer calls the S3Segment#commit0() method. Suppose to write data with position=0 and length=1024, that is, write to the object path baseDir/chunk-0 with a size of 1024. Upload it directly through the S3 client asynchronously. After the upload is completed, update the S3SegmentMetadata:

0 -> <baseDir/chunk-0, 1024>

Read

That is, call the S3Segment#read0() method. Suppose to read 1M data with position=0 and length=1024. Then you can find the object baseDir/chunk-0 starting from position 0 through S3SegmentMetadata, and download it directly through the S3 client.

Commit

Suppose we write data with position=1024 and length=4096, that is, submit 4K data from position 1K. Upload it directly through the S3 client asynchronously. After the upload is completed, update the S3SegmentMetadata:

0 -> <baseDir/chunk-0, 1024>
1024 -> <baseDir/chunk-1024, 4096>

read

Assuming we are reading from position = 512 and length = 1024, according to S3SegmentMetadata, we need to fetch the data of [512, 1024) from baseDir/chunk-0 and [1024, 1536) from baseDir/chunk-1024.

commit

Assuming we are writing data with position = 5120 and length = 5120, we upload the object baseDir/chunk-5120, and the S3SegmentMetadata becomes:

0 -> <baseDir/chunk-0, 1024>
1024 -> <baseDir/chunk-1024, 4096>
5120 -> <baseDir/chunk-5120, 5120>

Segment full

Assuming our Segment size is 10240 Bytes. We have filled the above Segment by now.
We can asynchronously trigger an uploadPartCopy to consolidate all the chunks into a large segment object. The path is baseDir/segment. After the copy is successful, we can asynchronously delete all baseDir/chunk-*, and update S3SegmentMetadata to:

0 -> <baseDir/segment, 10240>

Restart - Consolidated

At this point, we first concatenate the path baseDir/segment to determine whether the object exists. If it exists, it is a consolidated large Segment object, so we record the corresponding metadata locally. read0() can directly read the object based on the offset. Then, we check whether the baseDir/chunk-* objects currently exist. If they do, it means that the asynchronous deletion has not been successful, so we can try to delete them again asynchronously.

Restart - Not Consolidated

If the concatenated path baseDir/segment does not exist, it may be due to consolidation failure or the current Segment not being fully written. We can list all the chunk-* paths under baseDir and then determine whether they are full (an interface can be added to determine this during recovery). If it is full, we can consolidate and delete it asynchronously. If it is not full, we can restore the metadata S3SegmentMetadata normally.

Possible Optimizations

Upload Buffer Pooling

A general UploadBufferPool can be used as the upload buffer. Each time commit0 is called, the data is first put into the corresponding Buffer in the pool. When the overall buffer pool reaches the set threshold, the actual data upload is triggered.

  • When commit0 is called, the data is read into the corresponding Buffer in the queue.
  • When the Upload Buffer Pool reaches the threshold, the actual data is uploaded to S3. All the data in each queue's Buffer forms an object.
  • Update S3SegmentMetadata and TieredFileSegment#commitPosition.

Tasks

  1. Implement and run the basic solution.
  2. Test the performance of the basic solution.
  3. If performance is related to uploading in batches, implement an optimization solution with uploading buffering pooling and compare its performance.

Elimination implement

new configuration

Configuration Type Unit Default Value Function
s3Region String Region name about S3 service
s3Bucket String object's store bucket in S3
s3AccessKey String IAM's accessKey
s3SecretAccessKey String IAM's Secret AccessKey
s3ChunkSize long bytes 4 *1024* 1024 chunk num in one S3Segment
readaheadChunkNum int 8 readahead chunk num in each read0 calling
s3ClientThreadNum int tieredStoreGroupCommitSize/s3ChunkSize threads' num in S3Client's threadpool

A segment is treated as a logical file and is divided into multiple physical files, or multiple physical objects, in the S3 view. We assume that each physical object has a default size of 4 MB, which is named chunk.

For ease of process representation, we assume that readaheadChunkNum is 2 in the following.

Process

Create S3SegmentFile object。

This is done in the S3Segment#S3Segment constructor. The path of the logical segment file is constructed by concatenating the following components according to a set of rules: clusterName/brokerName/topic/queue/type-baseOffset. The path below this point is referred to as baseDir.

Create real logical segment

That is, the S3Segment#createFile() method is called. Since no data has been written yet, we need to create the first chunk object and allocate 4MB of memory to cache the data for this chunk. We request the creation of an object from S3 in the format baseDir/chunk-startOffset, which means creating a baseDir/chunk-0 object in S3 now.

commit

The Segment#commit0() method is called.
We assume that wrting 2MB data this time.

The data is directlly writed into chunk-0, and uploaded to S3.

read

That is, the S3Segment#read0() method is called. Suppose we are currently reading 1MB of data with position = 0 and length = 1024. Then it directly hits in the local chunk-0 buffer and returns.

commit

Suppose this time we write position= 2048, length= 12 * 1024 data, that is, submit 12MB of data from 2MB position.

At this point, the first 2MB of chunk-0 is cached locally, so we can directly concatenate the first 2MB of chunk-0 with the first 2MB of the stream to form a complete chunk-0. Next, we correctly locate the first 2MB of chunk-4096, chunk-8192, and chunk-12288, and then upload them to S3. For the case of multiple chunks uploading at the same time, we use asynchronous/thread pool to upload them. If some chunks fail to upload, they are cached and then retried in the background asynchronously. If they fail multiple times, appropriate logical processing is performed.

read

After the above commit, only the first 2MB of chunk-12288 is cached locally. Now, we read 4096 bytes of data starting from position = 2048, which means reading the second half of chunk-0 and the first half of chunk-4096. Since we have enabled the pre-reading mechanism and the parameter is 2, we need to read two more chunks. Considering that we only read half of chunk-4096, we only need to read one more chunk, which is chunk-8192.

Then we read chunk-0, chunk-4096, and chunk-8192 from S3. According to the pre-reading mechanism, we do not save chunk-0 and only save chunk-4096 and chunk-8192 in memory.

read

Now, we read 4096 bytes of data starting from position = 6144, which means reading the second half of chunk-4096 and the first half of chunk-8192. Since we have pre-loaded chunk-4096 and chunk-8192 into memory, we can directly return the data without reading from S3.

Segment is full

At this point, we can asynchronously trigger an uploadPartCopy operation to consolidate all the chunks into a single large segment object, and record the basic information of the segment in the object metadata. The object path is clusterName/brokerName/topic/queue/type-baseOffset-seg. After the copy operation is successful, we can asynchronously delete the parent path of the chunks.

restart with segment

Now we concatenate the path clusterName/brokerName/topic/queue/type-baseOffset-seg and check whether the object exists. If it exists, it means it is the already organized large Segment object, then we record the corresponding metadata locally, and read0() can directly read the object based on the offset. Next, we check if there is an object under .../type-baseOffset. If it exists, it means the asynchronous deletion has not been successful, so we can re-attempt asynchronous deletion.

restart with unexist segment

If the path .../type-baseOffset-seg does not exist, it may be due to failed consolidation or the current segment has not been written to capacity. In this case, we can list all the chunk files under the path and then determine if the segment has been fully written (this can be checked by adding an interface that is called during recovery). If the segment has been fully written, we can consolidate the chunks asynchronously and then delete them. If the segment has not been fully written, we can simply recover the latest chunk by caching it.

Advantages and disadvantages

Advantages

  1. The pre-fetching mechanism and the caching of incomplete chunks can help reduce network requests.
  2. Optimal use case for this design is sequential read access, which fully utilizes the prefetching mechanism.

Disadvantages

  1. chunk caches can lead to excessive memory usage. Suppose that 1000 queues, even if only one chunk is cached for one queue, can reach 4GB of memory usage.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions