From c99c0431dd4b700ec9dd24bee9a95e7fba11ae3e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 4 Feb 2025 11:45:22 +0100 Subject: [PATCH] Add status for each priority --- .../lib/src/database/powersync_db_mixin.dart | 22 +++- .../lib/src/streaming_sync.dart | 106 +++++++++++++----- .../powersync_core/lib/src/sync_status.dart | 51 +++++++-- .../lib/src/web/sync_worker_protocol.dart | 22 ++++ 4 files changed, 157 insertions(+), 44 deletions(-) diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index a5fb93a5..8869c162 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -135,13 +135,27 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { } } - /// Returns a [Future] which will resolve once the first full sync has completed. - Future waitForFirstSync() async { - if (currentStatus.hasSynced ?? false) { + /// Returns a [Future] which will resolve once a synchronization operation has + /// completed. + /// + /// When [priority] is null (the default), this method waits for a full sync + /// operation to complete. When set to a [BucketPriority] however, it also + /// completes once a partial sync operation containing that priority has + /// completed. + Future waitForFirstSync({BucketPriority? priority}) async { + bool matches(SyncStatus status) { + if (priority == null) { + return status.hasSynced == true; + } else { + return status.statusForPriority(priority)?.hasSynced == true; + } + } + + if (matches(currentStatus)) { return; } await for (final result in statusStream) { - if (result.hasSynced ?? false) { + if (matches(result)) { break; } } diff --git a/packages/powersync_core/lib/src/streaming_sync.dart b/packages/powersync_core/lib/src/streaming_sync.dart index 80aeb10a..b4b6f214 100644 --- a/packages/powersync_core/lib/src/streaming_sync.dart +++ b/packages/powersync_core/lib/src/streaming_sync.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'dart:convert' as convert; +import 'package:collection/collection.dart'; import 'package:http/http.dart' as http; import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/exceptions.dart'; @@ -273,31 +274,62 @@ class StreamingSyncImplementation implements StreamingSync { return body['data']['write_checkpoint'] as String; } + void _updateStatusForPriority(SyncPriorityStatus completed) { + // Note: statusInPriority is sorted by priorities (ascending) + final existingPriorityState = lastStatus.statusInPriority; + + for (final (i, priority) in existingPriorityState.indexed) { + switch ( + BucketPriority.comparator(priority.priority, completed.priority)) { + case < 0: + // Entries from here on have a higher priority than the one that was + // just completed + final copy = existingPriorityState.toList(); + copy.insert(i, completed); + _updateStatus(statusInPriority: copy); + return; + case 0: + final copy = existingPriorityState.toList(); + copy[i] = completed; + _updateStatus(statusInPriority: copy); + return; + case > 0: + continue; + } + } + + _updateStatus(statusInPriority: [...existingPriorityState, completed]); + } + /// Update sync status based on any non-null parameters. /// To clear errors, use [_noError] instead of null. - void _updateStatus( - {DateTime? lastSyncedAt, - bool? hasSynced, - bool? connected, - bool? connecting, - bool? downloading, - bool? uploading, - Object? uploadError, - Object? downloadError}) { + void _updateStatus({ + DateTime? lastSyncedAt, + bool? hasSynced, + bool? connected, + bool? connecting, + bool? downloading, + bool? uploading, + Object? uploadError, + Object? downloadError, + List? statusInPriority, + }) { final c = connected ?? lastStatus.connected; var newStatus = SyncStatus( - connected: c, - connecting: !c && (connecting ?? lastStatus.connecting), - lastSyncedAt: lastSyncedAt ?? lastStatus.lastSyncedAt, - hasSynced: hasSynced ?? lastStatus.hasSynced, - downloading: downloading ?? lastStatus.downloading, - uploading: uploading ?? lastStatus.uploading, - uploadError: uploadError == _noError - ? null - : (uploadError ?? lastStatus.uploadError), - downloadError: downloadError == _noError - ? null - : (downloadError ?? lastStatus.downloadError)); + connected: c, + connecting: !c && (connecting ?? lastStatus.connecting), + lastSyncedAt: lastSyncedAt ?? lastStatus.lastSyncedAt, + hasSynced: hasSynced ?? lastStatus.hasSynced, + downloading: downloading ?? lastStatus.downloading, + uploading: uploading ?? lastStatus.uploading, + uploadError: uploadError == _noError + ? null + : (uploadError ?? lastStatus.uploadError), + downloadError: downloadError == _noError + ? null + : (downloadError ?? lastStatus.downloadError), + statusInPriority: statusInPriority ?? lastStatus.statusInPriority, + ); lastStatus = newStatus; _statusStreamController.add(newStatus); } @@ -371,10 +403,25 @@ class StreamingSyncImplementation implements StreamingSync { } else { appliedCheckpoint = targetCheckpoint; + final now = DateTime.now(); _updateStatus( - downloading: false, - downloadError: _noError, - lastSyncedAt: DateTime.now()); + downloading: false, + downloadError: _noError, + lastSyncedAt: now, + statusInPriority: [ + if (appliedCheckpoint.checksums.isNotEmpty) + ( + hasSynced: true, + lastSyncedAt: now, + priority: maxBy( + appliedCheckpoint.checksums + .map((cs) => BucketPriority(cs.priority)), + (priority) => priority, + compare: BucketPriority.comparator, + )!, + ) + ], + ); } validatedCheckpoint = targetCheckpoint; @@ -390,12 +437,11 @@ class StreamingSyncImplementation implements StreamingSync { // Checksums valid, but need more data for a consistent checkpoint. // Continue waiting. } else { - appliedCheckpoint = targetCheckpoint; - - _updateStatus( - downloading: false, - downloadError: _noError, - lastSyncedAt: DateTime.now()); + _updateStatusForPriority(( + priority: BucketPriority(bucketPriority), + lastSyncedAt: DateTime.now(), + hasSynced: true, + )); } validatedCheckpoint = targetCheckpoint; diff --git a/packages/powersync_core/lib/src/sync_status.dart b/packages/powersync_core/lib/src/sync_status.dart index d9546c4a..799e69bb 100644 --- a/packages/powersync_core/lib/src/sync_status.dart +++ b/packages/powersync_core/lib/src/sync_status.dart @@ -1,4 +1,6 @@ -class SyncStatus { +import 'package:collection/collection.dart'; + +final class SyncStatus { /// true if currently connected. /// /// This means the PowerSync connection is ready to download, and @@ -22,11 +24,11 @@ class SyncStatus { /// Time that a last sync has fully completed, if any. /// /// This is null while loading the database. - DateTime? get lastSyncedAt => statusInPriority.lastOrNull?.lastSyncedAt; + final DateTime? lastSyncedAt; /// Indicates whether there has been at least one full sync, if any. /// Is null when unknown, for example when state is still being loaded from the database. - bool? get hasSynced => statusInPriority.lastOrNull?.hasSynced; + final bool? hasSynced; /// Error during uploading. /// @@ -62,7 +64,8 @@ class SyncStatus { other.downloadError == downloadError && other.uploadError == uploadError && other.lastSyncedAt == lastSyncedAt && - other.hasSynced == hasSynced); + other.hasSynced == hasSynced && + _statusEquality.equals(other.statusInPriority, statusInPriority)); } SyncStatus copyWith({ @@ -74,6 +77,7 @@ class SyncStatus { Object? downloadError, DateTime? lastSyncedAt, bool? hasSynced, + List? statusInPriority, }) { return SyncStatus( connected: connected ?? this.connected, @@ -84,6 +88,7 @@ class SyncStatus { downloadError: downloadError ?? this.downloadError, lastSyncedAt: lastSyncedAt ?? this.lastSyncedAt, hasSynced: hasSynced ?? this.hasSynced, + statusInPriority: statusInPriority ?? this.statusInPriority, ); } @@ -92,31 +97,57 @@ class SyncStatus { return downloadError ?? uploadError; } + /// Returns [lastSyncedAt] and [hasSynced] information for a partial sync + /// operation, or `null` if the status for that priority is unknown. + SyncPriorityStatus? statusForPriority(BucketPriority priority) { + assert(statusInPriority.isSortedByCompare( + (e) => e.priority, BucketPriority.comparator)); + + for (final known in statusInPriority) { + // Lower-priority buckets are synchronized after higher-priority buckets, + // and since statusInPriority is sorted we look for the first entry that + // doesn't have a higher priority. + if (BucketPriority.comparator(known.priority, priority) <= 0) { + return known; + } + } + + return null; + } + @override int get hashCode { - return Object.hash(connected, downloading, uploading, connecting, - uploadError, downloadError, lastSyncedAt); + return Object.hash( + connected, + downloading, + uploading, + connecting, + uploadError, + downloadError, + lastSyncedAt, + _statusEquality.hash(statusInPriority)); } @override String toString() { return "SyncStatus"; } + + static const _statusEquality = ListEquality(); } /// The priority of a PowerSync bucket. extension type const BucketPriority._(int priorityNumber) { static const _highest = 0; - static const _lowests = 3; factory BucketPriority(int i) { - assert(i >= _highest && i <= _lowests); + assert(i >= _highest); return BucketPriority._(i); } /// A [Comparator] instance suitable for comparing [BucketPriority] values. - static Comparator comparator = - (a, b) => -a.priorityNumber.compareTo(b.priorityNumber); + static int comparator(BucketPriority a, BucketPriority b) => + -a.priorityNumber.compareTo(b.priorityNumber); } /// Partial information about the synchronization status for buckets within a diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index 9d8b8484..4713b392 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -157,6 +157,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { required bool? hasSyned, required String? uploadError, required String? downloadError, + required JSArray? statusInPriority, }); factory SerializedSyncStatus.from(SyncStatus status) { @@ -169,6 +170,14 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { hasSyned: status.hasSynced, uploadError: status.uploadError?.toString(), downloadError: status.downloadError?.toString(), + statusInPriority: [ + for (final entry in status.statusInPriority) + [ + entry.priority.priorityNumber.toJS, + entry.lastSyncedAt.microsecondsSinceEpoch.toJS, + entry.hasSynced.toJS, + ].toJS + ].toJS, ); } @@ -180,6 +189,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { external bool? hasSynced; external String? uploadError; external String? downloadError; + external JSArray? statusInPriority; SyncStatus asSyncStatus() { return SyncStatus( @@ -193,6 +203,18 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { hasSynced: hasSynced, uploadError: uploadError, downloadError: downloadError, + statusInPriority: statusInPriority?.toDart.map((e) { + final [rawPriority, rawSynced, rawHasSynced, ...] = + (e as JSArray).toDart; + + return ( + priority: BucketPriority((rawPriority as JSNumber).toDartInt), + lastSyncedAt: DateTime.fromMicrosecondsSinceEpoch( + (rawSynced as JSNumber).toDartInt), + hasSynced: (rawHasSynced as JSBoolean).toDart, + ); + }).toList() ?? + const [], ); } }