Skip to content

[Multi-Tab] Detect and recover from GCed Remote Document Changelog #1272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 3, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions packages/firestore/src/core/sync_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import { SortedMap } from '../util/sorted_map';
import { isNullOrUndefined } from '../util/types';

import { isPrimaryLeaseLostError } from '../local/indexeddb_persistence';
import { isRemoteDocumentChangesGarbageCollectedError } from '../local/indexeddb_remote_document_cache';
import { ClientId, SharedClientState } from '../local/shared_client_state';
import {
QueryTargetState,
Expand Down Expand Up @@ -1001,14 +1002,33 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
switch (state) {
case 'current':
case 'not-current': {
const changes = await this.localStore.getNewDocumentChanges();
const synthesizedRemoteEvent = RemoteEvent.createSynthesizedRemoteEventForCurrentChange(
targetId,
state === 'current'
);
return this.emitNewSnapsAndNotifyLocalStore(
changes,
synthesizedRemoteEvent
return this.localStore.getNewDocumentChanges().then(
async changes => {
// tslint and prettier disagree about their preferred line length.
// tslint:disable-next-line:max-line-length
const synthesizedRemoteEvent = RemoteEvent.createSynthesizedRemoteEventForCurrentChange(
targetId,
state === 'current'
);
await this.emitNewSnapsAndNotifyLocalStore(
changes,
synthesizedRemoteEvent
);
},
async err => {
if (isRemoteDocumentChangesGarbageCollectedError(err)) {
const activeTargets: TargetId[] = [];
objUtils.forEachNumber(this.queryViewsByTarget, target =>
activeTargets.push(target)
);
await this.synchronizeQueryViewsAndRaiseSnapshots(
activeTargets
);
await this.localStore.resetLastDocumentChangeId();
} else {
throw err;
}
}
);
}
case 'rejected': {
Expand Down
75 changes: 56 additions & 19 deletions packages/firestore/src/local/indexeddb_remote_document_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { DocumentKey } from '../model/document_key';

import { SnapshotVersion } from '../core/snapshot_version';
import { assert } from '../util/assert';
import { Code, FirestoreError } from '../util/error';
import { IndexedDbPersistence } from './indexeddb_persistence';
import {
DbRemoteDocument,
Expand All @@ -40,6 +41,10 @@ import { PersistencePromise } from './persistence_promise';
import { RemoteDocumentCache } from './remote_document_cache';
import { SimpleDb, SimpleDbStore, SimpleDbTransaction } from './simple_db';

const REMOTE_DOCUMENT_CHANGELOG_GARBAGE_COLLECTED_ERR_MSG =
'The remote document changelog no longer contains all changes for all ' +
'local query views. It may be necessary to rebuild these views.';

export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
/** The last id read by `getNewDocumentChanges()`. */
private _lastProcessedDocumentChangeId = 0;
Expand Down Expand Up @@ -69,21 +74,11 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
*/
// PORTING NOTE: This is only used for multi-tab synchronization.
start(transaction: SimpleDbTransaction): PersistencePromise<void> {
// If there are no existing changes, we set `lastProcessedDocumentChangeId`
// to 0 since IndexedDb's auto-generated keys start at 1.
this._lastProcessedDocumentChangeId = 0;

const store = SimpleDb.getStore<
DbRemoteDocumentChangesKey,
DbRemoteDocumentChanges
>(transaction, DbRemoteDocumentChanges.store);
return store.iterate(
{ keysOnly: true, reverse: true },
(key, value, control) => {
this._lastProcessedDocumentChangeId = key;
control.done();
}
);
return this.synchronizeLastDocumentChangeId(store);
}

addEntries(
Expand Down Expand Up @@ -172,18 +167,26 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
let changedDocs = maybeDocumentMap();

const range = IDBKeyRange.lowerBound(
this._lastProcessedDocumentChangeId,
/*lowerOpen=*/ true
this._lastProcessedDocumentChangeId + 1
);
let firstIteration = true;

// TODO(b/114228464): Another client may have garbage collected the remote
// document changelog if our client was throttled for more than 30 minutes.
// We can detect this if the `lastProcessedDocumentChangeId` entry is no
// longer in the changelog. It is possible to recover from this state,
// either by replaying the entire remote document cache or by re-executing
// all queries against the local store.
return documentChangesStore(transaction)
.iterate({ range }, (_, documentChange) => {
if (firstIteration) {
// If our client was throttled for more than 30 minutes, another
// client may have garbage collected the remote document changelog.
if (this._lastProcessedDocumentChangeId + 1 !== documentChange.id) {
return PersistencePromise.reject(
new FirestoreError(
Code.DATA_LOSS,
REMOTE_DOCUMENT_CHANGELOG_GARBAGE_COLLECTED_ERR_MSG
)
);
}
firstIteration = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my comment elsewhere, perhaps rework this so that getNewDocumentChanges() always advances past all document changes, even if it ultimately returns an error... e.g. by changing firstIteration to minChangeId and checking it in the next() block...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider it done.

}

changedKeys = changedKeys.unionWith(
this.serializer.fromDbResourcePaths(documentChange.changes)
);
Expand Down Expand Up @@ -217,6 +220,40 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
const range = IDBKeyRange.upperBound(changeId);
return documentChangesStore(transaction).delete(range);
}

resetLastProcessedDocumentChange(
transaction: PersistenceTransaction
): PersistencePromise<void> {
const store = documentChangesStore(transaction);
return this.synchronizeLastDocumentChangeId(store);
}

private synchronizeLastDocumentChangeId(
documentChangesStore: SimpleDbStore<
DbRemoteDocumentChangesKey,
DbRemoteDocumentChanges
>
): PersistencePromise<void> {
// If there are no existing changes, we set `lastProcessedDocumentChangeId`
// to 0 since IndexedDb's auto-generated keys start at 1.
this._lastProcessedDocumentChangeId = 0;
return documentChangesStore.iterate(
{ keysOnly: true, reverse: true },
(key, value, control) => {
this._lastProcessedDocumentChangeId = key;
control.done();
}
);
}
}

export function isRemoteDocumentChangesGarbageCollectedError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like verbosity, but "is...Error" gets hard to parse when there are 5 words in between. How about isDocumentChangeMissingError()?

I was also wondering if you could just:

export const DOCUMENT_CHANGE_MISSING_ERROR = new Error(...);

But I am not sure how JS works. Would that mess up the stack trace in the error or something?

It's also a little dubious to be using errors for control flow. We could change getNewDocumentChanges() to return null if it encountered a missing change or something, but I think I'm not that thrilled about the idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the name to isDocumentChangeMissingError.

In general, I would like to shy away from statically allocating errors as it does mess up the stacktraces (even in JS).

I'm gonna use the current fire alarm in SFO as an excuse to not comment on the API contract. I can't really think of a much better way to convey this information.

err: FirestoreError
): boolean {
return (
err.code === Code.DATA_LOSS &&
err.message === REMOTE_DOCUMENT_CHANGELOG_GARBAGE_COLLECTED_ERR_MSG
);
}

/**
Expand Down
10 changes: 10 additions & 0 deletions packages/firestore/src/local/local_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -869,4 +869,14 @@ export class LocalStore {
}
);
}

resetLastDocumentChangeId(): Promise<void> {
return this.persistence.runTransaction(
'Reset last document change ID',
'readonly',
txn => {
return this.remoteDocuments.resetLastProcessedDocumentChange(txn);
}
);
}
}
7 changes: 7 additions & 0 deletions packages/firestore/src/local/memory_remote_document_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,11 @@ export class MemoryRemoteDocumentCache implements RemoteDocumentCache {

return PersistencePromise.resolve(changedDocs);
}

resetLastProcessedDocumentChange(
transaction: PersistenceTransaction
): PersistencePromise<void> {
this.newDocumentChanges = documentKeySet();
return PersistencePromise.resolve();
}
}
11 changes: 11 additions & 0 deletions packages/firestore/src/local/remote_document_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,15 @@ export interface RemoteDocumentCache {
getNewDocumentChanges(
transaction: PersistenceTransaction
): PersistencePromise<MaybeDocumentMap>;

/**
* Skips all existing change log entries in IndexedDb and moves the change log
* cursor past the last existing change. This method should only be called if
* `getNewDocumentChanges()`can no longer replay all changes and all views
* have already been manually synchronized.
*/
// PORTING NOTE: This is only used for multi-tab synchronization.
resetLastProcessedDocumentChange(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"lastProcessedDocumentChange" is using implementation terminology and should instead use the same terminology as the rest of this interface, which probably means something like "resetDocumenChanges()" (to match "getNewDocumentChanges()"). The comment should also be updated to be more implementation-agnostic (including not mentioning IndexedDb).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, can we get rid of this method and just have getNewDocumentChanges() maintain its guarantee that it'll only return new changes since the last call, even if it hits the error? That is, have it still "consume" all of the new changes, updating lastProcessedChangeId_, even if there was an error?

That seems sensible and simplifies the interface (and maybe our implementation?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the method and inlined the call that resets the change log pointer.

transaction: PersistenceTransaction
): PersistencePromise<void>;
}
61 changes: 55 additions & 6 deletions packages/firestore/test/unit/local/remote_document_cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@ import {
removedDoc
} from '../../util/helpers';

import { IndexedDbRemoteDocumentCache } from '../../../src/local/indexeddb_remote_document_cache';
import {
IndexedDbRemoteDocumentCache,
isRemoteDocumentChangesGarbageCollectedError
} from '../../../src/local/indexeddb_remote_document_cache';
import {
DbRemoteDocumentChanges,
DbRemoteDocumentChangesKey
} from '../../../src/local/indexeddb_schema';
import { MaybeDocumentMap } from '../../../src/model/collections';
import { fail } from '../../../src/util/assert';
import * as persistenceHelpers from './persistence_test_helpers';
import { INDEXEDDB_TEST_SERIALIZER } from './persistence_test_helpers';
import { TestRemoteDocumentCache } from './test_remote_document_cache';

// Helpers for use throughout tests.
Expand Down Expand Up @@ -115,10 +120,54 @@ describe('IndexedDbRemoteDocumentCache', () => {
persistence,
persistence.getRemoteDocumentCache()
);
const changedDocs = await cache.getNextDocumentChanges();
const changedDocs = await cache.getNewDocumentChanges();
assertMatches([], changedDocs);
});

it('can recover from garbage collected change log', async () => {
// This test is meant to simulate the recovery from a garbage collected
// document change log.
// The tests adds four changes (via the `writer`). After the first change is
// processed by the reader, the writer garbage collects the first and second
// change. When reader then reads the new changes, it notices that a change
// is missing. The test then uses `resetLastProcessedDocumentChange` to
// simulate a successful recovery.

const writerCache = new TestRemoteDocumentCache(
persistence,
persistence.getRemoteDocumentCache()
);
const readerCache = new TestRemoteDocumentCache(
persistence,
new IndexedDbRemoteDocumentCache(INDEXEDDB_TEST_SERIALIZER, true)
);

await writerCache.addEntries([doc('a/1', 1, DOC_DATA)]);
let changedDocs = await readerCache.getNewDocumentChanges();
assertMatches([doc('a/1', 1, DOC_DATA)], changedDocs);

await writerCache.addEntries([doc('a/2', 2, DOC_DATA)]);
await writerCache.addEntries([doc('a/3', 3, DOC_DATA)]);
// Garbage collect change 1 and 2, but not change 3.
await writerCache.removeDocumentChangesThroughChangeId(2);

await readerCache
.getNewDocumentChanges()
.then(
() => fail('Missing expected error'),
err =>
expect(isRemoteDocumentChangesGarbageCollectedError(err)).to.be.ok
);

// Recover by moving moving the change log cursor to the last existing
// entry. We assume that as part of the recovery, all relevant changes have
// been processed manually (see `SyncEngine.applyTargetState`).
await readerCache.resetLastProcessDocumentChange();
await writerCache.addEntries([doc('a/4', 4, DOC_DATA)]);
changedDocs = await readerCache.getNewDocumentChanges();
assertMatches([doc('a/4', 4, DOC_DATA)], changedDocs);
});

genericRemoteDocumentCacheTests();
});

Expand Down Expand Up @@ -217,7 +266,7 @@ function genericRemoteDocumentCacheTests(): void {
doc('a/1', 3, DOC_DATA)
]);

let changedDocs = await cache.getNextDocumentChanges();
let changedDocs = await cache.getNewDocumentChanges();
assertMatches(
[
doc('a/1', 3, DOC_DATA),
Expand All @@ -228,12 +277,12 @@ function genericRemoteDocumentCacheTests(): void {
);

await cache.addEntries([doc('c/1', 3, DOC_DATA)]);
changedDocs = await cache.getNextDocumentChanges();
changedDocs = await cache.getNewDocumentChanges();
assertMatches([doc('c/1', 3, DOC_DATA)], changedDocs);
});

it('can get empty changes', async () => {
const changedDocs = await cache.getNextDocumentChanges();
const changedDocs = await cache.getNewDocumentChanges();
assertMatches([], changedDocs);
});

Expand All @@ -245,7 +294,7 @@ function genericRemoteDocumentCacheTests(): void {
]);
await cache.removeEntry(key('a/2'));

const changedDocs = await cache.getNextDocumentChanges();
const changedDocs = await cache.getNewDocumentChanges();
assertMatches(
[doc('a/1', 1, DOC_DATA), removedDoc('a/2'), doc('a/3', 3, DOC_DATA)],
changedDocs
Expand Down
34 changes: 32 additions & 2 deletions packages/firestore/test/unit/local/test_remote_document_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import { Query } from '../../../src/core/query';
import { IndexedDbRemoteDocumentCache } from '../../../src/local/indexeddb_remote_document_cache';
import { Persistence } from '../../../src/local/persistence';
import { RemoteDocumentCache } from '../../../src/local/remote_document_cache';
import { DocumentMap, MaybeDocumentMap } from '../../../src/model/collections';
Expand Down Expand Up @@ -67,13 +68,42 @@ export class TestRemoteDocumentCache {
);
}

getNextDocumentChanges(): Promise<MaybeDocumentMap> {
getNewDocumentChanges(): Promise<MaybeDocumentMap> {
return this.persistence.runTransaction(
'getNextDocumentChanges',
'getNewDocumentChanges',
'readonly',
txn => {
return this.cache.getNewDocumentChanges(txn);
}
);
}

resetLastProcessDocumentChange(): Promise<void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hoping this method goes away, but if not you're missing 'ed' here and below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it's gone!

return this.persistence.runTransaction(
'resetLastProcessDocumentChange',
'readonly',
txn => {
return this.cache.resetLastProcessedDocumentChange(txn);
}
);
}

removeDocumentChangesThroughChangeId(changeId: number): Promise<void> {
if (!(this.cache instanceof IndexedDbRemoteDocumentCache)) {
throw new Error(
'Can only removeDocumentChangesThroughChangeId() in IndexedDb'
);
}
return this.persistence.runTransaction(
'removeDocumentChangesThroughChangeId',
'readwrite-primary',
txn => {
return (this
.cache as IndexedDbRemoteDocumentCache).removeDocumentChangesThroughChangeId(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this cast necessary? If you moved your if check above into this closure it probably wouldn't be...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I managed to get rid of it by moving the assert.

txn,
changeId
);
}
);
}
}
2 changes: 1 addition & 1 deletion packages/firestore/tslint.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"interface-name": [true, "never-prefix"],
"jsdoc-format": true,
"label-position": true,
"max-line-length": [true, {"limit": 100, "ignore-pattern": "https?://"}],
"max-line-length": [true, {"limit": 100, "ignore-pattern": "^import|https?://"}],
"member-access": [true, "no-public"],
"new-parens": true,
"no-angle-bracket-type-assertion": true,
Expand Down