Skip to content

Make getNewDocumentChanges() idempotent #2255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Oct 11, 2019
2 changes: 2 additions & 0 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ export class FirestoreClient {
// TODO(index-free): Use IndexFreeQueryEngine/IndexedQueryEngine as appropriate.
const queryEngine = new SimpleQueryEngine();
this.localStore = new LocalStore(this.persistence, queryEngine, user);
await this.localStore.start();

if (maybeLruGc) {
// We're running LRU Garbage collection. Set up the scheduler.
this.lruScheduler = new LruScheduler(
Expand Down
15 changes: 3 additions & 12 deletions packages/firestore/src/local/indexeddb_persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,12 @@ export class IndexedDbPersistence implements Persistence {

this.scheduleClientMetadataAndPrimaryLeaseRefreshes();

return this.startRemoteDocumentCache();
})
.then(() =>
this.simpleDb.runTransaction(
return this.simpleDb.runTransaction(
'readonly-idempotent',
[DbTargetGlobal.store],
txn => getHighestListenSequenceNumber(txn)
)
)
);
})
.then(highestListenSequenceNumber => {
this.listenSequence = new ListenSequence(
highestListenSequenceNumber,
Expand All @@ -341,12 +338,6 @@ export class IndexedDbPersistence implements Persistence {
});
}

private startRemoteDocumentCache(): Promise<void> {
return this.simpleDb.runTransaction('readonly', ALL_STORES, txn =>
this.remoteDocumentCache.start(txn)
);
}

setPrimaryStateListener(
primaryStateListener: PrimaryStateListener
): Promise<void> {
Expand Down
101 changes: 48 additions & 53 deletions packages/firestore/src/local/indexeddb_remote_document_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,10 @@ import { PersistenceTransaction } from './persistence';
import { PersistencePromise } from './persistence_promise';
import { RemoteDocumentCache } from './remote_document_cache';
import { RemoteDocumentChangeBuffer } from './remote_document_change_buffer';
import {
IterateOptions,
SimpleDb,
SimpleDbStore,
SimpleDbTransaction
} from './simple_db';
import { IterateOptions, SimpleDbStore } from './simple_db';
import { ObjectMap } from '../util/obj_map';

export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
/** The read time of the last entry consumed by `getNewDocumentChanges()`. */
private lastProcessedReadTime = SnapshotVersion.MIN;

/**
* @param {LocalSerializer} serializer The document serializer.
* @param {IndexManager} indexManager The query indexes that need to be maintained.
Expand All @@ -67,18 +59,6 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
private readonly indexManager: IndexManager
) {}

/**
* Starts up the remote document cache.
*
* Reads the ID of the last document change from the documentChanges store.
* Existing changes will not be returned as part of
* `getNewDocumentChanges()`.
*/
// PORTING NOTE: This is only used for multi-tab synchronization.
start(transaction: SimpleDbTransaction): PersistencePromise<void> {
return this.synchronizeLastProcessedReadTime(transaction);
}

/**
* Adds the supplied entries to the cache.
*
Expand Down Expand Up @@ -313,14 +293,21 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
.next(() => results);
}

/**
* Returns the set of documents that have been updated since the specified read
* time.
*/
// PORTING NOTE: This is only used for multi-tab synchronization.
getNewDocumentChanges(
transaction: PersistenceTransaction
): PersistencePromise<MaybeDocumentMap> {
transaction: PersistenceTransaction,
sinceReadTime: SnapshotVersion
): PersistencePromise<{
changedDocs: MaybeDocumentMap;
readTime: SnapshotVersion;
}> {
let changedDocs = maybeDocumentMap();

const lastReadTime = this.serializer.toDbTimestampKey(
this.lastProcessedReadTime
);
let lastReadTime = this.serializer.toDbTimestampKey(sinceReadTime);

const documentsStore = remoteDocumentsStore(transaction);
const range = IDBKeyRange.lowerBound(lastReadTime, true);
Expand All @@ -332,40 +319,48 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
// the documents directly since we want to keep sentinel deletes.
const doc = this.serializer.fromDbRemoteDocument(dbRemoteDoc);
changedDocs = changedDocs.insert(doc.key, doc);
this.lastProcessedReadTime = this.serializer.fromDbTimestampKey(
dbRemoteDoc.readTime!
);
lastReadTime = dbRemoteDoc.readTime!;
}
)
.next(() => changedDocs);
.next(() => {
return {
changedDocs,
readTime: this.serializer.fromDbTimestampKey(lastReadTime)
};
});
}

/**
* Sets the last processed read time to the maximum read time of the backing
* object store, allowing calls to getNewDocumentChanges() to return subsequent
* changes.
* Returns the last document that has changed, as well as the read time of the
* last change. If no document has changed, returns SnapshotVersion.MIN.
*/
private synchronizeLastProcessedReadTime(
transaction: SimpleDbTransaction
): PersistencePromise<void> {
const documentsStore = SimpleDb.getStore<
DbRemoteDocumentKey,
DbRemoteDocument
>(transaction, DbRemoteDocument.store);

// If there are no existing entries, we set `lastProcessedReadTime` to 0.
this.lastProcessedReadTime = SnapshotVersion.forDeletedDoc();
return documentsStore.iterate(
{ index: DbRemoteDocument.readTimeIndex, reverse: true },
(key, value, control) => {
if (value.readTime) {
this.lastProcessedReadTime = this.serializer.fromDbTimestampKey(
value.readTime
);
// PORTING NOTE: This is only used for multi-tab synchronization.
getLastDocumentChange(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in the synchronizeLastProcessedReadTime version of LocalStore. The changedDoc is never read, but I opted to return it to make the API similar to getDocumentChanges above.

transaction: PersistenceTransaction
): PersistencePromise<{
changedDoc: MaybeDocument | undefined;
readTime: SnapshotVersion;
}> {
const documentsStore = remoteDocumentsStore(transaction);

// If there are no existing entries, we return SnapshotVersion.MIN.
let readTime = SnapshotVersion.MIN;
let changedDoc: MaybeDocument | undefined;

return documentsStore
.iterate(
{ index: DbRemoteDocument.readTimeIndex, reverse: true },
(key, dbRemoteDoc, control) => {
changedDoc = this.serializer.fromDbRemoteDocument(dbRemoteDoc);
if (dbRemoteDoc.readTime) {
readTime = this.serializer.fromDbTimestampKey(dbRemoteDoc.readTime);
}
control.done();
}
control.done();
}
);
)
.next(() => {
return { changedDoc, readTime };
});
}

newChangeBuffer(options?: {
Expand Down
58 changes: 51 additions & 7 deletions packages/firestore/src/local/local_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { ObjectMap } from '../util/obj_map';
import { LocalDocumentsView } from './local_documents_view';
import { LocalViewChanges } from './local_view_changes';
import { LruGarbageCollector, LruResults } from './lru_garbage_collector';
import { IndexedDbRemoteDocumentCache } from './indexeddb_remote_document_cache';
import { MutationQueue } from './mutation_queue';
import { Persistence, PersistenceTransaction } from './persistence';
import { PersistencePromise } from './persistence_promise';
Expand Down Expand Up @@ -168,6 +169,13 @@ export class LocalStore {
q.canonicalId()
);

/**
* The read time of the last entry processed by `getNewDocumentChanges()`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getDocumentChanges?

Oh! I see: this refers to getNewDocumentChanges in LocalStore and not the one in the RemoteDocumentCache. This is slightly confusing and I have a slight preference for keeping the names the same, though up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your confusion:) I tried to be to smart about it and reverted back to using getNewDocumentChanges() everywhere, which still makes sense even when a read time is passed (at least I see it that way now).

*
* PORTING NOTE: This is only used for multi-tab synchronization.
*/
private lastDocumentChangeReadTime = SnapshotVersion.MIN;

constructor(
/** Manages our in-memory or durable persistence. */
private persistence: Persistence,
Expand All @@ -192,6 +200,11 @@ export class LocalStore {
this.queryEngine.setLocalDocumentsView(this.localDocuments);
}

/** Starts the LocalStore. */
start(): Promise<void> {
return this.synchronizeLastDocumentChangeReadTime();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is equivalent to the previous functionality in RemoteDocumentStore.start().

}

/**
* Tells the LocalStore that the currently authenticated user has changed.
*
Expand Down Expand Up @@ -1006,14 +1019,45 @@ export class LocalStore {
}
}

/**
* Returns the set of documents that have been updated since the last call.
* If this is the first call, returns the set of changes since client
* initialization. Further invocations will return document changes since
* the point of rejection.
*/
// PORTING NOTE: Multi-tab only.
getNewDocumentChanges(): Promise<MaybeDocumentMap> {
return this.persistence.runTransaction(
'Get new document changes',
'readonly',
txn => {
return this.remoteDocuments.getNewDocumentChanges(txn);
}
);
return this.persistence
.runTransaction('Get new document changes', 'readonly-idempotent', txn =>
this.remoteDocuments.getNewDocumentChanges(
txn,
this.lastDocumentChangeReadTime
)
)
.then(({ changedDocs, readTime }) => {
this.lastDocumentChangeReadTime = readTime;
return changedDocs;
});
}

/**
* Reads the newest document change from persistence and forwards the internal
* synchronization marker so that calls to `getNewDocumentChanges()`
* only return changes that happened after client initialization.
*/
// PORTING NOTE: Multi-tab only.
async synchronizeLastDocumentChangeReadTime(): Promise<void> {
if (this.remoteDocuments instanceof IndexedDbRemoteDocumentCache) {
const remoteDocumentCache = this.remoteDocuments;
return this.persistence
.runTransaction(
'Synchronize last document change read time',
'readonly-idempotent',
txn => remoteDocumentCache.getLastDocumentChange(txn)
)
.then(({ readTime }) => {
this.lastDocumentChangeReadTime = readTime;
});
}
}
}
33 changes: 10 additions & 23 deletions packages/firestore/src/local/memory_remote_document_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
import { Query } from '../core/query';
import {
DocumentKeySet,
documentKeySet,
DocumentMap,
documentMap,
DocumentSizeEntry,
MaybeDocumentMap,
maybeDocumentMap,
NullableMaybeDocumentMap,
nullableMaybeDocumentMap
} from '../model/collections';
import { Document, MaybeDocument, NoDocument } from '../model/document';
import { Document, MaybeDocument } from '../model/document';
import { DocumentKey } from '../model/document_key';

import { SnapshotVersion } from '../core/snapshot_version';
Expand Down Expand Up @@ -57,9 +55,6 @@ export class MemoryRemoteDocumentCache implements RemoteDocumentCache {
/** Underlying cache of documents and their read times. */
private docs = documentEntryMap();

/** Set of documents changed since last call to `getNewDocumentChanges()`. */
private newDocumentChanges = documentKeySet();

/** Size of all cached documents. */
private size = 0;

Expand Down Expand Up @@ -99,7 +94,6 @@ export class MemoryRemoteDocumentCache implements RemoteDocumentCache {
readTime
});

this.newDocumentChanges = this.newDocumentChanges.add(key);
this.size += currentSize - previousSize;

return this.indexManager.addToCollectionParentIndex(
Expand All @@ -117,7 +111,6 @@ export class MemoryRemoteDocumentCache implements RemoteDocumentCache {
private removeEntry(documentKey: DocumentKey): void {
const entry = this.docs.get(documentKey);
if (entry) {
this.newDocumentChanges = this.newDocumentChanges.add(documentKey);
this.docs = this.docs.remove(documentKey);
this.size -= entry.size;
}
Expand Down Expand Up @@ -184,21 +177,15 @@ export class MemoryRemoteDocumentCache implements RemoteDocumentCache {
}

getNewDocumentChanges(
transaction: PersistenceTransaction
): PersistencePromise<MaybeDocumentMap> {
let changedDocs = maybeDocumentMap();

this.newDocumentChanges.forEach(key => {
const entry = this.docs.get(key);
const changedDoc = entry
? entry.maybeDocument
: new NoDocument(key, SnapshotVersion.forDeletedDoc());
changedDocs = changedDocs.insert(key, changedDoc);
});

this.newDocumentChanges = documentKeySet();

return PersistencePromise.resolve(changedDocs);
transaction: PersistenceTransaction,
sinceReadTime: SnapshotVersion
): PersistencePromise<{
changedDocs: MaybeDocumentMap;
readTime: SnapshotVersion;
}> {
throw new Error(
'getNewDocumentChanges() is not supported with MemoryPersistence'
);
}

newChangeBuffer(options?: {
Expand Down
14 changes: 8 additions & 6 deletions packages/firestore/src/local/remote_document_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,17 @@ export interface RemoteDocumentCache {
): PersistencePromise<DocumentMap>;

/**
* Returns the set of documents that have been updated since the last call.
* If this is the first call, returns the set of changes since client
* initialization. Further invocations will return document changes since
* the point of rejection.
* Returns the set of documents that have changed since the specified read
* time.
*/
// PORTING NOTE: This is only used for multi-tab synchronization.
getNewDocumentChanges(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative to making LocalStore aware that it's using IndexedDB persistence would be to leave this method in the interface and allow persistence implementations to throw some kind of unsupported operation error if they're ever called.

This would have the effect of making the LocalStore code cleaner. Both would fail if MemoryPersistence was ever used with multi-tab. As is we fail the assertion; the proposed change would throw unsupported operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the method back to the interface and am throwing an exception now. I debated between the two approaches, but it probably makes sense to reduce this clutter in LocalStore.

I did leave getLastDocumentChange() IndexdedDb only. This is a new method and there are no asserts for this in LocalStore (just an if-statement that makes the intent clear).

transaction: PersistenceTransaction
): PersistencePromise<MaybeDocumentMap>;
transaction: PersistenceTransaction,
sinceReadTime: SnapshotVersion
): PersistencePromise<{
changedDocs: MaybeDocumentMap;
readTime: SnapshotVersion;
}>;

/**
* Provides access to add or update the contents of the cache. The buffer
Expand Down
1 change: 1 addition & 0 deletions packages/firestore/test/unit/local/local_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ function genericLocalStoreTests(
countingQueryEngine,
User.UNAUTHENTICATED
);
await localStore.start();
});

afterEach(async () => {
Expand Down
Loading