Skip to content

Commit

Permalink
Merge branch 'develop' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
ipavlidakis authored Sep 27, 2024
2 parents 1968d05 + 8b8ee29 commit 8eb31fb
Show file tree
Hide file tree
Showing 17 changed files with 137 additions and 162 deletions.
6 changes: 4 additions & 2 deletions Sources/StreamVideo/Controllers/CallController.swift
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,10 @@ class CallController: @unchecked Sendable {
private func handleParticipantsUpdated() {
webRTCParticipantsObserver = debouncedParticipants?
.$value
.sinkTask { @MainActor [weak self] participants in
self?.call?.state.participantsMap = participants
.sink { [weak self] participants in
Task { @MainActor [weak self] in
self?.call?.state.participantsMap = participants
}
}
}

Expand Down
11 changes: 7 additions & 4 deletions Sources/StreamVideo/Models/CallParticipant.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,12 @@ public struct CallParticipant: Identifiable, Sendable, Hashable {
lhs.hasVideo == rhs.hasVideo &&
lhs.hasAudio == rhs.hasAudio &&
lhs.isScreensharing == rhs.isScreensharing &&
lhs.track?.isEnabled == rhs.track?.isEnabled &&
lhs.track?.trackId == rhs.track?.trackId &&
lhs.trackSize == rhs.trackSize &&
lhs.screenshareTrack === rhs.screenshareTrack &&
lhs.screenshareTrack?.isEnabled == rhs.screenshareTrack?.isEnabled &&
lhs.screenshareTrack?.trackId == rhs.screenshareTrack?.trackId &&
lhs.showTrack == rhs.showTrack &&
lhs.isSpeaking == rhs.isSpeaking &&
lhs.isDominantSpeaker == rhs.isDominantSpeaker &&
Expand All @@ -125,9 +130,7 @@ public struct CallParticipant: Identifiable, Sendable, Hashable {
lhs.joinedAt == rhs.joinedAt &&
lhs.audioLevel == rhs.audioLevel &&
lhs.audioLevels == rhs.audioLevels &&
lhs.pin == rhs.pin &&
lhs.track === rhs.track &&
lhs.screenshareTrack === rhs.screenshareTrack
lhs.pin == rhs.pin
}

public var isPinned: Bool {
Expand All @@ -141,7 +144,7 @@ public struct CallParticipant: Identifiable, Sendable, Hashable {

/// Determines whether the track of the participant should be displayed.
public var shouldDisplayTrack: Bool {
hasVideo && showTrack
hasVideo && showTrack && track?.isEnabled == true
}

public func withUpdated(trackSize: CGSize) -> CallParticipant {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Combine
import Foundation

extension Publisher {
public func sinkTask(
func sinkTask(
storeIn disposableBag: DisposableBag? = nil,
identifier: String = UUID().uuidString,
receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void) = { _ in },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,20 @@ actor ICEAdapter: @unchecked Sendable {
throw ClientError("PeerConnection type:\(peerType) was unable to trickle generated candidate\(candidate).")
}

try Task.checkCancellation()

log.debug(
"""
PeerConnection type:\(peerType) generated candidate while remoteDescription.
PeerConnection type:\(peerType) generated candidate while remoteDescription is \(peerConnection
.remoteDescription == nil ? "nil" : "non-nil").
Candidate: \(candidate)
""",
subsystems: .iceAdapter
)

try Task.checkCancellation()

try await sfuAdapter.iCETrickle(
candidate: jsonString,
peerType: peerType == .publisher ? .publisherUnspecified : .subscriber,
for: sessionID
)

try Task.checkCancellation()

log.debug(
"""
PeerConnection type:\(peerType) will store trickled candidate for future use.
Expand Down Expand Up @@ -183,7 +177,7 @@ actor ICEAdapter: @unchecked Sendable {
await withTaskGroup(of: Void.self) { [weak self] group in
guard let self else { return }
for candidate in await trickledCandidates {
group.addTask { @MainActor [weak self] in
group.addTask { [weak self] in
guard let self else { return }
do {
try Task.checkCancellation()
Expand Down Expand Up @@ -213,7 +207,7 @@ actor ICEAdapter: @unchecked Sendable {
private func task(
for candidate: RTCIceCandidate
) -> Task<Void, Never> {
Task { @MainActor [weak peerConnection] in
Task { [weak peerConnection] in
guard let peerConnection else { return }
do {
try Task.checkCancellation()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,8 @@ final class LocalAudioMediaAdapter: LocalMediaAdapting {

/// Cleans up resources when the instance is deallocated.
deinit {
Task { @MainActor [sender, localTrack] in
sender?.sender.track = nil
localTrack?.isEnabled = false
}
sender?.sender.track = nil
localTrack?.isEnabled = false
if let localTrack {
log.debug(
"""
Expand Down Expand Up @@ -122,7 +120,7 @@ final class LocalAudioMediaAdapter: LocalMediaAdapting {
)
)
}
audioTrack.isEnabled = settings.audioOn
audioTrack.isEnabled = false

log.debug(
"""
Expand All @@ -149,38 +147,34 @@ final class LocalAudioMediaAdapter: LocalMediaAdapting {

/// Starts publishing the local audio track.
func publish() {
Task { @MainActor in
guard
let localTrack,
localTrack.isEnabled == false || sender?.sender.track == nil
else {
return
}
guard
let localTrack,
localTrack.isEnabled == false || sender?.sender.track == nil
else {
return
}

if sender == nil {
sender = peerConnection.addTransceiver(
with: localTrack,
init: RTCRtpTransceiverInit(
trackType: .audio,
direction: .sendOnly,
streamIds: streamIds
)
if sender == nil {
sender = peerConnection.addTransceiver(
with: localTrack,
init: RTCRtpTransceiverInit(
trackType: .audio,
direction: .sendOnly,
streamIds: streamIds
)
} else {
sender?.sender.track = localTrack
}
localTrack.isEnabled = true
)
} else {
sender?.sender.track = localTrack
}
localTrack.isEnabled = true
}

/// Stops publishing the local audio track.
func unpublish() {
Task { @MainActor in
guard let sender, let localTrack else { return }
localTrack.isEnabled = false
sender.sender.track = nil
log.debug("Local audioTrack trackId:\(localTrack.trackId) is now unpublished.")
}
guard let sender, let localTrack else { return }
localTrack.isEnabled = false
sender.sender.track = nil
log.debug("Local audioTrack trackId:\(localTrack.trackId) is now unpublished.")
}

/// Updates the local audio media based on new call settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ final class LocalVideoMediaAdapter: LocalMediaAdapting, @unchecked Sendable {

/// Cleans up resources when the instance is deallocated.
deinit {
Task { @MainActor [sender, localTrack, capturer] in
try? await capturer?.stopCapture()
localTrack?.isEnabled = false
sender?.sender.track = nil
}
Task { [capturer] in try? await capturer?.stopCapture() }
localTrack?.isEnabled = false
sender?.sender.track = nil
if let localTrack {
log.debug(
"""
Local videoTrack will be deallocated trackId:\(localTrack.trackId) isEnabled:\(localTrack.isEnabled).
Local videoTrack will be deallocated
trackId:\(localTrack.trackId)
isEnabled:\(localTrack.isEnabled)
"""
)
}
Expand Down Expand Up @@ -124,6 +124,7 @@ final class LocalVideoMediaAdapter: LocalMediaAdapting, @unchecked Sendable {
)
}
} else if !hasVideo {
localTrack?.isEnabled = false
Task { [weak self] in
do {
try await self?.capturer?.stopCapture()
Expand All @@ -132,51 +133,40 @@ final class LocalVideoMediaAdapter: LocalMediaAdapting, @unchecked Sendable {
}
}
}

localTrack?.isEnabled = settings.videoOn
}

/// Starts publishing the local video track.
func publish() {
Task { @MainActor [weak self] in
guard
let self,
let localTrack,
localTrack.isEnabled == false || sender == nil
else {
return
}
guard
let localTrack,
localTrack.isEnabled == false || sender == nil
else {
return
}

if sender == nil {
sender = peerConnection.addTransceiver(
with: localTrack,
init: RTCRtpTransceiverInit(
trackType: .video,
direction: .sendOnly,
streamIds: streamIds,
codecs: videoOptions.supportedCodecs
)
if sender == nil {
sender = peerConnection.addTransceiver(
with: localTrack,
init: RTCRtpTransceiverInit(
trackType: .video,
direction: .sendOnly,
streamIds: streamIds,
codecs: videoOptions.supportedCodecs
)
} else {
sender?.sender.track = localTrack
}
localTrack.isEnabled = true
log.debug("Local videoTrack trackId:\(localTrack.trackId) is now published.")
)
} else {
sender?.sender.track = localTrack
}
localTrack.isEnabled = true
log.debug("Local videoTrack trackId:\(localTrack.trackId) is now published.")
}

/// Stops publishing the local video track.
func unpublish() {
Task { @MainActor [weak self] in
guard
let self,
let sender,
let localTrack
else { return }
sender.sender.track = nil
localTrack.isEnabled = false
log.debug("Local videoTrack trackId:\(localTrack.trackId) is now unpublished.")
}
guard let sender, let localTrack else { return }
sender.sender.track = nil
localTrack.isEnabled = false
log.debug("Local videoTrack trackId:\(localTrack.trackId) is now unpublished.")
}

/// Updates the local video media based on new call settings.
Expand Down Expand Up @@ -281,45 +271,39 @@ final class LocalVideoMediaAdapter: LocalMediaAdapting, @unchecked Sendable {
func changePublishQuality(
with activeEncodings: Set<String>
) {
Task { @MainActor [weak self] in
guard
let self,
let sender,
!activeEncodings.isEmpty
else {
return
}
guard let sender, !activeEncodings.isEmpty else {
return
}

var hasChanges = false
let params = sender
.sender
.parameters
var updatedEncodings = [RTCRtpEncodingParameters]()
var hasChanges = false
let params = sender
.sender
.parameters
var updatedEncodings = [RTCRtpEncodingParameters]()

for encoding in params.encodings {
guard let rid = encoding.rid else {
continue
}
let shouldEnable = activeEncodings.contains(rid)

switch (shouldEnable, encoding.isActive) {
case (true, true):
break
case (false, false):
break
default:
hasChanges = true
encoding.isActive = shouldEnable
}
updatedEncodings.append(encoding)
for encoding in params.encodings {
guard let rid = encoding.rid else {
continue
}

guard hasChanges else {
return
let shouldEnable = activeEncodings.contains(rid)

switch (shouldEnable, encoding.isActive) {
case (true, true):
break
case (false, false):
break
default:
hasChanges = true
encoding.isActive = shouldEnable
}
params.encodings = updatedEncodings
sender.sender.parameters = params
updatedEncodings.append(encoding)
}

guard hasChanges else {
return
}
params.encodings = updatedEncodings
sender.sender.parameters = params
}

// MARK: - Private helpers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,5 @@ protocol StreamRTCPeerConnectionProtocol: AnyObject {
func restartIce()

/// Closes the peer connection.
func close() async
func close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class RTCPeerConnectionCoordinator: @unchecked Sendable {
subsystems: subsystem
)
disposableBag.removeAll()
Task { [peerConnection] in await peerConnection.close() }
peerConnection.close()
}

func prepareForClosing() async {
Expand Down Expand Up @@ -366,7 +366,7 @@ class RTCPeerConnectionCoordinator: @unchecked Sendable {
}

/// Closes the peer connection.
func close() async {
func close() {
log.debug(
"""
Closing PeerConnection
Expand All @@ -378,7 +378,7 @@ class RTCPeerConnectionCoordinator: @unchecked Sendable {
subsystems: subsystem
)
disposableBag.removeAll()
await peerConnection.close()
peerConnection.close()
}

/// Restarts ICE for the peer connection.
Expand Down
Loading

0 comments on commit 8eb31fb

Please sign in to comment.