@@ -349,10 +349,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
349
349
. then ( result => {
350
350
this . sharedClientState . addPendingMutation ( result . batchId ) ;
351
351
this . addMutationCallback ( result . batchId , userCallback ) ;
352
- return this . emitNewSnapsAndNotifyLocalStore (
353
- result . changes ,
354
- SnapshotVersion . MIN
355
- ) ;
352
+ return this . emitNewSnapsAndNotifyLocalStore ( result . changes ) ;
356
353
} )
357
354
. then ( ( ) => {
358
355
return this . remoteStore . fillWritePipeline ( ) ;
@@ -462,11 +459,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
462
459
}
463
460
}
464
461
) ;
465
- return this . emitNewSnapsAndNotifyLocalStore (
466
- changes ,
467
- remoteEvent . snapshotVersion ,
468
- remoteEvent
469
- ) ;
462
+ return this . emitNewSnapsAndNotifyLocalStore ( changes , remoteEvent ) ;
470
463
} )
471
464
. catch ( err => this . ignoreIfPrimaryLeaseLoss ( err ) ) ;
472
465
}
@@ -583,22 +576,30 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
583
576
return ;
584
577
}
585
578
579
+ let remoteEvent ;
580
+
586
581
if ( batchState === 'pending' ) {
587
582
// If we are the primary client, we need to send this write to the
588
583
// backend. Secondary clients will ignore these writes since their remote
589
584
// connection is disabled.
590
585
await this . remoteStore . fillWritePipeline ( ) ;
591
- } else if ( batchState === 'acknowledged' || batchState === 'rejected' ) {
586
+ } else if ( batchState === 'acknowledged' ) {
587
+ remoteEvent = RemoteEvent . createSynthesizedRemoteEventForSuccessfulWrite (
588
+ snapshotVersion
589
+ ) ;
592
590
// NOTE: Both these methods are no-ops for batches that originated from
593
591
// other clients.
594
- this . processUserCallback ( batchId , error ? error : null ) ;
595
-
592
+ this . processUserCallback ( batchId , null ) ;
593
+ this . localStore . removeCachedMutationBatchMetadata ( batchId ) ;
594
+ } else if ( batchState === 'rejected' ) {
595
+ assert ( error !== null , 'Error not set for rejected mutation' ) ;
596
+ this . processUserCallback ( batchId , error ! ) ;
596
597
this . localStore . removeCachedMutationBatchMetadata ( batchId ) ;
597
598
} else {
598
599
fail ( `Unknown batchState: ${ batchState } ` ) ;
599
600
}
600
601
601
- await this . emitNewSnapsAndNotifyLocalStore ( documents , snapshotVersion ) ;
602
+ await this . emitNewSnapsAndNotifyLocalStore ( documents , remoteEvent ) ;
602
603
}
603
604
604
605
applySuccessfulWrite (
@@ -617,9 +618,12 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
617
618
return this . localStore
618
619
. acknowledgeBatch ( mutationBatchResult )
619
620
. then ( changes => {
621
+ const synthesizedRemoteEvent = RemoteEvent . createSynthesizedRemoteEventForSuccessfulWrite (
622
+ mutationBatchResult . commitVersion
623
+ ) ;
620
624
return this . emitNewSnapsAndNotifyLocalStore (
621
625
changes ,
622
- mutationBatchResult . commitVersion
626
+ synthesizedRemoteEvent
623
627
) ;
624
628
} )
625
629
. catch ( err => this . ignoreIfPrimaryLeaseLoss ( err ) ) ;
@@ -643,10 +647,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
643
647
'rejected' ,
644
648
error
645
649
) ;
646
- return this . emitNewSnapsAndNotifyLocalStore (
647
- changes ,
648
- SnapshotVersion . MIN
649
- ) ;
650
+ return this . emitNewSnapsAndNotifyLocalStore ( changes ) ;
650
651
} )
651
652
. catch ( err => this . ignoreIfPrimaryLeaseLoss ( err ) ) ;
652
653
}
@@ -779,13 +780,16 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
779
780
780
781
private async emitNewSnapsAndNotifyLocalStore (
781
782
changes : MaybeDocumentMap ,
782
- snapshotVersion : SnapshotVersion ,
783
783
remoteEvent ?: RemoteEvent
784
784
) : Promise < void > {
785
785
const newSnaps : ViewSnapshot [ ] = [ ] ;
786
786
const docChangesInAllViews : LocalViewChanges [ ] = [ ] ;
787
787
const queriesProcessed : Array < Promise < void > > = [ ] ;
788
788
789
+ const snapshotVersion = remoteEvent
790
+ ? remoteEvent . snapshotVersion
791
+ : SnapshotVersion . MIN ;
792
+
789
793
this . queryViewsByQuery . forEach ( ( _ , queryView ) => {
790
794
queriesProcessed . push (
791
795
Promise . resolve ( )
@@ -882,10 +886,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
882
886
result . removedBatchIds ,
883
887
result . addedBatchIds
884
888
) ;
885
- await this . emitNewSnapsAndNotifyLocalStore (
886
- result . affectedDocuments ,
887
- SnapshotVersion . MIN
888
- ) ;
889
+ await this . emitNewSnapsAndNotifyLocalStore ( result . affectedDocuments ) ;
889
890
}
890
891
891
892
await this . remoteStore . handleCredentialChange ( ) ;
@@ -1015,12 +1016,12 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
1015
1016
case 'not-current' : {
1016
1017
const changes = await this . localStore . getNewDocumentChanges ( ) ;
1017
1018
const synthesizedRemoteEvent = RemoteEvent . createSynthesizedRemoteEventForCurrentChange (
1019
+ snapshotVersion ,
1018
1020
targetId ,
1019
1021
state === 'current'
1020
1022
) ;
1021
1023
return this . emitNewSnapsAndNotifyLocalStore (
1022
1024
changes ,
1023
- snapshotVersion ,
1024
1025
synthesizedRemoteEvent
1025
1026
) ;
1026
1027
}
0 commit comments