Skip to content

Commit

Permalink
Add status for each priority
Browse files Browse the repository at this point in the history
  • Loading branch information
simolus3 committed Feb 4, 2025
1 parent f423ffd commit c99c043
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 44 deletions.
22 changes: 18 additions & 4 deletions packages/powersync_core/lib/src/database/powersync_db_mixin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,27 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
}
}

/// Returns a [Future] which will resolve once the first full sync has completed.
Future<void> waitForFirstSync() async {
if (currentStatus.hasSynced ?? false) {
/// Returns a [Future] which will resolve once a synchronization operation has
/// completed.
///
/// When [priority] is null (the default), this method waits for a full sync
/// operation to complete. When set to a [BucketPriority] however, it also
/// completes once a partial sync operation containing that priority has
/// completed.
Future<void> waitForFirstSync({BucketPriority? priority}) async {
bool matches(SyncStatus status) {
if (priority == null) {
return status.hasSynced == true;
} else {
return status.statusForPriority(priority)?.hasSynced == true;
}
}

if (matches(currentStatus)) {
return;
}
await for (final result in statusStream) {
if (result.hasSynced ?? false) {
if (matches(result)) {
break;
}
}
Expand Down
106 changes: 76 additions & 30 deletions packages/powersync_core/lib/src/streaming_sync.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:convert' as convert;

import 'package:collection/collection.dart';
import 'package:http/http.dart' as http;
import 'package:powersync_core/src/abort_controller.dart';
import 'package:powersync_core/src/exceptions.dart';
Expand Down Expand Up @@ -273,31 +274,62 @@ class StreamingSyncImplementation implements StreamingSync {
return body['data']['write_checkpoint'] as String;
}

void _updateStatusForPriority(SyncPriorityStatus completed) {
// Note: statusInPriority is sorted by priorities (ascending)
final existingPriorityState = lastStatus.statusInPriority;

for (final (i, priority) in existingPriorityState.indexed) {
switch (
BucketPriority.comparator(priority.priority, completed.priority)) {
case < 0:
// Entries from here on have a higher priority than the one that was
// just completed
final copy = existingPriorityState.toList();
copy.insert(i, completed);
_updateStatus(statusInPriority: copy);
return;
case 0:
final copy = existingPriorityState.toList();
copy[i] = completed;
_updateStatus(statusInPriority: copy);
return;
case > 0:
continue;
}
}

_updateStatus(statusInPriority: [...existingPriorityState, completed]);
}

/// Update sync status based on any non-null parameters.
/// To clear errors, use [_noError] instead of null.
void _updateStatus(
{DateTime? lastSyncedAt,
bool? hasSynced,
bool? connected,
bool? connecting,
bool? downloading,
bool? uploading,
Object? uploadError,
Object? downloadError}) {
void _updateStatus({
DateTime? lastSyncedAt,
bool? hasSynced,
bool? connected,
bool? connecting,
bool? downloading,
bool? uploading,
Object? uploadError,
Object? downloadError,
List<SyncPriorityStatus>? statusInPriority,
}) {
final c = connected ?? lastStatus.connected;
var newStatus = SyncStatus(
connected: c,
connecting: !c && (connecting ?? lastStatus.connecting),
lastSyncedAt: lastSyncedAt ?? lastStatus.lastSyncedAt,
hasSynced: hasSynced ?? lastStatus.hasSynced,
downloading: downloading ?? lastStatus.downloading,
uploading: uploading ?? lastStatus.uploading,
uploadError: uploadError == _noError
? null
: (uploadError ?? lastStatus.uploadError),
downloadError: downloadError == _noError
? null
: (downloadError ?? lastStatus.downloadError));
connected: c,
connecting: !c && (connecting ?? lastStatus.connecting),
lastSyncedAt: lastSyncedAt ?? lastStatus.lastSyncedAt,
hasSynced: hasSynced ?? lastStatus.hasSynced,
downloading: downloading ?? lastStatus.downloading,
uploading: uploading ?? lastStatus.uploading,
uploadError: uploadError == _noError
? null
: (uploadError ?? lastStatus.uploadError),
downloadError: downloadError == _noError
? null
: (downloadError ?? lastStatus.downloadError),
statusInPriority: statusInPriority ?? lastStatus.statusInPriority,
);
lastStatus = newStatus;
_statusStreamController.add(newStatus);
}
Expand Down Expand Up @@ -371,10 +403,25 @@ class StreamingSyncImplementation implements StreamingSync {
} else {
appliedCheckpoint = targetCheckpoint;

final now = DateTime.now();
_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: DateTime.now());
downloading: false,
downloadError: _noError,
lastSyncedAt: now,
statusInPriority: [
if (appliedCheckpoint.checksums.isNotEmpty)
(
hasSynced: true,
lastSyncedAt: now,
priority: maxBy(
appliedCheckpoint.checksums
.map((cs) => BucketPriority(cs.priority)),
(priority) => priority,
compare: BucketPriority.comparator,
)!,
)
],
);
}

validatedCheckpoint = targetCheckpoint;
Expand All @@ -390,12 +437,11 @@ class StreamingSyncImplementation implements StreamingSync {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
} else {
appliedCheckpoint = targetCheckpoint;

_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: DateTime.now());
_updateStatusForPriority((
priority: BucketPriority(bucketPriority),
lastSyncedAt: DateTime.now(),
hasSynced: true,
));
}

validatedCheckpoint = targetCheckpoint;
Expand Down
51 changes: 41 additions & 10 deletions packages/powersync_core/lib/src/sync_status.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
class SyncStatus {
import 'package:collection/collection.dart';

final class SyncStatus {
/// true if currently connected.
///
/// This means the PowerSync connection is ready to download, and
Expand All @@ -22,11 +24,11 @@ class SyncStatus {
/// Time that a last sync has fully completed, if any.
///
/// This is null while loading the database.
DateTime? get lastSyncedAt => statusInPriority.lastOrNull?.lastSyncedAt;
final DateTime? lastSyncedAt;

/// Indicates whether there has been at least one full sync, if any.
/// Is null when unknown, for example when state is still being loaded from the database.
bool? get hasSynced => statusInPriority.lastOrNull?.hasSynced;
final bool? hasSynced;

/// Error during uploading.
///
Expand Down Expand Up @@ -62,7 +64,8 @@ class SyncStatus {
other.downloadError == downloadError &&
other.uploadError == uploadError &&
other.lastSyncedAt == lastSyncedAt &&
other.hasSynced == hasSynced);
other.hasSynced == hasSynced &&
_statusEquality.equals(other.statusInPriority, statusInPriority));
}

SyncStatus copyWith({
Expand All @@ -74,6 +77,7 @@ class SyncStatus {
Object? downloadError,
DateTime? lastSyncedAt,
bool? hasSynced,
List<SyncPriorityStatus>? statusInPriority,
}) {
return SyncStatus(
connected: connected ?? this.connected,
Expand All @@ -84,6 +88,7 @@ class SyncStatus {
downloadError: downloadError ?? this.downloadError,
lastSyncedAt: lastSyncedAt ?? this.lastSyncedAt,
hasSynced: hasSynced ?? this.hasSynced,
statusInPriority: statusInPriority ?? this.statusInPriority,
);
}

Expand All @@ -92,31 +97,57 @@ class SyncStatus {
return downloadError ?? uploadError;
}

/// Returns [lastSyncedAt] and [hasSynced] information for a partial sync
/// operation, or `null` if the status for that priority is unknown.
SyncPriorityStatus? statusForPriority(BucketPriority priority) {
assert(statusInPriority.isSortedByCompare(
(e) => e.priority, BucketPriority.comparator));

for (final known in statusInPriority) {
// Lower-priority buckets are synchronized after higher-priority buckets,
// and since statusInPriority is sorted we look for the first entry that
// doesn't have a higher priority.
if (BucketPriority.comparator(known.priority, priority) <= 0) {
return known;
}
}

return null;
}

@override
int get hashCode {
return Object.hash(connected, downloading, uploading, connecting,
uploadError, downloadError, lastSyncedAt);
return Object.hash(
connected,
downloading,
uploading,
connecting,
uploadError,
downloadError,
lastSyncedAt,
_statusEquality.hash(statusInPriority));
}

@override
String toString() {
return "SyncStatus<connected: $connected connecting: $connecting downloading: $downloading uploading: $uploading lastSyncedAt: $lastSyncedAt, hasSynced: $hasSynced, error: $anyError>";
}

static const _statusEquality = ListEquality<SyncPriorityStatus>();
}

/// The priority of a PowerSync bucket.
extension type const BucketPriority._(int priorityNumber) {
static const _highest = 0;
static const _lowests = 3;

factory BucketPriority(int i) {
assert(i >= _highest && i <= _lowests);
assert(i >= _highest);
return BucketPriority._(i);
}

/// A [Comparator] instance suitable for comparing [BucketPriority] values.
static Comparator<BucketPriority> comparator =
(a, b) => -a.priorityNumber.compareTo(b.priorityNumber);
static int comparator(BucketPriority a, BucketPriority b) =>
-a.priorityNumber.compareTo(b.priorityNumber);
}

/// Partial information about the synchronization status for buckets within a
Expand Down
22 changes: 22 additions & 0 deletions packages/powersync_core/lib/src/web/sync_worker_protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject {
required bool? hasSyned,
required String? uploadError,
required String? downloadError,
required JSArray? statusInPriority,
});

factory SerializedSyncStatus.from(SyncStatus status) {
Expand All @@ -169,6 +170,14 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject {
hasSyned: status.hasSynced,
uploadError: status.uploadError?.toString(),
downloadError: status.downloadError?.toString(),
statusInPriority: <JSArray?>[
for (final entry in status.statusInPriority)
[
entry.priority.priorityNumber.toJS,
entry.lastSyncedAt.microsecondsSinceEpoch.toJS,
entry.hasSynced.toJS,
].toJS
].toJS,
);
}

Expand All @@ -180,6 +189,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject {
external bool? hasSynced;
external String? uploadError;
external String? downloadError;
external JSArray? statusInPriority;

SyncStatus asSyncStatus() {
return SyncStatus(
Expand All @@ -193,6 +203,18 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject {
hasSynced: hasSynced,
uploadError: uploadError,
downloadError: downloadError,
statusInPriority: statusInPriority?.toDart.map((e) {
final [rawPriority, rawSynced, rawHasSynced, ...] =
(e as JSArray).toDart;

return (
priority: BucketPriority((rawPriority as JSNumber).toDartInt),
lastSyncedAt: DateTime.fromMicrosecondsSinceEpoch(
(rawSynced as JSNumber).toDartInt),
hasSynced: (rawHasSynced as JSBoolean).toDart,
);
}).toList() ??
const [],
);
}
}
Expand Down

0 comments on commit c99c043

Please sign in to comment.