@@ -70,10 +70,18 @@ impl SyncClient {
7070 ) ) ;
7171 } ;
7272
73- let done = handle. run ( & mut active) ?;
74- if done {
75- * state = ClientState :: Idle ;
76- }
73+ match handle. run ( & mut active) {
74+ Err ( e) => {
75+ * state = ClientState :: Idle ;
76+ return Err ( e) ;
77+ }
78+ Ok ( done) => {
79+ if done {
80+ active. instructions . push ( Instruction :: CloseSyncStream ) ;
81+ * state = ClientState :: Idle ;
82+ }
83+ }
84+ } ;
7785
7886 Ok ( active. instructions )
7987 }
@@ -236,12 +244,12 @@ impl StreamingSyncIteration {
236244 }
237245 SyncLine :: CheckpointDiff ( diff) => {
238246 let Some ( target) = target. target_checkpoint_mut ( ) else {
239- event . instructions . push ( Instruction :: LogLine {
240- severity : LogSeverity :: WARNING ,
241- line : "Received checkpoint_diff without previous checkpoint"
242- . to_string ( ) ,
243- } ) ;
244- break ;
247+ return Err ( SQLiteError (
248+ ResultCode :: ABORT ,
249+ Some (
250+ "Received checkpoint_diff without previous checkpoint" . to_string ( ) ,
251+ ) ,
252+ ) ) ;
245253 } ;
246254
247255 target. apply_diff ( & diff) ;
@@ -255,9 +263,15 @@ impl StreamingSyncIteration {
255263 ) ;
256264 }
257265 SyncLine :: CheckpointComplete ( checkpoint_complete) => {
258- let target = target
259- . target_checkpoint ( )
260- . expect ( "should have target checkpoint" ) ;
266+ let Some ( target) = target. target_checkpoint_mut ( ) else {
267+ return Err ( SQLiteError (
268+ ResultCode :: ABORT ,
269+ Some (
270+ "Received checkpoint complete without previous checkpoint"
271+ . to_string ( ) ,
272+ ) ,
273+ ) ) ;
274+ } ;
261275 let result = self . adapter . sync_local ( target, None ) ?;
262276
263277 match result {
@@ -272,7 +286,11 @@ impl StreamingSyncIteration {
272286 break ;
273287 }
274288 SyncLocalResult :: PendingLocalChanges => {
275- todo ! ( "Await pending uploads and try again" )
289+ event. instructions . push ( Instruction :: LogLine {
290+ severity : LogSeverity :: WARNING ,
291+ line : format ! ( "TODO: Await pending uploads and try again" ) ,
292+ } ) ;
293+ break ;
276294 }
277295 SyncLocalResult :: ChangesApplied => {
278296 event. instructions . push ( Instruction :: LogLine {
@@ -290,9 +308,15 @@ impl StreamingSyncIteration {
290308 }
291309 SyncLine :: CheckpointPartiallyComplete ( complete) => {
292310 let priority = complete. priority ;
293- let target = target
294- . target_checkpoint ( )
295- . expect ( "should have target checkpoint" ) ;
311+ let Some ( target) = target. target_checkpoint_mut ( ) else {
312+ return Err ( SQLiteError (
313+ ResultCode :: ABORT ,
314+ Some (
315+ "Received checkpoint complete without previous checkpoint"
316+ . to_string ( ) ,
317+ ) ,
318+ ) ) ;
319+ } ;
296320 let result = self . adapter . sync_local ( target, Some ( priority) ) ?;
297321
298322 match result {
@@ -356,7 +380,7 @@ impl StreamingSyncIteration {
356380 . update ( |s| s. start_connecting ( ) , & mut event. instructions ) ;
357381
358382 let requests = self . adapter . collect_bucket_requests ( ) ?;
359- let local_bucket_names: Vec < String > = requests. iter ( ) . map ( |s| s. after . clone ( ) ) . collect ( ) ;
383+ let local_bucket_names: Vec < String > = requests. iter ( ) . map ( |s| s. name . clone ( ) ) . collect ( ) ;
360384 let request = StreamingSyncRequest {
361385 buckets : requests,
362386 include_checksum : true ,
@@ -372,6 +396,7 @@ impl StreamingSyncIteration {
372396 }
373397}
374398
399+ #[ derive( Debug ) ]
375400enum SyncTarget {
376401 /// We've received a checkpoint line towards the given checkpoint. The tracked checkpoint is
377402 /// updated for subsequent checkpoint or checkpoint_diff lines.
@@ -413,6 +438,7 @@ impl SyncTarget {
413438 }
414439}
415440
441+ #[ derive( Debug ) ]
416442pub struct OwnedCheckpoint {
417443 pub last_op_id : i64 ,
418444 pub write_checkpoint : Option < i64 > ,
@@ -446,6 +472,7 @@ impl OwnedCheckpoint {
446472 }
447473}
448474
475+ #[ derive( Debug ) ]
449476pub struct OwnedBucketChecksum {
450477 pub bucket : String ,
451478 pub checksum : i32 ,
@@ -461,13 +488,6 @@ impl OwnedBucketChecksum {
461488 Some ( prio) => self . priority >= prio,
462489 }
463490 }
464-
465- fn description ( & self ) -> BucketDescription {
466- BucketDescription {
467- priority : self . priority ,
468- name : self . bucket . clone ( ) ,
469- }
470- }
471491}
472492
473493impl From < & ' _ BucketChecksum < ' _ > > for OwnedBucketChecksum {
0 commit comments