77 "time"
88
99 "github.com/OpenListTeam/OpenList/v4/internal/conf"
10- "github.com/OpenListTeam/OpenList/v4/internal/driver"
1110 "github.com/OpenListTeam/OpenList/v4/internal/errs"
1211 "github.com/OpenListTeam/OpenList/v4/internal/model"
1312 "github.com/OpenListTeam/OpenList/v4/internal/op"
@@ -52,17 +51,16 @@ func (t *FileTransferTask) Run() error {
5251 t .ClearEndTime ()
5352 t .SetStartTime (time .Now ())
5453 defer func () { t .SetEndTime (time .Now ()) }()
55- var err error
56- if t .SrcStorage == nil {
57- t .SrcStorage , err = op .GetStorageByMountPath (t .SrcStorageMp )
58- }
59- if t .DstStorage == nil {
60- t .DstStorage , err = op .GetStorageByMountPath (t .DstStorageMp )
61- }
62- if err != nil {
63- return errors .WithMessage (err , "failed get storage" )
64- }
65- return putBetween2Storages (t , t .SrcStorage , t .DstStorage , t .SrcActualPath , t .DstActualPath )
54+ return t .RunWithNextTaskCallback (func (nextTask * FileTransferTask ) error {
55+ nextTask .groupID = t .groupID
56+ task_group .TransferCoordinator .AddTask (t .groupID , nil )
57+ if t .TaskType == copy {
58+ CopyTaskManager .Add (nextTask )
59+ } else {
60+ MoveTaskManager .Add (nextTask )
61+ }
62+ return nil
63+ })
6664}
6765
6866func (t * FileTransferTask ) OnSucceeded () {
@@ -109,51 +107,11 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
109107 return nil , err
110108 }
111109 }
112- } else if ctx .Value (conf .NoTaskKey ) != nil {
113- return nil , fmt .Errorf ("can't %s files between two storages, please use the front-end " , taskType )
114110 }
115111
116- // if ctx.Value(conf.NoTaskKey) != nil { // webdav
117- // srcObj, err := op.Get(ctx, srcStorage, srcObjActualPath)
118- // if err != nil {
119- // return nil, errors.WithMessagef(err, "failed get src [%s] file", srcObjPath)
120- // }
121- // if !srcObj.IsDir() {
122- // // copy file directly
123- // link, _, err := op.Link(ctx, srcStorage, srcObjActualPath, model.LinkArgs{})
124- // if err != nil {
125- // return nil, errors.WithMessagef(err, "failed get [%s] link", srcObjPath)
126- // }
127- // // any link provided is seekable
128- // ss, err := stream.NewSeekableStream(&stream.FileStream{
129- // Obj: srcObj,
130- // Ctx: ctx,
131- // }, link)
132- // if err != nil {
133- // _ = link.Close()
134- // return nil, errors.WithMessagef(err, "failed get [%s] stream", srcObjPath)
135- // }
136- // if taskType == move {
137- // defer func() {
138- // task_group.TransferCoordinator.Done(dstDirPath, err == nil)
139- // }()
140- // task_group.TransferCoordinator.AddTask(dstDirPath, task_group.SrcPathToRemove(srcObjPath))
141- // }
142- // err = op.Put(ctx, dstStorage, dstDirActualPath, ss, nil, taskType == move)
143- // return nil, err
144- // } else {
145- // return nil, fmt.Errorf("can't %s dir two storages, please use the front-end ", taskType)
146- // }
147- // }
148-
149112 // not in the same storage
150- taskCreator , _ := ctx .Value (conf .UserKey ).(* model.User )
151113 t := & FileTransferTask {
152114 TaskData : TaskData {
153- TaskExtension : task.TaskExtension {
154- Creator : taskCreator ,
155- ApiUrl : common .GetApiUrl (ctx ),
156- },
157115 SrcStorage : srcStorage ,
158116 DstStorage : dstStorage ,
159117 SrcActualPath : srcObjActualPath ,
@@ -162,8 +120,34 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
162120 DstStorageMp : dstStorage .GetStorage ().MountPath ,
163121 },
164122 TaskType : taskType ,
165- groupID : dstDirPath ,
166123 }
124+
125+ if ctx .Value (conf .NoTaskKey ) != nil {
126+ var callback func (nextTask * FileTransferTask ) error
127+ hasSuccess := false
128+ callback = func (nextTask * FileTransferTask ) error {
129+ nextTask .Base .SetCtx (ctx )
130+ err := nextTask .RunWithNextTaskCallback (callback )
131+ if err == nil {
132+ hasSuccess = true
133+ }
134+ return err
135+ }
136+ t .Base .SetCtx (ctx )
137+ err = t .RunWithNextTaskCallback (callback )
138+ if hasSuccess || err == nil {
139+ if taskType == move {
140+ task_group .RefreshAndRemove (dstDirPath , task_group .SrcPathToRemove (srcObjPath ))
141+ } else {
142+ op .DeleteCache (t .DstStorage , dstDirActualPath )
143+ }
144+ }
145+ return nil , err
146+ }
147+
148+ t .Creator , _ = ctx .Value (conf .UserKey ).(* model.User )
149+ t .ApiUrl = common .GetApiUrl (ctx )
150+ t .groupID = dstDirPath
167151 if taskType == copy {
168152 task_group .TransferCoordinator .AddTask (dstDirPath , nil )
169153 CopyTaskManager .Add (t )
@@ -174,76 +158,69 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
174158 return t , nil
175159}
176160
177- func putBetween2Storages (t * FileTransferTask , srcStorage , dstStorage driver. Driver , srcActualPath , dstDirActualPath string ) error {
161+ func (t * FileTransferTask ) RunWithNextTaskCallback ( f func ( nextTask * FileTransferTask ) error ) error {
178162 t .Status = "getting src object"
179- srcObj , err := op .Get (t .Ctx (), srcStorage , srcActualPath )
163+ srcObj , err := op .Get (t .Ctx (), t . SrcStorage , t . SrcActualPath )
180164 if err != nil {
181- return errors .WithMessagef (err , "failed get src [%s] file" , srcActualPath )
165+ return errors .WithMessagef (err , "failed get src [%s] file" , t . SrcActualPath )
182166 }
183167 if srcObj .IsDir () {
184168 t .Status = "src object is dir, listing objs"
185- objs , err := op .List (t .Ctx (), srcStorage , srcActualPath , model.ListArgs {})
169+ objs , err := op .List (t .Ctx (), t . SrcStorage , t . SrcActualPath , model.ListArgs {})
186170 if err != nil {
187- return errors .WithMessagef (err , "failed list src [%s] objs" , srcActualPath )
171+ return errors .WithMessagef (err , "failed list src [%s] objs" , t . SrcActualPath )
188172 }
189- dstActualPath := stdpath .Join (dstDirActualPath , srcObj .GetName ())
173+ dstActualPath := stdpath .Join (t . DstActualPath , srcObj .GetName ())
190174 if t .TaskType == copy {
191- task_group .TransferCoordinator .AppendPayload (t .groupID , task_group .DstPathToRefresh (dstActualPath ))
175+ if t .Ctx ().Value (conf .NoTaskKey ) != nil {
176+ defer op .DeleteCache (t .DstStorage , dstActualPath )
177+ } else {
178+ task_group .TransferCoordinator .AppendPayload (t .groupID , task_group .DstPathToRefresh (dstActualPath ))
179+ }
192180 }
193181 for _ , obj := range objs {
194182 if utils .IsCanceled (t .Ctx ()) {
195183 return nil
196184 }
197- task := & FileTransferTask {
185+ err = f ( & FileTransferTask {
198186 TaskType : t .TaskType ,
199187 TaskData : TaskData {
200188 TaskExtension : task.TaskExtension {
201- Creator : t .GetCreator () ,
189+ Creator : t .Creator ,
202190 ApiUrl : t .ApiUrl ,
203191 },
204- SrcStorage : srcStorage ,
205- DstStorage : dstStorage ,
206- SrcActualPath : stdpath .Join (srcActualPath , obj .GetName ()),
192+ SrcStorage : t . SrcStorage ,
193+ DstStorage : t . DstStorage ,
194+ SrcActualPath : stdpath .Join (t . SrcActualPath , obj .GetName ()),
207195 DstActualPath : dstActualPath ,
208- SrcStorageMp : srcStorage . GetStorage (). MountPath ,
209- DstStorageMp : dstStorage . GetStorage (). MountPath ,
196+ SrcStorageMp : t . SrcStorageMp ,
197+ DstStorageMp : t . DstStorageMp ,
210198 },
211- groupID : t .groupID ,
212- }
213- task_group .TransferCoordinator .AddTask (t .groupID , nil )
214- if t .TaskType == copy {
215- CopyTaskManager .Add (task )
216- } else {
217- MoveTaskManager .Add (task )
199+ })
200+ if err != nil {
201+ return err
218202 }
219203 }
220204 t .Status = fmt .Sprintf ("src object is dir, added all %s tasks of objs" , t .TaskType )
221205 return nil
222206 }
223- return putFileBetween2Storages (t , srcStorage , dstStorage , srcActualPath , dstDirActualPath )
224- }
225207
226- func putFileBetween2Storages (tsk * FileTransferTask , srcStorage , dstStorage driver.Driver , srcActualPath , dstDirActualPath string ) error {
227- srcFile , err := op .Get (tsk .Ctx (), srcStorage , srcActualPath )
228- if err != nil {
229- return errors .WithMessagef (err , "failed get src [%s] file" , srcActualPath )
230- }
231- tsk .SetTotalBytes (srcFile .GetSize ())
232- link , _ , err := op .Link (tsk .Ctx (), srcStorage , srcActualPath , model.LinkArgs {})
208+ link , _ , err := op .Link (t .Ctx (), t .SrcStorage , t .SrcActualPath , model.LinkArgs {})
233209 if err != nil {
234- return errors .WithMessagef (err , "failed get [%s] link" , srcActualPath )
210+ return errors .WithMessagef (err , "failed get [%s] link" , t . SrcActualPath )
235211 }
236212 // any link provided is seekable
237213 ss , err := stream .NewSeekableStream (& stream.FileStream {
238- Obj : srcFile ,
239- Ctx : tsk .Ctx (),
214+ Obj : srcObj ,
215+ Ctx : t .Ctx (),
240216 }, link )
241217 if err != nil {
242218 _ = link .Close ()
243- return errors .WithMessagef (err , "failed get [%s] stream" , srcActualPath )
219+ return errors .WithMessagef (err , "failed get [%s] stream" , t . SrcActualPath )
244220 }
245- tsk .SetTotalBytes (ss .GetSize ())
246- return op .Put (tsk .Ctx (), dstStorage , dstDirActualPath , ss , tsk .SetProgress , true )
221+ t .SetTotalBytes (ss .GetSize ())
222+ t .Status = "uploading"
223+ return op .Put (t .Ctx (), t .DstStorage , t .DstActualPath , ss , t .SetProgress , true )
247224}
248225
249226var (
0 commit comments