Skip to content

Commit 48ec537

Browse files
author
DominicGBauer
committed
chore: merge main
2 parents 6833294 + 55cf059 commit 48ec537

File tree

7 files changed

+59
-26
lines changed

7 files changed

+59
-26
lines changed

core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ public interface PowerSyncDatabase : Queries {
2323
*/
2424
public val currentStatus: SyncStatus
2525

26+
/**
27+
* Suspend function that resolves when the first sync has occurred
28+
*/
29+
public suspend fun waitForFirstSync()
30+
2631
/**
2732
* Connect to the PowerSync service, and keep the databases in sync.
2833
*

core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,10 @@ internal class BucketStorage(
8989

9090
logger.i { "[updateLocalTarget] Updating target to checkpoint $opId" }
9191

92-
return db.readTransaction {
92+
return db.writeTransaction {
9393
if (hasCrud()) {
9494
logger.w { "[updateLocalTarget] ps crud is not empty" }
95-
return@readTransaction false
95+
return@writeTransaction false
9696
}
9797

9898
val seqAfter =
@@ -103,15 +103,17 @@ internal class BucketStorage(
103103
throw AssertionError("Sqlite Sequence should not be empty")
104104

105105
if (seqAfter != seqBefore) {
106+
logger.d("seqAfter != seqBefore seqAfter: $seqAfter seqBefore: $seqBefore")
106107
// New crud data may have been uploaded since we got the checkpoint. Abort.
107-
return@readTransaction false
108+
return@writeTransaction false
108109
}
109110

110111
db.execute(
111-
"UPDATE ${InternalTable.BUCKETS} SET target_op = ? WHERE name='\$local'",
112+
"UPDATE ${InternalTable.BUCKETS} SET target_op = CAST(? as INTEGER) WHERE name='\$local'",
112113
listOf(opId)
113114
)
114-
return@readTransaction true
115+
116+
return@writeTransaction true
115117
}
116118
}
117119

@@ -128,7 +130,7 @@ internal class BucketStorage(
128130

129131
suspend fun getBucketStates(): List<BucketState> {
130132
return db.getAll(
131-
"SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ${InternalTable.BUCKETS} WHERE pending_delete = 0",
133+
"SELECT name AS bucket, CAST(last_op AS TEXT) AS op_id FROM ${InternalTable.BUCKETS} WHERE pending_delete = 0",
132134
mapper = { cursor ->
133135
BucketState(
134136
bucket = cursor.getString(0)!!,
@@ -153,6 +155,8 @@ internal class BucketStorage(
153155
)
154156
}
155157

158+
Logger.d("[deleteBucket] Done deleting")
159+
156160
this.pendingBucketDeletes.value = true
157161
}
158162

@@ -221,7 +225,7 @@ internal class BucketStorage(
221225

222226
private suspend fun validateChecksums(checkpoint: Checkpoint): SyncLocalDatabaseResult {
223227
val res = db.getOptional(
224-
"SELECT powersync_validate_checkpoint(?) as result",
228+
"SELECT powersync_validate_checkpoint(?) AS result",
225229
parameters = listOf(JsonUtil.json.encodeToString(checkpoint)),
226230
mapper = { cursor ->
227231
cursor.getString(0)!!
@@ -242,11 +246,16 @@ internal class BucketStorage(
242246
*/
243247
private suspend fun updateObjectsFromBuckets(): Boolean {
244248
return db.writeTransaction { tx ->
245-
val res = tx.execute(
249+
250+
tx.execute(
246251
"INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
247252
listOf("sync_local", "")
248253
)
249254

255+
val res = tx.get("select last_insert_rowid()") { cursor ->
256+
cursor.getLong(0)!!
257+
}
258+
250259
return@writeTransaction res == 1L
251260
}
252261
}
@@ -275,12 +284,9 @@ internal class BucketStorage(
275284

276285
db.writeTransaction { tx ->
277286
tx.execute(
278-
"DELETE FROM ps_oplog WHERE bucket IN (SELECT name FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op)",
287+
"INSERT INTO powersync_operations(op, data) VALUES (?, ?)", listOf("delete_pending_buckets","")
279288
)
280289

281-
tx.execute(
282-
"DELETE FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op",
283-
)
284290
// Executed once after start-up, and again when there are pending deletes.
285291
pendingBucketDeletes.value = false
286292
}

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import kotlinx.coroutines.Job
2929
import kotlinx.coroutines.cancelAndJoin
3030
import kotlinx.coroutines.flow.Flow
3131
import kotlinx.coroutines.flow.debounce
32+
import kotlinx.coroutines.flow.first
3233
import kotlinx.coroutines.launch
3334
import kotlinx.coroutines.runBlocking
3435
import kotlinx.datetime.Instant
@@ -191,7 +192,8 @@ internal class PowerSyncDatabaseImpl(
191192
}
192193

193194
return@readTransaction CrudTransaction(
194-
crud = entries, transactionId = txId,
195+
crud = entries,
196+
transactionId = txId,
195197
complete = { writeCheckpoint ->
196198
logger.i { "[CrudTransaction::complete] Completing transaction with checkpoint $writeCheckpoint" }
197199
handleWriteCheckpoint(entries.last().clientId, writeCheckpoint)
@@ -259,12 +261,12 @@ internal class PowerSyncDatabaseImpl(
259261

260262
if (writeCheckpoint != null && bucketStorage.hasCrud()) {
261263
tx.execute(
262-
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='\$local'",
264+
"UPDATE ps_buckets SET target_op = CAST(? AS INTEGER) WHERE name='\$local'",
263265
listOf(writeCheckpoint),
264266
)
265267
} else {
266268
tx.execute(
267-
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='\$local'",
269+
"UPDATE ps_buckets SET target_op = CAST(? AS INTEGER) WHERE name='\$local'",
268270
listOf(bucketStorage.getMaxOpId()),
269271
)
270272
}
@@ -319,6 +321,16 @@ internal class PowerSyncDatabaseImpl(
319321
}
320322
}
321323

324+
override suspend fun waitForFirstSync() {
325+
if (currentStatus.hasSynced == true) {
326+
return
327+
}
328+
329+
currentStatus.asFlow().first { status ->
330+
status.hasSynced == true
331+
}
332+
}
333+
322334
/**
323335
* Check that a supported version of the powersync extension is loaded.
324336
*/

core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public data class SyncStatus internal constructor(
150150
get() = data.downloadError
151151

152152
override fun toString(): String {
153-
return "SyncStatus(connected=$connected, connecting=$connecting, downloading=$downloading, uploading=$uploading, lastSyncedAt=$lastSyncedAt, hasSynced: $hasSynced, error=$anyError)"
153+
return "SyncStatus(connected=$connected, connecting=$connecting, downloading=$downloading, uploading=$uploading, lastSyncedAt=$lastSyncedAt, hasSynced=$hasSynced, error=$anyError)"
154154
}
155155

156156
public companion object {

core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,21 @@ internal class SyncStream(
171171
status.update(uploading = false)
172172
}
173173

174+
private suspend fun uploadCrudBatch(): Boolean {
175+
if (bucketStorage.hasCrud()) {
176+
status.update(uploading = true)
177+
uploadCrud()
178+
return false
179+
} else {
180+
// This isolate is the only one triggering
181+
return bucketStorage.updateLocalTarget { getWriteCheckpoint() }
182+
}
183+
}
184+
174185
private suspend fun getWriteCheckpoint(): String {
175186
val credentials = connector.getCredentialsCached()
176187
require(credentials != null) { "Not logged in" }
177-
val uri = credentials.endpointUri("write-checkpoint2.json?client_id=$clientId'")
188+
val uri = credentials.endpointUri("write-checkpoint2.json?client_id=$clientId")
178189

179190
val response = httpClient.get(uri) {
180191
contentType(ContentType.Application.Json)
@@ -267,9 +278,7 @@ internal class SyncStream(
267278
jsonString: String,
268279
state: SyncStreamState
269280
): SyncStreamState {
270-
logger.i { "[handleInstruction] Received Instruction: $jsonString" }
271281
val obj = JsonUtil.json.parseToJsonElement(jsonString).jsonObject
272-
273282
// TODO: Clean up
274283
when {
275284
isStreamingSyncCheckpoint(obj) -> return handleStreamingSyncCheckpoint(obj, state)
@@ -297,7 +306,6 @@ internal class SyncStream(
297306
): SyncStreamState {
298307
val checkpoint =
299308
JsonUtil.json.decodeFromJsonElement<Checkpoint>(jsonObj["checkpoint"] as JsonElement)
300-
301309
state.targetCheckpoint = checkpoint
302310
val bucketsToDelete = state.bucketSet!!.toMutableList()
303311
val newBuckets = mutableSetOf<String>()
@@ -392,8 +400,6 @@ internal class SyncStream(
392400
jsonObj: JsonObject,
393401
state: SyncStreamState
394402
): SyncStreamState {
395-
396-
397403
val syncBuckets =
398404
listOf<SyncDataBucket>(JsonUtil.json.decodeFromJsonElement(jsonObj["data"] as JsonElement))
399405

demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/powersync/List.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package com.powersync.demos.powersync
22

33
import androidx.lifecycle.ViewModel
4+
import androidx.lifecycle.viewModelScope
45
import com.powersync.PowerSyncDatabase
56
import kotlinx.coroutines.flow.Flow
67
import kotlinx.coroutines.flow.MutableStateFlow
78
import kotlinx.coroutines.flow.StateFlow
9+
import kotlinx.coroutines.launch
810
import kotlinx.coroutines.runBlocking
911

1012
internal class ListContent(
@@ -37,7 +39,7 @@ internal class ListContent(
3739
}
3840

3941
fun onItemDeleteClicked(item: ListItem) {
40-
runBlocking {
42+
viewModelScope.launch {
4143
db.writeTransaction { tx ->
4244
tx.execute("DELETE FROM $LISTS_TABLE WHERE id = ?", listOf(item.id))
4345
tx.execute("DELETE FROM $TODOS_TABLE WHERE list_id = ?", listOf(item.id))
@@ -48,7 +50,7 @@ internal class ListContent(
4850
fun onAddItemClicked() {
4951
if (_inputText.value.isBlank()) return
5052

51-
runBlocking {
53+
viewModelScope.launch {
5254
db.writeTransaction { tx ->
5355
tx.execute(
5456
"INSERT INTO $LISTS_TABLE (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)",

demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/powersync/List.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package com.powersync.demos.powersync
22

33
import androidx.lifecycle.ViewModel
4+
import androidx.lifecycle.viewModelScope
45
import com.powersync.PowerSyncDatabase
56
import kotlinx.coroutines.flow.Flow
67
import kotlinx.coroutines.flow.MutableStateFlow
78
import kotlinx.coroutines.flow.StateFlow
9+
import kotlinx.coroutines.launch
810
import kotlinx.coroutines.runBlocking
911

1012
internal class ListContent(
@@ -37,7 +39,7 @@ internal class ListContent(
3739
}
3840

3941
fun onItemDeleteClicked(item: ListItem) {
40-
runBlocking {
42+
viewModelScope.launch {
4143
db.writeTransaction { tx ->
4244
tx.execute("DELETE FROM $LISTS_TABLE WHERE id = ?", listOf(item.id))
4345
tx.execute("DELETE FROM $TODOS_TABLE WHERE list_id = ?", listOf(item.id))
@@ -48,7 +50,7 @@ internal class ListContent(
4850
fun onAddItemClicked() {
4951
if (_inputText.value.isBlank()) return
5052

51-
runBlocking {
53+
viewModelScope.launch {
5254
db.writeTransaction { tx ->
5355
tx.execute(
5456
"INSERT INTO $LISTS_TABLE (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)",

0 commit comments

Comments
 (0)