11use alloc:: format;
22use alloc:: string:: String ;
3+ use alloc:: vec:: Vec ;
34use num_traits:: Zero ;
5+ use serde:: Deserialize ;
46
57use crate :: error:: { PSResult , SQLiteError } ;
8+ use crate :: sync:: line:: DataLine ;
9+ use crate :: sync:: operations:: insert_bucket_operations;
610use crate :: sync:: Checksum ;
711use sqlite_nostd as sqlite;
812use sqlite_nostd:: { Connection , ResultCode } ;
@@ -11,246 +15,15 @@ use crate::ext::SafeManagedStmt;
1115
1216// Run inside a transaction
1317pub fn insert_operation ( db : * mut sqlite:: sqlite3 , data : & str ) -> Result < ( ) , SQLiteError > {
14- // language=SQLite
15- let statement = db. prepare_v2 (
16- "\
17- SELECT
18- json_extract(e.value, '$.bucket') as bucket,
19- json_extract(e.value, '$.data') as data,
20- json_extract(e.value, '$.has_more') as has_more,
21- json_extract(e.value, '$.after') as after,
22- json_extract(e.value, '$.next_after') as next_after
23- FROM json_each(json_extract(?1, '$.buckets')) e" ,
24- ) ?;
25- statement. bind_text ( 1 , data, sqlite:: Destructor :: STATIC ) ?;
26-
27- while statement. step ( ) ? == ResultCode :: ROW {
28- let bucket = statement. column_text ( 0 ) ?;
29- let data = statement. column_text ( 1 ) ?;
30- // let _has_more = statement.column_int(2)? != 0;
31- // let _after = statement.column_text(3)?;
32- // let _next_after = statement.column_text(4)?;
33-
34- insert_bucket_operations ( db, bucket, data) ?;
35- }
36-
37- Ok ( ( ) )
38- }
39-
40- pub fn insert_bucket_operations (
41- db : * mut sqlite:: sqlite3 ,
42- bucket : & str ,
43- data : & str ,
44- ) -> Result < ( ) , SQLiteError > {
45- // Statement to insert new operations (only for PUT and REMOVE).
46- // language=SQLite
47- let iterate_statement = db. prepare_v2 (
48- "\
49- SELECT
50- json_extract(e.value, '$.op_id') as op_id,
51- json_extract(e.value, '$.op') as op,
52- json_extract(e.value, '$.object_type') as object_type,
53- json_extract(e.value, '$.object_id') as object_id,
54- json_extract(e.value, '$.checksum') as checksum,
55- json_extract(e.value, '$.data') as data,
56- json_extract(e.value, '$.subkey') as subkey
57- FROM json_each(?) e" ,
58- ) ?;
59- iterate_statement. bind_text ( 1 , data, sqlite:: Destructor :: STATIC ) ?;
60-
61- // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows.
62- // We can consider splitting this into separate SELECT and INSERT statements.
63- // language=SQLite
64- let bucket_statement = db. prepare_v2 (
65- "INSERT INTO ps_buckets(name)
66- VALUES(?)
67- ON CONFLICT DO UPDATE
68- SET last_applied_op = last_applied_op
69- RETURNING id, last_applied_op" ,
70- ) ?;
71- bucket_statement. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
72- bucket_statement. step ( ) ?;
73-
74- let bucket_id = bucket_statement. column_int64 ( 0 ) ;
75-
76- // This is an optimization for initial sync - we can avoid persisting individual REMOVE
77- // operations when last_applied_op = 0.
78- // We do still need to do the "supersede_statement" step for this case, since a REMOVE
79- // operation can supersede another PUT operation we're syncing at the same time.
80- let mut is_empty = bucket_statement. column_int64 ( 1 ) == 0 ;
81-
82- // Statement to supersede (replace) operations with the same key.
83- // language=SQLite
84- let supersede_statement = db. prepare_v2 (
85- "\
86- DELETE FROM ps_oplog
87- WHERE unlikely(ps_oplog.bucket = ?1)
88- AND ps_oplog.key = ?2
89- RETURNING op_id, hash" ,
90- ) ?;
91- supersede_statement. bind_int64 ( 1 , bucket_id) ?;
92-
93- // language=SQLite
94- let insert_statement = db. prepare_v2 ( "\
95- INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)") ?;
96- insert_statement. bind_int64 ( 1 , bucket_id) ?;
97-
98- let updated_row_statement = db. prepare_v2 (
99- "\
100- INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
101- ) ?;
102-
103- bucket_statement. reset ( ) ?;
104-
105- let mut last_op: Option < i64 > = None ;
106- let mut add_checksum = Checksum :: zero ( ) ;
107- let mut op_checksum = Checksum :: zero ( ) ;
108- let mut added_ops: i32 = 0 ;
109-
110- while iterate_statement. step ( ) ? == ResultCode :: ROW {
111- let op_id = iterate_statement. column_int64 ( 0 ) ;
112- let op = iterate_statement. column_text ( 1 ) ?;
113- let object_type = iterate_statement. column_text ( 2 ) ;
114- let object_id = iterate_statement. column_text ( 3 ) ;
115- let checksum = Checksum :: from_i32 ( iterate_statement. column_int ( 4 ) ) ;
116- let op_data = iterate_statement. column_text ( 5 ) ;
117-
118- last_op = Some ( op_id) ;
119- added_ops += 1 ;
120-
121- if op == "PUT" || op == "REMOVE" {
122- let key: String ;
123- if let ( Ok ( object_type) , Ok ( object_id) ) = ( object_type. as_ref ( ) , object_id. as_ref ( ) ) {
124- let subkey = iterate_statement. column_text ( 6 ) . unwrap_or ( "null" ) ;
125- key = format ! ( "{}/{}/{}" , & object_type, & object_id, subkey) ;
126- } else {
127- key = String :: from ( "" ) ;
128- }
129-
130- supersede_statement. bind_text ( 2 , & key, sqlite:: Destructor :: STATIC ) ?;
131-
132- let mut superseded = false ;
133-
134- while supersede_statement. step ( ) ? == ResultCode :: ROW {
135- // Superseded (deleted) a previous operation, add the checksum
136- let supersede_checksum = Checksum :: from_i32 ( supersede_statement. column_int ( 1 ) ) ;
137- add_checksum += supersede_checksum;
138- op_checksum -= supersede_checksum;
139-
140- // Superseded an operation, only skip if the bucket was empty
141- // Previously this checked "superseded_op <= last_applied_op".
142- // However, that would not account for a case where a previous
143- // PUT operation superseded the original PUT operation in this
144- // same batch, in which case superseded_op is not accurate for this.
145- if !is_empty {
146- superseded = true ;
147- }
148- }
149- supersede_statement. reset ( ) ?;
150-
151- if op == "REMOVE" {
152- let should_skip_remove = !superseded;
153-
154- add_checksum += checksum;
155-
156- if !should_skip_remove {
157- if let ( Ok ( object_type) , Ok ( object_id) ) = ( object_type, object_id) {
158- updated_row_statement. bind_text (
159- 1 ,
160- object_type,
161- sqlite:: Destructor :: STATIC ,
162- ) ?;
163- updated_row_statement. bind_text (
164- 2 ,
165- object_id,
166- sqlite:: Destructor :: STATIC ,
167- ) ?;
168- updated_row_statement. exec ( ) ?;
169- }
170- }
171-
172- continue ;
173- }
174-
175- insert_statement. bind_int64 ( 2 , op_id) ?;
176- if key != "" {
177- insert_statement. bind_text ( 3 , & key, sqlite:: Destructor :: STATIC ) ?;
178- } else {
179- insert_statement. bind_null ( 3 ) ?;
180- }
181-
182- if let ( Ok ( object_type) , Ok ( object_id) ) = ( object_type, object_id) {
183- insert_statement. bind_text ( 4 , object_type, sqlite:: Destructor :: STATIC ) ?;
184- insert_statement. bind_text ( 5 , object_id, sqlite:: Destructor :: STATIC ) ?;
185- } else {
186- insert_statement. bind_null ( 4 ) ?;
187- insert_statement. bind_null ( 5 ) ?;
188- }
189- if let Ok ( data) = op_data {
190- insert_statement. bind_text ( 6 , data, sqlite:: Destructor :: STATIC ) ?;
191- } else {
192- insert_statement. bind_null ( 6 ) ?;
193- }
194-
195- insert_statement. bind_int ( 7 , checksum. bitcast_i32 ( ) ) ?;
196- insert_statement. exec ( ) ?;
197-
198- op_checksum += checksum;
199- } else if op == "MOVE" {
200- add_checksum += checksum;
201- } else if op == "CLEAR" {
202- // Any remaining PUT operations should get an implicit REMOVE
203- // language=SQLite
204- let clear_statement1 = db
205- . prepare_v2 (
206- "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
207- SELECT row_type, row_id
208- FROM ps_oplog
209- WHERE bucket = ?1" ,
210- )
211- . into_db_result ( db) ?;
212- clear_statement1. bind_int64 ( 1 , bucket_id) ?;
213- clear_statement1. exec ( ) ?;
214-
215- let clear_statement2 = db
216- . prepare_v2 ( "DELETE FROM ps_oplog WHERE bucket = ?1" )
217- . into_db_result ( db) ?;
218- clear_statement2. bind_int64 ( 1 , bucket_id) ?;
219- clear_statement2. exec ( ) ?;
220-
221- // And we need to re-apply all of those.
222- // We also replace the checksum with the checksum of the CLEAR op.
223- // language=SQLite
224- let clear_statement2 = db. prepare_v2 (
225- "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2" ,
226- ) ?;
227- clear_statement2. bind_int64 ( 2 , bucket_id) ?;
228- clear_statement2. bind_int ( 1 , checksum. bitcast_i32 ( ) ) ?;
229- clear_statement2. exec ( ) ?;
230-
231- add_checksum = Checksum :: zero ( ) ;
232- is_empty = true ;
233- op_checksum = Checksum :: zero ( ) ;
234- }
18+ #[ derive( Deserialize ) ]
19+ struct BucketBatch < ' a > {
20+ #[ serde( borrow) ]
21+ buckets : Vec < DataLine < ' a > > ,
23522 }
23623
237- if let Some ( last_op) = & last_op {
238- // language=SQLite
239- let statement = db. prepare_v2 (
240- "UPDATE ps_buckets
241- SET last_op = ?2,
242- add_checksum = (add_checksum + ?3) & 0xffffffff,
243- op_checksum = (op_checksum + ?4) & 0xffffffff,
244- count_since_last = count_since_last + ?5
245- WHERE id = ?1" ,
246- ) ?;
247- statement. bind_int64 ( 1 , bucket_id) ?;
248- statement. bind_int64 ( 2 , * last_op) ?;
249- statement. bind_int ( 3 , add_checksum. bitcast_i32 ( ) ) ?;
250- statement. bind_int ( 4 , op_checksum. bitcast_i32 ( ) ) ?;
251- statement. bind_int ( 5 , added_ops) ?;
252-
253- statement. exec ( ) ?;
24+ let batch: BucketBatch = serde_json:: from_str ( data) ?;
25+ for line in & batch. buckets {
26+ insert_bucket_operations ( db, line) ?;
25427 }
25528
25629 Ok ( ( ) )
0 commit comments