@@ -4,20 +4,21 @@ import Logger, { ILogger } from 'js-logger';
44
55import {
66 BucketRequest ,
7+ StreamingSyncLine ,
8+ StreamingSyncRequest ,
79 isStreamingKeepalive ,
810 isStreamingSyncCheckpoint ,
911 isStreamingSyncCheckpointComplete ,
1012 isStreamingSyncCheckpointDiff ,
11- isStreamingSyncData ,
12- StreamingSyncLine ,
13- StreamingSyncRequest
13+ isStreamingSyncData
1414} from './streaming-sync-types' ;
1515import { AbstractRemote } from './AbstractRemote' ;
1616import ndjsonStream from 'can-ndjson-stream' ;
1717import { BucketChecksum , BucketStorageAdapter , Checkpoint } from '../bucket/BucketStorageAdapter' ;
1818import { SyncStatus , SyncStatusOptions } from '../../../db/crud/SyncStatus' ;
1919import { SyncDataBucket } from '../bucket/SyncDataBucket' ;
2020import { BaseObserver , BaseListener , Disposable } from '../../../utils/BaseObserver' ;
21+ import { AbortOperation } from '../../../utils/AbortOperation' ;
2122
2223export enum LockType {
2324 CRUD = 'crud' ,
@@ -47,6 +48,14 @@ export interface AbstractStreamingSyncImplementationOptions {
4748}
4849
4950export interface StreamingSyncImplementationListener extends BaseListener {
51+ /**
52+ * Triggered whenever a status update has been attempted to be made or
53+ * refreshed.
54+ */
55+ statusUpdated ?: ( ( statusUpdate : SyncStatusOptions ) => void ) | undefined ;
56+ /**
57+ * Triggers whenever the status' members have changed in value
58+ */
5059 statusChanged ?: ( ( status : SyncStatus ) => void ) | undefined ;
5160}
5261
@@ -86,6 +95,7 @@ export abstract class AbstractStreamingSyncImplementation
8695 protected options : AbstractStreamingSyncImplementationOptions ;
8796 protected abortController : AbortController | null ;
8897 protected crudUpdateListener ?: ( ) => void ;
98+ protected streamingSyncPromise ?: Promise < void > ;
8999
90100 syncStatus : SyncStatus ;
91101 triggerCrudUpload : ( ) => void ;
@@ -224,16 +234,53 @@ export abstract class AbstractStreamingSyncImplementation
224234 if ( this . abortController ) {
225235 await this . disconnect ( ) ;
226236 }
237+
227238 this . abortController = new AbortController ( ) ;
228- this . streamingSync ( this . abortController . signal ) ;
229- return this . waitForStatus ( { connected : true } ) ;
239+ this . streamingSyncPromise = this . streamingSync ( this . abortController . signal ) ;
240+
241+ // Return a promise that resolves when the connection status is updated
242+ return new Promise < void > ( ( resolve ) => {
243+ const l = this . registerListener ( {
244+ statusUpdated : ( update ) => {
245+ // This is triggered as soon as a connection is read from
246+ if ( typeof update . connected == 'undefined' ) {
247+ // only concern with connection updates
248+ return ;
249+ }
250+
251+ if ( update . connected == false ) {
252+ /**
253+ * This function does not reject if initial connect attempt failed
254+ */
255+ this . logger . warn ( 'Initial connect attempt did not successfully connect to server' ) ;
256+ }
257+
258+ resolve ( ) ;
259+ l ( ) ;
260+ }
261+ } ) ;
262+ } ) ;
230263 }
231264
232265 async disconnect ( ) : Promise < void > {
233266 if ( ! this . abortController ) {
234- throw new Error ( 'Disconnect not possible' ) ;
267+ return ;
268+ }
269+
270+ // This might be called multiple times
271+ if ( ! this . abortController . signal . aborted ) {
272+ this . abortController . abort ( new AbortOperation ( 'Disconnect has been requested' ) ) ;
235273 }
236- this . abortController . abort ( 'Disconnected' ) ;
274+
275+ // Await any pending operations before completing the disconnect operation
276+ try {
277+ await this . streamingSyncPromise ;
278+ } catch ( ex ) {
279+ // The operation might have failed, all we care about is if it has completed
280+ this . logger . warn ( ex ) ;
281+ }
282+ this . streamingSyncPromise = undefined ;
283+
237284 this . abortController = null ;
238285 this . updateSyncStatus ( { connected : false } ) ;
239286 }
@@ -261,7 +308,11 @@ export abstract class AbstractStreamingSyncImplementation
261308 let nestedAbortController = new AbortController ( ) ;
262309
263310 signal . addEventListener ( 'abort' , ( ) => {
264- nestedAbortController . abort ( ) ;
311+ /**
312+ * A request for disconnect was received upstream. Relay the request
313+ * to the nested abort controller.
314+ */
315+ nestedAbortController . abort ( signal ?. reason ?? new AbortOperation ( 'Received command to disconnect from upstream' ) ) ;
265316 this . crudUpdateListener ?.( ) ;
266317 this . crudUpdateListener = undefined ;
267318 this . updateSyncStatus ( {
@@ -272,34 +323,58 @@ export abstract class AbstractStreamingSyncImplementation
272323 } ) ;
273324 } ) ;
274325
326+ /**
327+ * This loops runs until [retry] is false or the abort signal is set to aborted.
328+ * Aborting the nestedAbortController will:
329+ * - Abort any pending fetch requests
330+ * - Close any sync stream ReadableStreams (which will also close any established network requests)
331+ */
275332 while ( true ) {
276333 try {
277334 if ( signal ?. aborted ) {
278335 break ;
279336 }
280337 const { retry } = await this . streamingSyncIteration ( nestedAbortController . signal ) ;
281338 if ( ! retry ) {
339+ /**
340+ * A sync error ocurred that we cannot recover from here.
341+ * This loop must terminate.
342+ * The nestedAbortController will close any open network requests and streams below.
343+ */
282344 break ;
283345 }
284346 // Continue immediately
285347 } catch ( ex ) {
286- this . logger . error ( ex ) ;
348+ /**
349+ * Either:
350+ * - A network request failed with a failed connection or not OKAY response code.
351+ * - There was a sync processing error.
352+ * This loop will retry.
353+ * The nested abort controller will cleanup any open network requests and streams.
354+ * The WebRemote should only abort pending fetch requests or close active Readable streams.
355+ */
356+ if ( ex instanceof AbortOperation ) {
357+ this . logger . warn ( ex ) ;
358+ } else {
359+ this . logger . error ( ex ) ;
360+ }
361+ await this . delayRetry ( ) ;
362+ } finally {
363+ if ( ! signal . aborted ) {
364+ nestedAbortController . abort ( new AbortOperation ( 'Closing sync stream network requests before retry.' ) ) ;
365+ nestedAbortController = new AbortController ( ) ;
366+ }
367+
287368 this . updateSyncStatus ( {
288369 connected : false
289370 } ) ;
371+
290372 // On error, wait a little before retrying
291- await this . delayRetry ( ) ;
292- } finally {
293- // Abort any open network requests. Create a new nested controller for retry.
294- nestedAbortController . abort ( ) ;
295- nestedAbortController = new AbortController ( ) ;
296373 }
297374 }
298375
299376 // Mark as disconnected if here
300- if ( this . abortController ) {
301- await this . disconnect ( ) ;
302- }
377+ this . updateSyncStatus ( { connected : false } ) ;
303378 }
304379
305380 protected async streamingSyncIteration ( signal : AbortSignal , progress ?: ( ) => void ) : Promise < { retry ?: boolean } > {
@@ -336,15 +411,6 @@ export abstract class AbstractStreamingSyncImplementation
336411 } ,
337412 signal
338413 ) ) {
339- // A connection is active and messages are being received
340- if ( ! this . syncStatus . connected ) {
341- // There is a connection now
342- Promise . resolve ( ) . then ( ( ) => this . triggerCrudUpload ( ) ) ;
343- this . updateSyncStatus ( {
344- connected : true
345- } ) ;
346- }
347-
348414 if ( isStreamingSyncCheckpoint ( line ) ) {
349415 targetCheckpoint = line . checkpoint ;
350416 const bucketsToDelete = new Set < string > ( bucketSet ) ;
@@ -476,6 +542,14 @@ export abstract class AbstractStreamingSyncImplementation
476542 signal ?: AbortSignal
477543 ) : AsyncGenerator < StreamingSyncLine > {
478544 const body = await this . options . remote . postStreaming ( '/sync/stream' , req , { } , signal ) ;
545+
546+ // A connection is active
547+ // There is a connection now
548+ Promise . resolve ( ) . then ( ( ) => this . triggerCrudUpload ( ) ) ;
549+ this . updateSyncStatus ( {
550+ connected : true
551+ } ) ;
552+
479553 const stream = ndjsonStream ( body ) ;
480554 const reader = stream . getReader ( ) ;
481555
@@ -505,8 +579,12 @@ export abstract class AbstractStreamingSyncImplementation
505579
506580 if ( ! this . syncStatus . isEqual ( updatedStatus ) ) {
507581 this . syncStatus = updatedStatus ;
582+ // Only trigger this is there was a change
508583 this . iterateListeners ( ( cb ) => cb . statusChanged ?.( updatedStatus ) ) ;
509584 }
585+
586+ // trigger this for all updates
587+ this . iterateListeners ( ( cb ) => cb . statusUpdated ?.( options ) ) ;
510588 }
511589
512590 private async delayRetry ( ) {
0 commit comments