@@ -6,11 +6,16 @@ import (
66 "io"
77 "net/http"
88 "strconv"
9+ "time"
910
1011 "github.com/OpenListTeam/OpenList/v4/drivers/base"
1112 "github.com/OpenListTeam/OpenList/v4/internal/driver"
1213 "github.com/OpenListTeam/OpenList/v4/internal/model"
14+ "github.com/OpenListTeam/OpenList/v4/internal/stream"
15+ "github.com/OpenListTeam/OpenList/v4/pkg/errgroup"
16+ "github.com/OpenListTeam/OpenList/v4/pkg/singleflight"
1317 "github.com/OpenListTeam/OpenList/v4/pkg/utils"
18+ "github.com/avast/retry-go"
1419 "github.com/go-resty/resty/v2"
1520)
1621
@@ -69,18 +74,15 @@ func (d *Pan123) completeS3(ctx context.Context, upReq *UploadResp, file model.F
6974}
7075
7176func (d * Pan123 ) newUpload (ctx context.Context , upReq * UploadResp , file model.FileStreamer , up driver.UpdateProgress ) error {
72- tmpF , err := file .CacheFullInTempFile ()
73- if err != nil {
74- return err
75- }
7677 // fetch s3 pre signed urls
7778 size := file .GetSize ()
78- chunkSize := min (size , 16 * utils .MB )
79- chunkCount := int (size / chunkSize )
79+ chunkSize := int64 (16 * utils .MB )
80+ chunkCount := 1
81+ if size > chunkSize {
82+ chunkCount = int ((size + chunkSize - 1 ) / chunkSize )
83+ }
8084 lastChunkSize := size % chunkSize
81- if lastChunkSize > 0 {
82- chunkCount ++
83- } else {
85+ if lastChunkSize == 0 {
8486 lastChunkSize = chunkSize
8587 }
8688 // only 1 batch is allowed
@@ -90,73 +92,103 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
9092 batchSize = 10
9193 getS3UploadUrl = d .getS3PreSignedUrls
9294 }
95+ ss , err := stream .NewStreamSectionReader (file , int (chunkSize ))
96+ if err != nil {
97+ return err
98+ }
99+
100+ thread := min (int (chunkCount ), d .UploadThread )
101+ threadG , uploadCtx := errgroup .NewOrderedGroupWithContext (ctx , thread ,
102+ retry .Attempts (3 ),
103+ retry .Delay (time .Second ),
104+ retry .DelayType (retry .BackOffDelay ))
93105 for i := 1 ; i <= chunkCount ; i += batchSize {
94- if utils .IsCanceled (ctx ) {
95- return ctx . Err ()
106+ if utils .IsCanceled (uploadCtx ) {
107+ break
96108 }
97109 start := i
98110 end := min (i + batchSize , chunkCount + 1 )
99- s3PreSignedUrls , err := getS3UploadUrl (ctx , upReq , start , end )
111+ s3PreSignedUrls , err := getS3UploadUrl (uploadCtx , upReq , start , end )
100112 if err != nil {
101113 return err
102114 }
103115 // upload each chunk
104- for j := start ; j < end ; j ++ {
105- if utils .IsCanceled (ctx ) {
106- return ctx . Err ()
116+ for cur := start ; cur < end ; cur ++ {
117+ if utils .IsCanceled (uploadCtx ) {
118+ break
107119 }
120+ offset := int64 (cur - 1 ) * chunkSize
108121 curSize := chunkSize
109- if j == chunkCount {
122+ if cur == chunkCount {
110123 curSize = lastChunkSize
111124 }
112- err = d .uploadS3Chunk (ctx , upReq , s3PreSignedUrls , j , end , io .NewSectionReader (tmpF , chunkSize * int64 (j - 1 ), curSize ), curSize , false , getS3UploadUrl )
113- if err != nil {
114- return err
115- }
116- up (float64 (j ) * 100 / float64 (chunkCount ))
125+ var reader * stream.SectionReader
126+ var rateLimitedRd io.Reader
127+ threadG .GoWithLifecycle (errgroup.Lifecycle {
128+ Before : func (ctx context.Context ) error {
129+ if reader == nil {
130+ var err error
131+ reader , err = ss .GetSectionReader (offset , curSize )
132+ if err != nil {
133+ return err
134+ }
135+ rateLimitedRd = driver .NewLimitedUploadStream (ctx , reader )
136+ }
137+ return nil
138+ },
139+ Do : func (ctx context.Context ) error {
140+ reader .Seek (0 , io .SeekStart )
141+ uploadUrl := s3PreSignedUrls .Data .PreSignedUrls [strconv .Itoa (cur )]
142+ if uploadUrl == "" {
143+ return fmt .Errorf ("upload url is empty, s3PreSignedUrls: %+v" , s3PreSignedUrls )
144+ }
145+ reader .Seek (0 , io .SeekStart )
146+ req , err := http .NewRequestWithContext (ctx , http .MethodPut , uploadUrl , rateLimitedRd )
147+ if err != nil {
148+ return err
149+ }
150+ req .ContentLength = curSize
151+ //req.Header.Set("Content-Length", strconv.FormatInt(curSize, 10))
152+ res , err := base .HttpClient .Do (req )
153+ if err != nil {
154+ return err
155+ }
156+ defer res .Body .Close ()
157+ if res .StatusCode == http .StatusForbidden {
158+ singleflight .AnyGroup .Do (fmt .Sprintf ("Pan123.newUpload_%p" , threadG ), func () (any , error ) {
159+ newS3PreSignedUrls , err := getS3UploadUrl (ctx , upReq , cur , end )
160+ if err != nil {
161+ return nil , err
162+ }
163+ s3PreSignedUrls .Data .PreSignedUrls = newS3PreSignedUrls .Data .PreSignedUrls
164+ return nil , nil
165+ })
166+ if err != nil {
167+ return err
168+ }
169+ return fmt .Errorf ("upload s3 chunk %d failed, status code: %d" , cur , res .StatusCode )
170+ }
171+ if res .StatusCode != http .StatusOK {
172+ body , err := io .ReadAll (res .Body )
173+ if err != nil {
174+ return err
175+ }
176+ return fmt .Errorf ("upload s3 chunk %d failed, status code: %d, body: %s" , cur , res .StatusCode , body )
177+ }
178+ progress := 10.0 + 85.0 * float64 (threadG .Success ())/ float64 (chunkCount )
179+ up (progress )
180+ return nil
181+ },
182+ After : func (err error ) {
183+ ss .RecycleSectionReader (reader )
184+ },
185+ })
117186 }
118187 }
119- // complete s3 upload
120- return d .completeS3 (ctx , upReq , file , chunkCount > 1 )
121- }
122-
123- func (d * Pan123 ) uploadS3Chunk (ctx context.Context , upReq * UploadResp , s3PreSignedUrls * S3PreSignedURLs , cur , end int , reader * io.SectionReader , curSize int64 , retry bool , getS3UploadUrl func (ctx context.Context , upReq * UploadResp , start int , end int ) (* S3PreSignedURLs , error )) error {
124- uploadUrl := s3PreSignedUrls .Data .PreSignedUrls [strconv .Itoa (cur )]
125- if uploadUrl == "" {
126- return fmt .Errorf ("upload url is empty, s3PreSignedUrls: %+v" , s3PreSignedUrls )
127- }
128- req , err := http .NewRequest ("PUT" , uploadUrl , driver .NewLimitedUploadStream (ctx , reader ))
129- if err != nil {
188+ if err := threadG .Wait (); err != nil {
130189 return err
131190 }
132- req = req .WithContext (ctx )
133- req .ContentLength = curSize
134- //req.Header.Set("Content-Length", strconv.FormatInt(curSize, 10))
135- res , err := base .HttpClient .Do (req )
136- if err != nil {
137- return err
138- }
139- defer res .Body .Close ()
140- if res .StatusCode == http .StatusForbidden {
141- if retry {
142- return fmt .Errorf ("upload s3 chunk %d failed, status code: %d" , cur , res .StatusCode )
143- }
144- // refresh s3 pre signed urls
145- newS3PreSignedUrls , err := getS3UploadUrl (ctx , upReq , cur , end )
146- if err != nil {
147- return err
148- }
149- s3PreSignedUrls .Data .PreSignedUrls = newS3PreSignedUrls .Data .PreSignedUrls
150- // retry
151- reader .Seek (0 , io .SeekStart )
152- return d .uploadS3Chunk (ctx , upReq , s3PreSignedUrls , cur , end , reader , curSize , true , getS3UploadUrl )
153- }
154- if res .StatusCode != http .StatusOK {
155- body , err := io .ReadAll (res .Body )
156- if err != nil {
157- return err
158- }
159- return fmt .Errorf ("upload s3 chunk %d failed, status code: %d, body: %s" , cur , res .StatusCode , body )
160- }
161- return nil
191+ defer up (100 )
192+ // complete s3 upload
193+ return d .completeS3 (ctx , upReq , file , chunkCount > 1 )
162194}
0 commit comments