Skip to content

Commit

Permalink
Merge pull request #242 from powersync-ja/feat/group-sync-lines
Browse files Browse the repository at this point in the history
Small streaming sync improvements
  • Loading branch information
simolus3 authored Feb 10, 2025
2 parents 15d1682 + 6c5337b commit 1f9268f
Show file tree
Hide file tree
Showing 18 changed files with 1,002 additions and 331 deletions.
115 changes: 0 additions & 115 deletions packages/powersync_core/lib/src/bucket_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -344,121 +344,6 @@ class BucketState {
}
}

class SyncDataBatch {
List<SyncBucketData> buckets;

SyncDataBatch(this.buckets);
}

class SyncBucketData {
final String bucket;
final List<OplogEntry> data;
final bool hasMore;
final String? after;
final String? nextAfter;

const SyncBucketData(
{required this.bucket,
required this.data,
this.hasMore = false,
this.after,
this.nextAfter});

SyncBucketData.fromJson(Map<String, dynamic> json)
: bucket = json['bucket'] as String,
hasMore = json['has_more'] as bool? ?? false,
after = json['after'] as String?,
nextAfter = json['next_after'] as String?,
data = (json['data'] as List)
.map((e) => OplogEntry.fromJson(e as Map<String, dynamic>))
.toList();

Map<String, dynamic> toJson() {
return {
'bucket': bucket,
'has_more': hasMore,
'after': after,
'next_after': nextAfter,
'data': data
};
}
}

class OplogEntry {
final String opId;

final OpType? op;

/// rowType + rowId uniquely identifies an entry in the local database.
final String? rowType;
final String? rowId;

/// Together with rowType and rowId, this uniquely identifies a source entry
/// per bucket in the oplog. There may be multiple source entries for a single
/// "rowType + rowId" combination.
final String? subkey;

final String? data;
final int checksum;

const OplogEntry(
{required this.opId,
required this.op,
this.subkey,
this.rowType,
this.rowId,
this.data,
required this.checksum});

OplogEntry.fromJson(Map<String, dynamic> json)
: opId = json['op_id'] as String,
op = OpType.fromJson(json['op'] as String),
rowType = json['object_type'] as String?,
rowId = json['object_id'] as String?,
checksum = json['checksum'] as int,
data = switch (json['data']) {
String data => data,
var other => jsonEncode(other),
},
subkey = switch (json['subkey']) {
String subkey => subkey,
_ => null,
};

Map<String, dynamic>? get parsedData {
return switch (data) {
final data? => jsonDecode(data) as Map<String, dynamic>,
null => null,
};
}

/// Key to uniquely represent a source entry in a bucket.
/// This is used to supersede old entries.
/// Relevant for put and remove ops.
String get key {
return "$rowType/$rowId/$subkey";
}

Map<String, dynamic> toJson() {
return {
'op_id': opId,
'op': op?.toJson(),
'object_type': rowType,
'object_id': rowId,
'checksum': checksum,
'subkey': subkey,
'data': data
};
}
}

class SqliteOp {
String sql;
List<dynamic> args;

SqliteOp(this.sql, this.args);
}

class SyncLocalDatabaseResult {
final bool ready;
final bool checkpointValid;
Expand Down
7 changes: 0 additions & 7 deletions packages/powersync_core/lib/src/stream_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,6 @@ Stream<Object?> ndjson(ByteStream input) {
return jsonInput;
}

/// Given a raw ByteStream, parse each line as JSON.
Stream<String> newlines(ByteStream input) {
final textInput = input.transform(convert.utf8.decoder);
final lineInput = textInput.transform(const convert.LineSplitter());
return lineInput;
}

void pauseAll(List<StreamSubscription<void>> subscriptions) {
for (var sub in subscriptions) {
sub.pause();
Expand Down
199 changes: 101 additions & 98 deletions packages/powersync_core/lib/src/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class StreamingSyncImplementation implements StreamingSync {

late final http.Client _client;

final StreamController<String?> _localPingController =
final StreamController<Null> _localPingController =
StreamController.broadcast();

final Duration retryDelay;
Expand Down Expand Up @@ -340,96 +340,19 @@ class StreamingSyncImplementation implements StreamingSync {
}

_updateStatus(connected: true, connecting: false);
if (line is Checkpoint) {
targetCheckpoint = line;
final Set<String> bucketsToDelete = {...bucketSet};
final Set<String> newBuckets = {};
for (final checksum in line.checksums) {
newBuckets.add(checksum.bucket);
bucketsToDelete.remove(checksum.bucket);
}
bucketSet = newBuckets;
await adapter.removeBuckets([...bucketsToDelete]);
_updateStatus(downloading: true);
} else if (line is StreamingSyncCheckpointComplete) {
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
// await new Promise((resolve) => setTimeout(resolve, 50));
return false;
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
} else {
appliedCheckpoint = targetCheckpoint;

_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: DateTime.now());
}

validatedCheckpoint = targetCheckpoint;
} else if (line is StreamingSyncCheckpointDiff) {
// TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint
if (targetCheckpoint == null) {
throw PowerSyncProtocolException(
'Checkpoint diff without previous checkpoint');
}
_updateStatus(downloading: true);
final diff = line;
final Map<String, BucketChecksum> newBuckets = {};
for (var checksum in targetCheckpoint.checksums) {
newBuckets[checksum.bucket] = checksum;
}
for (var checksum in diff.updatedBuckets) {
newBuckets[checksum.bucket] = checksum;
}
for (var bucket in diff.removedBuckets) {
newBuckets.remove(bucket);
}

final newCheckpoint = Checkpoint(
lastOpId: diff.lastOpId,
checksums: [...newBuckets.values],
writeCheckpoint: diff.writeCheckpoint);
targetCheckpoint = newCheckpoint;

bucketSet = Set.from(newBuckets.keys);
await adapter.removeBuckets(diff.removedBuckets);
adapter.setTargetCheckpoint(targetCheckpoint);
} else if (line is SyncBucketData) {
_updateStatus(downloading: true);
await adapter.saveSyncData(SyncDataBatch([line]));
} else if (line is StreamingSyncKeepalive) {
if (line.tokenExpiresIn == 0) {
// Token expired already - stop the connection immediately
invalidCredentialsCallback?.call().ignore();
break;
} else if (line.tokenExpiresIn <= 30) {
// Token expires soon - refresh it in the background
if (credentialsInvalidation == null &&
invalidCredentialsCallback != null) {
credentialsInvalidation = invalidCredentialsCallback!().then((_) {
// Token has been refreshed - we should restart the connection.
haveInvalidated = true;
// trigger next loop iteration ASAP, don't wait for another
// message from the server.
_localPingController.add(null);
}, onError: (_) {
// Token refresh failed - retry on next keepalive.
credentialsInvalidation = null;
});
switch (line) {
case Checkpoint():
targetCheckpoint = line;
final Set<String> bucketsToDelete = {...bucketSet};
final Set<String> newBuckets = {};
for (final checksum in line.checksums) {
newBuckets.add(checksum.bucket);
bucketsToDelete.remove(checksum.bucket);
}
}
} else {
if (targetCheckpoint == appliedCheckpoint) {
_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: DateTime.now());
} else if (validatedCheckpoint == targetCheckpoint) {
bucketSet = newBuckets;
await adapter.removeBuckets([...bucketsToDelete]);
_updateStatus(downloading: true);
case StreamingSyncCheckpointComplete():
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
Expand All @@ -447,7 +370,88 @@ class StreamingSyncImplementation implements StreamingSync {
downloadError: _noError,
lastSyncedAt: DateTime.now());
}
}

validatedCheckpoint = targetCheckpoint;
case StreamingSyncCheckpointDiff():
// TODO: It may be faster to just keep track of the diff, instead of
// the entire checkpoint
if (targetCheckpoint == null) {
throw PowerSyncProtocolException(
'Checkpoint diff without previous checkpoint');
}
_updateStatus(downloading: true);
final diff = line;
final Map<String, BucketChecksum> newBuckets = {};
for (var checksum in targetCheckpoint.checksums) {
newBuckets[checksum.bucket] = checksum;
}
for (var checksum in diff.updatedBuckets) {
newBuckets[checksum.bucket] = checksum;
}
for (var bucket in diff.removedBuckets) {
newBuckets.remove(bucket);
}

final newCheckpoint = Checkpoint(
lastOpId: diff.lastOpId,
checksums: [...newBuckets.values],
writeCheckpoint: diff.writeCheckpoint);
targetCheckpoint = newCheckpoint;

bucketSet = Set.from(newBuckets.keys);
await adapter.removeBuckets(diff.removedBuckets);
adapter.setTargetCheckpoint(targetCheckpoint);
case SyncDataBatch():
_updateStatus(downloading: true);
await adapter.saveSyncData(line);
case StreamingSyncKeepalive(:final tokenExpiresIn):
if (tokenExpiresIn == 0) {
// Token expired already - stop the connection immediately
invalidCredentialsCallback?.call().ignore();
break;
} else if (tokenExpiresIn <= 30) {
// Token expires soon - refresh it in the background
if (credentialsInvalidation == null &&
invalidCredentialsCallback != null) {
credentialsInvalidation = invalidCredentialsCallback!().then((_) {
// Token has been refreshed - we should restart the connection.
haveInvalidated = true;
// trigger next loop iteration ASAP, don't wait for another
// message from the server.
_localPingController.add(null);
}, onError: (_) {
// Token refresh failed - retry on next keepalive.
credentialsInvalidation = null;
});
}
}
case UnknownSyncLine(:final rawData):
isolateLogger.fine('Unknown sync line: $rawData');
case null: // Local ping
if (targetCheckpoint == appliedCheckpoint) {
_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: DateTime.now());
} else if (validatedCheckpoint == targetCheckpoint) {
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
// await new Promise((resolve) => setTimeout(resolve, 50));
return false;
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
} else {
appliedCheckpoint = targetCheckpoint;

_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: DateTime.now());
}
}
}

if (haveInvalidated) {
Expand All @@ -458,7 +462,8 @@ class StreamingSyncImplementation implements StreamingSync {
return true;
}

Stream<Object?> streamingSyncRequest(StreamingSyncRequest data) async* {
Stream<StreamingSyncLine> streamingSyncRequest(
StreamingSyncRequest data) async* {
final credentials = await credentialsCallback();
if (credentials == null) {
throw CredentialsException('Not logged in');
Expand Down Expand Up @@ -494,12 +499,10 @@ class StreamingSyncImplementation implements StreamingSync {
}

// Note: The response stream is automatically closed when this loop errors
await for (var line in ndjson(res.stream)) {
if (aborted) {
break;
}
yield parseStreamingSyncLine(line as Map<String, dynamic>);
}
yield* ndjson(res.stream)
.cast<Map<String, dynamic>>()
.transform(StreamingSyncLine.reader)
.takeWhile((_) => !aborted);
}

/// Delays the standard `retryDelay` Duration, but exits early if
Expand Down
Loading

0 comments on commit 1f9268f

Please sign in to comment.