Skip to content

Commit 3223572

Browse files
[Multi-Tab] Detect and recover from GCed Remote Document Changelog (#1272)
1 parent 6f08d5d commit 3223572

File tree

6 files changed

+157
-37
lines changed

6 files changed

+157
-37
lines changed

packages/firestore/src/core/sync_engine.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import { SortedMap } from '../util/sorted_map';
4242
import { isNullOrUndefined } from '../util/types';
4343

4444
import { isPrimaryLeaseLostError } from '../local/indexeddb_persistence';
45+
import { isDocumentChangeMissingError } from '../local/indexeddb_remote_document_cache';
4546
import { ClientId, SharedClientState } from '../local/shared_client_state';
4647
import {
4748
QueryTargetState,
@@ -1001,14 +1002,32 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
10011002
switch (state) {
10021003
case 'current':
10031004
case 'not-current': {
1004-
const changes = await this.localStore.getNewDocumentChanges();
1005-
const synthesizedRemoteEvent = RemoteEvent.createSynthesizedRemoteEventForCurrentChange(
1006-
targetId,
1007-
state === 'current'
1008-
);
1009-
return this.emitNewSnapsAndNotifyLocalStore(
1010-
changes,
1011-
synthesizedRemoteEvent
1005+
return this.localStore.getNewDocumentChanges().then(
1006+
async changes => {
1007+
// tslint and prettier disagree about their preferred line length.
1008+
// tslint:disable-next-line:max-line-length
1009+
const synthesizedRemoteEvent = RemoteEvent.createSynthesizedRemoteEventForCurrentChange(
1010+
targetId,
1011+
state === 'current'
1012+
);
1013+
await this.emitNewSnapsAndNotifyLocalStore(
1014+
changes,
1015+
synthesizedRemoteEvent
1016+
);
1017+
},
1018+
async err => {
1019+
if (isDocumentChangeMissingError(err)) {
1020+
const activeTargets: TargetId[] = [];
1021+
objUtils.forEachNumber(this.queryViewsByTarget, target =>
1022+
activeTargets.push(target)
1023+
);
1024+
await this.synchronizeQueryViewsAndRaiseSnapshots(
1025+
activeTargets
1026+
);
1027+
} else {
1028+
throw err;
1029+
}
1030+
}
10121031
);
10131032
}
10141033
case 'rejected': {

packages/firestore/src/local/indexeddb_remote_document_cache.ts

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import { DocumentKey } from '../model/document_key';
2727

2828
import { SnapshotVersion } from '../core/snapshot_version';
2929
import { assert } from '../util/assert';
30+
import { Code, FirestoreError } from '../util/error';
3031
import { IndexedDbPersistence } from './indexeddb_persistence';
3132
import {
3233
DbRemoteDocument,
@@ -40,6 +41,10 @@ import { PersistencePromise } from './persistence_promise';
4041
import { RemoteDocumentCache } from './remote_document_cache';
4142
import { SimpleDb, SimpleDbStore, SimpleDbTransaction } from './simple_db';
4243

44+
const REMOTE_DOCUMENT_CHANGE_MISSING_ERR_MSG =
45+
'The remote document changelog no longer contains all changes for all ' +
46+
'local query views. It may be necessary to rebuild these views.';
47+
4348
export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
4449
/** The last id read by `getNewDocumentChanges()`. */
4550
private _lastProcessedDocumentChangeId = 0;
@@ -69,21 +74,11 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
6974
*/
7075
// PORTING NOTE: This is only used for multi-tab synchronization.
7176
start(transaction: SimpleDbTransaction): PersistencePromise<void> {
72-
// If there are no existing changes, we set `lastProcessedDocumentChangeId`
73-
// to 0 since IndexedDb's auto-generated keys start at 1.
74-
this._lastProcessedDocumentChangeId = 0;
75-
7677
const store = SimpleDb.getStore<
7778
DbRemoteDocumentChangesKey,
7879
DbRemoteDocumentChanges
7980
>(transaction, DbRemoteDocumentChanges.store);
80-
return store.iterate(
81-
{ keysOnly: true, reverse: true },
82-
(key, value, control) => {
83-
this._lastProcessedDocumentChangeId = key;
84-
control.done();
85-
}
86-
);
81+
return this.synchronizeLastDocumentChangeId(store);
8782
}
8883

8984
addEntries(
@@ -172,18 +167,33 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
172167
let changedDocs = maybeDocumentMap();
173168

174169
const range = IDBKeyRange.lowerBound(
175-
this._lastProcessedDocumentChangeId,
176-
/*lowerOpen=*/ true
170+
this._lastProcessedDocumentChangeId + 1
177171
);
172+
let firstIteration = true;
178173

179-
// TODO(b/114228464): Another client may have garbage collected the remote
180-
// document changelog if our client was throttled for more than 30 minutes.
181-
// We can detect this if the `lastProcessedDocumentChangeId` entry is no
182-
// longer in the changelog. It is possible to recover from this state,
183-
// either by replaying the entire remote document cache or by re-executing
184-
// all queries against the local store.
185-
return documentChangesStore(transaction)
174+
const changesStore = documentChangesStore(transaction);
175+
return changesStore
186176
.iterate({ range }, (_, documentChange) => {
177+
if (firstIteration) {
178+
firstIteration = false;
179+
180+
// If our client was throttled for more than 30 minutes, another
181+
// client may have garbage collected the remote document changelog.
182+
if (this._lastProcessedDocumentChangeId + 1 !== documentChange.id) {
183+
// Reset the `lastProcessedDocumentChangeId` to allow further
184+
// invocations to successfully return the changes after this
185+
// rejection.
186+
return this.synchronizeLastDocumentChangeId(changesStore).next(() =>
187+
PersistencePromise.reject(
188+
new FirestoreError(
189+
Code.DATA_LOSS,
190+
REMOTE_DOCUMENT_CHANGE_MISSING_ERR_MSG
191+
)
192+
)
193+
);
194+
}
195+
}
196+
187197
changedKeys = changedKeys.unionWith(
188198
this.serializer.fromDbResourcePaths(documentChange.changes)
189199
);
@@ -217,6 +227,31 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
217227
const range = IDBKeyRange.upperBound(changeId);
218228
return documentChangesStore(transaction).delete(range);
219229
}
230+
231+
private synchronizeLastDocumentChangeId(
232+
documentChangesStore: SimpleDbStore<
233+
DbRemoteDocumentChangesKey,
234+
DbRemoteDocumentChanges
235+
>
236+
): PersistencePromise<void> {
237+
// If there are no existing changes, we set `lastProcessedDocumentChangeId`
238+
// to 0 since IndexedDb's auto-generated keys start at 1.
239+
this._lastProcessedDocumentChangeId = 0;
240+
return documentChangesStore.iterate(
241+
{ keysOnly: true, reverse: true },
242+
(key, value, control) => {
243+
this._lastProcessedDocumentChangeId = key;
244+
control.done();
245+
}
246+
);
247+
}
248+
}
249+
250+
export function isDocumentChangeMissingError(err: FirestoreError): boolean {
251+
return (
252+
err.code === Code.DATA_LOSS &&
253+
err.message === REMOTE_DOCUMENT_CHANGE_MISSING_ERR_MSG
254+
);
220255
}
221256

222257
/**

packages/firestore/src/local/remote_document_cache.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ export interface RemoteDocumentCache {
8989
* Returns the set of documents that have been updated since the last call.
9090
* If this is the first call, returns the set of changes since client
9191
* initialization.
92+
*
93+
* If the changelog was garbage collected and can no longer be replayed,
94+
* `getNewDocumentChanges` will reject the returned Promise. Further
95+
* invocations will return document changes since the point of rejection.
9296
*/
9397
// PORTING NOTE: This is only used for multi-tab synchronization.
9498
getNewDocumentChanges(

packages/firestore/test/unit/local/remote_document_cache.test.ts

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,18 @@ import {
2828
removedDoc
2929
} from '../../util/helpers';
3030

31-
import { IndexedDbRemoteDocumentCache } from '../../../src/local/indexeddb_remote_document_cache';
31+
import {
32+
IndexedDbRemoteDocumentCache,
33+
isDocumentChangeMissingError
34+
} from '../../../src/local/indexeddb_remote_document_cache';
3235
import {
3336
DbRemoteDocumentChanges,
3437
DbRemoteDocumentChangesKey
3538
} from '../../../src/local/indexeddb_schema';
3639
import { MaybeDocumentMap } from '../../../src/model/collections';
40+
import { fail } from '../../../src/util/assert';
3741
import * as persistenceHelpers from './persistence_test_helpers';
42+
import { INDEXEDDB_TEST_SERIALIZER } from './persistence_test_helpers';
3843
import { TestRemoteDocumentCache } from './test_remote_document_cache';
3944

4045
// Helpers for use throughout tests.
@@ -115,10 +120,51 @@ describe('IndexedDbRemoteDocumentCache', () => {
115120
persistence,
116121
persistence.getRemoteDocumentCache()
117122
);
118-
const changedDocs = await cache.getNextDocumentChanges();
123+
const changedDocs = await cache.getNewDocumentChanges();
119124
assertMatches([], changedDocs);
120125
});
121126

127+
it('can recover from garbage collected change log', async () => {
128+
// This test is meant to simulate the recovery from a garbage collected
129+
// document change log.
130+
// The tests adds four changes (via the `writer`). After the first change is
131+
// processed by the reader, the writer garbage collects the first and second
132+
// change. When reader then reads the new changes, it notices that a change
133+
// is missing. The test then uses `resetLastProcessedDocumentChange` to
134+
// simulate a successful recovery.
135+
136+
const writerCache = new TestRemoteDocumentCache(
137+
persistence,
138+
persistence.getRemoteDocumentCache()
139+
);
140+
const readerCache = new TestRemoteDocumentCache(
141+
persistence,
142+
new IndexedDbRemoteDocumentCache(INDEXEDDB_TEST_SERIALIZER, true)
143+
);
144+
145+
await writerCache.addEntries([doc('a/1', 1, DOC_DATA)]);
146+
let changedDocs = await readerCache.getNewDocumentChanges();
147+
assertMatches([doc('a/1', 1, DOC_DATA)], changedDocs);
148+
149+
await writerCache.addEntries([doc('a/2', 2, DOC_DATA)]);
150+
await writerCache.addEntries([doc('a/3', 3, DOC_DATA)]);
151+
// Garbage collect change 1 and 2, but not change 3.
152+
await writerCache.removeDocumentChangesThroughChangeId(2);
153+
154+
await readerCache
155+
.getNewDocumentChanges()
156+
.then(
157+
() => fail('Missing expected error'),
158+
err => expect(isDocumentChangeMissingError(err)).to.be.ok
159+
);
160+
161+
// Ensure that we can retrieve future changes after the we processed the
162+
// error
163+
await writerCache.addEntries([doc('a/4', 4, DOC_DATA)]);
164+
changedDocs = await readerCache.getNewDocumentChanges();
165+
assertMatches([doc('a/4', 4, DOC_DATA)], changedDocs);
166+
});
167+
122168
genericRemoteDocumentCacheTests();
123169
});
124170

@@ -217,7 +263,7 @@ function genericRemoteDocumentCacheTests(): void {
217263
doc('a/1', 3, DOC_DATA)
218264
]);
219265

220-
let changedDocs = await cache.getNextDocumentChanges();
266+
let changedDocs = await cache.getNewDocumentChanges();
221267
assertMatches(
222268
[
223269
doc('a/1', 3, DOC_DATA),
@@ -228,12 +274,12 @@ function genericRemoteDocumentCacheTests(): void {
228274
);
229275

230276
await cache.addEntries([doc('c/1', 3, DOC_DATA)]);
231-
changedDocs = await cache.getNextDocumentChanges();
277+
changedDocs = await cache.getNewDocumentChanges();
232278
assertMatches([doc('c/1', 3, DOC_DATA)], changedDocs);
233279
});
234280

235281
it('can get empty changes', async () => {
236-
const changedDocs = await cache.getNextDocumentChanges();
282+
const changedDocs = await cache.getNewDocumentChanges();
237283
assertMatches([], changedDocs);
238284
});
239285

@@ -245,7 +291,7 @@ function genericRemoteDocumentCacheTests(): void {
245291
]);
246292
await cache.removeEntry(key('a/2'));
247293

248-
const changedDocs = await cache.getNextDocumentChanges();
294+
const changedDocs = await cache.getNewDocumentChanges();
249295
assertMatches(
250296
[doc('a/1', 1, DOC_DATA), removedDoc('a/2'), doc('a/3', 3, DOC_DATA)],
251297
changedDocs

packages/firestore/test/unit/local/test_remote_document_cache.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
import { Query } from '../../../src/core/query';
18+
import { IndexedDbRemoteDocumentCache } from '../../../src/local/indexeddb_remote_document_cache';
1819
import { Persistence } from '../../../src/local/persistence';
1920
import { RemoteDocumentCache } from '../../../src/local/remote_document_cache';
2021
import { DocumentMap, MaybeDocumentMap } from '../../../src/model/collections';
@@ -67,13 +68,28 @@ export class TestRemoteDocumentCache {
6768
);
6869
}
6970

70-
getNextDocumentChanges(): Promise<MaybeDocumentMap> {
71+
getNewDocumentChanges(): Promise<MaybeDocumentMap> {
7172
return this.persistence.runTransaction(
72-
'getNextDocumentChanges',
73+
'getNewDocumentChanges',
7374
'readonly',
7475
txn => {
7576
return this.cache.getNewDocumentChanges(txn);
7677
}
7778
);
7879
}
80+
81+
removeDocumentChangesThroughChangeId(changeId: number): Promise<void> {
82+
return this.persistence.runTransaction(
83+
'removeDocumentChangesThroughChangeId',
84+
'readwrite-primary',
85+
txn => {
86+
if (!(this.cache instanceof IndexedDbRemoteDocumentCache)) {
87+
throw new Error(
88+
'Can only removeDocumentChangesThroughChangeId() in IndexedDb'
89+
);
90+
}
91+
return this.cache.removeDocumentChangesThroughChangeId(txn, changeId);
92+
}
93+
);
94+
}
7995
}

packages/firestore/tslint.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
"interface-name": [true, "never-prefix"],
2626
"jsdoc-format": true,
2727
"label-position": true,
28-
"max-line-length": [true, {"limit": 100, "ignore-pattern": "https?://"}],
28+
"max-line-length": [true, {"limit": 100, "ignore-pattern": "^import|https?://"}],
2929
"member-access": [true, "no-public"],
3030
"new-parens": true,
3131
"no-angle-bracket-type-assertion": true,

0 commit comments

Comments
 (0)