Skip to content

Commit c40ff09

Browse files
Recover from unexpected primary lease loss
1 parent 7b318b4 commit c40ff09

File tree

8 files changed

+339
-52
lines changed

8 files changed

+339
-52
lines changed

packages/firestore/src/core/sync_engine.ts

Lines changed: 57 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import { RemoteEvent, TargetChange } from '../remote/remote_event';
3333
import { RemoteStore } from '../remote/remote_store';
3434
import { RemoteSyncer } from '../remote/remote_syncer';
3535
import { assert, fail } from '../util/assert';
36-
import { FirestoreError } from '../util/error';
36+
import { Code, FirestoreError } from '../util/error';
3737
import * as log from '../util/log';
3838
import { AnyJs, primitiveComparator } from '../util/misc';
3939
import { ObjectMap } from '../util/obj_map';
@@ -66,6 +66,7 @@ import {
6666
} from '../local/shared_client_state_syncer';
6767
import { ClientId, SharedClientState } from '../local/shared_client_state';
6868
import { SortedSet } from '../util/sorted_set';
69+
import * as objUtils from '../util/obj';
6970

7071
const LOG_TAG = 'SyncEngine';
7172

@@ -277,11 +278,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
277278
if (!targetRemainsActive) {
278279
this.remoteStore.unlisten(queryView.targetId);
279280
await this.removeAndCleanupQuery(queryView);
280-
await this.localStore.releaseQuery(
281-
query,
282-
/*keepPersistedQueryData=*/ false
283-
);
284-
await this.localStore.collectGarbage();
281+
return this.localStore
282+
.releaseQuery(query, /*keepPersistedQueryData=*/ false)
283+
.then(() => this.localStore.collectGarbage())
284+
.catch(err => this.tryRecoverClient(err));
285285
}
286286
} else {
287287
await this.removeAndCleanupQuery(queryView);
@@ -382,9 +382,12 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
382382
applyRemoteEvent(remoteEvent: RemoteEvent): Promise<void> {
383383
this.assertSubscribed('applyRemoteEvent()');
384384

385-
return this.localStore.applyRemoteEvent(remoteEvent).then(changes => {
386-
return this.emitNewSnapsAndNotifyLocalStore(changes, remoteEvent);
387-
});
385+
return this.localStore
386+
.applyRemoteEvent(remoteEvent)
387+
.then(changes => {
388+
return this.emitNewSnapsAndNotifyLocalStore(changes, remoteEvent);
389+
})
390+
.catch(err => this.tryRecoverClient(err));
388391
}
389392

390393
/**
@@ -490,6 +493,11 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
490493
// connection is disabled.
491494
await this.remoteStore.fillWritePipeline();
492495
} else if (batchState === 'acknowledged' || batchState === 'rejected') {
496+
// If we receive a notification of an `acknowledged` or `rejected` batch
497+
// via Web Storage, we are either already secondary or another tab has
498+
// taken the primary lease.
499+
await this.applyPrimaryState(false);
500+
493501
// NOTE: Both these methods are no-ops for batches that originated from
494502
// other clients.
495503
this.sharedClientState.removeLocalPendingMutation(batchId);
@@ -521,7 +529,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
521529
.then(changes => {
522530
this.sharedClientState.removeLocalPendingMutation(batchId);
523531
return this.emitNewSnapsAndNotifyLocalStore(changes);
524-
});
532+
})
533+
.catch(err => this.tryRecoverClient(err));
525534
}
526535

527536
rejectFailedWrite(batchId: BatchId, error: FirestoreError): Promise<void> {
@@ -533,11 +542,14 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
533542
// listen events.
534543
this.processUserCallback(batchId, error);
535544

536-
return this.localStore.rejectBatch(batchId).then(changes => {
537-
this.sharedClientState.trackMutationResult(batchId, 'rejected', error);
538-
this.sharedClientState.removeLocalPendingMutation(batchId);
539-
return this.emitNewSnapsAndNotifyLocalStore(changes);
540-
});
545+
return this.localStore
546+
.rejectBatch(batchId)
547+
.then(changes => {
548+
this.sharedClientState.trackMutationResult(batchId, 'rejected', error);
549+
this.sharedClientState.removeLocalPendingMutation(batchId);
550+
return this.emitNewSnapsAndNotifyLocalStore(changes);
551+
})
552+
.catch(err => this.tryRecoverClient(err));
541553
}
542554

543555
private addMutationCallback(
@@ -712,7 +724,27 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
712724
await this.localStore.notifyLocalViewChanges(docChangesInAllViews);
713725
// TODO(multitab): Multitab garbage collection
714726
if (this.isPrimary) {
715-
await this.localStore.collectGarbage();
727+
await this.localStore
728+
.collectGarbage()
729+
.catch(err => this.tryRecoverClient(err));
730+
}
731+
}
732+
733+
/**
734+
* Marks the client as secondary if an IndexedDb operation fails because the
735+
* primary lease has been taken by another client. This can happen when the
736+
* client is temporarily CPU throttled and fails to renew its lease in time,
737+
* in which we treat the current client as secondary. We can always revert
738+
* back to primary status via the lease refresh in our persistence layer.
739+
*
740+
* @param err An error returned by an IndexedDb operation.
741+
* @return A Promise that resolves after we recovered, or the original error.
742+
*/
743+
private async tryRecoverClient(err: FirestoreError): Promise<void> {
744+
if (err.code === Code.FAILED_PRECONDITION) {
745+
return this.applyPrimaryState(false);
746+
} else {
747+
throw err;
716748
}
717749
}
718750

@@ -784,9 +816,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
784816
}
785817
} else if (isPrimary === false && this.isPrimary !== false) {
786818
this.isPrimary = false;
787-
788-
// TODO: Unlisten from all active targets
789819
await this.remoteStore.disableNetwork();
820+
objUtils.forEachNumber(this.queryViewsByTarget, targetId => {
821+
this.remoteStore.unlisten(targetId);
822+
});
790823
}
791824
}
792825

@@ -801,6 +834,12 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
801834
state: QueryTargetState,
802835
error?: FirestoreError
803836
): Promise<void> {
837+
if (this.isPrimary) {
838+
// If we receive a target state notification via Web Storage, we are
839+
// either already secondary or another tab has taken the primary lease.
840+
await this.applyPrimaryState(false);
841+
}
842+
804843
if (this.queryViewsByTarget[targetId]) {
805844
switch (state) {
806845
case 'current':

packages/firestore/src/local/indexeddb_persistence.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,10 @@ export class IndexedDbPersistence implements Persistence {
203203
.put(new DbClientMetadata(this.clientId, Date.now(), this.inForeground))
204204
.next(() => this.canActAsPrimary(txn))
205205
.next(canActAsPrimary => {
206-
if (canActAsPrimary !== this.isPrimary) {
207-
this.isPrimary = canActAsPrimary;
208-
this.queue.enqueue(() => this.primaryStateListener(this.isPrimary));
209-
}
206+
this.isPrimary = canActAsPrimary;
207+
// Always call the primary state listener, since SyncEngine may have
208+
// changed the primary state to 'false'.
209+
this.queue.enqueue(() => this.primaryStateListener(this.isPrimary));
210210

211211
if (this.isPrimary) {
212212
return this.acquireOrExtendPrimaryLease(txn);

packages/firestore/src/local/local_store.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ export class LocalStore {
669669
* found - used for testing.
670670
*/
671671
readDocument(key: DocumentKey): Promise<MaybeDocument | null> {
672-
return this.persistence.runTransaction('read document', true, txn => {
672+
return this.persistence.runTransaction('read document', false, txn => {
673673
return this.localDocuments.getDocument(txn, key);
674674
});
675675
}

packages/firestore/src/remote/remote_store.ts

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -558,12 +558,23 @@ export class RemoteStore implements TargetMetadataProvider {
558558
// Record the stream token.
559559
return this.localStore
560560
.setLastStreamToken(this.writeStream.lastStreamToken)
561-
.then(() => {
562-
// Send the write pipeline now that the stream is established.
563-
for (const batch of this.writePipeline) {
564-
this.writeStream.writeMutations(batch.mutations);
561+
.then(
562+
() => {
563+
// Send the write pipeline now that the stream is established.
564+
for (const batch of this.writePipeline) {
565+
this.writeStream.writeMutations(batch.mutations);
566+
}
567+
},
568+
err => {
569+
if (err.code === Code.FAILED_PRECONDITION) {
570+
// We have temporarily lost our primary lease. If we recover,
571+
// Sync Engine will re-enable the network.
572+
return this.disableNetwork();
573+
} else {
574+
return Promise.reject(err);
575+
}
565576
}
566-
});
577+
);
567578
}
568579

569580
private onMutationResult(
@@ -633,7 +644,17 @@ export class RemoteStore implements TargetMetadataProvider {
633644
);
634645
this.writeStream.lastStreamToken = emptyByteString();
635646

636-
return this.localStore.setLastStreamToken(emptyByteString());
647+
return this.localStore
648+
.setLastStreamToken(emptyByteString())
649+
.catch(err => {
650+
if (err.code === Code.FAILED_PRECONDITION) {
651+
// We have temporarily lost our primary lease. If we recover,
652+
// Sync Engine will re-enable the network.
653+
return this.disableNetwork();
654+
} else {
655+
return Promise.reject(err);
656+
}
657+
});
637658
} else {
638659
// Some other error, don't reset stream token. Our stream logic will
639660
// just retry with exponential backoff.

packages/firestore/test/unit/specs/listen_spec.test.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -785,4 +785,62 @@ describeSpec('Listens:', [], () => {
785785
.client(2)
786786
.expectEvents(query, { added: [docB] });
787787
});
788+
789+
specTest('Query recovers after primary takeover', ['multi-client'], () => {
790+
const query = Query.atPath(path('collection'));
791+
const docA = doc('collection/a', 1000, { key: 'a' });
792+
const docB = doc('collection/b', 2000, { key: 'b' });
793+
794+
return (
795+
client(0)
796+
.expectPrimaryState(true)
797+
.userListens(query)
798+
.watchAcksFull(query, 1000, docA)
799+
.expectEvents(query, { added: [docA] })
800+
.client(1)
801+
.userListens(query)
802+
.expectEvents(query, { added: [docA] })
803+
.stealPrimaryLease()
804+
.expectListen(query, 'resume-token-1000')
805+
.watchAcksFull(query, 2000, docB)
806+
.expectEvents(query, { added: [docB] })
807+
.client(0)
808+
.expectPrimaryState(false)
809+
// TODO(multitab): Suppres the additional `fromCache`, which is raised
810+
// locally because we disable the network when we lose the primary
811+
// lease.
812+
.expectEvents(query, { fromCache: true })
813+
.expectEvents(query, { added: [docB] })
814+
);
815+
});
816+
817+
specTest(
818+
'Unresponsive primary ignores watch update',
819+
['multi-client'],
820+
() => {
821+
const query = Query.atPath(path('collection'));
822+
const docA = doc('collection/a', 1000, { key: 'a' });
823+
824+
return (
825+
client(0)
826+
.expectPrimaryState(true)
827+
.client(1)
828+
.userListens(query)
829+
.expectEvents(query, { fromCache: true })
830+
.client(0)
831+
.expectListen(query)
832+
.client(1)
833+
.stealPrimaryLease()
834+
.client(0)
835+
// Send a watch update to client 0, who is longer primary (but doesn't
836+
// it yet). The watch update gets ignored.
837+
.watchAcksFull(query, 1000, docA)
838+
.expectPrimaryState(false)
839+
.client(1)
840+
.expectListen(query)
841+
.watchAcksFull(query, 1000, docA)
842+
.expectEvents(query, { added: [docA] })
843+
);
844+
}
845+
);
788846
});

packages/firestore/test/unit/specs/spec_builder.ts

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ import {
2424
} from '../../../src/model/document';
2525
import { DocumentKey } from '../../../src/model/document_key';
2626
import { JsonObject } from '../../../src/model/field_value';
27-
import { mapRpcCodeFromCode } from '../../../src/remote/rpc_error';
27+
import {
28+
isPermanentError,
29+
mapCodeFromRpcCode,
30+
mapRpcCodeFromCode
31+
} from '../../../src/remote/rpc_error';
2832
import { assert } from '../../../src/util/assert';
2933
import { fail } from '../../../src/util/assert';
3034
import { Code } from '../../../src/util/error';
@@ -457,12 +461,16 @@ export class SpecBuilder {
457461
writeAcks(
458462
doc: string,
459463
version: TestSnapshotVersion,
460-
options?: { expectUserCallback: boolean }
464+
options?: { expectUserCallback?: boolean; keepInQueue?: boolean }
461465
): this {
462466
this.nextStep();
463-
this.currentStep = { writeAck: { version } };
467+
options = options || {};
468+
469+
this.currentStep = {
470+
writeAck: { version, keepInQueue: !!options.keepInQueue }
471+
};
464472

465-
if (!options || options.expectUserCallback) {
473+
if (options.expectUserCallback) {
466474
return this.expectUserCallbacks({ acknowledged: [doc] });
467475
} else {
468476
return this;
@@ -476,13 +484,23 @@ export class SpecBuilder {
476484
*/
477485
failWrite(
478486
doc: string,
479-
err: RpcError,
480-
options?: { expectUserCallback: boolean }
487+
error: RpcError,
488+
options?: { expectUserCallback?: boolean; keepInQueue?: boolean }
481489
): this {
482490
this.nextStep();
483-
this.currentStep = { failWrite: { error: err } };
491+
options = options || {};
484492

485-
if (!options || options.expectUserCallback) {
493+
// If this is a permanent error, the write is not expected to be sent
494+
// again.
495+
const isPermanentFailure = isPermanentError(mapCodeFromRpcCode(error.code));
496+
const keepInQueue =
497+
options.keepInQueue !== undefined
498+
? options.keepInQueue
499+
: !isPermanentFailure;
500+
501+
this.currentStep = { failWrite: { error, keepInQueue } };
502+
503+
if (options.expectUserCallback) {
486504
return this.expectUserCallbacks({ rejected: [doc] });
487505
} else {
488506
return this;
@@ -899,6 +917,23 @@ export class MultiClientSpecBuilder extends SpecBuilder {
899917
return this;
900918
}
901919

920+
/**
921+
* Take the primary lease, even if another client has already obtained the
922+
* lease.
923+
*/
924+
stealPrimaryLease(): this {
925+
this.nextStep();
926+
this.currentStep = {
927+
applyClientState: {
928+
primary: true
929+
},
930+
stateExpect: {
931+
isPrimary: true
932+
}
933+
};
934+
return this;
935+
}
936+
902937
protected nextStep(): void {
903938
if (this.currentStep !== null) {
904939
this.currentStep.clientIndex = this.activeClientIndex;

0 commit comments

Comments
 (0)