Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Sources/Helpers/EventEmitter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import Foundation
/// When this token gets deallocated it cancels the observation it was associated with. Store this token in another object to keep the observation alive.
public final class ObservationToken: @unchecked Sendable, Hashable {
private let _isCancelled = LockIsolated(false)
package var onCancel: @Sendable () -> Void
package var onCancel: () -> Void

public var isCancelled: Bool {
_isCancelled.withValue { $0 }
}

package init(onCancel: @escaping @Sendable () -> Void = {}) {
package init(onCancel: @escaping () -> Void = {}) {
self.onCancel = onCancel
}

Expand Down
96 changes: 31 additions & 65 deletions Sources/Realtime/CallbackManager.swift
Original file line number Diff line number Diff line change
@@ -1,105 +1,75 @@
import ConcurrencyExtras
import Foundation

final class CallbackManager: Sendable {
struct MutableState {
var id = 0
var serverChanges: [PostgresJoinConfig] = []
var callbacks: [RealtimeCallback] = []
}

private let mutableState = LockIsolated(MutableState())

var serverChanges: [PostgresJoinConfig] {
mutableState.serverChanges
}

var callbacks: [RealtimeCallback] {
mutableState.callbacks
}

deinit {
reset()
}
@MainActor
final class CallbackManager {
var id = 0
var serverChanges: [PostgresJoinConfig] = []
var callbacks: [RealtimeCallback] = []

@discardableResult
func addBroadcastCallback(
event: String,
callback: @escaping @Sendable (JSONObject) -> Void
) -> Int {
mutableState.withValue {
$0.id += 1
$0.callbacks.append(
.broadcast(
BroadcastCallback(
id: $0.id,
event: event,
callback: callback
)
self.id += 1
self.callbacks.append(
.broadcast(
BroadcastCallback(
id: self.id,
event: event,
callback: callback
)
)
return $0.id
}
)
return self.id
}

@discardableResult
func addPostgresCallback(
filter: PostgresJoinConfig,
callback: @escaping @Sendable (AnyAction) -> Void
) -> Int {
mutableState.withValue {
$0.id += 1
$0.callbacks.append(
self.id += 1
self.callbacks.append(
.postgres(
PostgresCallback(
id: $0.id,
id: self.id,
filter: filter,
callback: callback
)
)
)
return $0.id
}
return self.id
}

@discardableResult
func addPresenceCallback(callback: @escaping @Sendable (any PresenceAction) -> Void) -> Int {
mutableState.withValue {
$0.id += 1
$0.callbacks.append(.presence(PresenceCallback(id: $0.id, callback: callback)))
return $0.id
}
self.id += 1
self.callbacks.append(.presence(PresenceCallback(id: self.id, callback: callback)))
return self.id
}

@discardableResult
func addSystemCallback(callback: @escaping @Sendable (RealtimeMessageV2) -> Void) -> Int {
mutableState.withValue {
$0.id += 1
$0.callbacks.append(.system(SystemCallback(id: $0.id, callback: callback)))
return $0.id
}
self.id += 1
self.callbacks.append(.system(SystemCallback(id: self.id, callback: callback)))
return self.id
}

func setServerChanges(changes: [PostgresJoinConfig]) {
mutableState.withValue {
$0.serverChanges = changes
}
self.serverChanges = changes
}

func removeCallback(id: Int) {
mutableState.withValue {
$0.callbacks.removeAll { $0.id == id }
}
self.callbacks.removeAll { $0.id == id }
}

func triggerPostgresChanges(ids: [Int], data: AnyAction) {
// Read mutableState at start to acquire lock once.
let mutableState = mutableState.value

let filters = mutableState.serverChanges.filter {
let filters = serverChanges.filter {
ids.contains($0.id)
}
let postgresCallbacks = mutableState.callbacks.compactMap {
let postgresCallbacks = callbacks.compactMap {
if case let .postgres(callback) = $0 {
return callback
}
Expand All @@ -118,7 +88,7 @@ final class CallbackManager: Sendable {
}

func triggerBroadcast(event: String, json: JSONObject) {
let broadcastCallbacks = mutableState.callbacks.compactMap {
let broadcastCallbacks = callbacks.compactMap {
if case let .broadcast(callback) = $0 {
return callback
}
Expand All @@ -133,7 +103,7 @@ final class CallbackManager: Sendable {
leaves: [String: PresenceV2],
rawMessage: RealtimeMessageV2
) {
let presenceCallbacks = mutableState.callbacks.compactMap {
let presenceCallbacks = callbacks.compactMap {
if case let .presence(callback) = $0 {
return callback
}
Expand All @@ -151,7 +121,7 @@ final class CallbackManager: Sendable {
}

func triggerSystem(message: RealtimeMessageV2) {
let systemCallbacks = mutableState.callbacks.compactMap {
let systemCallbacks = callbacks.compactMap {
if case .system(let callback) = $0 {
return callback
}
Expand All @@ -162,10 +132,6 @@ final class CallbackManager: Sendable {
systemCallback.callback(message)
}
}

func reset() {
mutableState.setValue(MutableState())
}
}

struct PostgresCallback {
Expand Down
67 changes: 16 additions & 51 deletions Sources/Realtime/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,28 @@ public struct RealtimeChannelConfig: Sendable {
public var isPrivate: Bool
}

protocol RealtimeChannelProtocol: AnyObject, Sendable {
@MainActor var config: RealtimeChannelConfig { get }
@MainActor
protocol RealtimeChannelProtocol: AnyObject {
var config: RealtimeChannelConfig { get }
var topic: String { get }
var logger: (any SupabaseLogger)? { get }

var socket: any RealtimeClientProtocol { get }
}

@MainActor
public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
struct MutableState {
var clientChanges: [PostgresJoinConfig] = []
var joinRef: String?
var pushes: [String: PushV2] = [:]
}

@MainActor
private var mutableState = MutableState()
var clientChanges: [PostgresJoinConfig] = []
var joinRef: String?
var pushes: [String: PushV2] = [:]

let topic: String

@MainActor var config: RealtimeChannelConfig
var config: RealtimeChannelConfig

let logger: (any SupabaseLogger)?
let socket: any RealtimeClientProtocol

@MainActor var joinRef: String? { mutableState.joinRef }

let callbackManager = CallbackManager()
private let statusSubject = AsyncValueSubject<RealtimeChannelStatus>(.unsubscribed)

Expand Down Expand Up @@ -87,10 +82,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
self.socket = socket
}

deinit {
callbackManager.reset()
}

/// Subscribes to the channel.
public func subscribeWithError() async throws {
logger?.debug(
Expand Down Expand Up @@ -162,13 +153,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
throw RealtimeError.maxRetryAttemptsReached
}

/// Subscribes to the channel.
@available(*, deprecated, message: "Use `subscribeWithError` instead")
@MainActor
public func subscribe() async {
try? await subscribeWithError()
}

/// Calculates retry delay with exponential backoff and jitter
private func calculateRetryDelay(for attempt: Int) -> TimeInterval {
let baseDelay: TimeInterval = 1.0
Expand All @@ -186,7 +170,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
}

/// Subscribes to the channel
@MainActor
private func _subscribe() async {
if socket.status != .connected {
if socket.options.connectOnSubscribe != true {
Expand All @@ -205,7 +188,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
let joinConfig = RealtimeJoinConfig(
broadcast: config.broadcast,
presence: config.presence,
postgresChanges: mutableState.clientChanges,
postgresChanges: clientChanges,
isPrivate: config.isPrivate
)

Expand All @@ -216,7 +199,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
)

let joinRef = socket.makeRef()
mutableState.joinRef = joinRef
self.joinRef = joinRef

logger?.debug("Subscribing to channel with body: \(joinConfig)")

Expand All @@ -236,20 +219,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
await push(ChannelEvent.leave)
}

@available(
*,
deprecated,
message:
"manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
)
public func updateAuth(jwt: String?) async {
logger?.debug("Updating auth token for channel \(topic)")
await push(
ChannelEvent.accessToken,
payload: ["access_token": jwt.map { .string($0) } ?? .null]
)
}

/// Sends a broadcast message explicitly via REST API.
///
/// This method always uses the REST API endpoint regardless of WebSocket connection state.
Expand Down Expand Up @@ -295,7 +264,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
}
headers[.authorization] = "Bearer \(accessToken)"

let body = try await JSONEncoder.supabase().encode(
let body = try JSONEncoder.supabase().encode(
BroadcastMessagePayload(
messages: [
BroadcastMessagePayload.Message(
Expand All @@ -317,7 +286,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {

let response = try await withTimeout(interval: timeout ?? socket.options.timeoutInterval) {
[self] in
await Result {
await Result { @Sendable in
try await socket.http.send(request)
}
}.get()
Expand Down Expand Up @@ -475,7 +444,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
throw RealtimeError("Received a reply with unexpected payload: \(message)")
}

await didReceiveReply(ref: ref, status: status)
didReceiveReply(ref: ref, status: status)

if message.payload["response"]?.objectValue?.keys
.contains(ChannelEvent.postgresChanges) == true
Expand Down Expand Up @@ -692,9 +661,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
filter: filter
)

Task { @MainActor in
mutableState.clientChanges.append(config)
}
clientChanges.append(config)

let id = callbackManager.addPostgresCallback(filter: config, callback: callback)
return RealtimeSubscription { [weak callbackManager, logger] in
Expand Down Expand Up @@ -733,7 +700,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
self.onSystem { _ in callback() }
}

@MainActor
@discardableResult
func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus {
let message = RealtimeMessageV2(
Expand All @@ -746,15 +712,14 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {

let push = PushV2(channel: self, message: message)
if let ref = message.ref {
mutableState.pushes[ref] = push
pushes[ref] = push
}

return await push.send()
}

@MainActor
private func didReceiveReply(ref: String, status: String) {
let push = mutableState.pushes.removeValue(forKey: ref)
let push = pushes.removeValue(forKey: ref)
push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
}
}
Loading
Loading