88 "fmt"
99 "io"
1010 "net/http"
11+ "os"
1112
1213 "github.com/OpenListTeam/OpenList/v4/internal/conf"
1314 "github.com/OpenListTeam/OpenList/v4/internal/model"
@@ -151,32 +152,58 @@ func CacheFullAndHash(stream model.FileStreamer, up *model.UpdateProgress, hashT
151152 return tmpF , hex .EncodeToString (h .Sum (nil )), nil
152153}
153154
154- type StreamSectionReader struct {
155- file model.FileStreamer
156- off int64
157- bufPool * pool.Pool [[]byte ]
155+ type StreamSectionReaderIF interface {
156+ // 线程不安全
157+ GetSectionReader (off , length int64 ) (io.ReadSeeker , error )
158+ FreeSectionReader (sr io.ReadSeeker )
159+ // 线程不安全
160+ DiscardSection (off int64 , length int64 ) error
158161}
159162
160- func NewStreamSectionReader (file model.FileStreamer , maxBufferSize int , up * model.UpdateProgress ) (* StreamSectionReader , error ) {
161- ss := & StreamSectionReader {file : file }
163+ func NewStreamSectionReader (file model.FileStreamer , maxBufferSize int , up * model.UpdateProgress ) (StreamSectionReaderIF , error ) {
162164 if file .GetFile () != nil {
163- return ss , nil
165+ return & cachedSectionReader { file . GetFile ()} , nil
164166 }
165167
166168 maxBufferSize = min (maxBufferSize , int (file .GetSize ()))
167169 if maxBufferSize > conf .MaxBufferLimit {
168- _ , err := file . CacheFullAndWriter ( up , nil )
170+ f , err := os . CreateTemp ( conf . Conf . TempDir , "file-*" )
169171 if err != nil {
170172 return nil , err
171173 }
174+
175+ if f .Truncate ((file .GetSize ()+ int64 (maxBufferSize - 1 ))/ int64 (maxBufferSize )* int64 (maxBufferSize )) != nil {
176+ // fallback to full cache
177+ _ , _ = f .Close (), os .Remove (f .Name ())
178+ cache , err := file .CacheFullAndWriter (up , nil )
179+ if err != nil {
180+ return nil , err
181+ }
182+ return & cachedSectionReader {cache }, nil
183+ }
184+
185+ ss := & fileSectionReader {Reader : file , temp : f }
186+ ss .bufPool = & pool.Pool [* offsetWriterWithBase ]{
187+ New : func () * offsetWriterWithBase {
188+ base := ss .fileOff
189+ ss .fileOff += int64 (maxBufferSize )
190+ return & offsetWriterWithBase {io .NewOffsetWriter (ss .temp , base ), base }
191+ },
192+ }
193+ file .Add (utils .CloseFunc (func () error {
194+ ss .bufPool .Reset ()
195+ return errors .Join (ss .temp .Close (), os .Remove (ss .temp .Name ()))
196+ }))
172197 return ss , nil
173198 }
199+
200+ ss := & directSectionReader {file : file }
174201 if conf .MmapThreshold > 0 && maxBufferSize >= conf .MmapThreshold {
175202 ss .bufPool = & pool.Pool [[]byte ]{
176203 New : func () []byte {
177204 buf , err := mmap .Alloc (maxBufferSize )
178205 if err == nil {
179- file .Add (utils .CloseFunc (func () error {
206+ ss . file .Add (utils .CloseFunc (func () error {
180207 return mmap .Free (buf )
181208 }))
182209 } else {
@@ -200,53 +227,113 @@ func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *mode
200227 return ss , nil
201228}
202229
230+ type cachedSectionReader struct {
231+ cache io.ReaderAt
232+ }
233+
234+ func (* cachedSectionReader ) DiscardSection (off int64 , length int64 ) error {
235+ return nil
236+ }
237+ func (s * cachedSectionReader ) GetSectionReader (off , length int64 ) (io.ReadSeeker , error ) {
238+ return io .NewSectionReader (s .cache , off , length ), nil
239+ }
240+ func (* cachedSectionReader ) FreeSectionReader (sr io.ReadSeeker ) {}
241+
242+ type fileSectionReader struct {
243+ io.Reader
244+ off int64
245+ temp * os.File
246+ fileOff int64
247+ bufPool * pool.Pool [* offsetWriterWithBase ]
248+ }
249+
250+ type offsetWriterWithBase struct {
251+ * io.OffsetWriter
252+ base int64
253+ }
254+
203255// 线程不安全
204- func (ss * StreamSectionReader ) DiscardSection (off int64 , length int64 ) error {
205- if ss .file .GetFile () == nil {
206- if off != ss .off {
207- return fmt .Errorf ("stream not cached: request offset %d != current offset %d" , off , ss .off )
208- }
209- _ , err := utils .CopyWithBufferN (io .Discard , ss .file , length )
210- if err != nil {
211- return fmt .Errorf ("failed to skip data: (expect =%d) %w" , length , err )
212- }
256+ func (ss * fileSectionReader ) DiscardSection (off int64 , length int64 ) error {
257+ if off != ss .off {
258+ return fmt .Errorf ("stream not cached: request offset %d != current offset %d" , off , ss .off )
259+ }
260+ _ , err := utils .CopyWithBufferN (io .Discard , ss .Reader , length )
261+ if err != nil {
262+ return fmt .Errorf ("failed to skip data: (expect =%d) %w" , length , err )
213263 }
214264 ss .off += length
215265 return nil
216266}
217267
218- // 线程不安全
219- func (ss * StreamSectionReader ) GetSectionReader (off , length int64 ) (* SectionReader , error ) {
220- var cache io.ReaderAt = ss .file .GetFile ()
221- var buf []byte
222- if cache == nil {
223- if off != ss .off {
224- return nil , fmt .Errorf ("stream not cached: request offset %d != current offset %d" , off , ss .off )
225- }
226- tempBuf := ss .bufPool .Get ()
227- buf = tempBuf [:length ]
228- n , err := io .ReadFull (ss .file , buf )
229- if int64 (n ) != length {
230- return nil , fmt .Errorf ("failed to read all data: (expect =%d, actual =%d) %w" , length , n , err )
231- }
232- ss .off += int64 (n )
233- off = 0
234- cache = bytes .NewReader (buf )
268+ type fileBufferSectionReader struct {
269+ io.ReadSeeker
270+ fileBuf * offsetWriterWithBase
271+ }
272+
273+ func (ss * fileSectionReader ) GetSectionReader (off , length int64 ) (io.ReadSeeker , error ) {
274+ if off != ss .off {
275+ return nil , fmt .Errorf ("stream not cached: request offset %d != current offset %d" , off , ss .off )
276+ }
277+ fileBuf := ss .bufPool .Get ()
278+ _ , _ = fileBuf .Seek (0 , io .SeekStart )
279+ n , err := utils .CopyWithBufferN (fileBuf , ss .Reader , length )
280+ if err != nil {
281+ return nil , fmt .Errorf ("failed to read all data: (expect =%d, actual =%d) %w" , length , n , err )
235282 }
236- return & SectionReader {io .NewSectionReader (cache , off , length ), buf }, nil
283+ ss .off += length
284+ return & fileBufferSectionReader {io .NewSectionReader (ss .temp , fileBuf .base , length ), fileBuf }, nil
237285}
238286
239- func (ss * StreamSectionReader ) FreeSectionReader (sr * SectionReader ) {
240- if sr != nil {
241- if sr .buf != nil {
242- ss .bufPool .Put (sr .buf [0 :cap (sr .buf )])
243- sr .buf = nil
244- }
287+ func (ss * fileSectionReader ) FreeSectionReader (rs io.ReadSeeker ) {
288+ if sr , ok := rs .(* fileBufferSectionReader ); ok {
289+ ss .bufPool .Put (sr .fileBuf )
290+ sr .fileBuf = nil
245291 sr .ReadSeeker = nil
246292 }
247293}
248294
249- type SectionReader struct {
295+ type directSectionReader struct {
296+ file model.FileStreamer
297+ off int64
298+ bufPool * pool.Pool [[]byte ]
299+ }
300+
301+ // 线程不安全
302+ func (ss * directSectionReader ) DiscardSection (off int64 , length int64 ) error {
303+ if off != ss .off {
304+ return fmt .Errorf ("stream not cached: request offset %d != current offset %d" , off , ss .off )
305+ }
306+ _ , err := utils .CopyWithBufferN (io .Discard , ss .file , length )
307+ if err != nil {
308+ return fmt .Errorf ("failed to skip data: (expect =%d) %w" , length , err )
309+ }
310+ ss .off += length
311+ return nil
312+ }
313+
314+ type bufferSectionReader struct {
250315 io.ReadSeeker
251316 buf []byte
252317}
318+
319+ // 线程不安全
320+ func (ss * directSectionReader ) GetSectionReader (off , length int64 ) (io.ReadSeeker , error ) {
321+ if off != ss .off {
322+ return nil , fmt .Errorf ("stream not cached: request offset %d != current offset %d" , off , ss .off )
323+ }
324+ tempBuf := ss .bufPool .Get ()
325+ buf := tempBuf [:length ]
326+ n , err := io .ReadFull (ss .file , buf )
327+ if int64 (n ) != length {
328+ return nil , fmt .Errorf ("failed to read all data: (expect =%d, actual =%d) %w" , length , n , err )
329+ }
330+ ss .off += int64 (n )
331+ return & bufferSectionReader {bytes .NewReader (buf ), buf }, nil
332+ }
333+ func (ss * directSectionReader ) FreeSectionReader (rs io.ReadSeeker ) {
334+ if sr , ok := rs .(* bufferSectionReader ); ok {
335+ ss .bufPool .Put (sr .buf [0 :cap (sr .buf )])
336+ sr .buf = nil
337+ sr .ReadSeeker = nil
338+ }
339+ }
0 commit comments