@@ -3,6 +3,7 @@ package net
33import (
44 "bytes"
55 "context"
6+ "errors"
67 "fmt"
78 "io"
89 "net/http"
@@ -119,7 +120,7 @@ type ConcurrencyLimit struct {
119120 Limit int // 需要大于0
120121}
121122
122- var ErrExceedMaxConcurrency = fmt . Errorf ("ExceedMaxConcurrency" )
123+ var ErrExceedMaxConcurrency = errors . New ("ExceedMaxConcurrency" )
123124
124125func (l * ConcurrencyLimit ) sub () error {
125126 l ._m .Lock ()
@@ -279,10 +280,9 @@ func (d *downloader) interrupt() error {
279280 err := fmt .Errorf ("interrupted" )
280281 d .err = err
281282 }
282- if d .chunkChannel != nil {
283+ close (d .chunkChannel )
284+ if d .bufs != nil {
283285 d .cancel (err )
284- close (d .chunkChannel )
285- d .chunkChannel = nil
286286 for _ , buf := range d .bufs {
287287 buf .Close ()
288288 }
@@ -291,8 +291,6 @@ func (d *downloader) interrupt() error {
291291 d .concurrency = - d .concurrency
292292 }
293293 log .Debugf ("maxConcurrency:%d" , d .cfg .Concurrency + d .concurrency )
294- } else {
295- log .Debug ("close of closed channel" )
296294 }
297295 return err
298296}
@@ -314,31 +312,35 @@ func (d *downloader) finishBuf(id int) (isLast bool, nextBuf *Buf) {
314312// downloadPart is an individual goroutine worker reading from the ch channel
315313// and performing Http request on the data with a given byte range.
316314func (d * downloader ) downloadPart () {
317- // defer d.wg.Done ()
315+ defer d .concurrencyFinish ()
318316 for {
319- c , ok := <- d .chunkChannel
320- if ! ok {
321- break
322- }
323- if d .getErr () != nil {
324- // Drain the channel if there is an error, to prevent deadlocking
325- // of download producer.
326- break
327- }
328- if err := d .downloadChunk (& c ); err != nil {
329- if err == errCancelConcurrency {
330- break
317+ select {
318+ case <- d .ctx .Done ():
319+ return
320+ case c , ok := <- d .chunkChannel :
321+ if ! ok {
322+ return
331323 }
332- if err == context .Canceled {
333- if e := context .Cause (d .ctx ); e != nil {
334- err = e
324+ if d .getErr () != nil {
325+ // Drain the channel if there is an error, to prevent deadlocking
326+ // of download producer.
327+ return
328+ }
329+ if err := d .downloadChunk (& c ); err != nil {
330+ if err == errCancelConcurrency {
331+ return
332+ }
333+ if err == context .Canceled {
334+ if e := context .Cause (d .ctx ); e != nil {
335+ err = e
336+ }
335337 }
338+ d .setErr (err )
339+ d .cancel (err )
340+ return
336341 }
337- d .setErr (err )
338- d .cancel (err )
339342 }
340343 }
341- d .concurrencyFinish ()
342344}
343345
344346// downloadChunk downloads the chunk
@@ -390,8 +392,8 @@ func (d *downloader) downloadChunk(ch *chunk) error {
390392 return err
391393}
392394
393- var errCancelConcurrency = fmt . Errorf ("cancel concurrency" )
394- var errInfiniteRetry = fmt . Errorf ("infinite retry" )
395+ var errCancelConcurrency = errors . New ("cancel concurrency" )
396+ var errInfiniteRetry = errors . New ("infinite retry" )
395397
396398func (d * downloader ) tryDownloadChunk (params * HttpRequestParams , ch * chunk ) (int64 , error ) {
397399 resp , err := d .cfg .HttpClient (d .ctx , params )
0 commit comments