Skip to content

Commit 0353890

Browse files
committed
service/s3/s3manager: Use sync.Pool for reuse of part buffers for streaming payloads
1 parent d592c40 commit 0353890

File tree

3 files changed

+45
-19
lines changed

3 files changed

+45
-19
lines changed

CHANGELOG_PENDING.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ SDK Enhancements
2929
* Related to [aws/aws-sdk-go#2310](https://github.com/aws/aws-sdk-go/pull/2310)
3030
* Fixes [#251](https://github.com/aws/aws-sdk-go-v2/issues/251)
3131
* `aws/request` : Retryer is now a named field on Request. ([#393](https://github.com/aws/aws-sdk-go-v2/pull/393))
32-
32+
* `service/s3/s3manager`: Adds `sync.Pool` to allow reuse of part buffers for streaming payloads ([#404](https://github.com/aws/aws-sdk-go-v2/pull/404))
33+
* Fixes [#402](https://github.com/aws/aws-sdk-go-v2/issues/402)
34+
* Uses the new behavior introduced in V1 [#2863](https://github.com/aws/aws-sdk-go/pull/2863) which allows the reuse of the sync.Pool across multiple Upload request that match part sizes.
35+
3336
SDK Bugs
3437
---

service/s3/s3manager/download.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ func (d *downloader) downloadChunk(chunk dlchunk) error {
433433
// Check if the returned error is an errReadingBody.
434434
// If err is errReadingBody this indicates that an error
435435
// occurred while copying the http response body.
436-
// If this occurs we unwrap the error to set the underling error
436+
// If this occurs we unwrap the error to set the underlying error
437437
// and attempt any remaining retries.
438438
if bodyErr, ok := err.(*errReadingBody); ok {
439439
err = bodyErr.Unwrap()

service/s3/s3manager/upload.go

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ type Uploader struct {
162162

163163
// Defines the buffer strategy used when uploading a part
164164
BufferProvider ReadSeekerWriteToProvider
165+
166+
// partPool allows for the re-usage of streaming payload part buffers between upload calls
167+
partPool *partPool
165168
}
166169

167170
// NewUploader creates a new Uploader instance to upload objects to S3. Pass In
@@ -179,8 +182,12 @@ type Uploader struct {
179182
// u.PartSize = 64 * 1024 * 1024 // 64MB per part
180183
// })
181184
func NewUploader(cfg aws.Config, options ...func(*Uploader)) *Uploader {
185+
return newUploader(s3.New(cfg), options...)
186+
}
187+
188+
func newUploader(client s3iface.ClientAPI, options ...func(*Uploader)) *Uploader {
182189
u := &Uploader{
183-
S3: s3.New(cfg),
190+
S3: client,
184191
PartSize: DefaultUploadPartSize,
185192
Concurrency: DefaultUploadConcurrency,
186193
LeavePartsOnError: false,
@@ -192,6 +199,8 @@ func NewUploader(cfg aws.Config, options ...func(*Uploader)) *Uploader {
192199
option(u)
193200
}
194201

202+
u.partPool = newPartPool(u.PartSize)
203+
195204
return u
196205
}
197206

@@ -214,20 +223,7 @@ func NewUploader(cfg aws.Config, options ...func(*Uploader)) *Uploader {
214223
// u.PartSize = 64 * 1024 * 1024 // 64MB per part
215224
// })
216225
func NewUploaderWithClient(svc s3iface.ClientAPI, options ...func(*Uploader)) *Uploader {
217-
u := &Uploader{
218-
S3: svc,
219-
PartSize: DefaultUploadPartSize,
220-
Concurrency: DefaultUploadConcurrency,
221-
LeavePartsOnError: false,
222-
MaxUploadParts: MaxUploadParts,
223-
BufferProvider: defaultUploadBufferProvider(),
224-
}
225-
226-
for _, option := range options {
227-
option(u)
228-
}
229-
230-
return u
226+
return newUploader(svc, options...)
231227
}
232228

233229
// Upload uploads an object to S3, intelligently buffering large files into
@@ -287,6 +283,7 @@ func (u Uploader) UploadWithContext(ctx context.Context, input *UploadInput, opt
287283
for _, opt := range opts {
288284
opt(&i.cfg)
289285
}
286+
290287
i.cfg.RequestOptions = append(i.cfg.RequestOptions, request.WithAppendUserAgent("S3Manager"))
291288

292289
return i.upload()
@@ -391,6 +388,15 @@ func (u *uploader) init() error {
391388
if u.cfg.PartSize == 0 {
392389
u.cfg.PartSize = DefaultUploadPartSize
393390
}
391+
if u.cfg.MaxUploadParts == 0 {
392+
u.cfg.MaxUploadParts = MaxUploadParts
393+
}
394+
395+
// If PartSize was changed or partPool was never setup then we need to allocated a new pool
396+
// so that we return []byte slices of the correct size
397+
if u.cfg.partPool == nil || u.cfg.partPool.partSize != u.cfg.PartSize {
398+
u.cfg.partPool = newPartPool(u.cfg.PartSize)
399+
}
394400

395401
// Try to get the total size for some optimizations
396402
return u.initSize()
@@ -460,11 +466,13 @@ func (u *uploader) nextReader() (io.ReadSeeker, int, func(), error) {
460466
return reader, int(n), cleanup, err
461467

462468
default:
463-
part := make([]byte, u.cfg.PartSize)
469+
part := u.cfg.partPool.Get().([]byte)
464470
n, err := readFillBuf(r, part)
465471
u.readerPos += int64(n)
466472

467-
cleanup := func() {}
473+
cleanup := func() {
474+
u.cfg.partPool.Put(part)
475+
}
468476

469477
return bytes.NewReader(part[0:n]), n, cleanup, err
470478
}
@@ -751,3 +759,18 @@ func (u *multiuploader) complete() *s3.CompleteMultipartUploadOutput {
751759

752760
return resp.CompleteMultipartUploadOutput
753761
}
762+
763+
type partPool struct {
764+
partSize int64
765+
sync.Pool
766+
}
767+
768+
func newPartPool(partSize int64) *partPool {
769+
p := &partPool{partSize: partSize}
770+
771+
p.New = func() interface{} {
772+
return make([]byte, p.partSize)
773+
}
774+
775+
return p
776+
}

0 commit comments

Comments
 (0)