diff --git a/packages/firestore/src/core/sync_engine.ts b/packages/firestore/src/core/sync_engine.ts index 3a33ce4e3be..35ae8bb9b9a 100644 --- a/packages/firestore/src/core/sync_engine.ts +++ b/packages/firestore/src/core/sync_engine.ts @@ -58,6 +58,7 @@ import { LimboDocumentChange, RemovedLimboDocument, View, + ViewChange, ViewDocumentChanges } from './view'; import { ViewSnapshot } from './view_snapshot'; @@ -68,6 +69,7 @@ import { import { ClientId, SharedClientState } from '../local/shared_client_state'; import { SortedSet } from '../util/sorted_set'; import * as objUtils from '../util/obj'; +import { isPrimaryLeaseLostError } from '../local/indexeddb_persistence'; const LOG_TAG = 'SyncEngine'; @@ -224,7 +226,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { queryData.targetId ); targetId = queryData.targetId; - viewSnapshot = await this.initializeViewAndComputeInitialSnapshot( + viewSnapshot = await this.initializeViewAndComputeSnapshot( queryData, status === 'current' ); @@ -237,7 +239,11 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { return targetId; } - private initializeViewAndComputeInitialSnapshot( + /** + * Registers a view for a previously unknown query and computes its initial + * snapshot. + */ + private initializeViewAndComputeSnapshot( queryData: QueryData, current: boolean ): Promise { @@ -281,18 +287,30 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { } /** - * Reconcile the list of synced documents in the local views with those from - * persistence. + * Reconcile the list of synced documents in an existing view with those + * from persistence. */ // PORTING NOTE: Multi-tab only. - private async synchronizeLocalView(targetId: TargetId): Promise { - return this.localStore - .remoteDocumentKeys(targetId) - .then(async remoteKeys => { - const queryView = this.queryViewsByTarget[targetId]; - assert(!!queryView, 'Expected queryView to be defined'); - queryView.view.synchronizeWithRemoteKeys(remoteKeys); - }); + private synchronizeViewAndComputeSnapshot( + queryView: QueryView + ): Promise { + return this.localStore.executeQuery(queryView.query).then(docs => { + return this.localStore + .remoteDocumentKeys(queryView.targetId) + .then(async remoteKeys => { + const viewSnapshot = queryView.view.synchronizeWithPersistedState( + docs, + remoteKeys + ); + if (this.isPrimary) { + await this.updateTrackedLimbos( + queryView.targetId, + viewSnapshot.limboChanges + ); + } + return viewSnapshot; + }); + }); } /** Stops listening to the query. */ @@ -312,12 +330,11 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { if (!targetRemainsActive) { this.remoteStore.unlisten(queryView.targetId); - await this.removeAndCleanupQuery(queryView); - await this.localStore.releaseQuery( - query, - /*keepPersistedQueryData=*/ false - ); - await this.localStore.collectGarbage(); + await this.localStore + .releaseQuery(query, /*keepPersistedQueryData=*/ false) + .then(() => this.removeAndCleanupQuery(queryView)) + .then(() => this.localStore.collectGarbage()) + .catch(err => this.ignoreIfPrimaryLeaseLoss(err)); } } else { await this.removeAndCleanupQuery(queryView); @@ -418,41 +435,46 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { applyRemoteEvent(remoteEvent: RemoteEvent): Promise { this.assertSubscribed('applyRemoteEvent()'); - // Update `receivedDocument` as appropriate for any limbo targets. - objUtils.forEach(remoteEvent.targetChanges, (targetId, targetChange) => { - const limboResolution = this.limboResolutionsByTarget[targetId]; - if (limboResolution) { - // Since this is a limbo resolution lookup, it's for a single document - // and it could be added, modified, or removed, but not a combination. - assert( - targetChange.addedDocuments.size + - targetChange.modifiedDocuments.size + - targetChange.removedDocuments.size <= - 1, - 'Limbo resolution for single document contains multiple changes.' + return this.localStore + .applyRemoteEvent(remoteEvent) + .then(changes => { + // Update `receivedDocument` as appropriate for any limbo targets. + objUtils.forEach( + remoteEvent.targetChanges, + (targetId, targetChange) => { + const limboResolution = this.limboResolutionsByTarget[targetId]; + if (limboResolution) { + // Since this is a limbo resolution lookup, it's for a single document + // and it could be added, modified, or removed, but not a combination. + assert( + targetChange.addedDocuments.size + + targetChange.modifiedDocuments.size + + targetChange.removedDocuments.size <= + 1, + 'Limbo resolution for single document contains multiple changes.' + ); + if (targetChange.addedDocuments.size > 0) { + limboResolution.receivedDocument = true; + } else if (targetChange.modifiedDocuments.size > 0) { + assert( + limboResolution.receivedDocument, + 'Received change for limbo target document without add.' + ); + } else if (targetChange.removedDocuments.size > 0) { + assert( + limboResolution.receivedDocument, + 'Received remove for limbo target document without add.' + ); + limboResolution.receivedDocument = false; + } else { + // This was probably just a CURRENT targetChange or similar. + } + } + } ); - if (targetChange.addedDocuments.size > 0) { - limboResolution.receivedDocument = true; - } else if (targetChange.modifiedDocuments.size > 0) { - assert( - limboResolution.receivedDocument, - 'Received change for limbo target document without add.' - ); - } else if (targetChange.removedDocuments.size > 0) { - assert( - limboResolution.receivedDocument, - 'Received remove for limbo target document without add.' - ); - limboResolution.receivedDocument = false; - } else { - // This was probably just a CURRENT targetChange or similar. - } - } - }); - - return this.localStore.applyRemoteEvent(remoteEvent).then(changes => { - return this.emitNewSnapsAndNotifyLocalStore(changes, remoteEvent); - }); + return this.emitNewSnapsAndNotifyLocalStore(changes, remoteEvent); + }) + .catch(err => this.ignoreIfPrimaryLeaseLoss(err)); } /** @@ -533,11 +555,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { } else { const queryView = this.queryViewsByTarget[targetId]; assert(!!queryView, 'Unknown targetId: ' + targetId); - await this.removeAndCleanupQuery(queryView); - await this.localStore.releaseQuery( - queryView.query, - /* keepPersistedQueryData */ false - ); + await this.localStore + .releaseQuery(queryView.query, /* keepPersistedQueryData */ false) + .then(() => this.removeAndCleanupQuery(queryView)) + .catch(err => this.ignoreIfPrimaryLeaseLoss(err)); this.syncEngineListener!.onWatchError(queryView.query, err); } } @@ -600,7 +621,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { .then(changes => { this.sharedClientState.removeLocalPendingMutation(batchId); return this.emitNewSnapsAndNotifyLocalStore(changes); - }); + }) + .catch(err => this.ignoreIfPrimaryLeaseLoss(err)); } rejectFailedWrite(batchId: BatchId, error: FirestoreError): Promise { @@ -612,11 +634,14 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { // listen events. this.processUserCallback(batchId, error); - return this.localStore.rejectBatch(batchId).then(changes => { - this.sharedClientState.trackMutationResult(batchId, 'rejected', error); - this.sharedClientState.removeLocalPendingMutation(batchId); - return this.emitNewSnapsAndNotifyLocalStore(changes); - }); + return this.localStore + .rejectBatch(batchId) + .then(changes => { + this.sharedClientState.trackMutationResult(batchId, 'rejected', error); + this.sharedClientState.removeLocalPendingMutation(batchId); + return this.emitNewSnapsAndNotifyLocalStore(changes); + }) + .catch(err => this.ignoreIfPrimaryLeaseLoss(err)); } private addMutationCallback( @@ -791,7 +816,27 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { await this.localStore.notifyLocalViewChanges(docChangesInAllViews); // TODO(multitab): Multitab garbage collection if (this.isPrimary) { - await this.localStore.collectGarbage(); + await this.localStore + .collectGarbage() + .catch(err => this.ignoreIfPrimaryLeaseLoss(err)); + } + } + + /** + * Verifies the error thrown by an LocalStore operation. If a LocalStore + * operation fails because the primary lease has been taken by another client, + * we ignore the error (the persistence layer will immediately call + * `applyPrimaryLease` to propagate the primary state change). All other + * errors are re-thrown. + * + * @param err An error returned by a LocalStore operation. + * @return A Promise that resolves after we recovered, or the original error. + */ + private async ignoreIfPrimaryLeaseLoss(err: FirestoreError): Promise { + if (isPrimaryLeaseLostError(err)) { + log.debug(LOG_TAG, 'Unexpectedly lost primary lease'); + } else { + throw err; } } @@ -823,7 +868,6 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { async applyPrimaryState(isPrimary: boolean): Promise { if (isPrimary === true && this.isPrimary !== true) { this.isPrimary = true; - await this.remoteStore.applyPrimaryState(true); // Secondary tabs only maintain Views for their local listeners and the @@ -832,45 +876,93 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { // server considers to be in the target). So when a secondary becomes // primary, we need to need to make sure that all views for all targets // match the state on disk. - let p = Promise.resolve(); const activeTargets = this.sharedClientState.getAllActiveQueryTargets(); - activeTargets.forEach(targetId => { - p = p.then(async () => { - let queryData; - const query = await this.localStore.getQueryForTarget(targetId); - if (this.queryViewsByTarget[targetId] === undefined) { - // For queries that never executed on this client, we need to - // allocate the query in LocalStore and initialize a new View. - queryData = await this.localStore.allocateQuery(query); - await this.initializeViewAndComputeInitialSnapshot( - queryData, - false - ); - } else { - // For queries that have a local View, we need to update their state - // in LocalStore (as the resume token and the snapshot version - // might have changed) and reconcile their views with the persisted - // state (the list of syncedDocuments may have gotten out of sync). - await this.localStore.releaseQuery(query, true); - queryData = await this.localStore.allocateQuery(query); - await this.synchronizeLocalView(targetId); - } - this.remoteStore.listen(queryData); - }); - }); - await p; + const activeQueries = await this.synchronizeQueryViewsAndRaiseSnapshots( + activeTargets.toArray() + ); + for (const queryData of activeQueries) { + this.remoteStore.listen(queryData); + } } else if (isPrimary === false && this.isPrimary !== false) { this.isPrimary = false; - - objUtils.forEachNumber(this.queryViewsByTarget, targetId => { + const activeQueries = await this.synchronizeQueryViewsAndRaiseSnapshots( + objUtils.indices(this.queryViewsByTarget) + ); + this.resetLimboDocuments(); + for (const queryData of activeQueries) { // TODO(multitab): Remove query views for non-local queries. - this.remoteStore.unlisten(targetId); - }); - + this.remoteStore.unlisten(queryData.targetId); + } await this.remoteStore.applyPrimaryState(false); } } + // PORTING NOTE: Multi-tab only. + private resetLimboDocuments(): void { + objUtils.forEachNumber(this.limboResolutionsByTarget, targetId => { + this.remoteStore.unlisten(targetId); + }); + this.limboResolutionsByTarget = []; + this.limboTargetsByKey = new SortedMap( + DocumentKey.comparator + ); + } + + /** + * Reconcile the query views of the provided query targets with the state from + * persistence. Raises snapshots for any changes that affect the local + * client and returns the updated state of all target's query data. + */ + // PORTING NOTE: Multi-tab only. + private synchronizeQueryViewsAndRaiseSnapshots( + targets: TargetId[] + ): Promise { + let p = Promise.resolve(); + const activeQueries: QueryData[] = []; + const newViewSnapshots: ViewSnapshot[] = []; + for (const targetId of targets) { + p = p.then(async () => { + let queryData: QueryData; + const queryView = this.queryViewsByTarget[targetId]; + if (queryView) { + // For queries that have a local View, we need to update their state + // in LocalStore (as the resume token and the snapshot version + // might have changed) and reconcile their views with the persisted + // state (the list of syncedDocuments may have gotten out of sync). + await this.localStore.releaseQuery( + queryView.query, + /*keepPersistedQueryData=*/ true + ); + queryData = await this.localStore.allocateQuery(queryView.query); + const viewChange = await this.synchronizeViewAndComputeSnapshot( + queryView + ); + if (viewChange.snapshot) { + newViewSnapshots.push(viewChange.snapshot); + } + } else { + assert( + this.isPrimary, + 'A secondary tab should never have an active query without an active view.' + ); + // For queries that never executed on this client, we need to + // allocate the query in LocalStore and initialize a new View. + const query = await this.localStore.getQueryForTarget(targetId); + queryData = await this.localStore.allocateQuery(query); + await this.initializeViewAndComputeSnapshot( + queryData, + /*current=*/ false + ); + } + activeQueries.push(queryData); + }); + } + return p.then(() => { + this.syncEngineListener!.onWatchChange(newViewSnapshots); + return activeQueries; + }); + } + // PORTING NOTE: Multi-tab only getActiveClients(): Promise { return this.localStore.getActiveClients(); @@ -882,6 +974,13 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { state: QueryTargetState, error?: FirestoreError ): Promise { + if (this.isPrimary) { + // If we receive a target state notification via WebStorage, we are + // either already secondary or another tab has taken the primary lease. + log.debug(LOG_TAG, 'Ignoring unexpected query state notification.'); + return; + } + if (this.queryViewsByTarget[targetId]) { switch (state) { case 'current': @@ -929,7 +1028,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { const query = await this.localStore.getQueryForTarget(targetId); assert(!!query, `Query data for active target ${targetId} not found`); const queryData = await this.localStore.allocateQuery(query); - await this.initializeViewAndComputeInitialSnapshot( + await this.initializeViewAndComputeSnapshot( queryData, /*current=*/ false ); @@ -942,11 +1041,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { // removed if it has been rejected by the backend. if (queryView) { this.remoteStore.unlisten(targetId); - await this.removeAndCleanupQuery(queryView); - await this.localStore.releaseQuery( - queryView.query, - /*keepPersistedQueryData=*/ false - ); + await this.localStore + .releaseQuery(queryView.query, /*keepPersistedQueryData=*/ false) + .then(() => this.removeAndCleanupQuery(queryView)) + .catch(err => this.ignoreIfPrimaryLeaseLoss(err)); } } } diff --git a/packages/firestore/src/core/view.ts b/packages/firestore/src/core/view.ts index e3c94ef47c3..307ca2877f6 100644 --- a/packages/firestore/src/core/view.ts +++ b/packages/firestore/src/core/view.ts @@ -383,9 +383,34 @@ export class View { return changes; } + /** + * Update the in-memory state of the current view with the state read from + * persistence. + * + * We update the query view whenever a client's primary status changes: + * - When a client transitions from primary to secondary, it can miss + * LocalStorage updates and its query views may temporarily not be + * synchronized with the state on disk. + * - For secondary to primary transitions, the client needs to update the list + * of `syncedDocuments` since secondary clients update their query views + * based purely on synthesized RemoteEvents. + * + * @param localDocs - The documents that match the query according to the + * LocalStore. + * @param remoteKeys - The keys of the documents that match the query + * according to the backend. + * + * @return The ViewChange that resulted from this synchronization. + */ // PORTING NOTE: Multi-tab only. - synchronizeWithRemoteKeys(remoteKeys: DocumentKeySet): void { + synchronizeWithPersistedState( + localDocs: MaybeDocumentMap, + remoteKeys: DocumentKeySet + ): ViewChange { this._syncedDocuments = remoteKeys; + this.limboDocuments = documentKeySet(); + const docChanges = this.computeDocChanges(localDocs); + return this.applyChanges(docChanges, /*updateLimboDocuments=*/ true); } /** diff --git a/packages/firestore/src/local/indexeddb_persistence.ts b/packages/firestore/src/local/indexeddb_persistence.ts index d1017e49242..064e1da058e 100644 --- a/packages/firestore/src/local/indexeddb_persistence.ts +++ b/packages/firestore/src/local/indexeddb_persistence.ts @@ -478,6 +478,8 @@ export class IndexedDbPersistence implements Persistence { log.error( `Failed to obtain primary lease for action '${action}'.` ); + this.isPrimary = false; + this.queue.enqueue(() => this.primaryStateListener(false)); throw new FirestoreError( Code.FAILED_PRECONDITION, PRIMARY_LEASE_LOST_ERROR_MSG @@ -734,6 +736,12 @@ export class IndexedDbPersistence implements Persistence { } } +export function isPrimaryLeaseLostError(err: FirestoreError): boolean { + return ( + err.code === Code.FAILED_PRECONDITION && + err.message === PRIMARY_LEASE_LOST_ERROR_MSG + ); +} /** * Helper to get a typed SimpleDbStore for the owner object store. */ diff --git a/packages/firestore/src/local/local_store.ts b/packages/firestore/src/local/local_store.ts index b4f6e83a0f7..685e650a035 100644 --- a/packages/firestore/src/local/local_store.ts +++ b/packages/firestore/src/local/local_store.ts @@ -687,7 +687,7 @@ export class LocalStore { * found - used for testing. */ readDocument(key: DocumentKey): Promise { - return this.persistence.runTransaction('read document', true, txn => { + return this.persistence.runTransaction('read document', false, txn => { return this.localDocuments.getDocument(txn, key); }); } diff --git a/packages/firestore/src/remote/remote_store.ts b/packages/firestore/src/remote/remote_store.ts index 30dde86a6e8..4c0dbe9bc46 100644 --- a/packages/firestore/src/remote/remote_store.ts +++ b/packages/firestore/src/remote/remote_store.ts @@ -51,6 +51,7 @@ import { import { OnlineStateTracker } from './online_state_tracker'; import { AsyncQueue } from '../util/async_queue'; import { DocumentKeySet } from '../model/collections'; +import { isPrimaryLeaseLostError } from '../local/indexeddb_persistence'; const LOG_TAG = 'RemoteStore'; @@ -580,7 +581,24 @@ export class RemoteStore implements TargetMetadataProvider { for (const batch of this.writePipeline) { this.writeStream.writeMutations(batch.mutations); } - }); + }) + .catch(err => this.ignoreIfPrimaryLeaseLoss(err)); + } + + /** + * Verifies the error thrown by an LocalStore operation. If a LocalStore + * operation fails because the primary lease has been taken by another client, + * we ignore the error. All other errors are re-thrown. + * + * @param err An error returned by a LocalStore operation. + * @return A Promise that resolves after we recovered, or the original error. + */ + private ignoreIfPrimaryLeaseLoss(err: FirestoreError): void { + if (isPrimaryLeaseLostError(err)) { + log.debug(LOG_TAG, 'Unexpectedly lost primary lease'); + } else { + throw err; + } } private onMutationResult( @@ -650,7 +668,9 @@ export class RemoteStore implements TargetMetadataProvider { ); this.writeStream.lastStreamToken = emptyByteString(); - return this.localStore.setLastStreamToken(emptyByteString()); + return this.localStore + .setLastStreamToken(emptyByteString()) + .catch(err => this.ignoreIfPrimaryLeaseLoss(err)); } else { // Some other error, don't reset stream token. Our stream logic will // just retry with exponential backoff. diff --git a/packages/firestore/src/util/obj.ts b/packages/firestore/src/util/obj.ts index c56ded1e85e..cc99ce18e56 100644 --- a/packages/firestore/src/util/obj.ts +++ b/packages/firestore/src/util/obj.ts @@ -39,6 +39,13 @@ export function size(obj: Dict): number { return count; } +/** Extracts the numeric indices from a dictionary. */ +export function indices(obj: { [numberKey: number]: V }): number[] { + return Object.keys(obj).map(key => { + return Number(key); + }); +} + /** Returns the given value if it's defined or the defaultValue otherwise. */ export function defaulted(value: V | undefined, defaultValue: V): V { return value !== undefined ? value : defaultValue; diff --git a/packages/firestore/test/unit/specs/limbo_spec.test.ts b/packages/firestore/test/unit/specs/limbo_spec.test.ts index 4c8ffe745e8..90f53cc6a6a 100644 --- a/packages/firestore/test/unit/specs/limbo_spec.test.ts +++ b/packages/firestore/test/unit/specs/limbo_spec.test.ts @@ -393,7 +393,52 @@ describeSpec('Limbo Documents:', [], () => { } ); - // TODO(multitab): We need a test case that verifies that a primary client - // that loses its primary lease while documents are in limbo correctly handles - // these documents even when it picks up its lease again. + specTest( + 'Limbo documents survive primary state transitions', + ['multi-client'], + () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { key: 'a' }); + const docB = doc('collection/b', 1001, { key: 'b' }); + const docC = doc('collection/c', 1002, { key: 'c' }); + const deletedDocB = deletedDoc('collection/b', 1006); + const deletedDocC = deletedDoc('collection/c', 1008); + + return ( + client(0, false) + .expectPrimaryState(true) + .userListens(query) + .watchAcksFull(query, 1002, docA, docB, docC) + .expectEvents(query, { added: [docA, docB, docC] }) + .watchRemovesDoc(docB.key, query) + .watchRemovesDoc(docC.key, query) + .watchSnapshots(1003) + .expectEvents(query, { fromCache: true }) + .expectLimboDocs(docB.key, docC.key) + .client(1) + .stealPrimaryLease() + .client(0) + .runTimer(TimerId.ClientMetadataRefresh) + .expectPrimaryState(false) + .expectLimboDocs() + .client(1) + // TODO(37254270): This should be 'resume-token-1003' from the last + // global snapshot. + .expectListen(query, 'resume-token-1002') + .watchAcksFull(query, 1005) + .expectLimboDocs(docB.key, docC.key) + .ackLimbo(1006, deletedDocB) + .expectLimboDocs(docC.key) + .client(0) + .expectEvents(query, { removed: [docB], fromCache: true }) + .stealPrimaryLease() + .expectListen(query, 'resume-token-1005') + .watchAcksFull(query, 1007) + .expectLimboDocs(docC.key) + .ackLimbo(1007, deletedDocC) + .expectLimboDocs() + .expectEvents(query, { removed: [docC] }) + ); + } + ); }); diff --git a/packages/firestore/test/unit/specs/listen_spec.test.ts b/packages/firestore/test/unit/specs/listen_spec.test.ts index fcb3474f656..4f12ac1940c 100644 --- a/packages/firestore/test/unit/specs/listen_spec.test.ts +++ b/packages/firestore/test/unit/specs/listen_spec.test.ts @@ -939,6 +939,66 @@ describeSpec('Listens:', [], () => { .expectEvents(query, { added: [docB] }); }); + specTest('Query recovers after primary takeover', ['multi-client'], () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { key: 'a' }); + const docB = doc('collection/b', 2000, { key: 'b' }); + const docC = doc('collection/c', 3000, { key: 'c' }); + + return ( + client(0) + .expectPrimaryState(true) + .userListens(query) + .watchAcksFull(query, 1000, docA) + .expectEvents(query, { added: [docA] }) + .client(1) + .userListens(query) + .expectEvents(query, { added: [docA] }) + .stealPrimaryLease() + .expectListen(query, 'resume-token-1000') + .watchAcksFull(query, 2000, docB) + .expectEvents(query, { added: [docB] }) + .client(0) + // Client 0 ignores all events until it transitions to secondary + .client(1) + .watchSends({ affects: [query] }, docC) + .watchSnapshots(3000) + .expectEvents(query, { added: [docC] }) + .client(0) + .runTimer(TimerId.ClientMetadataRefresh) + .expectPrimaryState(false) + .expectEvents(query, { added: [docB, docC] }) + ); + }); + + specTest( + 'Unresponsive primary ignores watch update', + ['multi-client'], + () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { key: 'a' }); + + return ( + client(0) + .expectPrimaryState(true) + .client(1) + .userListens(query) + .client(0) + .expectListen(query) + .client(1) + .stealPrimaryLease() + .client(0) + // Send a watch update to client 0, who is longer primary (but doesn't + // know it yet). The watch update gets ignored. + .watchAcksFull(query, 1000, docA) + .client(1) + .expectListen(query) + .watchAcksFull(query, 1000, docA) + .expectEvents(query, { added: [docA] }) + ); + } + ); + specTest( 'Listen is established in newly started primary', ['multi-client'], diff --git a/packages/firestore/test/unit/specs/spec_builder.ts b/packages/firestore/test/unit/specs/spec_builder.ts index a10388fb81c..923e498a0a6 100644 --- a/packages/firestore/test/unit/specs/spec_builder.ts +++ b/packages/firestore/test/unit/specs/spec_builder.ts @@ -24,7 +24,11 @@ import { } from '../../../src/model/document'; import { DocumentKey } from '../../../src/model/document_key'; import { JsonObject } from '../../../src/model/field_value'; -import { mapRpcCodeFromCode } from '../../../src/remote/rpc_error'; +import { + isPermanentError, + mapCodeFromRpcCode, + mapRpcCodeFromCode +} from '../../../src/remote/rpc_error'; import { assert } from '../../../src/util/assert'; import { fail } from '../../../src/util/assert'; import { Code } from '../../../src/util/error'; @@ -76,12 +80,20 @@ export class ClientMemoryState { this.reset(); } + /** Reset all internal memory state (as done during a client restart). */ reset(): void { this.queryMapping = {}; this.limboMapping = {}; this.activeTargets = {}; this.limboIdGenerator = TargetIdGenerator.forSyncEngine(); } + + /** + * Reset the internal limbo mapping (as done during a primary lease failover). + */ + resetLimboMapping(): void { + this.limboMapping = {}; + } } /** @@ -457,12 +469,16 @@ export class SpecBuilder { writeAcks( doc: string, version: TestSnapshotVersion, - options?: { expectUserCallback: boolean } + options?: { expectUserCallback?: boolean; keepInQueue?: boolean } ): this { this.nextStep(); - this.currentStep = { writeAck: { version } }; + options = options || {}; - if (!options || options.expectUserCallback) { + this.currentStep = { + writeAck: { version, keepInQueue: !!options.keepInQueue } + }; + + if (options.expectUserCallback) { return this.expectUserCallbacks({ acknowledged: [doc] }); } else { return this; @@ -476,13 +492,23 @@ export class SpecBuilder { */ failWrite( doc: string, - err: RpcError, - options?: { expectUserCallback: boolean } + error: RpcError, + options?: { expectUserCallback?: boolean; keepInQueue?: boolean } ): this { this.nextStep(); - this.currentStep = { failWrite: { error: err } }; + options = options || {}; - if (!options || options.expectUserCallback) { + // If this is a permanent error, the write is not expected to be sent + // again. + const isPermanentFailure = isPermanentError(mapCodeFromRpcCode(error.code)); + const keepInQueue = + options.keepInQueue !== undefined + ? options.keepInQueue + : !isPermanentFailure; + + this.currentStep = { failWrite: { error, keepInQueue } }; + + if (options.expectUserCallback) { return this.expectUserCallbacks({ rejected: [doc] }); } else { return this; @@ -904,6 +930,31 @@ export class MultiClientSpecBuilder extends SpecBuilder { return this; } + /** + * Take the primary lease, even if another client has already obtained the + * lease. + */ + stealPrimaryLease(): this { + this.nextStep(); + this.currentStep = { + applyClientState: { + primary: true + }, + stateExpect: { + isPrimary: true + } + }; + + // HACK: SyncEngine resets its limbo mapping when it gains the primary + // lease. The SpecTests need to also clear their mapping, but when we parse + // the spec tests, we don't know when the primary lease transition happens. + // It is likely going to happen right after `stealPrimaryLease`, so we are + // clearing the limbo mapping here. + this.clientState.resetLimboMapping(); + + return this; + } + protected nextStep(): void { if (this.currentStep !== null) { this.currentStep.clientIndex = this.activeClientIndex; diff --git a/packages/firestore/test/unit/specs/spec_test_runner.ts b/packages/firestore/test/unit/specs/spec_test_runner.ts index d54c9df4d98..3da11d8afb8 100644 --- a/packages/firestore/test/unit/specs/spec_test_runner.ts +++ b/packages/firestore/test/unit/specs/spec_test_runner.ts @@ -60,10 +60,7 @@ import { Datastore } from '../../../src/remote/datastore'; import { ExistenceFilter } from '../../../src/remote/existence_filter'; import { WriteRequest } from '../../../src/remote/persistent_stream'; import { RemoteStore } from '../../../src/remote/remote_store'; -import { - isPermanentError, - mapCodeFromRpcCode -} from '../../../src/remote/rpc_error'; +import { mapCodeFromRpcCode } from '../../../src/remote/rpc_error'; import { JsonProtoSerializer } from '../../../src/remote/serializer'; import { StreamBridge } from '../../../src/remote/stream_bridge'; import { @@ -99,6 +96,12 @@ import { SharedClientState, WebStorageSharedClientState } from '../../../src/local/shared_client_state'; +import { + createOrUpgradeDb, + DbOwner, + DbOwnerKey, + SCHEMA_VERSION +} from '../../../src/local/indexeddb_schema'; import { TestPlatform, SharedFakeWebStorage } from '../../util/test_platform'; class MockConnection implements Connection { @@ -811,7 +814,9 @@ abstract class TestRunner { private doWriteAck(writeAck: SpecWriteAck): Promise { const updateTime = this.serializer.toVersion(version(writeAck.version)); - const nextMutation = this.sharedWrites.shift(); + const nextMutation = writeAck.keepInQueue + ? this.sharedWrites.peek() + : this.sharedWrites.shift(); return this.validateNextWriteRequest(nextMutation).then(() => { this.connection.ackWrite(updateTime, [{ updateTime }]); }); @@ -823,14 +828,10 @@ abstract class TestRunner { mapCodeFromRpcCode(specError.code), specError.message ); - const nextMutation = this.sharedWrites.peek(); + const nextMutation = writeFailure.keepInQueue + ? this.sharedWrites.peek() + : this.sharedWrites.shift(); return this.validateNextWriteRequest(nextMutation).then(() => { - // If this is a permanent error, the write is not expected to be sent - // again. - if (isPermanentError(error.code)) { - this.sharedWrites.shift(); - } - this.connection.failWrite(error); }); } @@ -888,10 +889,16 @@ abstract class TestRunner { }); } - private doApplyClientState(state: SpecClientState): Promise { + private async doApplyClientState(state: SpecClientState): Promise { if (state.visibility) { this.platform.raiseVisibilityEvent(state.visibility!); } + + if (state.primary) { + await writeOwnerToIndexedDb(this.clientId); + await this.queue.runDelayedOperationsEarly(TimerId.ClientMetadataRefresh); + } + return Promise.resolve(); } @@ -951,7 +958,10 @@ abstract class TestRunner { this.expectedActiveTargets = expectation.activeTargets!; } if ('isPrimary' in expectation) { - expect(this.syncEngine.isPrimaryClient).to.eq(expectation.isPrimary!); + expect(this.syncEngine.isPrimaryClient).to.eq( + expectation.isPrimary!, + 'isPrimary' + ); } if ('userCallbacks' in expectation) { expect(this.acknowledgedDocs).to.have.members( @@ -1394,11 +1404,24 @@ export type SpecWatchStreamClose = { export type SpecWriteAck = { /** The version the backend uses to ack the write. */ version: TestSnapshotVersion; + /** + * Whether we should keep the write in our internal queue. This should only + * be set to 'true' if the client ignores the write (e.g. a secondary client + * which ignores write acknowledgments). + */ + // PORTING NOTE: Multi-Tab only. + keepInQueue: boolean; }; export type SpecWriteFailure = { /** The error the backend uses to fail the write. */ error: SpecError; + /** + * Whether we should keep the write in our internal queue. This should be set + * to 'true' for transient errors or if the client ignores the failure + * (e.g. a secondary client which ignores write rejections). + */ + keepInQueue: boolean; }; export interface SpecWatchEntity { @@ -1418,6 +1441,8 @@ export interface SpecWatchEntity { export type SpecClientState = { /** The visibility state of the browser tab running the client. */ visibility?: VisibilityState; + /** Whether this tab should try to forcefully become primary. */ + primary?: true; }; /** @@ -1504,3 +1529,19 @@ export interface StateExpectation { rejectedDocs: string[]; }; } + +async function writeOwnerToIndexedDb(clientId: ClientId): Promise { + const db = await SimpleDb.openOrCreate( + IndexedDbTestRunner.TEST_DB_NAME + IndexedDbPersistence.MAIN_DATABASE, + SCHEMA_VERSION, + createOrUpgradeDb + ); + await db.runTransaction('readwrite', ['owner'], txn => { + const owner = txn.store(DbOwner.store); + return owner.put( + 'owner', + new DbOwner(clientId, /* allowTabSynchronization=*/ true, Date.now()) + ); + }); + db.close(); +} diff --git a/packages/firestore/test/unit/specs/write_spec.test.ts b/packages/firestore/test/unit/specs/write_spec.test.ts index 9fce5ebdca4..11db44ba686 100644 --- a/packages/firestore/test/unit/specs/write_spec.test.ts +++ b/packages/firestore/test/unit/specs/write_spec.test.ts @@ -1128,6 +1128,36 @@ describeSpec('Writes:', [], () => { }); }); + specTest('Mutation recovers after primary takeover', ['multi-client'], () => { + const query = Query.atPath(path('collection')); + const docALocal = doc( + 'collection/a', + 0, + { k: 'a' }, + { hasLocalMutations: true } + ); + const docA = doc('collection/a', 1000, { k: 'a' }); + + return client(0) + .expectPrimaryState(true) + .userSets('collection/a', { k: 'a' }) + .client(1) + .userListens(query) + .expectEvents(query, { + added: [docALocal], + hasPendingWrites: true, + fromCache: true + }) + .stealPrimaryLease() + .writeAcks('collection/a', 1000, { expectUserCallback: false }) + .watchAcksFull(query, 1000, docA) + .expectEvents(query, { metadata: [docA] }) + .client(0) + .expectUserCallbacks({ + acknowledged: ['collection/a'] + }); + }); + specTest('Write is sent by newly started primary', ['multi-client'], () => { return client(0) .expectPrimaryState(true) @@ -1145,4 +1175,74 @@ describeSpec('Writes:', [], () => { acknowledged: ['collection/a'] }); }); + + specTest( + 'Unresponsive primary ignores acknowledged write', + ['multi-client'], + () => { + return ( + client(0) + .expectPrimaryState(true) + // Send initial write to open the write stream + .userSets('collection/a', { k: 'a' }) + .writeAcks('collection/a', 1000) + .client(1) + .userSets('collection/b', { k: 'b' }) + .client(2) + .stealPrimaryLease() + .client(0) + // Client 2 is now the primary client, and client 0 ignores the write + // acknowledgement. + .writeAcks('collection/b', 2000, { + expectUserCallback: false, + keepInQueue: true + }) + .client(2) + .writeAcks('collection/b', 2000, { expectUserCallback: false }) + .client(1) + .expectUserCallbacks({ + acknowledged: ['collection/b'] + }) + ); + } + ); + + specTest( + 'Unresponsive primary ignores rejected write', + ['multi-client'], + () => { + return ( + client(0) + .expectPrimaryState(true) + // Send initial write to open the write stream + .userSets('collection/a', { k: 'a' }) + .writeAcks('collection/a', 1000) + .client(1) + .userSets('collection/b', { k: 'b' }) + .client(2) + .stealPrimaryLease() + .client(0) + // Client 2 is now the primary client, and client 0 ignores the rejected + // write. + .failWrite( + 'collection/b', + new RpcError(Code.FAILED_PRECONDITION, 'Write error'), + { + expectUserCallback: false, + keepInQueue: true + } + ) + .client(2) + .failWrite( + 'collection/b', + new RpcError(Code.FAILED_PRECONDITION, 'Write error'), + { expectUserCallback: false } + ) + .client(1) + .expectUserCallbacks({ + rejected: ['collection/b'] + }) + ); + } + ); });