diff --git a/packages/firestore/src/core/sync_engine.ts b/packages/firestore/src/core/sync_engine.ts index 30e77853f4a..a0b1fefc7c9 100644 --- a/packages/firestore/src/core/sync_engine.ts +++ b/packages/firestore/src/core/sync_engine.ts @@ -42,6 +42,7 @@ import { SortedMap } from '../util/sorted_map'; import { isNullOrUndefined } from '../util/types'; import { isPrimaryLeaseLostError } from '../local/indexeddb_persistence'; +import { isDocumentChangeMissingError } from '../local/indexeddb_remote_document_cache'; import { ClientId, SharedClientState } from '../local/shared_client_state'; import { QueryTargetState, @@ -1001,14 +1002,32 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { switch (state) { case 'current': case 'not-current': { - const changes = await this.localStore.getNewDocumentChanges(); - const synthesizedRemoteEvent = RemoteEvent.createSynthesizedRemoteEventForCurrentChange( - targetId, - state === 'current' - ); - return this.emitNewSnapsAndNotifyLocalStore( - changes, - synthesizedRemoteEvent + return this.localStore.getNewDocumentChanges().then( + async changes => { + // tslint and prettier disagree about their preferred line length. + // tslint:disable-next-line:max-line-length + const synthesizedRemoteEvent = RemoteEvent.createSynthesizedRemoteEventForCurrentChange( + targetId, + state === 'current' + ); + await this.emitNewSnapsAndNotifyLocalStore( + changes, + synthesizedRemoteEvent + ); + }, + async err => { + if (isDocumentChangeMissingError(err)) { + const activeTargets: TargetId[] = []; + objUtils.forEachNumber(this.queryViewsByTarget, target => + activeTargets.push(target) + ); + await this.synchronizeQueryViewsAndRaiseSnapshots( + activeTargets + ); + } else { + throw err; + } + } ); } case 'rejected': { diff --git a/packages/firestore/src/local/indexeddb_remote_document_cache.ts b/packages/firestore/src/local/indexeddb_remote_document_cache.ts index afd83ec859e..ba10a899bf0 100644 --- a/packages/firestore/src/local/indexeddb_remote_document_cache.ts +++ b/packages/firestore/src/local/indexeddb_remote_document_cache.ts @@ -27,6 +27,7 @@ import { DocumentKey } from '../model/document_key'; import { SnapshotVersion } from '../core/snapshot_version'; import { assert } from '../util/assert'; +import { Code, FirestoreError } from '../util/error'; import { IndexedDbPersistence } from './indexeddb_persistence'; import { DbRemoteDocument, @@ -40,6 +41,10 @@ import { PersistencePromise } from './persistence_promise'; import { RemoteDocumentCache } from './remote_document_cache'; import { SimpleDb, SimpleDbStore, SimpleDbTransaction } from './simple_db'; +const REMOTE_DOCUMENT_CHANGE_MISSING_ERR_MSG = + 'The remote document changelog no longer contains all changes for all ' + + 'local query views. It may be necessary to rebuild these views.'; + export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache { /** The last id read by `getNewDocumentChanges()`. */ private _lastProcessedDocumentChangeId = 0; @@ -69,21 +74,11 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache { */ // PORTING NOTE: This is only used for multi-tab synchronization. start(transaction: SimpleDbTransaction): PersistencePromise { - // If there are no existing changes, we set `lastProcessedDocumentChangeId` - // to 0 since IndexedDb's auto-generated keys start at 1. - this._lastProcessedDocumentChangeId = 0; - const store = SimpleDb.getStore< DbRemoteDocumentChangesKey, DbRemoteDocumentChanges >(transaction, DbRemoteDocumentChanges.store); - return store.iterate( - { keysOnly: true, reverse: true }, - (key, value, control) => { - this._lastProcessedDocumentChangeId = key; - control.done(); - } - ); + return this.synchronizeLastDocumentChangeId(store); } addEntries( @@ -172,18 +167,33 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache { let changedDocs = maybeDocumentMap(); const range = IDBKeyRange.lowerBound( - this._lastProcessedDocumentChangeId, - /*lowerOpen=*/ true + this._lastProcessedDocumentChangeId + 1 ); + let firstIteration = true; - // TODO(b/114228464): Another client may have garbage collected the remote - // document changelog if our client was throttled for more than 30 minutes. - // We can detect this if the `lastProcessedDocumentChangeId` entry is no - // longer in the changelog. It is possible to recover from this state, - // either by replaying the entire remote document cache or by re-executing - // all queries against the local store. - return documentChangesStore(transaction) + const changesStore = documentChangesStore(transaction); + return changesStore .iterate({ range }, (_, documentChange) => { + if (firstIteration) { + firstIteration = false; + + // If our client was throttled for more than 30 minutes, another + // client may have garbage collected the remote document changelog. + if (this._lastProcessedDocumentChangeId + 1 !== documentChange.id) { + // Reset the `lastProcessedDocumentChangeId` to allow further + // invocations to successfully return the changes after this + // rejection. + return this.synchronizeLastDocumentChangeId(changesStore).next(() => + PersistencePromise.reject( + new FirestoreError( + Code.DATA_LOSS, + REMOTE_DOCUMENT_CHANGE_MISSING_ERR_MSG + ) + ) + ); + } + } + changedKeys = changedKeys.unionWith( this.serializer.fromDbResourcePaths(documentChange.changes) ); @@ -217,6 +227,31 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache { const range = IDBKeyRange.upperBound(changeId); return documentChangesStore(transaction).delete(range); } + + private synchronizeLastDocumentChangeId( + documentChangesStore: SimpleDbStore< + DbRemoteDocumentChangesKey, + DbRemoteDocumentChanges + > + ): PersistencePromise { + // If there are no existing changes, we set `lastProcessedDocumentChangeId` + // to 0 since IndexedDb's auto-generated keys start at 1. + this._lastProcessedDocumentChangeId = 0; + return documentChangesStore.iterate( + { keysOnly: true, reverse: true }, + (key, value, control) => { + this._lastProcessedDocumentChangeId = key; + control.done(); + } + ); + } +} + +export function isDocumentChangeMissingError(err: FirestoreError): boolean { + return ( + err.code === Code.DATA_LOSS && + err.message === REMOTE_DOCUMENT_CHANGE_MISSING_ERR_MSG + ); } /** diff --git a/packages/firestore/src/local/remote_document_cache.ts b/packages/firestore/src/local/remote_document_cache.ts index ae030a0169b..17b87059e2d 100644 --- a/packages/firestore/src/local/remote_document_cache.ts +++ b/packages/firestore/src/local/remote_document_cache.ts @@ -89,6 +89,10 @@ export interface RemoteDocumentCache { * Returns the set of documents that have been updated since the last call. * If this is the first call, returns the set of changes since client * initialization. + * + * If the changelog was garbage collected and can no longer be replayed, + * `getNewDocumentChanges` will reject the returned Promise. Further + * invocations will return document changes since the point of rejection. */ // PORTING NOTE: This is only used for multi-tab synchronization. getNewDocumentChanges( diff --git a/packages/firestore/src/local/shared_client_state.ts b/packages/firestore/src/local/shared_client_state.ts index 3937ec598aa..b794d83e520 100644 --- a/packages/firestore/src/local/shared_client_state.ts +++ b/packages/firestore/src/local/shared_client_state.ts @@ -526,7 +526,6 @@ export class WebStorageSharedClientState implements SharedClientState { private readonly sequenceNumberKey: string; private readonly activeClients: { [key: string]: ClientState } = {}; private readonly storageListener = this.handleWebStorageEvent.bind(this); - private readonly escapedPersistenceKey: string; private readonly onlineStateKey: string; private readonly clientStateKeyRe: RegExp; private readonly mutationBatchKeyRe: RegExp; @@ -543,7 +542,7 @@ export class WebStorageSharedClientState implements SharedClientState { constructor( private readonly queue: AsyncQueue, private readonly platform: Platform, - persistenceKey: string, + private readonly persistenceKey: string, private readonly localClientId: ClientId, initialUser: User ) { @@ -555,7 +554,7 @@ export class WebStorageSharedClientState implements SharedClientState { } // Escape the special characters mentioned here: // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions - this.escapedPersistenceKey = persistenceKey.replace( + const escapedPersistenceKey = persistenceKey.replace( /[.*+?^${}()|[\]\\]/g, '\\$&' ); @@ -565,26 +564,20 @@ export class WebStorageSharedClientState implements SharedClientState { this.localClientStorageKey = this.toWebStorageClientStateKey( this.localClientId ); - this.sequenceNumberKey = `${SEQUENCE_NUMBER_KEY_PREFIX}_${ - this.escapedPersistenceKey - }`; + this.sequenceNumberKey = `${SEQUENCE_NUMBER_KEY_PREFIX}_${persistenceKey}`; this.activeClients[this.localClientId] = new LocalClientState(); this.clientStateKeyRe = new RegExp( - `^${CLIENT_STATE_KEY_PREFIX}_${this.escapedPersistenceKey}_([^_]*)$` + `^${CLIENT_STATE_KEY_PREFIX}_${escapedPersistenceKey}_([^_]*)$` ); this.mutationBatchKeyRe = new RegExp( - `^${MUTATION_BATCH_KEY_PREFIX}_${ - this.escapedPersistenceKey - }_(\\d+)(?:_(.*))?$` + `^${MUTATION_BATCH_KEY_PREFIX}_${escapedPersistenceKey}_(\\d+)(?:_(.*))?$` ); this.queryTargetKeyRe = new RegExp( - `^${QUERY_TARGET_KEY_PREFIX}_${this.escapedPersistenceKey}_(\\d+)$` + `^${QUERY_TARGET_KEY_PREFIX}_${escapedPersistenceKey}_(\\d+)$` ); - this.onlineStateKey = `${ONLINE_STATE_KEY_PREFIX}_${ - this.escapedPersistenceKey - }`; + this.onlineStateKey = `${ONLINE_STATE_KEY_PREFIX}_${persistenceKey}`; // Rather than adding the storage observer during start(), we add the // storage observer during initialization. This ensures that we collect @@ -926,22 +919,18 @@ export class WebStorageSharedClientState implements SharedClientState { `Client key cannot contain '_', but was '${clientId}'` ); - return `${CLIENT_STATE_KEY_PREFIX}_${ - this.escapedPersistenceKey - }_${clientId}`; + return `${CLIENT_STATE_KEY_PREFIX}_${this.persistenceKey}_${clientId}`; } /** Assembles the key for a query state in WebStorage */ private toWebStorageQueryTargetMetadataKey(targetId: TargetId): string { - return `${QUERY_TARGET_KEY_PREFIX}_${ - this.escapedPersistenceKey - }_${targetId}`; + return `${QUERY_TARGET_KEY_PREFIX}_${this.persistenceKey}_${targetId}`; } /** Assembles the key for a mutation batch in WebStorage */ private toWebStorageMutationBatchKey(batchId: BatchId): string { let mutationKey = `${MUTATION_BATCH_KEY_PREFIX}_${ - this.escapedPersistenceKey + this.persistenceKey }_${batchId}`; if (this.currentUser.isAuthenticated()) { diff --git a/packages/firestore/test/unit/local/persistence_test_helpers.ts b/packages/firestore/test/unit/local/persistence_test_helpers.ts index 6a0eccbb4ac..a52d7105720 100644 --- a/packages/firestore/test/unit/local/persistence_test_helpers.ts +++ b/packages/firestore/test/unit/local/persistence_test_helpers.ts @@ -56,7 +56,7 @@ export const INDEXEDDB_TEST_DATABASE_ID = new DatabaseId('test-project'); /** The DatabaseInfo used by most tests that access IndexedDb. */ const INDEXEDDB_TEST_DATABASE_INFO = new DatabaseInfo( INDEXEDDB_TEST_DATABASE_ID, - 'PersistenceTestHelpers', + '[PersistenceTestHelpers]', 'host', /*ssl=*/ false ); diff --git a/packages/firestore/test/unit/local/remote_document_cache.test.ts b/packages/firestore/test/unit/local/remote_document_cache.test.ts index b9606b61fc8..f42fa33b615 100644 --- a/packages/firestore/test/unit/local/remote_document_cache.test.ts +++ b/packages/firestore/test/unit/local/remote_document_cache.test.ts @@ -28,13 +28,18 @@ import { removedDoc } from '../../util/helpers'; -import { IndexedDbRemoteDocumentCache } from '../../../src/local/indexeddb_remote_document_cache'; +import { + IndexedDbRemoteDocumentCache, + isDocumentChangeMissingError +} from '../../../src/local/indexeddb_remote_document_cache'; import { DbRemoteDocumentChanges, DbRemoteDocumentChangesKey } from '../../../src/local/indexeddb_schema'; import { MaybeDocumentMap } from '../../../src/model/collections'; +import { fail } from '../../../src/util/assert'; import * as persistenceHelpers from './persistence_test_helpers'; +import { INDEXEDDB_TEST_SERIALIZER } from './persistence_test_helpers'; import { TestRemoteDocumentCache } from './test_remote_document_cache'; // Helpers for use throughout tests. @@ -115,10 +120,51 @@ describe('IndexedDbRemoteDocumentCache', () => { persistence, persistence.getRemoteDocumentCache() ); - const changedDocs = await cache.getNextDocumentChanges(); + const changedDocs = await cache.getNewDocumentChanges(); assertMatches([], changedDocs); }); + it('can recover from garbage collected change log', async () => { + // This test is meant to simulate the recovery from a garbage collected + // document change log. + // The tests adds four changes (via the `writer`). After the first change is + // processed by the reader, the writer garbage collects the first and second + // change. When reader then reads the new changes, it notices that a change + // is missing. The test then uses `resetLastProcessedDocumentChange` to + // simulate a successful recovery. + + const writerCache = new TestRemoteDocumentCache( + persistence, + persistence.getRemoteDocumentCache() + ); + const readerCache = new TestRemoteDocumentCache( + persistence, + new IndexedDbRemoteDocumentCache(INDEXEDDB_TEST_SERIALIZER, true) + ); + + await writerCache.addEntries([doc('a/1', 1, DOC_DATA)]); + let changedDocs = await readerCache.getNewDocumentChanges(); + assertMatches([doc('a/1', 1, DOC_DATA)], changedDocs); + + await writerCache.addEntries([doc('a/2', 2, DOC_DATA)]); + await writerCache.addEntries([doc('a/3', 3, DOC_DATA)]); + // Garbage collect change 1 and 2, but not change 3. + await writerCache.removeDocumentChangesThroughChangeId(2); + + await readerCache + .getNewDocumentChanges() + .then( + () => fail('Missing expected error'), + err => expect(isDocumentChangeMissingError(err)).to.be.ok + ); + + // Ensure that we can retrieve future changes after the we processed the + // error + await writerCache.addEntries([doc('a/4', 4, DOC_DATA)]); + changedDocs = await readerCache.getNewDocumentChanges(); + assertMatches([doc('a/4', 4, DOC_DATA)], changedDocs); + }); + genericRemoteDocumentCacheTests(); }); @@ -217,7 +263,7 @@ function genericRemoteDocumentCacheTests(): void { doc('a/1', 3, DOC_DATA) ]); - let changedDocs = await cache.getNextDocumentChanges(); + let changedDocs = await cache.getNewDocumentChanges(); assertMatches( [ doc('a/1', 3, DOC_DATA), @@ -228,12 +274,12 @@ function genericRemoteDocumentCacheTests(): void { ); await cache.addEntries([doc('c/1', 3, DOC_DATA)]); - changedDocs = await cache.getNextDocumentChanges(); + changedDocs = await cache.getNewDocumentChanges(); assertMatches([doc('c/1', 3, DOC_DATA)], changedDocs); }); it('can get empty changes', async () => { - const changedDocs = await cache.getNextDocumentChanges(); + const changedDocs = await cache.getNewDocumentChanges(); assertMatches([], changedDocs); }); @@ -245,7 +291,7 @@ function genericRemoteDocumentCacheTests(): void { ]); await cache.removeEntry(key('a/2')); - const changedDocs = await cache.getNextDocumentChanges(); + const changedDocs = await cache.getNewDocumentChanges(); assertMatches( [doc('a/1', 1, DOC_DATA), removedDoc('a/2'), doc('a/3', 3, DOC_DATA)], changedDocs diff --git a/packages/firestore/test/unit/local/test_remote_document_cache.ts b/packages/firestore/test/unit/local/test_remote_document_cache.ts index 531f69e3301..7f70399df3b 100644 --- a/packages/firestore/test/unit/local/test_remote_document_cache.ts +++ b/packages/firestore/test/unit/local/test_remote_document_cache.ts @@ -15,6 +15,7 @@ */ import { Query } from '../../../src/core/query'; +import { IndexedDbRemoteDocumentCache } from '../../../src/local/indexeddb_remote_document_cache'; import { Persistence } from '../../../src/local/persistence'; import { RemoteDocumentCache } from '../../../src/local/remote_document_cache'; import { DocumentMap, MaybeDocumentMap } from '../../../src/model/collections'; @@ -67,13 +68,28 @@ export class TestRemoteDocumentCache { ); } - getNextDocumentChanges(): Promise { + getNewDocumentChanges(): Promise { return this.persistence.runTransaction( - 'getNextDocumentChanges', + 'getNewDocumentChanges', 'readonly', txn => { return this.cache.getNewDocumentChanges(txn); } ); } + + removeDocumentChangesThroughChangeId(changeId: number): Promise { + return this.persistence.runTransaction( + 'removeDocumentChangesThroughChangeId', + 'readwrite-primary', + txn => { + if (!(this.cache instanceof IndexedDbRemoteDocumentCache)) { + throw new Error( + 'Can only removeDocumentChangesThroughChangeId() in IndexedDb' + ); + } + return this.cache.removeDocumentChangesThroughChangeId(txn, changeId); + } + ); + } } diff --git a/packages/firestore/tslint.json b/packages/firestore/tslint.json index ecd6ae6b247..c27f046465d 100644 --- a/packages/firestore/tslint.json +++ b/packages/firestore/tslint.json @@ -25,7 +25,7 @@ "interface-name": [true, "never-prefix"], "jsdoc-format": true, "label-position": true, - "max-line-length": [true, {"limit": 100, "ignore-pattern": "https?://"}], + "max-line-length": [true, {"limit": 100, "ignore-pattern": "^import|https?://"}], "member-access": [true, "no-public"], "new-parens": true, "no-angle-bracket-type-assertion": true,