Skip to content

Commit cf06f4a

Browse files
committed
Start integrating Rust extension
1 parent 6547d44 commit cf06f4a

File tree

6 files changed

+286
-3
lines changed

6 files changed

+286
-3
lines changed

packages/powersync_core/lib/src/sync/bucket_storage.dart

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,22 @@ UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
365365
});
366366
}
367367

368+
Future<String> control(String op, [Object? payload]) async {
369+
return await writeTransaction(
370+
(tx) async {
371+
final [row] =
372+
await tx.execute('SELECT powersync_control(?, ?)', [op, payload]);
373+
return row.columnAt(0) as String;
374+
},
375+
// We flush when powersync_control yields an instruction to do so.
376+
flush: false,
377+
);
378+
}
379+
380+
Future<void> flushFileSystem() async {
381+
// Noop outside of web.
382+
}
383+
368384
/// Note: The asynchronous nature of this is due to this needing a global
369385
/// lock. The actual database operations are still synchronous, and it
370386
/// is assumed that multiple functions on this instance won't be called
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import 'sync_status.dart';
2+
3+
/// An internal instruction emitted by the sync client in the core extension in
4+
/// response to the Dart SDK passing sync data into the extension.
5+
sealed class Instruction {
6+
factory Instruction.fromJson(Map<String, Object?> json) {
7+
return switch (json) {
8+
{'LogLine': final logLine} =>
9+
LogLine.fromJson(logLine as Map<String, Object?>),
10+
{'UpdateSyncStatus': final updateStatus} =>
11+
UpdateSyncStatus.fromJson(updateStatus as Map<String, Object?>),
12+
{'EstablishSyncStream': final establish} =>
13+
EstablishSyncStream.fromJson(establish as Map<String, Object?>),
14+
{'FetchCredentials': final creds} =>
15+
FetchCredentials.fromJson(creds as Map<String, Object?>),
16+
{'CloseSyncStream': _} => const CloseSyncStream(),
17+
{'FlushFileSystem': _} => const FlushFileSystem(),
18+
{'DidCompleteSync': _} => const DidCompleteSync(),
19+
_ => UnknownSyncInstruction(json)
20+
};
21+
}
22+
}
23+
24+
final class LogLine implements Instruction {
25+
final String severity;
26+
final String line;
27+
28+
LogLine({required this.severity, required this.line});
29+
30+
factory LogLine.fromJson(Map<String, Object?> json) {
31+
return LogLine(
32+
severity: json['severity'] as String,
33+
line: json['line'] as String,
34+
);
35+
}
36+
}
37+
38+
final class EstablishSyncStream implements Instruction {
39+
final Map<String, Object?> request;
40+
41+
EstablishSyncStream(this.request);
42+
43+
factory EstablishSyncStream.fromJson(Map<String, Object?> json) {
44+
return EstablishSyncStream(json['request'] as Map<String, Object?>);
45+
}
46+
}
47+
48+
final class UpdateSyncStatus implements Instruction {
49+
final CoreSyncStatus status;
50+
51+
UpdateSyncStatus({required this.status});
52+
53+
factory UpdateSyncStatus.fromJson(Map<String, Object?> json) {
54+
return UpdateSyncStatus(
55+
status:
56+
CoreSyncStatus.fromJson(json['status'] as Map<String, Object?>));
57+
}
58+
}
59+
60+
final class CoreSyncStatus {
61+
final bool connected;
62+
final bool connecting;
63+
final List<SyncPriorityStatus> priorityStatus;
64+
final DownloadProgress? downloading;
65+
66+
CoreSyncStatus({
67+
required this.connected,
68+
required this.connecting,
69+
required this.priorityStatus,
70+
required this.downloading,
71+
});
72+
73+
factory CoreSyncStatus.fromJson(Map<String, Object?> json) {
74+
return CoreSyncStatus(
75+
connected: json['connected'] as bool,
76+
connecting: json['connecting'] as bool,
77+
priorityStatus: [
78+
for (final entry in json['priority_status'] as List)
79+
_priorityStatusFromJson(entry as Map<String, Object?>)
80+
],
81+
downloading: switch (json['downloading']) {
82+
null => null,
83+
final raw as Map<String, Object?> => DownloadProgress.fromJson(raw),
84+
},
85+
);
86+
}
87+
88+
static SyncPriorityStatus _priorityStatusFromJson(Map<String, Object?> json) {
89+
return (
90+
priority: BucketPriority(json['priority'] as int),
91+
hasSynced: json['has_synced'] as bool?,
92+
lastSyncedAt: switch (json['last_synced_at']) {
93+
null => null,
94+
final lastSyncedAt as int =>
95+
DateTime.fromMillisecondsSinceEpoch(lastSyncedAt * 1000),
96+
},
97+
);
98+
}
99+
}
100+
101+
final class DownloadProgress {
102+
final Map<String, BucketProgress> progress;
103+
104+
DownloadProgress(this.progress);
105+
106+
factory DownloadProgress.fromJson(Map<String, Object?> line) {
107+
return DownloadProgress(line.map((k, v) =>
108+
MapEntry(k, _bucketProgressFromJson(v as Map<String, Object?>))));
109+
}
110+
111+
static BucketProgress _bucketProgressFromJson(Map<String, Object?> json) {
112+
return (
113+
priority: BucketPriority(json['priority'] as int),
114+
atLast: json['at_last'] as int,
115+
sinceLast: json['since_last'] as int,
116+
targetCount: json['target_count'] as int,
117+
);
118+
}
119+
}
120+
121+
final class FetchCredentials implements Instruction {
122+
final bool didExpire;
123+
124+
FetchCredentials(this.didExpire);
125+
126+
factory FetchCredentials.fromJson(Map<String, Object?> line) {
127+
return FetchCredentials(line['did_expire'] as bool);
128+
}
129+
}
130+
131+
final class CloseSyncStream implements Instruction {
132+
const CloseSyncStream();
133+
}
134+
135+
final class FlushFileSystem implements Instruction {
136+
const FlushFileSystem();
137+
}
138+
139+
final class DidCompleteSync implements Instruction {
140+
const DidCompleteSync();
141+
}
142+
143+
final class UnknownSyncInstruction implements Instruction {
144+
final Map<String, Object?> source;
145+
146+
UnknownSyncInstruction(this.source);
147+
}

packages/powersync_core/lib/src/sync/mutable_sync_status.dart

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import 'dart:async';
22

33
import 'package:collection/collection.dart';
4+
import 'package:powersync_core/src/sync/instruction.dart';
45

56
import 'sync_status.dart';
67
import 'bucket_storage.dart';
@@ -79,6 +80,20 @@ final class MutableSyncStatus {
7980
}
8081
}
8182

83+
void applyFromCore(CoreSyncStatus status) {
84+
connected = status.connected;
85+
connecting = status.connecting;
86+
downloading = status.downloading != null;
87+
priorityStatusEntries = status.priorityStatus;
88+
downloadProgress = switch (status.downloading) {
89+
null => null,
90+
final downloading => InternalSyncDownloadProgress(downloading.progress),
91+
};
92+
lastSyncedAt = status.priorityStatus
93+
.firstWhereOrNull((s) => s.priority == BucketPriority.fullSyncPriority)
94+
?.lastSyncedAt;
95+
}
96+
8297
SyncStatus immutableSnapshot() {
8398
return SyncStatus(
8499
connected: connected,

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import 'dart:async';
22
import 'dart:convert' as convert;
3+
import 'dart:typed_data';
34

45
import 'package:http/http.dart' as http;
56
import 'package:logging/logging.dart';
7+
import 'package:logging/logging.dart';
68
import 'package:meta/meta.dart';
79
import 'package:powersync_core/src/abort_controller.dart';
810
import 'package:powersync_core/src/exceptions.dart';
@@ -13,7 +15,7 @@ import 'package:sqlite_async/mutex.dart';
1315

1416
import 'bucket_storage.dart';
1517
import '../crud.dart';
16-
18+
import 'instruction.dart';
1719
import 'internal_connector.dart';
1820
import 'mutable_sync_status.dart';
1921
import 'stream_utils.dart';
@@ -140,7 +142,12 @@ class StreamingSyncImplementation implements StreamingSync {
140142
}
141143
// Protect sync iterations with exclusivity (if a valid Mutex is provided)
142144
await syncMutex.lock(() {
143-
return _streamingSyncIteration();
145+
switch (options.source.syncImplementation) {
146+
case SyncClientImplementation.dart:
147+
return _dartStreamingSyncIteration();
148+
case SyncClientImplementation.rust:
149+
return _rustStreamingSyncIteration();
150+
}
144151
}, timeout: _retryDelay);
145152
} catch (e, stacktrace) {
146153
if (aborted && e is http.ClientException) {
@@ -241,6 +248,7 @@ class StreamingSyncImplementation implements StreamingSync {
241248
}
242249

243250
assert(identical(_activeCrudUpload, completer));
251+
_nonLineSyncEvents.add(const UploadCompleted());
244252
_activeCrudUpload = null;
245253
completer.complete();
246254
});
@@ -284,6 +292,10 @@ class StreamingSyncImplementation implements StreamingSync {
284292
});
285293
}
286294

295+
Future<void> _rustStreamingSyncIteration() async {
296+
await _ActiveRustStreamingIteration(this).syncIteration();
297+
}
298+
287299
Future<(List<BucketRequest>, Map<String, BucketDescription?>)>
288300
_collectLocalBucketState() async {
289301
final bucketEntries = await adapter.getBucketStates();
@@ -298,7 +310,7 @@ class StreamingSyncImplementation implements StreamingSync {
298310
return (initialRequests, localDescriptions);
299311
}
300312

301-
Future<void> _streamingSyncIteration() async {
313+
Future<void> _dartStreamingSyncIteration() async {
302314
var (bucketRequests, bucketMap) = await _collectLocalBucketState();
303315
if (aborted) {
304316
return;
@@ -571,6 +583,91 @@ typedef BucketDescription = ({
571583
int priority,
572584
});
573585

586+
final class _ActiveRustStreamingIteration {
587+
final StreamingSyncImplementation sync;
588+
589+
StreamSubscription<void>? _completedUploads;
590+
final Completer<void> _completedStream = Completer();
591+
592+
_ActiveRustStreamingIteration(this.sync);
593+
594+
Future<void> syncIteration() async {
595+
try {
596+
await _control('start', convert.json.encode(sync.options.params));
597+
assert(_completedStream.isCompleted, 'Should have started streaming');
598+
await _completedStream.future;
599+
} finally {
600+
_completedUploads?.cancel();
601+
await _stop();
602+
}
603+
}
604+
605+
Stream<ReceivedLine> _receiveLines(Object? data) {
606+
return sync._rawStreamingSyncRequest(data).map(ReceivedLine.new);
607+
}
608+
609+
Future<void> _handleLines(EstablishSyncStream request) async {
610+
final events = addBroadcast(
611+
_receiveLines(request.request), sync._nonLineSyncEvents.stream);
612+
613+
listen:
614+
await for (final event in events) {
615+
switch (event) {
616+
case ReceivedLine(line: final Uint8List line):
617+
await _control('line_binary', line);
618+
case ReceivedLine(line: final line as String):
619+
await _control('line_text', line);
620+
case UploadCompleted():
621+
await _control('completed_upload');
622+
case TokenRefreshComplete():
623+
await _control('refreshed_token');
624+
case AbortRequested():
625+
break listen;
626+
}
627+
}
628+
}
629+
630+
Future<void> _stop() => _control('stop');
631+
632+
Future<void> _control(String operation, [Object? payload]) async {
633+
final rawResponse = await sync.adapter.control(operation, payload);
634+
final instructions = convert.json.decode(rawResponse) as List;
635+
636+
for (final instruction in instructions) {
637+
await _handleInstruction(
638+
Instruction.fromJson(instruction as Map<String, Object?>));
639+
}
640+
}
641+
642+
Future<void> _handleInstruction(Instruction instruction) async {
643+
switch (instruction) {
644+
case LogLine(:final severity, :final line):
645+
sync.logger.log(
646+
switch (severity) {
647+
'DEBUG' => Level.FINE,
648+
'INFO' => Level.INFO,
649+
_ => Level.WARNING,
650+
},
651+
line);
652+
case EstablishSyncStream():
653+
_completedStream.complete(_handleLines(instruction));
654+
case UpdateSyncStatus(:final status):
655+
sync._state.updateStatus((m) => m.applyFromCore(status));
656+
case FetchCredentials():
657+
// TODO: Handle this case.
658+
throw UnimplementedError();
659+
case CloseSyncStream():
660+
sync._nonLineSyncEvents.add(AbortRequested());
661+
case FlushFileSystem():
662+
await sync.adapter.flushFileSystem();
663+
case DidCompleteSync():
664+
sync._state.updateStatus((m) => m.downloadError = null);
665+
case UnknownSyncInstruction(:final source):
666+
sync.logger.warning('Unknown instruction: $source');
667+
}
668+
}
669+
}
670+
574671
sealed class SyncEvent {}
575672

576673
final class ReceivedLine implements SyncEvent {

packages/powersync_core/lib/src/sync/sync_status.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ extension type const BucketPriority._(int priorityNumber) {
191191
/// A [Comparator] instance suitable for comparing [BucketPriority] values.
192192
static int comparator(BucketPriority a, BucketPriority b) =>
193193
-a.priorityNumber.compareTo(b.priorityNumber);
194+
195+
/// The priority used by PowerSync to indicate that a full sync was completed.
196+
static const fullSyncPriority = BucketPriority._(2147483647);
194197
}
195198

196199
/// Partial information about the synchronization status for buckets within a

packages/powersync_core/lib/src/web/web_bucket_storage.dart

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,9 @@ class WebBucketStorage extends BucketStorage {
1717
return _webDb.writeTransaction(callback,
1818
lockTimeout: lockTimeout, flush: flush);
1919
}
20+
21+
@override
22+
Future<void> flushFileSystem() {
23+
return _webDb.flush();
24+
}
2025
}

0 commit comments

Comments
 (0)