Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create SyncOrchestrator #4176

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import androidx.compose.runtime.Composable
import androidx.compose.runtime.collectAsState
import androidx.compose.runtime.getValue
import androidx.compose.ui.Modifier
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.lifecycleScope
import androidx.lifecycle.repeatOnLifecycle
import com.bumble.appyx.core.composable.PermanentChild
import com.bumble.appyx.core.lifecycle.subscribe
import com.bumble.appyx.core.modality.BuildContext
Expand Down Expand Up @@ -52,8 +50,6 @@ import io.element.android.features.ftue.api.FtueEntryPoint
import io.element.android.features.ftue.api.state.FtueService
import io.element.android.features.ftue.api.state.FtueState
import io.element.android.features.logout.api.LogoutEntryPoint
import io.element.android.features.networkmonitor.api.NetworkMonitor
import io.element.android.features.networkmonitor.api.NetworkStatus
import io.element.android.features.preferences.api.PreferencesEntryPoint
import io.element.android.features.roomdirectory.api.RoomDescription
import io.element.android.features.roomdirectory.api.RoomDirectoryEntryPoint
Expand All @@ -77,18 +73,13 @@ import io.element.android.libraries.matrix.api.core.RoomIdOrAlias
import io.element.android.libraries.matrix.api.core.UserId
import io.element.android.libraries.matrix.api.core.toRoomIdOrAlias
import io.element.android.libraries.matrix.api.permalink.PermalinkData
import io.element.android.libraries.matrix.api.sync.SyncState
import io.element.android.libraries.matrix.api.verification.SessionVerificationRequestDetails
import io.element.android.libraries.matrix.api.verification.SessionVerificationServiceListener
import io.element.android.libraries.preferences.api.store.EnableNativeSlidingSyncUseCase
import io.element.android.services.appnavstate.api.AppNavigationStateService
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch
import kotlinx.parcelize.Parcelize
import timber.log.Timber
Expand All @@ -107,7 +98,6 @@ class LoggedInFlowNode @AssistedInject constructor(
private val userProfileEntryPoint: UserProfileEntryPoint,
private val ftueEntryPoint: FtueEntryPoint,
private val coroutineScope: CoroutineScope,
private val networkMonitor: NetworkMonitor,
private val ftueService: FtueService,
private val roomDirectoryEntryPoint: RoomDirectoryEntryPoint,
private val shareEntryPoint: ShareEntryPoint,
Expand All @@ -116,6 +106,7 @@ class LoggedInFlowNode @AssistedInject constructor(
private val logoutEntryPoint: LogoutEntryPoint,
private val incomingVerificationEntryPoint: IncomingVerificationEntryPoint,
private val enableNativeSlidingSyncUseCase: EnableNativeSlidingSyncUseCase,
private val syncOrchestrator: SyncOrchestrator,
snackbarDispatcher: SnackbarDispatcher,
) : BaseFlowNode<LoggedInFlowNode.NavTarget>(
backstack = BackStack(
Expand Down Expand Up @@ -147,6 +138,9 @@ class LoggedInFlowNode @AssistedInject constructor(

override fun onBuilt() {
super.onBuilt()

syncOrchestrator.start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code will not be run if the app is killed and a Push is received. In this case the sync will not start?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right, and I completely overlooked this... This complicates things, as we would have to use SyncOrchestrator in the AppScope and that means holding as many as MatrixClients there are in MatrixClientsHolder, maybe following a similar approach 🫤 .


lifecycle.subscribe(
onCreate = {
appNavigationStateService.onNavigateToSession(id, matrixClient.sessionId)
Expand All @@ -165,52 +159,20 @@ class LoggedInFlowNode @AssistedInject constructor(
}
.launchIn(lifecycleScope)
},
onStop = {
coroutineScope.launch {
// Counterpart startSync is done in observeSyncStateAndNetworkStatus method.
syncService.stopSync()
}
},
onDestroy = {
appNavigationStateService.onLeavingSpace(id)
appNavigationStateService.onLeavingSession(id)
loggedInFlowProcessor.stopObserving()
matrixClient.sessionVerificationService().setListener(null)
}
)
observeSyncStateAndNetworkStatus()
setupSendingQueue()
}

private fun setupSendingQueue() {
sendingQueue.launchIn(lifecycleScope)
}

@OptIn(FlowPreview::class)
private fun observeSyncStateAndNetworkStatus() {
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
combine(
// small debounce to avoid spamming startSync when the state is changing quickly in case of error.
syncService.syncState.debounce(100),
networkMonitor.connectivity
) { syncState, networkStatus ->
Pair(syncState, networkStatus)
}
.onStart {
// Temporary fix to ensure that the sync is started even if the networkStatus is offline.
syncService.startSync()
}
.collect { (syncState, networkStatus) ->
Timber.d("Sync state: $syncState, network status: $networkStatus")
if (syncState != SyncState.Running && networkStatus == NetworkStatus.Online) {
syncService.startSync()
}
}
}
}
}

sealed interface NavTarget : Parcelable {
@Parcelize
data object Placeholder : NavTarget
Expand Down
135 changes: 135 additions & 0 deletions appnav/src/main/kotlin/io/element/android/appnav/SyncOrchestrator.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/

package io.element.android.appnav

import io.element.android.features.networkmonitor.api.NetworkMonitor
import io.element.android.features.networkmonitor.api.NetworkStatus
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.di.SessionScope
import io.element.android.libraries.di.SingleIn
import io.element.android.libraries.di.annotations.SessionCoroutineScope
import io.element.android.libraries.matrix.api.MatrixClient
import io.element.android.libraries.matrix.api.sync.SyncState
import io.element.android.services.appnavstate.api.AppForegroundStateService
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import timber.log.Timber
import javax.inject.Inject
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds

/**
* Observes the app state and network state to start/stop the sync service.
*/
@SingleIn(SessionScope::class)
class SyncOrchestrator @Inject constructor(
matrixClient: MatrixClient,
private val appForegroundStateService: AppForegroundStateService,
private val networkMonitor: NetworkMonitor,
@SessionCoroutineScope private val sessionCoroutineScope: CoroutineScope,
private val dispatchers: CoroutineDispatchers,
) {
private val syncService = matrixClient.syncService()

private val initialSyncMutex = Mutex()

private var coroutineScope: CoroutineScope? = null

private val tag = "SyncOrchestrator"

/**
* Starting observing the app state and network state to start/stop the sync service.
*
* Before observing the state, a first attempt at starting the sync service will happen if it's not already running.
*/
@OptIn(FlowPreview::class)
fun start() {
Timber.tag(tag).d("start observing the app and network state")

if (syncService.syncState.value != SyncState.Running) {
Timber.tag(tag).d("initial startSync")
sessionCoroutineScope.launch(dispatchers.io) {
try {
initialSyncMutex.lock()
syncService.startSync()

// Wait until it's running
syncService.syncState.first { it == SyncState.Running }
} finally {
initialSyncMutex.unlock()
}
}
}

coroutineScope = CoroutineScope(sessionCoroutineScope.coroutineContext + CoroutineName(tag) + dispatchers.io)

coroutineScope?.launch {
// Wait until the initial sync is done, either successfully or failing
initialSyncMutex.lock()

combine(
// small debounce to avoid spamming startSync when the state is changing quickly in case of error.
syncService.syncState.debounce(100.milliseconds),
networkMonitor.connectivity,
appForegroundStateService.isInForeground,
appForegroundStateService.isInCall,
appForegroundStateService.isSyncingNotificationEvent,
) { syncState, networkState, isInForeground, isInCall, isSyncingNotificationEvent ->
val isAppActive = isInForeground || isInCall || isSyncingNotificationEvent
val isNetworkAvailable = networkState == NetworkStatus.Online

Timber.tag(tag).d("isAppActive=$isAppActive, isNetworkAvailable=$isNetworkAvailable")
if (syncState == SyncState.Running && (!isAppActive || !isNetworkAvailable)) {
// Don't stop the sync immediately, wait a bit to avoid starting/stopping the sync too often
delay(3.seconds)
SyncObserverAction.StopSync
} else if (syncState != SyncState.Running && isAppActive && isNetworkAvailable) {
SyncObserverAction.StartSync
} else {
SyncObserverAction.NoOp
}
}
.distinctUntilChanged()
.collect { action ->
when (action) {
SyncObserverAction.StartSync -> {
syncService.startSync()
}
SyncObserverAction.StopSync -> {
syncService.stopSync()
}
SyncObserverAction.NoOp -> Unit
}
}
}
}

/**
* Stop observing the app state and network state.
*/
fun stop() {
Timber.tag(tag).d("stop observing the app and network state")
coroutineScope?.cancel()
coroutineScope = null
}
}

private enum class SyncObserverAction {
StartSync,
StopSync,
NoOp,
}
Loading
Loading