From 793e01e06037b5cf0d6aa9e99da2cce57c7b1df0 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 5 Feb 2025 14:39:03 +0100 Subject: [PATCH] Tests for partial sync operations --- .../lib/src/database/powersync_db_mixin.dart | 67 +++++- .../lib/src/streaming_sync.dart | 87 ++++---- .../powersync_core/lib/src/sync_status.dart | 21 +- .../powersync_core/lib/src/sync_types.dart | 6 +- .../powersync_core/test/connected_test.dart | 1 - .../powersync_core/test/disconnect_test.dart | 2 +- .../test/in_memory_sync_test.dart | 209 ++++++++++++++++++ .../sync_server/in_memory_sync_server.dart | 64 ++++++ .../server/sync_server/mock_sync_server.dart | 37 +--- .../test/streaming_sync_test.dart | 20 +- .../test/utils/abstract_test_utils.dart | 64 +++++- .../test/utils/in_memory_http.dart | 56 +++++ .../test/utils/native_test_utils.dart | 34 ++- .../test/utils/stub_test_utils.dart | 10 + .../test/utils/web_test_utils.dart | 31 ++- 15 files changed, 591 insertions(+), 118 deletions(-) create mode 100644 packages/powersync_core/test/in_memory_sync_test.dart create mode 100644 packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart create mode 100644 packages/powersync_core/test/utils/in_memory_http.dart diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 8869c162..dd857080 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:convert'; import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; @@ -121,17 +122,56 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { Future _updateHasSynced() async { // Query the database to see if any data has been synced. - final result = - await database.get('SELECT powersync_last_synced_at() as synced_at'); - final timestamp = result['synced_at'] as String?; - final hasSynced = timestamp != null; - - if (hasSynced != currentStatus.hasSynced) { - final lastSyncedAt = - timestamp == null ? null : DateTime.parse('${timestamp}Z').toLocal(); - final status = - SyncStatus(hasSynced: hasSynced, lastSyncedAt: lastSyncedAt); - setStatus(status); + final result = await database.get(''' + SELECT CASE + WHEN EXISTS (SELECT 1 FROM sqlite_master WHERE name = 'ps_sync_state') + THEN (SELECT json_group_array( + json_object('prio', priority, 'last_sync', last_synced_at) + ) FROM ps_sync_state ORDER BY priority) + ELSE powersync_last_synced_at() + END AS r; + '''); + final value = result['r'] as String?; + final hasData = value != null; + + DateTime parseDateTime(String sql) { + return DateTime.parse('${sql}Z').toLocal(); + } + + if (hasData) { + DateTime? lastCompleteSync; + final priorityStatus = []; + var hasSynced = false; + + if (value.startsWith('[')) { + for (final entry in jsonDecode(value) as List) { + final priority = entry['prio'] as int; + final lastSyncedAt = parseDateTime(entry['last_sync'] as String); + + if (priority == -1) { + hasSynced = true; + lastCompleteSync = lastSyncedAt; + } else { + priorityStatus.add(( + hasSynced: true, + lastSyncedAt: lastSyncedAt, + priority: BucketPriority(priority) + )); + } + } + } else { + hasSynced = true; + lastCompleteSync = parseDateTime(value); + } + + if (hasSynced != currentStatus.hasSynced) { + final status = SyncStatus( + hasSynced: hasSynced, + lastSyncedAt: lastCompleteSync, + statusInPriority: priorityStatus, + ); + setStatus(status); + } } } @@ -201,7 +241,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { await disconnect(); // Now we can close the database await database.close(); - await statusStreamController.close(); + + // If there are paused subscriptionso n the status stream, don't delay + // closing the database because of that. + unawaited(statusStreamController.close()); } /// Connect to the PowerSync service, and keep the databases in sync. diff --git a/packages/powersync_core/lib/src/streaming_sync.dart b/packages/powersync_core/lib/src/streaming_sync.dart index b4b6f214..63787c90 100644 --- a/packages/powersync_core/lib/src/streaming_sync.dart +++ b/packages/powersync_core/lib/src/streaming_sync.dart @@ -119,6 +119,7 @@ class StreamingSyncImplementation implements StreamingSync { // Now close the client in all cases not covered above _client.close(); + _statusStreamController.close(); } bool get aborted { @@ -281,7 +282,7 @@ class StreamingSyncImplementation implements StreamingSync { for (final (i, priority) in existingPriorityState.indexed) { switch ( BucketPriority.comparator(priority.priority, completed.priority)) { - case < 0: + case > 0: // Entries from here on have a higher priority than the one that was // just completed final copy = existingPriorityState.toList(); @@ -293,7 +294,7 @@ class StreamingSyncImplementation implements StreamingSync { copy[i] = completed; _updateStatus(statusInPriority: copy); return; - case > 0: + case < 0: continue; } } @@ -537,49 +538,55 @@ class StreamingSyncImplementation implements StreamingSync { return true; } - Stream streamingSyncRequest( - StreamingSyncRequest data) async* { - final credentials = await credentialsCallback(); - if (credentials == null) { - throw CredentialsException('Not logged in'); - } - final uri = credentials.endpointUri('sync/stream'); - - final request = http.Request('POST', uri); - request.headers['Content-Type'] = 'application/json'; - request.headers['Authorization'] = "Token ${credentials.token}"; - request.headers.addAll(_userAgentHeaders); - - request.body = convert.jsonEncode(data); - - http.StreamedResponse res; - try { - // Do not close the client during the request phase - this causes uncaught errors. - _safeToClose = false; - res = await _client.send(request); - } finally { - _safeToClose = true; - } - if (aborted) { - return; - } + Stream streamingSyncRequest(StreamingSyncRequest data) { + Future setup() async { + final credentials = await credentialsCallback(); + if (credentials == null) { + throw CredentialsException('Not logged in'); + } + final uri = credentials.endpointUri('sync/stream'); + + final request = http.Request('POST', uri); + request.headers['Content-Type'] = 'application/json'; + request.headers['Authorization'] = "Token ${credentials.token}"; + request.headers.addAll(_userAgentHeaders); + + request.body = convert.jsonEncode(data); + + http.StreamedResponse res; + try { + // Do not close the client during the request phase - this causes uncaught errors. + _safeToClose = false; + res = await _client.send(request); + } finally { + _safeToClose = true; + } + if (aborted) { + return null; + } - if (res.statusCode == 401) { - if (invalidCredentialsCallback != null) { - await invalidCredentialsCallback!(); + if (res.statusCode == 401) { + if (invalidCredentialsCallback != null) { + await invalidCredentialsCallback!(); + } } - } - if (res.statusCode != 200) { - throw await SyncResponseException.fromStreamedResponse(res); + if (res.statusCode != 200) { + throw await SyncResponseException.fromStreamedResponse(res); + } + + return res.stream; } - // Note: The response stream is automatically closed when this loop errors - await for (var line in ndjson(res.stream)) { - if (aborted) { - break; + return Stream.fromFuture(setup()).asyncExpand((stream) { + if (stream == null || aborted) { + return const Stream.empty(); + } else { + return ndjson(stream) + .map((line) => + StreamingSyncLine.fromJson(line as Map)) + .takeWhile((_) => !aborted); } - yield StreamingSyncLine.fromJson(line as Map); - } + }); } /// Delays the standard `retryDelay` Duration, but exits early if diff --git a/packages/powersync_core/lib/src/sync_status.dart b/packages/powersync_core/lib/src/sync_status.dart index 799e69bb..ff658921 100644 --- a/packages/powersync_core/lib/src/sync_status.dart +++ b/packages/powersync_core/lib/src/sync_status.dart @@ -99,7 +99,15 @@ final class SyncStatus { /// Returns [lastSyncedAt] and [hasSynced] information for a partial sync /// operation, or `null` if the status for that priority is unknown. - SyncPriorityStatus? statusForPriority(BucketPriority priority) { + /// + /// The information returned may be more generic than requested. For instance, + /// a completed sync operation (as expressed by [lastSyncedAt]) also + /// guarantees that every bucket priority was synchronized before that. + /// Similarly, requesting the sync status for priority `1` may return + /// information extracted from the lower priority `2` since each partial sync + /// in priority `2` necessarily includes a consistent view over data in + /// priority `1`. + SyncPriorityStatus statusForPriority(BucketPriority priority) { assert(statusInPriority.isSortedByCompare( (e) => e.priority, BucketPriority.comparator)); @@ -112,7 +120,12 @@ final class SyncStatus { } } - return null; + // If we have a complete sync, that necessarily includes all priorities. + return ( + priority: priority, + hasSynced: hasSynced, + lastSyncedAt: lastSyncedAt + ); } @override @@ -154,8 +167,8 @@ extension type const BucketPriority._(int priorityNumber) { /// priority. typedef SyncPriorityStatus = ({ BucketPriority priority, - DateTime lastSyncedAt, - bool hasSynced, + DateTime? lastSyncedAt, + bool? hasSynced, }); /// Stats of the local upload queue. diff --git a/packages/powersync_core/lib/src/sync_types.dart b/packages/powersync_core/lib/src/sync_types.dart index 842f44ab..6d02766f 100644 --- a/packages/powersync_core/lib/src/sync_types.dart +++ b/packages/powersync_core/lib/src/sync_types.dart @@ -58,7 +58,11 @@ final class Checkpoint extends StreamingSyncLine { 'write_checkpoint': writeCheckpoint, 'buckets': checksums .where((c) => priority == null || c.priority <= priority) - .map((c) => {'bucket': c.bucket, 'checksum': c.checksum}) + .map((c) => { + 'bucket': c.bucket, + 'checksum': c.checksum, + 'priority': c.priority, + }) .toList(growable: false) }; } diff --git a/packages/powersync_core/test/connected_test.dart b/packages/powersync_core/test/connected_test.dart index ff6a9b0c..ace3ae90 100644 --- a/packages/powersync_core/test/connected_test.dart +++ b/packages/powersync_core/test/connected_test.dart @@ -9,7 +9,6 @@ import 'package:powersync_core/powersync_core.dart'; import 'package:test/test.dart'; import 'server/sync_server/mock_sync_server.dart'; -import 'streaming_sync_test.dart'; import 'utils/abstract_test_utils.dart'; import 'utils/test_utils_impl.dart'; diff --git a/packages/powersync_core/test/disconnect_test.dart b/packages/powersync_core/test/disconnect_test.dart index 489928ef..a3706537 100644 --- a/packages/powersync_core/test/disconnect_test.dart +++ b/packages/powersync_core/test/disconnect_test.dart @@ -1,7 +1,7 @@ import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/sqlite_async.dart'; import 'package:test/test.dart'; -import 'streaming_sync_test.dart'; +import 'utils/abstract_test_utils.dart'; import 'utils/test_utils_impl.dart'; import 'watch_test.dart'; diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart new file mode 100644 index 00000000..a830a55a --- /dev/null +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -0,0 +1,209 @@ +import 'package:async/async.dart'; +import 'package:logging/logging.dart'; +import 'package:powersync_core/powersync_core.dart'; +import 'package:powersync_core/sqlite3_common.dart'; +import 'package:powersync_core/src/log_internal.dart'; +import 'package:powersync_core/src/streaming_sync.dart'; +import 'package:powersync_core/src/sync_types.dart'; +import 'package:test/test.dart'; + +import 'server/sync_server/in_memory_sync_server.dart'; +import 'utils/abstract_test_utils.dart'; +import 'utils/in_memory_http.dart'; +import 'utils/test_utils_impl.dart'; + +void main() { + final testUtils = TestUtils(); + + group('in-memory sync tests', () { + late TestPowerSyncFactory factory; + late CommonDatabase raw; + late PowerSyncDatabase database; + late MockSyncService syncService; + late StreamingSyncImplementation syncClient; + + setUp(() async { + final (client, server) = inMemoryServer(); + syncService = MockSyncService(); + server.mount(syncService.router.call); + + factory = await testUtils.testFactory(); + (raw, database) = await factory.openInMemoryDatabase(); + await database.initialize(); + syncClient = database.connectWithMockService( + client, + TestConnector(() async { + return PowerSyncCredentials( + endpoint: server.url.toString(), + token: 'token not used here', + expiresAt: DateTime.now(), + ); + }), + ); + }); + + tearDown(() async { + await syncClient.abort(); + await database.close(); + await syncService.stop(); + }); + + Future> waitForConnection( + {bool expectNoWarnings = true}) async { + if (expectNoWarnings) { + isolateLogger.onRecord.listen((e) { + if (e.level >= Level.WARNING) { + fail('Unexpected log: $e'); + } + }); + } + syncClient.streamingSync(); + await syncService.waitForListener; + + expect(database.currentStatus.lastSyncedAt, isNull); + expect(database.currentStatus.downloading, isFalse); + final status = StreamQueue(database.statusStream); + addTearDown(status.cancel); + + syncService.addKeepAlive(); + await expectLater( + status, emits(isSyncStatus(connected: true, hasSynced: false))); + return status; + } + + test('persists completed sync information', () async { + final status = await waitForConnection(); + + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '0', + writeCheckpoint: null, + checksums: [BucketChecksum(bucket: 'bkt', priority: 1, checksum: 0)], + ) + }); + await expectLater(status, emits(isSyncStatus(downloading: true))); + + syncService.addLine({ + 'checkpoint_complete': {'last_op_id': '0'} + }); + await expectLater( + status, emits(isSyncStatus(downloading: false, hasSynced: true))); + + final independentDb = factory.wrapRaw(raw); + // Even though this database doesn't have a sync client attached to it, + // is should reconstruct hasSynced from the database. + await independentDb.initialize(); + expect(independentDb.currentStatus.hasSynced, isTrue); + // A complete sync also means that all partial syncs have completed + expect( + independentDb.currentStatus + .statusForPriority(BucketPriority(3)) + ?.hasSynced, + isTrue); + }); + + group('partial sync', () { + test('updates sync state incrementally', () async { + final status = await waitForConnection(); + + final checksums = [ + for (var prio = 0; prio <= 3; prio++) + BucketChecksum(bucket: 'prio$prio', priority: prio, checksum: 0) + ]; + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '0', + writeCheckpoint: null, + checksums: checksums, + ) + }); + + // Receiving the checkpoint sets the state to downloading + await expectLater( + status, emits(isSyncStatus(downloading: true, hasSynced: false))); + + // Emit partial sync complete for each priority but the last. + for (var prio = 0; prio < 3; prio++) { + syncService.addLine({ + 'partial_checkpoint_complete': { + 'last_op_id': '0', + 'priority': prio, + } + }); + + await expectLater( + status, + emits(isSyncStatus(downloading: true, hasSynced: false).having( + (e) => e.statusForPriority(BucketPriority(0))?.hasSynced, + 'status for $prio', + isTrue, + )), + ); + } + + // Complete the sync + syncService.addLine({ + 'checkpoint_complete': {'last_op_id': '0'} + }); + + await expectLater( + status, emits(isSyncStatus(downloading: false, hasSynced: true))); + }); + + test('remembers last partial sync state', () async { + final status = await waitForConnection(); + + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '0', + writeCheckpoint: null, + checksums: [ + BucketChecksum(bucket: 'bkt', priority: 1, checksum: 0) + ], + ) + }); + await expectLater(status, emits(isSyncStatus(downloading: true))); + + syncService.addLine({ + 'partial_checkpoint_complete': { + 'last_op_id': '0', + 'priority': 1, + } + }); + await database.waitForFirstSync(priority: BucketPriority(1)); + expect(database.currentStatus.hasSynced, isFalse); + + final independentDb = factory.wrapRaw(raw); + await independentDb.initialize(); + expect(independentDb.currentStatus.hasSynced, isFalse); + // Completing a sync for prio 1 implies a completed sync for prio 0 + expect( + independentDb.currentStatus + .statusForPriority(BucketPriority(0)) + ?.hasSynced, + isTrue); + expect( + independentDb.currentStatus + .statusForPriority(BucketPriority(3)) + ?.hasSynced, + isFalse); + }); + }); + }); +} + +TypeMatcher isSyncStatus( + {Object? downloading, Object? connected, Object? hasSynced}) { + var matcher = isA(); + if (downloading != null) { + matcher = matcher.having((e) => e.downloading, 'downloading', downloading); + } + if (connected != null) { + matcher = matcher.having((e) => e.connected, 'connected', connected); + } + if (hasSynced != null) { + matcher = matcher.having((e) => e.hasSynced, 'hasSynced', hasSynced); + } + + return matcher; +} diff --git a/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart b/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart new file mode 100644 index 00000000..b31dffc2 --- /dev/null +++ b/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart @@ -0,0 +1,64 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:shelf/shelf.dart'; +import 'package:shelf_router/shelf_router.dart'; + +final class MockSyncService { + // Use a queued stream to make tests easier. + StreamController _controller = StreamController(); + Completer _listener = Completer(); + + final router = Router(); + + MockSyncService() { + router + ..post('/sync/stream', (Request request) async { + _listener.complete(); + // Respond immediately with a stream + return Response.ok(_controller.stream.transform(utf8.encoder), + headers: { + 'Content-Type': 'application/x-ndjson', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + context: { + "shelf.io.buffer_output": false + }); + }) + ..get('/write-checkpoint2.json', (request) { + return Response.ok('{"data": {"write_checkpoint": "10"}}', headers: { + 'Content-Type': 'application/json', + }); + }); + } + + Future get waitForListener => _listener.future; + + // Queue events which will be sent to connected clients. + void addRawEvent(String data) { + _controller.add(data); + } + + void addLine(Object? message) { + addRawEvent('${json.encode(message)}\n'); + } + + void addKeepAlive([int tokenExpiresIn = 3600]) { + addLine({'token_expires_in': tokenExpiresIn}); + } + + // Clear events. We rely on a buffered controller here. Create a new controller + // in order to clear the buffer. + Future clearEvents() async { + await _controller.close(); + _listener = Completer(); + _controller = StreamController(); + } + + Future stop() async { + if (_controller.hasListener) { + await _controller.close(); + } + } +} diff --git a/packages/powersync_core/test/server/sync_server/mock_sync_server.dart b/packages/powersync_core/test/server/sync_server/mock_sync_server.dart index 9844692f..e4710f3d 100644 --- a/packages/powersync_core/test/server/sync_server/mock_sync_server.dart +++ b/packages/powersync_core/test/server/sync_server/mock_sync_server.dart @@ -1,58 +1,37 @@ import 'dart:async'; -import 'dart:convert'; import 'dart:io'; -import 'package:shelf/shelf.dart'; import 'package:shelf/shelf_io.dart' as io; -import 'package:shelf_router/shelf_router.dart'; + +import 'in_memory_sync_server.dart'; // A basic Mock PowerSync service server which queues commands // which clients can receive via connecting to the `/sync/stream` route. // This assumes only one client will ever be connected at a time. class TestHttpServerHelper { - // Use a queued stream to make tests easier. - StreamController _controller = StreamController(); + final MockSyncService service = MockSyncService(); late HttpServer _server; + Uri get uri => Uri.parse('http://localhost:${_server.port}'); Future start() async { - final router = Router() - ..post('/sync/stream', (Request request) async { - // Respond immediately with a stream - return Response.ok(_controller.stream.transform(utf8.encoder), - headers: { - 'Content-Type': 'application/x-ndjson', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - }, - context: { - "shelf.io.buffer_output": false - }); - }) - ..get('/write-checkpoint2.json', (request) { - return Response.ok('{"data": {"write_checkpoint": "10"}}', headers: { - 'Content-Type': 'application/json', - }); - }); - - _server = await io.serve(router.call, 'localhost', 0); + _server = await io.serve(service.router.call, 'localhost', 0); print('Test server running at ${_server.address}:${_server.port}'); } // Queue events which will be sent to connected clients. void addEvent(String data) { - _controller.add(data); + service.addRawEvent(data); } // Clear events. We rely on a buffered controller here. Create a new controller // in order to clear the buffer. Future clearEvents() async { - await _controller.close(); - _controller = StreamController(); + await service.clearEvents(); } Future stop() async { - await _controller.close(); + await service.stop(); await _server.close(); } } diff --git a/packages/powersync_core/test/streaming_sync_test.dart b/packages/powersync_core/test/streaming_sync_test.dart index 5238dd05..b9bbed07 100644 --- a/packages/powersync_core/test/streaming_sync_test.dart +++ b/packages/powersync_core/test/streaming_sync_test.dart @@ -9,29 +9,11 @@ import 'package:powersync_core/powersync_core.dart'; import 'package:test/test.dart'; import 'test_server.dart'; +import 'utils/abstract_test_utils.dart'; import 'utils/test_utils_impl.dart'; final testUtils = TestUtils(); -class TestConnector extends PowerSyncBackendConnector { - final Function _fetchCredentials; - final Future Function(PowerSyncDatabase)? _uploadData; - - TestConnector(this._fetchCredentials, - {Future Function(PowerSyncDatabase)? uploadData}) - : _uploadData = uploadData; - - @override - Future fetchCredentials() { - return _fetchCredentials(); - } - - @override - Future uploadData(PowerSyncDatabase database) async { - await _uploadData?.call(database); - } -} - void main() { group('Streaming Sync Test', () { late String path; diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index 561e5c3b..d94b92f2 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -1,5 +1,8 @@ +import 'package:http/http.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; +import 'package:powersync_core/src/bucket_storage.dart'; +import 'package:powersync_core/src/streaming_sync.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test_api/src/backend/invoker.dart'; @@ -52,6 +55,23 @@ Logger _makeTestLogger({Level level = Level.ALL, String? name}) { return logger; } +abstract mixin class TestPowerSyncFactory implements PowerSyncOpenFactory { + Future openRawInMemoryDatabase(); + + Future<(CommonDatabase, PowerSyncDatabase)> openInMemoryDatabase() async { + final raw = await openRawInMemoryDatabase(); + return (raw, wrapRaw(raw)); + } + + PowerSyncDatabase wrapRaw(CommonDatabase raw) { + return PowerSyncDatabase.withDatabase( + schema: schema, + database: SqliteDatabase.singleConnection( + SqliteConnection.synchronousWrapper(raw)), + ); + } +} + abstract class AbstractTestUtils { String get _testName => Invoker.current!.liveTest.test.name; @@ -63,12 +83,10 @@ abstract class AbstractTestUtils { } /// Generates a test open factory - Future testFactory( + Future testFactory( {String? path, String sqlitePath = '', - SqliteOptions options = const SqliteOptions.defaults()}) async { - return PowerSyncOpenFactory(path: path ?? dbPath(), sqliteOptions: options); - } + SqliteOptions options = const SqliteOptions.defaults()}); /// Creates a SqliteDatabaseConnection Future setupPowerSync( @@ -93,3 +111,41 @@ abstract class AbstractTestUtils { /// Deletes any DB data Future cleanDb({required String path}); } + +class TestConnector extends PowerSyncBackendConnector { + final Function _fetchCredentials; + final Future Function(PowerSyncDatabase)? _uploadData; + + TestConnector(this._fetchCredentials, + {Future Function(PowerSyncDatabase)? uploadData}) + : _uploadData = uploadData; + + @override + Future fetchCredentials() { + return _fetchCredentials(); + } + + @override + Future uploadData(PowerSyncDatabase database) async { + await _uploadData?.call(database); + } +} + +extension MockSync on PowerSyncDatabase { + StreamingSyncImplementation connectWithMockService( + Client client, PowerSyncBackendConnector connector) { + final impl = StreamingSyncImplementation( + adapter: BucketStorage(this), + client: client, + retryDelay: const Duration(seconds: 5), + credentialsCallback: connector.getCredentialsCached, + invalidCredentialsCallback: connector.prefetchCredentials, + uploadCrud: () => connector.uploadData(this), + crudUpdateTriggerStream: database + .onChange(['ps_crud'], throttle: const Duration(milliseconds: 10)), + ); + impl.statusStream.listen(setStatus); + + return impl; + } +} diff --git a/packages/powersync_core/test/utils/in_memory_http.dart b/packages/powersync_core/test/utils/in_memory_http.dart new file mode 100644 index 00000000..61550e3c --- /dev/null +++ b/packages/powersync_core/test/utils/in_memory_http.dart @@ -0,0 +1,56 @@ +import 'package:http/http.dart'; +import 'package:http/testing.dart'; +import 'package:shelf/shelf.dart' as shelf; + +final Uri mockHttpUri = Uri.parse('https://testing.powersync.com/'); + +/// Returns a [Client] that can send HTTP requests to the returned +/// [shelf.Server]. +/// +/// The server can be used to serve shelf routes via [shelf.Server.mount]. +(Client, shelf.Server) inMemoryServer() { + final server = _MockServer(); + final client = MockClient.streaming(server.handleRequest); + + return (client, server); +} + +final class _MockServer implements shelf.Server { + shelf.Handler? _handler; + + @override + void mount(shelf.Handler handler) { + if (_handler != null) { + throw StateError('already has a handler'); + } + + _handler = handler; + } + + @override + Future close() async {} + + @override + Uri get url => mockHttpUri; + + Future handleRequest( + BaseRequest request, ByteStream body) async { + if (_handler case final endpoint?) { + final shelfRequest = shelf.Request( + request.method, + request.url, + headers: request.headers, + body: body, + ); + final shelfResponse = await endpoint(shelfRequest); + + return StreamedResponse( + shelfResponse.read(), + shelfResponse.statusCode, + headers: shelfResponse.headers, + ); + } else { + throw StateError('Request before handler was set on mock server'); + } + } +} diff --git a/packages/powersync_core/test/utils/native_test_utils.dart b/packages/powersync_core/test/utils/native_test_utils.dart index ab8cfd6a..95e55b3a 100644 --- a/packages/powersync_core/test/utils/native_test_utils.dart +++ b/packages/powersync_core/test/utils/native_test_utils.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'dart:ffi'; import 'dart:io'; import 'package:powersync_core/powersync_core.dart'; +import 'package:powersync_core/sqlite3.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite3/open.dart' as sqlite_open; @@ -10,17 +11,28 @@ import 'abstract_test_utils.dart'; const defaultSqlitePath = 'libsqlite3.so.0'; -class TestOpenFactory extends PowerSyncOpenFactory { +class TestOpenFactory extends PowerSyncOpenFactory with TestPowerSyncFactory { TestOpenFactory({required super.path}); - @override - CommonDatabase open(SqliteOpenOptions options) { + void applyOpenOverride() { sqlite_open.open.overrideFor(sqlite_open.OperatingSystem.linux, () { return DynamicLibrary.open('libsqlite3.so.0'); }); sqlite_open.open.overrideFor(sqlite_open.OperatingSystem.macOS, () { return DynamicLibrary.open('libsqlite3.dylib'); }); + } + + @override + void enableExtension() { + var powersyncLib = getLibraryForPlatform(); + sqlite3.ensureExtensionLoaded(SqliteExtension.inLibrary( + DynamicLibrary.open(powersyncLib), 'sqlite3_powersync_init')); + } + + @override + CommonDatabase open(SqliteOpenOptions options) { + applyOpenOverride(); return super.open(options); } @@ -52,6 +64,22 @@ class TestOpenFactory extends PowerSyncOpenFactory { ); } } + + @override + Future openRawInMemoryDatabase() async { + applyOpenOverride(); + + try { + enableExtension(); + } on PowersyncNotReadyException catch (e) { + autoLogger.severe(e.message); + rethrow; + } + + final db = sqlite3.openInMemory(); + setupFunctions(db); + return db; + } } class TestUtils extends AbstractTestUtils { diff --git a/packages/powersync_core/test/utils/stub_test_utils.dart b/packages/powersync_core/test/utils/stub_test_utils.dart index 3f86512c..d0cdb428 100644 --- a/packages/powersync_core/test/utils/stub_test_utils.dart +++ b/packages/powersync_core/test/utils/stub_test_utils.dart @@ -1,3 +1,5 @@ +import 'package:sqlite_async/src/sqlite_options.dart'; + import 'abstract_test_utils.dart'; class TestUtils extends AbstractTestUtils { @@ -5,4 +7,12 @@ class TestUtils extends AbstractTestUtils { Future cleanDb({required String path}) { throw UnimplementedError(); } + + @override + Future testFactory( + {String? path, + String sqlitePath = '', + SqliteOptions options = const SqliteOptions.defaults()}) { + throw UnimplementedError(); + } } diff --git a/packages/powersync_core/test/utils/web_test_utils.dart b/packages/powersync_core/test/utils/web_test_utils.dart index 71a462a2..3a74a448 100644 --- a/packages/powersync_core/test/utils/web_test_utils.dart +++ b/packages/powersync_core/test/utils/web_test_utils.dart @@ -3,7 +3,7 @@ import 'dart:js_interop'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; -import 'package:sqlite_async/sqlite3_common.dart'; +import 'package:sqlite_async/sqlite3_wasm.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; import 'package:web/web.dart' show Blob, BlobPropertyBag; @@ -12,6 +12,29 @@ import 'abstract_test_utils.dart'; @JS('URL.createObjectURL') external String _createObjectURL(Blob blob); +class TestOpenFactory extends PowerSyncOpenFactory with TestPowerSyncFactory { + TestOpenFactory({required super.path, super.sqliteOptions}); + + @override + Future openRawInMemoryDatabase() async { + final sqlite = await WasmSqlite3.loadFromUrl( + Uri.parse(sqliteOptions.webSqliteOptions.wasmUri)); + sqlite.registerVirtualFileSystem(InMemoryFileSystem(), makeDefault: true); + + final db = sqlite.openInMemory(); + + try { + enableExtension(); + } on PowersyncNotReadyException catch (e) { + autoLogger.severe(e.message); + rethrow; + } + + setupFunctions(db); + return db; + } +} + class TestUtils extends AbstractTestUtils { late Future _isInitialized; late final String sqlite3WASMUri; @@ -39,16 +62,16 @@ class TestUtils extends AbstractTestUtils { Future cleanDb({required String path}) async {} @override - Future testFactory( + Future testFactory( {String? path, - String? sqlitePath, + String sqlitePath = '', SqliteOptions options = const SqliteOptions.defaults()}) async { await _isInitialized; final webOptions = SqliteOptions( webSqliteOptions: WebSqliteOptions(wasmUri: sqlite3WASMUri, workerUri: workerUri)); - return super.testFactory(path: path, options: webOptions); + return TestOpenFactory(path: path ?? '', sqliteOptions: webOptions); } @override