Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create SyncOrchestrator #4176

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import io.element.android.libraries.matrix.api.core.RoomIdOrAlias
import io.element.android.libraries.matrix.api.core.UserId
import io.element.android.libraries.matrix.api.core.toRoomIdOrAlias
import io.element.android.libraries.matrix.api.permalink.PermalinkData
import io.element.android.libraries.matrix.api.sync.SyncOrchestratorProvider
import io.element.android.libraries.matrix.api.verification.SessionVerificationRequestDetails
import io.element.android.libraries.matrix.api.verification.SessionVerificationServiceListener
import io.element.android.libraries.preferences.api.store.EnableNativeSlidingSyncUseCase
Expand Down Expand Up @@ -106,7 +107,7 @@ class LoggedInFlowNode @AssistedInject constructor(
private val logoutEntryPoint: LogoutEntryPoint,
private val incomingVerificationEntryPoint: IncomingVerificationEntryPoint,
private val enableNativeSlidingSyncUseCase: EnableNativeSlidingSyncUseCase,
private val syncOrchestrator: SyncOrchestrator,
private val syncOrchestratorProvider: SyncOrchestratorProvider,
snackbarDispatcher: SnackbarDispatcher,
) : BaseFlowNode<LoggedInFlowNode.NavTarget>(
backstack = BackStack(
Expand All @@ -124,7 +125,6 @@ class LoggedInFlowNode @AssistedInject constructor(
fun onOpenBugReport()
}

private val syncService = matrixClient.syncService()
private val loggedInFlowProcessor = LoggedInEventProcessor(
snackbarDispatcher,
matrixClient.roomMembershipObserver(),
Expand All @@ -139,7 +139,7 @@ class LoggedInFlowNode @AssistedInject constructor(
override fun onBuilt() {
super.onBuilt()

syncOrchestrator.start()
syncOrchestratorProvider.get(sessionId = matrixClient.sessionId)?.start()

lifecycle.subscribe(
onCreate = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
* Please see LICENSE files in the repository root for full details.
*/

package io.element.android.appnav
package io.element.android.appnav.di

import dagger.assisted.Assisted
import dagger.assisted.AssistedFactory
import dagger.assisted.AssistedInject
import io.element.android.features.networkmonitor.api.NetworkMonitor
import io.element.android.features.networkmonitor.api.NetworkStatus
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.di.SessionScope
import io.element.android.libraries.di.SingleIn
import io.element.android.libraries.di.annotations.SessionCoroutineScope
import io.element.android.libraries.matrix.api.MatrixClient
import io.element.android.libraries.matrix.api.sync.SyncOrchestrator
import io.element.android.libraries.matrix.api.sync.SyncState
import io.element.android.services.appnavstate.api.AppForegroundStateService
import kotlinx.coroutines.CoroutineName
Expand All @@ -28,21 +29,22 @@
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import timber.log.Timber
import javax.inject.Inject
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds

/**
* Observes the app state and network state to start/stop the sync service.
*/
@SingleIn(SessionScope::class)
class SyncOrchestrator @Inject constructor(
matrixClient: MatrixClient,
class DefaultSyncOrchestrator @AssistedInject constructor(
@Assisted matrixClient: MatrixClient,
private val baseCoroutineScope: CoroutineScope = matrixClient.sessionCoroutineScope,
private val appForegroundStateService: AppForegroundStateService,
private val networkMonitor: NetworkMonitor,
@SessionCoroutineScope private val sessionCoroutineScope: CoroutineScope,
private val dispatchers: CoroutineDispatchers,
) {
) : SyncOrchestrator {
@AssistedFactory
interface Factory {
fun create(matrixClient: MatrixClient): DefaultSyncOrchestrator
}

private val syncService = matrixClient.syncService()

private val initialSyncMutex = Mutex()
Expand All @@ -51,18 +53,25 @@

private val tag = "SyncOrchestrator"

private val started = AtomicBoolean(false)

/**
* Starting observing the app state and network state to start/stop the sync service.
*
* Before observing the state, a first attempt at starting the sync service will happen if it's not already running.
*/
@OptIn(FlowPreview::class)
fun start() {
override fun start() {
if (!started.compareAndSet(false, true)) {
Timber.tag(tag).d("already started, exiting early")

Check warning on line 66 in appnav/src/main/kotlin/io/element/android/appnav/di/DefaultSyncOrchestrator.kt

View check run for this annotation

Codecov / codecov/patch

appnav/src/main/kotlin/io/element/android/appnav/di/DefaultSyncOrchestrator.kt#L66

Added line #L66 was not covered by tests
return
}

Timber.tag(tag).d("start observing the app and network state")

if (syncService.syncState.value != SyncState.Running) {
Timber.tag(tag).d("initial startSync")
sessionCoroutineScope.launch(dispatchers.io) {
baseCoroutineScope.launch(dispatchers.io) {
try {
initialSyncMutex.lock()
syncService.startSync()
Expand All @@ -75,7 +84,7 @@
}
}

coroutineScope = CoroutineScope(sessionCoroutineScope.coroutineContext + CoroutineName(tag) + dispatchers.io)
coroutineScope = CoroutineScope(baseCoroutineScope.coroutineContext + CoroutineName(tag) + dispatchers.io)

coroutineScope?.launch {
// Wait until the initial sync is done, either successfully or failing
Expand All @@ -96,23 +105,23 @@
if (syncState == SyncState.Running && (!isAppActive || !isNetworkAvailable)) {
// Don't stop the sync immediately, wait a bit to avoid starting/stopping the sync too often
delay(3.seconds)
SyncObserverAction.StopSync
SyncStateAction.StopSync
} else if (syncState != SyncState.Running && isAppActive && isNetworkAvailable) {
SyncObserverAction.StartSync
SyncStateAction.StartSync
} else {
SyncObserverAction.NoOp
SyncStateAction.NoOp
}
}
.distinctUntilChanged()
.collect { action ->
when (action) {
SyncObserverAction.StartSync -> {
SyncStateAction.StartSync -> {
syncService.startSync()
}
SyncObserverAction.StopSync -> {
SyncStateAction.StopSync -> {
syncService.stopSync()
}
SyncObserverAction.NoOp -> Unit
SyncStateAction.NoOp -> Unit
}
}
}
Expand All @@ -121,14 +130,18 @@
/**
* Stop observing the app state and network state.
*/
fun stop() {
override fun stop() {
if (!started.compareAndSet(true, false)) {
Timber.tag(tag).d("already stopped, exiting early")

Check warning on line 135 in appnav/src/main/kotlin/io/element/android/appnav/di/DefaultSyncOrchestrator.kt

View check run for this annotation

Codecov / codecov/patch

appnav/src/main/kotlin/io/element/android/appnav/di/DefaultSyncOrchestrator.kt#L135

Added line #L135 was not covered by tests
return
}
Timber.tag(tag).d("stop observing the app and network state")
coroutineScope?.cancel()
coroutineScope = null
}
}

private enum class SyncObserverAction {
private enum class SyncStateAction {
StartSync,
StopSync,
NoOp,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/

package io.element.android.appnav.di

import com.squareup.anvil.annotations.ContributesBinding
import io.element.android.libraries.di.AppScope
import io.element.android.libraries.di.SingleIn
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.matrix.api.sync.SyncOrchestrator
import io.element.android.libraries.matrix.api.sync.SyncOrchestratorProvider
import javax.inject.Inject

@SingleIn(AppScope::class)
@ContributesBinding(AppScope::class)
class DefaultSyncOrchestratorProvider @Inject constructor(
private val matrixClientsHolder: MatrixClientsHolder,

Check warning on line 21 in appnav/src/main/kotlin/io/element/android/appnav/di/DefaultSyncOrchestratorProvider.kt

View check run for this annotation

Codecov / codecov/patch

appnav/src/main/kotlin/io/element/android/appnav/di/DefaultSyncOrchestratorProvider.kt#L20-L21

Added lines #L20 - L21 were not covered by tests
) : SyncOrchestratorProvider {
override fun get(sessionId: SessionId): SyncOrchestrator? {
return matrixClientsHolder.getSyncOrchestratorOrNull(sessionId)

Check warning on line 24 in appnav/src/main/kotlin/io/element/android/appnav/di/DefaultSyncOrchestratorProvider.kt

View check run for this annotation

Codecov / codecov/patch

appnav/src/main/kotlin/io/element/android/appnav/di/DefaultSyncOrchestratorProvider.kt#L24

Added line #L24 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import io.element.android.libraries.matrix.api.MatrixClient
import io.element.android.libraries.matrix.api.MatrixClientProvider
import io.element.android.libraries.matrix.api.auth.MatrixAuthenticationService
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.matrix.api.sync.SyncOrchestrator
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
Expand All @@ -29,13 +30,14 @@ private const val SAVE_INSTANCE_KEY = "io.element.android.x.di.MatrixClientsHold
@ContributesBinding(AppScope::class)
class MatrixClientsHolder @Inject constructor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should rename this class so it's a bit clearer it's not just about MatrixClients?
Also we might want to create a class instead of using Pair?

I don't think we need the DefaultSyncOrchestratorProvider, we should let MatrixClientsHolder (with a new name :P) implements the SyncOrchestratorProvider

private val authenticationService: MatrixAuthenticationService,
private val syncOrchestratorFactory: DefaultSyncOrchestrator.Factory,
) : MatrixClientProvider {
private val sessionIdsToMatrixClient = ConcurrentHashMap<SessionId, MatrixClient>()
private val sessionIdsToMatrixClient = ConcurrentHashMap<SessionId, Pair<MatrixClient, SyncOrchestrator>>()
private val restoreMutex = Mutex()

init {
authenticationService.listenToNewMatrixClients { matrixClient ->
sessionIdsToMatrixClient[matrixClient.sessionId] = matrixClient
sessionIdsToMatrixClient[matrixClient.sessionId] = matrixClient to syncOrchestratorFactory.create(matrixClient)
}
}

Expand All @@ -48,18 +50,22 @@ class MatrixClientsHolder @Inject constructor(
}

override fun getOrNull(sessionId: SessionId): MatrixClient? {
return sessionIdsToMatrixClient[sessionId]
return sessionIdsToMatrixClient[sessionId]?.first
}

override suspend fun getOrRestore(sessionId: SessionId): Result<MatrixClient> {
return restoreMutex.withLock {
when (val matrixClient = getOrNull(sessionId)) {
when (val cached = getOrNull(sessionId)) {
null -> restore(sessionId)
else -> Result.success(matrixClient)
else -> Result.success(cached)
}
}
}

internal fun getSyncOrchestratorOrNull(sessionId: SessionId): SyncOrchestrator? {
return sessionIdsToMatrixClient[sessionId]?.second
}

@Suppress("UNCHECKED_CAST")
fun restoreWithSavedState(state: SavedStateMap?) {
Timber.d("Restore state")
Expand Down Expand Up @@ -88,7 +94,7 @@ class MatrixClientsHolder @Inject constructor(
Timber.d("Restore matrix session: $sessionId")
return authenticationService.restoreSession(sessionId)
.onSuccess { matrixClient ->
sessionIdsToMatrixClient[matrixClient.sessionId] = matrixClient
sessionIdsToMatrixClient[matrixClient.sessionId] = matrixClient to syncOrchestratorFactory.create(matrixClient)
}
.onFailure {
Timber.e(it, "Fail to restore session")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package io.element.android.appnav

import io.element.android.appnav.di.DefaultSyncOrchestrator
import io.element.android.features.networkmonitor.api.NetworkStatus
import io.element.android.features.networkmonitor.test.FakeNetworkMonitor
import io.element.android.libraries.matrix.api.sync.SyncState
Expand All @@ -29,7 +30,7 @@ import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds

@OptIn(ExperimentalCoroutinesApi::class)
class SyncOrchestratorTest {
class DefaultSyncOrchestratorTest {
@get:Rule
val warmUpRule = WarmUpRule()

Expand Down Expand Up @@ -322,11 +323,11 @@ class SyncOrchestratorTest {
syncService: FakeSyncService = FakeSyncService(),
networkMonitor: FakeNetworkMonitor = FakeNetworkMonitor(),
appForegroundStateService: FakeAppForegroundStateService = FakeAppForegroundStateService(),
) = SyncOrchestrator(
) = DefaultSyncOrchestrator(
matrixClient = FakeMatrixClient(syncService = syncService),
networkMonitor = networkMonitor,
appForegroundStateService = appForegroundStateService,
sessionCoroutineScope = CoroutineScope(coroutineContext + SupervisorJob()),
baseCoroutineScope = CoroutineScope(coroutineContext + SupervisorJob()),
dispatchers = testCoroutineDispatchers(),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,47 @@ package io.element.android.appnav.di

import com.bumble.appyx.core.state.MutableSavedStateMapImpl
import com.google.common.truth.Truth.assertThat
import io.element.android.features.networkmonitor.test.FakeNetworkMonitor
import io.element.android.libraries.matrix.api.MatrixClient
import io.element.android.libraries.matrix.test.A_SESSION_ID
import io.element.android.libraries.matrix.test.FakeMatrixClient
import io.element.android.libraries.matrix.test.auth.FakeMatrixAuthenticationService
import io.element.android.services.appnavstate.test.FakeAppForegroundStateService
import io.element.android.tests.testutils.testCoroutineDispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import org.junit.Test

class MatrixClientsHolderTest {
@Test
fun `test getOrNull`() {
fun `test getOrNull`() = runTest {
val fakeAuthenticationService = FakeMatrixAuthenticationService()
val matrixClientsHolder = MatrixClientsHolder(fakeAuthenticationService)
val matrixClientsHolder = MatrixClientsHolder(fakeAuthenticationService, createSyncOrchestratorFactory())
assertThat(matrixClientsHolder.getOrNull(A_SESSION_ID)).isNull()
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `test getSyncOrchestratorOrNull`() = runTest {
val fakeAuthenticationService = FakeMatrixAuthenticationService()
val matrixClientsHolder = MatrixClientsHolder(fakeAuthenticationService, createSyncOrchestratorFactory())

// With no matrix client there is no sync orchestrator
assertThat(matrixClientsHolder.getOrNull(A_SESSION_ID)).isNull()
assertThat(matrixClientsHolder.getSyncOrchestratorOrNull(A_SESSION_ID)).isNull()

// But as soon as we receive a client, we can get the sync orchestrator
val fakeMatrixClient = FakeMatrixClient()
fakeAuthenticationService.givenMatrixClient(fakeMatrixClient)
assertThat(matrixClientsHolder.getOrRestore(A_SESSION_ID).getOrNull()).isEqualTo(fakeMatrixClient)
assertThat(matrixClientsHolder.getSyncOrchestratorOrNull(A_SESSION_ID)).isNotNull()
}

@Test
fun `test getOrRestore`() = runTest {
val fakeAuthenticationService = FakeMatrixAuthenticationService()
val matrixClientsHolder = MatrixClientsHolder(fakeAuthenticationService)
val matrixClientsHolder = MatrixClientsHolder(fakeAuthenticationService, createSyncOrchestratorFactory())
val fakeMatrixClient = FakeMatrixClient()
fakeAuthenticationService.givenMatrixClient(fakeMatrixClient)
assertThat(matrixClientsHolder.getOrNull(A_SESSION_ID)).isNull()
Expand All @@ -39,7 +62,7 @@ class MatrixClientsHolderTest {
@Test
fun `test remove`() = runTest {
val fakeAuthenticationService = FakeMatrixAuthenticationService()
val matrixClientsHolder = MatrixClientsHolder(fakeAuthenticationService)
val matrixClientsHolder = MatrixClientsHolder(fakeAuthenticationService, createSyncOrchestratorFactory())
val fakeMatrixClient = FakeMatrixClient()
fakeAuthenticationService.givenMatrixClient(fakeMatrixClient)
assertThat(matrixClientsHolder.getOrRestore(A_SESSION_ID).getOrNull()).isEqualTo(fakeMatrixClient)
Expand All @@ -52,7 +75,7 @@ class MatrixClientsHolderTest {
@Test
fun `test remove all`() = runTest {
val fakeAuthenticationService = FakeMatrixAuthenticationService()
val matrixClientsHolder = MatrixClientsHolder(fakeAuthenticationService)
val matrixClientsHolder = MatrixClientsHolder(fakeAuthenticationService, createSyncOrchestratorFactory())
val fakeMatrixClient = FakeMatrixClient()
fakeAuthenticationService.givenMatrixClient(fakeMatrixClient)
assertThat(matrixClientsHolder.getOrRestore(A_SESSION_ID).getOrNull()).isEqualTo(fakeMatrixClient)
Expand All @@ -65,7 +88,7 @@ class MatrixClientsHolderTest {
@Test
fun `test save and restore`() = runTest {
val fakeAuthenticationService = FakeMatrixAuthenticationService()
val matrixClientsHolder = MatrixClientsHolder(fakeAuthenticationService)
val matrixClientsHolder = MatrixClientsHolder(fakeAuthenticationService, createSyncOrchestratorFactory())
val fakeMatrixClient = FakeMatrixClient()
fakeAuthenticationService.givenMatrixClient(fakeMatrixClient)
matrixClientsHolder.getOrRestore(A_SESSION_ID)
Expand All @@ -85,7 +108,7 @@ class MatrixClientsHolderTest {
@Test
fun `test AuthenticationService listenToNewMatrixClients emits a Client value and we save it`() = runTest {
val fakeAuthenticationService = FakeMatrixAuthenticationService()
val matrixClientsHolder = MatrixClientsHolder(fakeAuthenticationService)
val matrixClientsHolder = MatrixClientsHolder(fakeAuthenticationService, createSyncOrchestratorFactory())
assertThat(matrixClientsHolder.getOrNull(A_SESSION_ID)).isNull()

fakeAuthenticationService.givenMatrixClient(FakeMatrixClient(sessionId = A_SESSION_ID))
Expand All @@ -94,4 +117,16 @@ class MatrixClientsHolderTest {
assertThat(loginSucceeded.isSuccess).isTrue()
assertThat(matrixClientsHolder.getOrNull(A_SESSION_ID)).isNotNull()
}

private fun TestScope.createSyncOrchestratorFactory() = object : DefaultSyncOrchestrator.Factory {
override fun create(matrixClient: MatrixClient): DefaultSyncOrchestrator {
return DefaultSyncOrchestrator(
matrixClient,
baseCoroutineScope = this@createSyncOrchestratorFactory,
appForegroundStateService = FakeAppForegroundStateService(),
networkMonitor = FakeNetworkMonitor(),
dispatchers = testCoroutineDispatchers(),
)
}
}
}
Loading
Loading