11package net
22
33import (
4- "bytes"
54 "context"
65 "errors"
76 "fmt"
@@ -15,6 +14,7 @@ import (
1514 "github.com/OpenListTeam/OpenList/v4/internal/conf"
1615 "github.com/OpenListTeam/OpenList/v4/internal/model"
1716 "github.com/OpenListTeam/OpenList/v4/pkg/utils"
17+ "github.com/rclone/rclone/lib/mmap"
1818
1919 "github.com/OpenListTeam/OpenList/v4/pkg/http_range"
2020 "github.com/aws/aws-sdk-go/aws/awsutil"
@@ -255,7 +255,10 @@ func (d *downloader) sendChunkTask(newConcurrency bool) error {
255255 finalSize += firstSize - minSize
256256 }
257257 }
258- buf .Reset (int (finalSize ))
258+ err := buf .Reset (int (finalSize ))
259+ if err != nil {
260+ return err
261+ }
259262 ch := chunk {
260263 start : d .pos ,
261264 size : finalSize ,
@@ -645,11 +648,13 @@ func (mr MultiReadCloser) Close() error {
645648}
646649
647650type Buf struct {
648- buffer * bytes.Buffer
649- size int //expected size
650- ctx context.Context
651- off int
652- rw sync.Mutex
651+ size int //expected size
652+ ctx context.Context
653+ offR int
654+ offW int
655+ rw sync.Mutex
656+ buf []byte
657+ mmap bool
653658
654659 readSignal chan struct {}
655660 readPending bool
@@ -658,89 +663,122 @@ type Buf struct {
658663// NewBuf is a buffer that can have 1 read & 1 write at the same time.
659664// when read is faster write, immediately feed data to read after written
660665func NewBuf (ctx context.Context , maxSize int ) * Buf {
661- return & Buf {
662- ctx : ctx ,
663- buffer : bytes .NewBuffer (make ([]byte , 0 , maxSize )),
664- size : maxSize ,
665-
666+ br := & Buf {
667+ ctx : ctx ,
668+ size : maxSize ,
666669 readSignal : make (chan struct {}, 1 ),
667670 }
671+ if conf .MmapThreshold > 0 && maxSize >= conf .MmapThreshold {
672+ m , err := mmap .Alloc (maxSize )
673+ if err == nil {
674+ br .buf = m
675+ br .mmap = true
676+ return br
677+ }
678+ }
679+ br .buf = make ([]byte , maxSize )
680+ return br
668681}
669- func (br * Buf ) Reset (size int ) {
682+
683+ func (br * Buf ) Reset (size int ) error {
670684 br .rw .Lock ()
671685 defer br .rw .Unlock ()
672- if br .buffer == nil {
673- return
686+ if br .buf == nil {
687+ return io .ErrClosedPipe
688+ }
689+ if size > cap (br .buf ) {
690+ return fmt .Errorf ("reset size %d exceeds max size %d" , size , cap (br .buf ))
674691 }
675- br .buffer .Reset ()
676692 br .size = size
677- br .off = 0
693+ br .offR = 0
694+ br .offW = 0
695+ return nil
678696}
679697
680- func (br * Buf ) Read (p []byte ) (n int , err error ) {
698+ func (br * Buf ) Read (p []byte ) (int , error ) {
681699 if err := br .ctx .Err (); err != nil {
682700 return 0 , err
683701 }
684702 if len (p ) == 0 {
685703 return 0 , nil
686704 }
687- if br .off >= br .size {
705+ if br .offR >= br .size {
688706 return 0 , io .EOF
689707 }
690708 for {
691709 br .rw .Lock ()
692- if br .buffer != nil {
693- n , err = br .buffer .Read (p )
694- } else {
695- err = io .ErrClosedPipe
710+ if br .buf == nil {
711+ br .rw .Unlock ()
712+ return 0 , io .ErrClosedPipe
696713 }
697- if err != nil && err != io .EOF {
714+
715+ if br .offW < br .offR {
698716 br .rw .Unlock ()
699- return
717+ return 0 , io . ErrUnexpectedEOF
700718 }
701- if n > 0 {
702- br .off += n
719+ if br . offW == br . offR {
720+ br .readPending = true
703721 br .rw .Unlock ()
704- return n , nil
722+ select {
723+ case <- br .ctx .Done ():
724+ return 0 , br .ctx .Err ()
725+ case _ , ok := <- br .readSignal :
726+ if ! ok {
727+ return 0 , io .ErrClosedPipe
728+ }
729+ continue
730+ }
705731 }
706- br .readPending = true
732+
733+ n := copy (p , br .buf [br .offR :br .offW ])
734+ br .offR += n
707735 br .rw .Unlock ()
708- // n==0, err==io.EOF
709- select {
710- case <- br .ctx .Done ():
711- return 0 , br .ctx .Err ()
712- case _ , ok := <- br .readSignal :
713- if ! ok {
714- return 0 , io .ErrClosedPipe
715- }
716- continue
736+ if n < len (p ) && br .offR >= br .size {
737+ return n , io .EOF
717738 }
739+ return n , nil
718740 }
719741}
720742
721- func (br * Buf ) Write (p []byte ) (n int , err error ) {
743+ func (br * Buf ) Write (p []byte ) (int , error ) {
722744 if err := br .ctx .Err (); err != nil {
723745 return 0 , err
724746 }
747+ if len (p ) == 0 {
748+ return 0 , nil
749+ }
725750 br .rw .Lock ()
726751 defer br .rw .Unlock ()
727- if br .buffer == nil {
752+ if br .buf == nil {
728753 return 0 , io .ErrClosedPipe
729754 }
730- n , err = br .buffer .Write (p )
755+ if br .offW >= br .size {
756+ return 0 , io .ErrShortWrite
757+ }
758+ n := copy (br .buf [br .offW :], p [:min (br .size - br .offW , len (p ))])
759+ br .offW += n
731760 if br .readPending {
732761 br .readPending = false
733762 select {
734763 case br .readSignal <- struct {}{}:
735764 default :
736765 }
737766 }
738- return
767+ if n < len (p ) {
768+ return n , io .ErrShortWrite
769+ }
770+ return n , nil
739771}
740772
741- func (br * Buf ) Close () {
773+ func (br * Buf ) Close () error {
742774 br .rw .Lock ()
743775 defer br .rw .Unlock ()
744- br .buffer = nil
776+ var err error
777+ if br .mmap {
778+ err = mmap .Free (br .buf )
779+ br .mmap = false
780+ }
781+ br .buf = nil
745782 close (br .readSignal )
783+ return err
746784}
0 commit comments