Skip to content

Commit

Permalink
Tests for partial sync operations
Browse files Browse the repository at this point in the history
  • Loading branch information
simolus3 committed Feb 5, 2025
1 parent 7ed6945 commit 793e01e
Show file tree
Hide file tree
Showing 15 changed files with 591 additions and 118 deletions.
67 changes: 55 additions & 12 deletions packages/powersync_core/lib/src/database/powersync_db_mixin.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:convert';

import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
Expand Down Expand Up @@ -121,17 +122,56 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {

Future<void> _updateHasSynced() async {
// Query the database to see if any data has been synced.
final result =
await database.get('SELECT powersync_last_synced_at() as synced_at');
final timestamp = result['synced_at'] as String?;
final hasSynced = timestamp != null;

if (hasSynced != currentStatus.hasSynced) {
final lastSyncedAt =
timestamp == null ? null : DateTime.parse('${timestamp}Z').toLocal();
final status =
SyncStatus(hasSynced: hasSynced, lastSyncedAt: lastSyncedAt);
setStatus(status);
final result = await database.get('''
SELECT CASE
WHEN EXISTS (SELECT 1 FROM sqlite_master WHERE name = 'ps_sync_state')
THEN (SELECT json_group_array(
json_object('prio', priority, 'last_sync', last_synced_at)
) FROM ps_sync_state ORDER BY priority)
ELSE powersync_last_synced_at()
END AS r;
''');
final value = result['r'] as String?;
final hasData = value != null;

DateTime parseDateTime(String sql) {
return DateTime.parse('${sql}Z').toLocal();
}

if (hasData) {
DateTime? lastCompleteSync;
final priorityStatus = <SyncPriorityStatus>[];
var hasSynced = false;

if (value.startsWith('[')) {
for (final entry in jsonDecode(value) as List) {
final priority = entry['prio'] as int;
final lastSyncedAt = parseDateTime(entry['last_sync'] as String);

if (priority == -1) {
hasSynced = true;
lastCompleteSync = lastSyncedAt;
} else {
priorityStatus.add((
hasSynced: true,
lastSyncedAt: lastSyncedAt,
priority: BucketPriority(priority)
));
}
}
} else {
hasSynced = true;
lastCompleteSync = parseDateTime(value);
}

if (hasSynced != currentStatus.hasSynced) {
final status = SyncStatus(
hasSynced: hasSynced,
lastSyncedAt: lastCompleteSync,
statusInPriority: priorityStatus,
);
setStatus(status);
}
}
}

Expand Down Expand Up @@ -201,7 +241,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
await disconnect();
// Now we can close the database
await database.close();
await statusStreamController.close();

// If there are paused subscriptionso n the status stream, don't delay
// closing the database because of that.
unawaited(statusStreamController.close());
}

/// Connect to the PowerSync service, and keep the databases in sync.
Expand Down
87 changes: 47 additions & 40 deletions packages/powersync_core/lib/src/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class StreamingSyncImplementation implements StreamingSync {

// Now close the client in all cases not covered above
_client.close();
_statusStreamController.close();
}

bool get aborted {
Expand Down Expand Up @@ -281,7 +282,7 @@ class StreamingSyncImplementation implements StreamingSync {
for (final (i, priority) in existingPriorityState.indexed) {
switch (
BucketPriority.comparator(priority.priority, completed.priority)) {
case < 0:
case > 0:
// Entries from here on have a higher priority than the one that was
// just completed
final copy = existingPriorityState.toList();
Expand All @@ -293,7 +294,7 @@ class StreamingSyncImplementation implements StreamingSync {
copy[i] = completed;
_updateStatus(statusInPriority: copy);
return;
case > 0:
case < 0:
continue;
}
}
Expand Down Expand Up @@ -537,49 +538,55 @@ class StreamingSyncImplementation implements StreamingSync {
return true;
}

Stream<StreamingSyncLine?> streamingSyncRequest(
StreamingSyncRequest data) async* {
final credentials = await credentialsCallback();
if (credentials == null) {
throw CredentialsException('Not logged in');
}
final uri = credentials.endpointUri('sync/stream');

final request = http.Request('POST', uri);
request.headers['Content-Type'] = 'application/json';
request.headers['Authorization'] = "Token ${credentials.token}";
request.headers.addAll(_userAgentHeaders);

request.body = convert.jsonEncode(data);

http.StreamedResponse res;
try {
// Do not close the client during the request phase - this causes uncaught errors.
_safeToClose = false;
res = await _client.send(request);
} finally {
_safeToClose = true;
}
if (aborted) {
return;
}
Stream<StreamingSyncLine?> streamingSyncRequest(StreamingSyncRequest data) {
Future<http.ByteStream?> setup() async {
final credentials = await credentialsCallback();
if (credentials == null) {
throw CredentialsException('Not logged in');
}
final uri = credentials.endpointUri('sync/stream');

final request = http.Request('POST', uri);
request.headers['Content-Type'] = 'application/json';
request.headers['Authorization'] = "Token ${credentials.token}";
request.headers.addAll(_userAgentHeaders);

request.body = convert.jsonEncode(data);

http.StreamedResponse res;
try {
// Do not close the client during the request phase - this causes uncaught errors.
_safeToClose = false;
res = await _client.send(request);
} finally {
_safeToClose = true;
}
if (aborted) {
return null;
}

if (res.statusCode == 401) {
if (invalidCredentialsCallback != null) {
await invalidCredentialsCallback!();
if (res.statusCode == 401) {
if (invalidCredentialsCallback != null) {
await invalidCredentialsCallback!();
}
}
}
if (res.statusCode != 200) {
throw await SyncResponseException.fromStreamedResponse(res);
if (res.statusCode != 200) {
throw await SyncResponseException.fromStreamedResponse(res);
}

return res.stream;
}

// Note: The response stream is automatically closed when this loop errors
await for (var line in ndjson(res.stream)) {
if (aborted) {
break;
return Stream.fromFuture(setup()).asyncExpand((stream) {
if (stream == null || aborted) {
return const Stream.empty();
} else {
return ndjson(stream)
.map((line) =>
StreamingSyncLine.fromJson(line as Map<String, dynamic>))
.takeWhile((_) => !aborted);
}
yield StreamingSyncLine.fromJson(line as Map<String, dynamic>);
}
});
}

/// Delays the standard `retryDelay` Duration, but exits early if
Expand Down
21 changes: 17 additions & 4 deletions packages/powersync_core/lib/src/sync_status.dart
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,15 @@ final class SyncStatus {

/// Returns [lastSyncedAt] and [hasSynced] information for a partial sync
/// operation, or `null` if the status for that priority is unknown.
SyncPriorityStatus? statusForPriority(BucketPriority priority) {
///
/// The information returned may be more generic than requested. For instance,
/// a completed sync operation (as expressed by [lastSyncedAt]) also
/// guarantees that every bucket priority was synchronized before that.
/// Similarly, requesting the sync status for priority `1` may return
/// information extracted from the lower priority `2` since each partial sync
/// in priority `2` necessarily includes a consistent view over data in
/// priority `1`.
SyncPriorityStatus statusForPriority(BucketPriority priority) {
assert(statusInPriority.isSortedByCompare(
(e) => e.priority, BucketPriority.comparator));

Expand All @@ -112,7 +120,12 @@ final class SyncStatus {
}
}

return null;
// If we have a complete sync, that necessarily includes all priorities.
return (
priority: priority,
hasSynced: hasSynced,
lastSyncedAt: lastSyncedAt
);
}

@override
Expand Down Expand Up @@ -154,8 +167,8 @@ extension type const BucketPriority._(int priorityNumber) {
/// priority.
typedef SyncPriorityStatus = ({
BucketPriority priority,
DateTime lastSyncedAt,
bool hasSynced,
DateTime? lastSyncedAt,
bool? hasSynced,
});

/// Stats of the local upload queue.
Expand Down
6 changes: 5 additions & 1 deletion packages/powersync_core/lib/src/sync_types.dart
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ final class Checkpoint extends StreamingSyncLine {
'write_checkpoint': writeCheckpoint,
'buckets': checksums
.where((c) => priority == null || c.priority <= priority)
.map((c) => {'bucket': c.bucket, 'checksum': c.checksum})
.map((c) => {
'bucket': c.bucket,
'checksum': c.checksum,
'priority': c.priority,
})
.toList(growable: false)
};
}
Expand Down
1 change: 0 additions & 1 deletion packages/powersync_core/test/connected_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import 'package:powersync_core/powersync_core.dart';
import 'package:test/test.dart';

import 'server/sync_server/mock_sync_server.dart';
import 'streaming_sync_test.dart';
import 'utils/abstract_test_utils.dart';
import 'utils/test_utils_impl.dart';

Expand Down
2 changes: 1 addition & 1 deletion packages/powersync_core/test/disconnect_test.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import 'package:powersync_core/powersync_core.dart';
import 'package:powersync_core/sqlite_async.dart';
import 'package:test/test.dart';
import 'streaming_sync_test.dart';
import 'utils/abstract_test_utils.dart';
import 'utils/test_utils_impl.dart';
import 'watch_test.dart';

Expand Down
Loading

0 comments on commit 793e01e

Please sign in to comment.