diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b18bf36a..44eff3543 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### ✅ Added - Add support for pre-built XCFrameworks +- Fast reconnection ### 🔄 Changed - You can now focus on a desired point in the local video stream. [#221](https://github.com/GetStream/stream-video-swift/pull/221) diff --git a/README.md b/README.md index b862af2d8..117f9d471 100644 --- a/README.md +++ b/README.md @@ -193,11 +193,11 @@ Video roadmap and changelog is available [here](https://github.com/GetStream/pro ### 0.5.0 milestone +- [x] Tap to focus +- [x] Complete reconnection flows - [ ] Video UIKit tutorial - [ ] Improve logging / Sentry integration - [ ] Camera controls -- [x] Tap to focus -- [ ] Complete reconnection flows - [ ] Complete Livestreaming APIs and Tutorials for hosting & watching - [ ] Dynascale 2.0 - [ ] Buttons to simulate ice-restart and SFU switching diff --git a/Sources/StreamVideo/WebRTC/PeerConnection.swift b/Sources/StreamVideo/WebRTC/PeerConnection.swift index 4ee73766e..a2df12018 100644 --- a/Sources/StreamVideo/WebRTC/PeerConnection.swift +++ b/Sources/StreamVideo/WebRTC/PeerConnection.swift @@ -32,6 +32,10 @@ class PeerConnection: NSObject, RTCPeerConnectionDelegate, @unchecked Sendable { var onStreamRemoved: ((RTCMediaStream) -> Void)? var paused = false + + var connectionState: RTCPeerConnectionState { + pc.connectionState + } init( sessionId: String, @@ -167,6 +171,10 @@ class PeerConnection: NSObject, RTCPeerConnectionDelegate, @unchecked Sendable { } try await add(candidate: iceCandidate) } + + func restartIce() { + pc.restartIce() + } func close() { pc.close() diff --git a/Sources/StreamVideo/WebRTC/SfuMiddleware.swift b/Sources/StreamVideo/WebRTC/SfuMiddleware.swift index a02a77062..001431e0b 100644 --- a/Sources/StreamVideo/WebRTC/SfuMiddleware.swift +++ b/Sources/StreamVideo/WebRTC/SfuMiddleware.swift @@ -14,7 +14,7 @@ class SfuMiddleware: EventMiddleware { var signalService: Stream_Video_Sfu_Signal_SignalServer private var subscriber: PeerConnection? private var publisher: PeerConnection? - var onSocketConnected: (() -> Void)? + var onSocketConnected: ((Bool) -> Void)? var onParticipantCountUpdated: ((UInt32) -> ())? var onSessionMigrationEvent: (() -> Void)? var onPinsChanged: (([Stream_Video_Sfu_Models_Pin]) -> ())? @@ -71,7 +71,7 @@ class SfuMiddleware: EventMiddleware { case .dominantSpeakerChanged(let event): await handleDominantSpeakerChanged(event) case .joinResponse(let event): - onSocketConnected?() + onSocketConnected?(event.reconnected) await loadParticipants(from: event) case .healthCheckResponse(let event): onParticipantCountUpdated?(event.participantCount.total) diff --git a/Sources/StreamVideo/WebRTC/WebRTCClient.swift b/Sources/StreamVideo/WebRTC/WebRTCClient.swift index 38e562577..4eac98638 100644 --- a/Sources/StreamVideo/WebRTC/WebRTCClient.swift +++ b/Sources/StreamVideo/WebRTC/WebRTCClient.swift @@ -12,6 +12,9 @@ class WebRTCClient: NSObject, @unchecked Sendable { static let screenshareTrackType = "TRACK_TYPE_SCREEN_SHARE" static let videoTrackType = "TRACK_TYPE_VIDEO" static let audioTrackType = "TRACK_TYPE_AUDIO" + static let timeoutInterval: TimeInterval = 15 + static let participantsThreshold = 10 + static let fastReconnectTimeout: TimeInterval = 4.0 } actor State: ObservableObject { @@ -148,7 +151,6 @@ class WebRTCClient: NSObject, @unchecked Sendable { private(set) var sessionID: String private var token: String - private let timeoutInterval: TimeInterval = 15 private(set) var localVideoTrack: RTCVideoTrack? private(set) var localAudioTrack: RTCAudioTrack? @@ -158,7 +160,6 @@ class WebRTCClient: NSObject, @unchecked Sendable { private let user: User private let callCid: String private let audioSession = AudioSession() - private let participantsThreshold = 10 private var connectOptions: ConnectOptions? internal var ownCapabilities: [OwnCapability] private let videoConfig: VideoConfig @@ -174,7 +175,9 @@ class WebRTCClient: NSObject, @unchecked Sendable { private var fromSfuName: String? private var tempSubscriber: PeerConnection? private var currentScreenhsareType: ScreensharingType? - + private var isFastReconnecting = false + private var disconnectTime: Date? + @Injected(\.thermalStateObserver) private var thermalStateObserver var onParticipantsUpdated: (([String: CallParticipant]) -> Void)? @@ -202,7 +205,7 @@ class WebRTCClient: NSObject, @unchecked Sendable { signalService: signalService, subscriber: subscriber, publisher: publisher, - participantThreshold: participantsThreshold + participantThreshold: Constants.participantsThreshold ) init( @@ -245,16 +248,19 @@ class WebRTCClient: NSObject, @unchecked Sendable { } addOnParticipantsChangeHandler() subscribeToAppLifecycleChanges() + subscribeToInternetConnectionUpdates() } func connect( callSettings: CallSettings, videoOptions: VideoOptions, connectOptions: ConnectOptions, - migrating: Bool = false + migrating: Bool = false, + fastReconnect: Bool = false ) async throws { let connectionStatus = await state.connectionState - if (connectionStatus == .connected || connectionStatus == .connecting) && !migrating { + let isReconnection = migrating || fastReconnect + if (connectionStatus == .connected || connectionStatus == .connecting) && !isReconnection { log.debug("Skipping connection, already connected or connecting", subsystems: .webRTC) return } @@ -264,16 +270,21 @@ class WebRTCClient: NSObject, @unchecked Sendable { log.debug("Connecting to SFU", subsystems: .webRTC) await state.update(connectionState: .connecting) log.debug("Setting user media", subsystems: .webRTC) - if !migrating { + if !isReconnection { + isFastReconnecting = false await setupUserMedia(callSettings: callSettings) log.debug("Connecting WS channel", subsystems: .webRTC) signalChannel?.connect() sfuMiddleware.onSocketConnected = handleOnSocketConnected - } else { + } else if migrating { log.debug("Performing session migration", subsystems: .webRTC) migratingWSClient?.connect() publisher?.update(configuration: connectOptions.rtcConfiguration) sfuMiddleware.onSocketConnected = handleOnMigrationJoinResponse + } else if fastReconnect { + log.debug("Performing fast reconnect", subsystems: .webRTC) + signalChannel?.connect() + sfuMiddleware.onSocketConnected = handleOnSocketConnected } sfuMiddleware.onParticipantCountUpdated = { [weak self] participantCount in self?.onParticipantCountUpdated?(participantCount) @@ -572,10 +583,17 @@ class WebRTCClient: NSObject, @unchecked Sendable { // MARK: - private - private func handleOnSocketConnected() { + private func handleOnSocketConnected(reconnected: Bool) { Task { do { - try await self.setupPeerConnections() + if !reconnected { + try await self.setupPeerConnections() + } else { + log.debug("reconnected - restarting publisher ice") + publisher?.restartIce() + await state.update(connectionState: .connected) + signalChannel?.engine?.send(message: Stream_Video_Sfu_Event_HealthCheckRequest()) + } } catch { log.error("Error setting up peer connections", subsystems: .webRTC, error: error) await self.state.update(connectionState: .disconnected()) @@ -583,7 +601,7 @@ class WebRTCClient: NSObject, @unchecked Sendable { } } - private func handleOnMigrationJoinResponse() { + private func handleOnMigrationJoinResponse(reconnected: Bool) { signalChannel?.connectionStateDelegate = nil signalChannel?.onWSConnectionEstablished = nil signalChannel?.disconnect {} @@ -637,6 +655,13 @@ class WebRTCClient: NSObject, @unchecked Sendable { subscriber?.onStreamAdded = handleStreamAdded subscriber?.onStreamRemoved = handleStreamRemoved + subscriber?.onDisconnect = { [weak self] _ in + log.debug("subscriber disconnected") + if self?.isFastReconnecting == false { + log.debug("notifying of subscriber disconnection") + self?.onSignalConnectionStateChange?(.disconnected(source: .noPongReceived)) + } + } log.debug("Updating connection status to connected", subsystems: .webRTC) await state.update(connectionState: .connected) @@ -657,7 +682,11 @@ class WebRTCClient: NSObject, @unchecked Sendable { ) publisher?.onNegotiationNeeded = handleNegotiationNeeded() publisher?.onDisconnect = { [weak self] _ in - self?.onSignalConnectionStateChange?(.disconnected(source: .noPongReceived)) + log.debug("publisher disconnected") + if self?.isFastReconnecting == false { + log.debug("notifying of publisher disconnection") + self?.onSignalConnectionStateChange?(.disconnected(source: .noPongReceived)) + } } } else { publisher?.signalService = signalService @@ -845,12 +874,14 @@ class WebRTCClient: NSObject, @unchecked Sendable { private func makeJoinRequest( subscriberSdp: String, - migrating: Bool = false + migrating: Bool = false, + fastReconnect: Bool = false ) async -> Stream_Video_Sfu_Event_JoinRequest { log.debug("Executing join request", subsystems: .webRTC) var joinRequest = Stream_Video_Sfu_Event_JoinRequest() joinRequest.sessionID = sessionID joinRequest.subscriberSdp = subscriberSdp + joinRequest.fastReconnect = fastReconnect if migrating { joinRequest.token = migratingToken ?? token var migration = Stream_Video_Sfu_Event_Migration() @@ -867,7 +898,8 @@ class WebRTCClient: NSObject, @unchecked Sendable { private func makeWebSocketClient( url: URL, apiKey: APIKey, - isMigrating: Bool = false + isMigrating: Bool = false, + isFastReconnect: Bool = false ) -> WebSocketClient { let config = URLSessionConfiguration.default config.waitsForConnectivity = false @@ -891,7 +923,7 @@ class WebRTCClient: NSObject, @unchecked Sendable { if isMigrating { await self.sendMigrationJoinRequest() } else { - try await self.handleSocketConnected() + try await self.handleSocketConnected(fastReconnect: isFastReconnect) } } } @@ -899,9 +931,15 @@ class WebRTCClient: NSObject, @unchecked Sendable { return webSocketClient } - private func handleSocketConnected() async throws { - let sdp = try await tempOfferSdp() - await sendJoinRequest(with: sdp) + private func handleSocketConnected(fastReconnect: Bool = false) async throws { + let sdp: String + if fastReconnect, let subscriber { + let offer = try await subscriber.createOffer() + sdp = offer.sdp + } else { + sdp = try await tempOfferSdp() + } + await sendJoinRequest(with: sdp, fastReconnect: fastReconnect) } private func tempOfferSdp() async throws -> String { @@ -939,8 +977,16 @@ class WebRTCClient: NSObject, @unchecked Sendable { return offer.sdp } - private func sendJoinRequest(with sdp: String, migrating: Bool = false) async { - let payload = await makeJoinRequest(subscriberSdp: sdp, migrating: migrating) + private func sendJoinRequest( + with sdp: String, + migrating: Bool = false, + fastReconnect: Bool = false + ) async { + let payload = await makeJoinRequest( + subscriberSdp: sdp, + migrating: migrating, + fastReconnect: fastReconnect + ) var event = Stream_Video_Sfu_Event_SfuRequest() event.requestPayload = .joinRequest(payload) if migrating { @@ -1163,10 +1209,119 @@ class WebRTCClient: NSObject, @unchecked Sendable { ) } + private func subscribeToInternetConnectionUpdates() { + NotificationCenter.default.addObserver( + self, + selector: #selector(handleConnectionStateChange), + name: .internetConnectionStatusDidChange, + object: nil + ) + } + + @objc private func handleConnectionStateChange(_ notification: NSNotification) { + guard let status = notification.userInfo?[Notification.internetConnectionStatusUserInfoKey] as? InternetConnection.Status else { + return + } + + handleConnectionState(isAvailable: status.isAvailable) + } + + private func handleConnectionState(isAvailable: Bool) { + if !isAvailable { + disconnectTime = Date() + return + } + + guard isAvailable, !isFastReconnecting else { return } + + if let disconnectTime { + let offlineInterval = Date().timeIntervalSince(disconnectTime) + log.debug("offline interval is \(offlineInterval) seconds") + if offlineInterval <= Constants.fastReconnectTimeout { + isFastReconnecting = true + } + } + + disconnectTime = nil + + if isFastReconnecting, let url = signalChannel?.connectURL { + signalChannel = makeWebSocketClient( + url: url, + apiKey: .init(apiKey), + isFastReconnect: true + ) + Task { + try await connect( + callSettings: callSettings, + videoOptions: videoOptions, + connectOptions: connectOptions!, + fastReconnect: true + ) + checkFastReconnectionStatus() + } + } + } + + private func checkFastReconnectionStatus(retries: Int = 0) { + DispatchQueue.main.asyncAfter(deadline: .now() + Constants.fastReconnectTimeout) { [weak self] in + guard let self else { return } + if (isPeerConnectionConnecting(publisher, otherNotDisconnected: subscriber) + || isPeerConnectionConnecting(subscriber, otherNotDisconnected: publisher)) + && retries == 0 { + log.debug("Still connecting, check again after the interval") + checkFastReconnectionStatus(retries: 1) + return + } + self.isFastReconnecting = false + let reconnectPublisher = isPeerConnectionDisconnected(publisher) + let reconnectSubscriber = isPeerConnectionDisconnected(subscriber) + let shouldFullyReconnect = reconnectPublisher || reconnectSubscriber + if shouldFullyReconnect { + log.debug("Fast reconnect failed, doing full reconnect") + onSignalConnectionStateChange?(.disconnected(source: .noPongReceived)) + } else { + log.debug("Fast reconnect successfull") + } + } + } + + private func isPeerConnectionDisconnected(_ peerConnection: PeerConnection?) -> Bool { + guard let peerConnection else { + return false + } + + switch peerConnection.connectionState { + case .disconnected, .failed: + return true + default: + return false + } + } + + private func isPeerConnectionConnecting( + _ peerConnection: PeerConnection?, + otherNotDisconnected other: PeerConnection? + ) -> Bool { + let otherState = other?.connectionState + if peerConnection?.connectionState == .connecting + && (otherState != .disconnected && otherState != .failed && otherState != .closed) { + return true + } + return false + } } extension WebRTCClient: ConnectionStateDelegate { func webSocketClient(_ client: WebSocketClient, didUpdateConnectionState state: WebSocketConnectionState) { - onSignalConnectionStateChange?(state) + log.debug("WS connection state changed to \(state)") + switch state { + case .disconnected(source: _), .disconnecting(source: _): + handleConnectionState(isAvailable: false) + if !isFastReconnecting && disconnectTime == nil { + onSignalConnectionStateChange?(state) + } + default: + onSignalConnectionStateChange?(state) + } } } diff --git a/StreamVideoTests/Controllers/CallController_Tests.swift b/StreamVideoTests/Controllers/CallController_Tests.swift index 28fcd10dd..4d7c0a084 100644 --- a/StreamVideoTests/Controllers/CallController_Tests.swift +++ b/StreamVideoTests/Controllers/CallController_Tests.swift @@ -62,6 +62,8 @@ final class CallController_Tests: ControllerTestCase { engine.simulateConnectionSuccess() try await waitForCallEvent() engine.simulateDisconnect() + try await waitForCallEvent(nanoseconds: 5_000_000_000) + webRTCClient?.onSignalConnectionStateChange?(.disconnected(source: .noPongReceived)) try await waitForCallEvent() // Then @@ -151,6 +153,8 @@ final class CallController_Tests: ControllerTestCase { engine.simulateConnectionSuccess() try await waitForCallEvent() engine.simulateDisconnect() + try await waitForCallEvent(nanoseconds: 5_000_000_000) + webRTCClient?.onSignalConnectionStateChange?(.disconnected(source: .noPongReceived)) try await waitForCallEvent() // Then