@@ -107,15 +107,16 @@ impl<'a> SyncOperation<'a> {
107107 while statement. step ( ) . into_db_result ( self . db ) ? == ResultCode :: ROW {
108108 let type_name = statement. column_text ( 0 ) ?;
109109 let id = statement. column_text ( 1 ) ?;
110- let buckets = statement. column_int ( 3 ) ;
111110 let data = statement. column_text ( 2 ) ;
112111
113112 let table_name = internal_table_name ( type_name) ;
114113
115114 if self . data_tables . contains ( & table_name) {
116115 let quoted = quote_internal_name ( type_name, false ) ;
117116
118- if buckets == 0 {
117+ // is_err() is essentially a NULL check here.
118+ // NULL data means no PUT operations found, so we delete the row.
119+ if data. is_err ( ) {
119120 // DELETE
120121 let delete_statement = self
121122 . db
@@ -134,7 +135,7 @@ impl<'a> SyncOperation<'a> {
134135 insert_statement. exec ( ) ?;
135136 }
136137 } else {
137- if buckets == 0 {
138+ if data . is_err ( ) {
138139 // DELETE
139140 // language=SQLite
140141 let delete_statement = self
@@ -185,32 +186,29 @@ impl<'a> SyncOperation<'a> {
185186 Ok ( match & self . partial {
186187 None => {
187188 // Complete sync
189+ // See dart/test/sync_local_performance_test.dart for an annotated version of this query.
188190 self . db
189191 . prepare_v2 (
190192 "\
191- -- 1. Filter oplog by the ops added but not applied yet (oplog b).
192- -- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
193193 WITH updated_rows AS (
194- SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets
195- CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
196- AND (b.op_id > buckets.last_applied_op)
197- UNION SELECT row_type, row_id FROM ps_updated_rows
194+ SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
195+ CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
196+ AND (b.op_id > buckets.last_applied_op)
197+ UNION ALL SELECT row_type, row_id FROM ps_updated_rows
198198)
199199
200- -- 3. Group the objects from different buckets together into a single one (ops).
201- SELECT b.row_type as type,
202- b.row_id as id,
203- r.data as data,
204- count(r.bucket) as buckets,
205- /* max() affects which row is used for 'data' */
206- max(r.op_id) as op_id
207- -- 2. Find *all* current ops over different buckets for those objects (oplog r).
208- FROM updated_rows b
209- LEFT OUTER JOIN ps_oplog AS r
210- ON r.row_type = b.row_type
211- AND r.row_id = b.row_id
212- -- Group for (3)
213- GROUP BY b.row_type, b.row_id" ,
200+ SELECT
201+ b.row_type,
202+ b.row_id,
203+ (
204+ SELECT iif(max(r.op_id), r.data, null)
205+ FROM ps_oplog r
206+ WHERE r.row_type = b.row_type
207+ AND r.row_id = b.row_id
208+
209+ ) as data
210+ FROM updated_rows b
211+ GROUP BY b.row_type, b.row_id;" ,
214212 )
215213 . into_db_result ( self . db ) ?
216214 }
@@ -220,33 +218,38 @@ GROUP BY b.row_type, b.row_id",
220218 . prepare_v2 (
221219 "\
222220 -- 1. Filter oplog by the ops added but not applied yet (oplog b).
223- -- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
221+ -- We do not do any DISTINCT operation here, since that introduces a temp b-tree.
222+ -- We filter out duplicates using the GROUP BY below.
224223WITH
225224 involved_buckets (id) AS MATERIALIZED (
226225 SELECT id FROM ps_buckets WHERE ?1 IS NULL
227226 OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets')))
228227 ),
229228 updated_rows AS (
230- SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets
231- CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op)
232- WHERE buckets.id IN (SELECT id FROM involved_buckets)
229+ SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
230+ CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
231+ AND (b.op_id > buckets.last_applied_op)
232+ WHERE buckets.id IN (SELECT id FROM involved_buckets)
233233 )
234234
235- -- 3. Group the objects from different buckets together into a single one (ops).
236- SELECT b.row_type as type,
237- b.row_id as id,
238- r.data as data,
239- count(r.bucket) as buckets,
240- /* max() affects which row is used for 'data' */
241- max(r.op_id) as op_id
242235-- 2. Find *all* current ops over different buckets for those objects (oplog r).
243- FROM updated_rows b
244- LEFT OUTER JOIN ps_oplog AS r
245- ON r.row_type = b.row_type
246- AND r.row_id = b.row_id
247- AND r.bucket IN (SELECT id FROM involved_buckets)
248- -- Group for (3)
249- GROUP BY b.row_type, b.row_id" ,
236+ SELECT
237+ b.row_type,
238+ b.row_id,
239+ (
240+ -- 3. For each unique row, select the data from the latest oplog entry.
241+ -- The max(r.op_id) clause is used to select the latest oplog entry.
242+ -- The iif is to avoid the max(r.op_id) column ending up in the results.
243+ SELECT iif(max(r.op_id), r.data, null)
244+ FROM ps_oplog r
245+ WHERE r.row_type = b.row_type
246+ AND r.row_id = b.row_id
247+ AND r.bucket IN (SELECT id FROM involved_buckets)
248+
249+ ) as data
250+ FROM updated_rows b
251+ -- Group for (2)
252+ GROUP BY b.row_type, b.row_id;" ,
250253 )
251254 . into_db_result ( self . db ) ?;
252255 stmt. bind_text ( 1 , partial. args , Destructor :: STATIC ) ?;
0 commit comments