@@ -6,11 +6,9 @@ import (
66	"fmt" 
77	"io" 
88	"math/rand" 
9- 	"mime" 
109	"os" 
1110	stdpath "path" 
1211	"path/filepath" 
13- 	"strconv" 
1412	"strings" 
1513	"time" 
1614
@@ -21,30 +19,22 @@ import (
2119	"github.com/OpenListTeam/OpenList/v4/internal/op" 
2220	"github.com/OpenListTeam/OpenList/v4/internal/stream" 
2321	"github.com/OpenListTeam/OpenList/v4/internal/task" 
22+ 	"github.com/OpenListTeam/OpenList/v4/internal/task_group" 
23+ 	"github.com/OpenListTeam/OpenList/v4/pkg/utils" 
24+ 	"github.com/OpenListTeam/OpenList/v4/server/common" 
2425	"github.com/OpenListTeam/tache" 
2526	"github.com/pkg/errors" 
2627	log "github.com/sirupsen/logrus" 
2728)
2829
2930type  ArchiveDownloadTask  struct  {
30- 	task. TaskExtension 
31+ 	TaskData 
3132	model.ArchiveDecompressArgs 
32- 	status        string 
33- 	SrcObjPath    string 
34- 	DstDirPath    string 
35- 	srcStorage    driver.Driver 
36- 	dstStorage    driver.Driver 
37- 	SrcStorageMp  string 
38- 	DstStorageMp  string 
3933}
4034
4135func  (t  * ArchiveDownloadTask ) GetName () string  {
42- 	return  fmt .Sprintf ("decompress [%s](%s)[%s] to [%s](%s) with password <%s>" , t .SrcStorageMp , t .SrcObjPath ,
43- 		t .InnerPath , t .DstStorageMp , t .DstDirPath , t .Password )
44- }
45- 
46- func  (t  * ArchiveDownloadTask ) GetStatus () string  {
47- 	return  t .status 
36+ 	return  fmt .Sprintf ("decompress [%s](%s)[%s] to [%s](%s) with password <%s>" , t .SrcStorageMp , t .SrcActualPath ,
37+ 		t .InnerPath , t .DstStorageMp , t .DstActualPath , t .Password )
4838}
4939
5040func  (t  * ArchiveDownloadTask ) Run () error  {
@@ -58,16 +48,21 @@ func (t *ArchiveDownloadTask) Run() error {
5848	if  err  !=  nil  {
5949		return  err 
6050	}
51+ 	uploadTask .groupID  =  stdpath .Join (uploadTask .DstStorageMp , uploadTask .DstActualPath )
52+ 	task_group .TransferCoordinator .AddTask (uploadTask .groupID , nil )
6153	ArchiveContentUploadTaskManager .Add (uploadTask )
6254	return  nil 
6355}
6456
6557func  (t  * ArchiveDownloadTask ) RunWithoutPushUploadTask () (* ArchiveContentUploadTask , error ) {
6658	var  err  error 
67- 	if  t .srcStorage  ==  nil  {
68- 		t .srcStorage , err  =  op .GetStorageByMountPath (t .SrcStorageMp )
59+ 	if  t .SrcStorage  ==  nil  {
60+ 		t .SrcStorage , err  =  op .GetStorageByMountPath (t .SrcStorageMp )
61+ 		if  err  !=  nil  {
62+ 			return  nil , err 
63+ 		}
6964	}
70- 	srcObj , tool , ss , err  :=  op .GetArchiveToolAndStream (t .Ctx (), t .srcStorage , t .SrcObjPath , model.LinkArgs {})
65+ 	srcObj , tool , ss , err  :=  op .GetArchiveToolAndStream (t .Ctx (), t .SrcStorage , t .SrcActualPath , model.LinkArgs {})
7166	if  err  !=  nil  {
7267		return  nil , err 
7368	}
@@ -87,7 +82,7 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
8782			total  +=  s .GetSize ()
8883		}
8984		t .SetTotalBytes (total )
90- 		t .status  =  "getting src object" 
85+ 		t .Status  =  "getting src object" 
9186		for  _ , s  :=  range  ss  {
9287			if  s .GetFile () ==  nil  {
9388				_ , err  =  stream .CacheFullInTempFileAndWriter (s , func (p  float64 ) {
@@ -104,7 +99,7 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
10499	} else  {
105100		decompressUp  =  t .SetProgress 
106101	}
107- 	t .status  =  "walking and decompressing" 
102+ 	t .Status  =  "walking and decompressing" 
108103	dir , err  :=  os .MkdirTemp (conf .Conf .TempDir , "dir-*" )
109104	if  err  !=  nil  {
110105		return  nil , err 
@@ -117,13 +112,14 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
117112	uploadTask  :=  & ArchiveContentUploadTask {
118113		TaskExtension : task.TaskExtension {
119114			Creator : t .GetCreator (),
115+ 			ApiUrl :  t .ApiUrl ,
120116		},
121- 		ObjName :      baseName ,
122- 		InPlace :      ! t .PutIntoNewDir ,
123- 		FilePath :     dir ,
124- 		DstDirPath :    t . DstDirPath ,
125- 		dstStorage :   t . dstStorage ,
126- 		DstStorageMp : t .DstStorageMp ,
117+ 		ObjName :        baseName ,
118+ 		InPlace :        ! t .PutIntoNewDir ,
119+ 		FilePath :       dir ,
120+ 		DstActualPath :  t . DstActualPath ,
121+ 		dstStorage :     t . DstStorage ,
122+ 		DstStorageMp :   t .DstStorageMp ,
127123	}
128124	return  uploadTask , nil 
129125}
@@ -132,18 +128,19 @@ var ArchiveDownloadTaskManager *tache.Manager[*ArchiveDownloadTask]
132128
133129type  ArchiveContentUploadTask  struct  {
134130	task.TaskExtension 
135- 	status        string 
136- 	ObjName       string 
137- 	InPlace       bool 
138- 	FilePath      string 
139- 	DstDirPath    string 
140- 	dstStorage    driver.Driver 
141- 	DstStorageMp  string 
142- 	finalized     bool 
131+ 	status         string 
132+ 	ObjName        string 
133+ 	InPlace        bool 
134+ 	FilePath       string 
135+ 	DstActualPath  string 
136+ 	dstStorage     driver.Driver 
137+ 	DstStorageMp   string 
138+ 	finalized      bool 
139+ 	groupID        string 
143140}
144141
145142func  (t  * ArchiveContentUploadTask ) GetName () string  {
146- 	return  fmt .Sprintf ("upload %s to [%s](%s)" , t .ObjName , t .DstStorageMp , t .DstDirPath )
143+ 	return  fmt .Sprintf ("upload %s to [%s](%s)" , t .ObjName , t .DstStorageMp , t .DstActualPath )
147144}
148145
149146func  (t  * ArchiveContentUploadTask ) GetStatus () string  {
@@ -163,21 +160,42 @@ func (t *ArchiveContentUploadTask) Run() error {
163160	})
164161}
165162
166- func  (t  * ArchiveContentUploadTask ) RunWithNextTaskCallback (f  func (nextTsk  * ArchiveContentUploadTask ) error ) error  {
163+ func  (t  * ArchiveContentUploadTask ) OnSucceeded () {
164+ 	task_group .TransferCoordinator .Done (t .groupID , true )
165+ }
166+ 
167+ func  (t  * ArchiveContentUploadTask ) OnFailed () {
168+ 	task_group .TransferCoordinator .Done (t .groupID , false )
169+ }
170+ 
171+ func  (t  * ArchiveContentUploadTask ) SetRetry (retry  int , maxRetry  int ) {
172+ 	t .TaskExtension .SetRetry (retry , maxRetry )
173+ 	if  retry  ==  0  && 
174+ 		(len (t .groupID ) ==  0  ||  // 重启恢复 
175+ 			(t .GetErr () ==  nil  &&  t .GetState () !=  tache .StatePending )) { // 手动重试 
176+ 		t .groupID  =  stdpath .Join (t .DstStorageMp , t .DstActualPath )
177+ 		task_group .TransferCoordinator .AddTask (t .groupID , nil )
178+ 	}
179+ }
180+ 
181+ func  (t  * ArchiveContentUploadTask ) RunWithNextTaskCallback (f  func (nextTask  * ArchiveContentUploadTask ) error ) error  {
167182	var  err  error 
168183	if  t .dstStorage  ==  nil  {
169184		t .dstStorage , err  =  op .GetStorageByMountPath (t .DstStorageMp )
185+ 		if  err  !=  nil  {
186+ 			return  err 
187+ 		}
170188	}
171189	info , err  :=  os .Stat (t .FilePath )
172190	if  err  !=  nil  {
173191		return  err 
174192	}
175193	if  info .IsDir () {
176194		t .status  =  "src object is dir, listing objs" 
177- 		nextDstPath  :=  t .DstDirPath 
195+ 		nextDstActualPath  :=  t .DstActualPath 
178196		if  ! t .InPlace  {
179- 			nextDstPath  =  stdpath .Join (nextDstPath , t .ObjName )
180- 			err  =  op .MakeDir (t .Ctx (), t .dstStorage , nextDstPath )
197+ 			nextDstActualPath  =  stdpath .Join (nextDstActualPath , t .ObjName )
198+ 			err  =  op .MakeDir (t .Ctx (), t .dstStorage , nextDstActualPath )
181199			if  err  !=  nil  {
182200				return  err 
183201			}
@@ -186,6 +204,9 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi
186204		if  err  !=  nil  {
187205			return  err 
188206		}
207+ 		if  ! t .InPlace  &&  len (t .groupID ) >  0  {
208+ 			task_group .TransferCoordinator .AppendPayload (t .groupID , task_group .DstPathToRefresh (nextDstActualPath ))
209+ 		}
189210		var  es  error 
190211		for  _ , entry  :=  range  entries  {
191212			var  nextFilePath  string 
@@ -198,16 +219,21 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi
198219				es  =  stderrors .Join (es , err )
199220				continue 
200221			}
222+ 			if  len (t .groupID ) >  0  {
223+ 				task_group .TransferCoordinator .AddTask (t .groupID , nil )
224+ 			}
201225			err  =  f (& ArchiveContentUploadTask {
202226				TaskExtension : task.TaskExtension {
203227					Creator : t .GetCreator (),
228+ 					ApiUrl :  t .ApiUrl ,
204229				},
205- 				ObjName :      entry .Name (),
206- 				InPlace :      false ,
207- 				FilePath :     nextFilePath ,
208- 				DstDirPath :   nextDstPath ,
209- 				dstStorage :   t .dstStorage ,
210- 				DstStorageMp : t .DstStorageMp ,
230+ 				ObjName :       entry .Name (),
231+ 				InPlace :       false ,
232+ 				FilePath :      nextFilePath ,
233+ 				DstActualPath : nextDstActualPath ,
234+ 				dstStorage :    t .dstStorage ,
235+ 				DstStorageMp :  t .DstStorageMp ,
236+ 				groupID :       t .groupID ,
211237			})
212238			if  err  !=  nil  {
213239				es  =  stderrors .Join (es , err )
@@ -228,13 +254,13 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi
228254				Size :     info .Size (),
229255				Modified : time .Now (),
230256			},
231- 			Mimetype :     mime . TypeByExtension ( filepath .Ext (t .ObjName )),
257+ 			Mimetype :     utils . GetMimeType ( stdpath .Ext (t .ObjName )),
232258			WebPutAsTask : true ,
233259			Reader :       file ,
234260		}
235261		fs .Closers .Add (file )
236262		t .status  =  "uploading" 
237- 		err  =  op .Put (t .Ctx (), t .dstStorage , t .DstDirPath , fs , t .SetProgress , true )
263+ 		err  =  op .Put (t .Ctx (), t .dstStorage , t .DstActualPath , fs , t .SetProgress , true )
238264		if  err  !=  nil  {
239265			return  err 
240266		}
@@ -271,8 +297,9 @@ func moveToTempPath(path, prefix string) (string, error) {
271297
272298func  genTempFileName (prefix  string ) (string , error ) {
273299	retry  :=  0 
300+ 	t  :=  time .Now ().UnixMilli ()
274301	for  retry  <  10000  {
275- 		newPath  :=  stdpath .Join (conf .Conf .TempDir , prefix + strconv . FormatUint ( uint64 ( rand .Uint32 ()),  10 ))
302+ 		newPath  :=  filepath .Join (conf .Conf .TempDir , prefix + fmt . Sprintf ( "%x-%x" ,  t ,  rand .Uint32 ()))
276303		if  _ , err  :=  os .Stat (newPath ); err  !=  nil  {
277304			if  os .IsNotExist (err ) {
278305				return  newPath , nil 
@@ -354,16 +381,19 @@ func archiveDecompress(ctx context.Context, srcObjPath, dstDirPath string, args
354381	}
355382	taskCreator , _  :=  ctx .Value (conf .UserKey ).(* model.User )
356383	tsk  :=  & ArchiveDownloadTask {
357- 		TaskExtension : task.TaskExtension {
358- 			Creator : taskCreator ,
384+ 		TaskData : TaskData {
385+ 			TaskExtension : task.TaskExtension {
386+ 				Creator : taskCreator ,
387+ 				ApiUrl :  common .GetApiUrl (ctx ),
388+ 			},
389+ 			SrcStorage :    srcStorage ,
390+ 			DstStorage :    dstStorage ,
391+ 			SrcActualPath : srcObjActualPath ,
392+ 			DstActualPath : dstDirActualPath ,
393+ 			SrcStorageMp :  srcStorage .GetStorage ().MountPath ,
394+ 			DstStorageMp :  dstStorage .GetStorage ().MountPath ,
359395		},
360396		ArchiveDecompressArgs : args ,
361- 		srcStorage :            srcStorage ,
362- 		dstStorage :            dstStorage ,
363- 		SrcObjPath :            srcObjActualPath ,
364- 		DstDirPath :            dstDirActualPath ,
365- 		SrcStorageMp :          srcStorage .GetStorage ().MountPath ,
366- 		DstStorageMp :          dstStorage .GetStorage ().MountPath ,
367397	}
368398	if  ctx .Value (conf .NoTaskKey ) !=  nil  {
369399		uploadTask , err  :=  tsk .RunWithoutPushUploadTask ()
0 commit comments