Skip to content

Make getNewDocumentChanges() idempotent #2255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Oct 11, 2019
2 changes: 2 additions & 0 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ export class FirestoreClient {
// TODO(index-free): Use IndexFreeQueryEngine/IndexedQueryEngine as appropriate.
const queryEngine = new SimpleQueryEngine();
this.localStore = new LocalStore(this.persistence, queryEngine, user);
await this.localStore.start();

if (maybeLruGc) {
// We're running LRU Garbage collection. Set up the scheduler.
this.lruScheduler = new LruScheduler(
Expand Down
25 changes: 14 additions & 11 deletions packages/firestore/src/local/indexeddb_persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import { MutationQueue } from './mutation_queue';
import {
Persistence,
PersistenceTransaction,
PersistenceTransactionMode,
PrimaryStateListener,
ReferenceDelegate
} from './persistence';
Expand Down Expand Up @@ -316,9 +317,6 @@ export class IndexedDbPersistence implements Persistence {

this.scheduleClientMetadataAndPrimaryLeaseRefreshes();

return this.startRemoteDocumentCache();
})
.then(() => {
return this.simpleDb.runTransaction(
'readonly',
[DbTargetGlobal.store],
Expand All @@ -343,12 +341,6 @@ export class IndexedDbPersistence implements Persistence {
});
}

private startRemoteDocumentCache(): Promise<void> {
return this.simpleDb.runTransaction('readonly', ALL_STORES, txn =>
this.remoteDocumentCache.start(txn)
);
}

setPrimaryStateListener(
primaryStateListener: PrimaryStateListener
): Promise<void> {
Expand Down Expand Up @@ -742,17 +734,28 @@ export class IndexedDbPersistence implements Persistence {

runTransaction<T>(
action: string,
mode: 'readonly' | 'readwrite' | 'readwrite-primary',
mode: PersistenceTransactionMode,
transactionOperation: (
transaction: PersistenceTransaction
) => PersistencePromise<T>
): Promise<T> {
log.debug(LOG_TAG, 'Starting transaction:', action);

// TODO(schmidt-sebastian): Simplify once all transactions are idempotent.
const idempotent = mode.endsWith('idempotent');
const readonly = mode.startsWith('readonly');
const simpleDbMode = readonly
? idempotent
? 'readonly-idempotent'
: 'readonly'
: idempotent
? 'readwrite-idempotent'
: 'readwrite';

// Do all transactions as readwrite against all object stores, since we
// are the only reader/writer.
return this.simpleDb.runTransaction(
mode === 'readonly' ? 'readonly' : 'readwrite',
simpleDbMode,
ALL_STORES,
simpleDbTxn => {
if (mode === 'readwrite-primary') {
Expand Down
104 changes: 52 additions & 52 deletions packages/firestore/src/local/indexeddb_remote_document_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ import {
import { ObjectMap } from '../util/obj_map';

export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
/** The read time of the last entry consumed by `getNewDocumentChanges()`. */
private lastProcessedReadTime = SnapshotVersion.MIN;

/**
* @param {LocalSerializer} serializer The document serializer.
* @param {IndexManager} indexManager The query indexes that need to be maintained.
Expand All @@ -67,18 +64,6 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
private readonly indexManager: IndexManager
) {}

/**
* Starts up the remote document cache.
*
* Reads the ID of the last document change from the documentChanges store.
* Existing changes will not be returned as part of
* `getNewDocumentChanges()`.
*/
// PORTING NOTE: This is only used for multi-tab synchronization.
start(transaction: SimpleDbTransaction): PersistencePromise<void> {
return this.synchronizeLastProcessedReadTime(transaction);
}

/**
* Adds the supplied entries to the cache.
*
Expand Down Expand Up @@ -313,59 +298,74 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
.next(() => results);
}

getNewDocumentChanges(
transaction: PersistenceTransaction
): PersistencePromise<MaybeDocumentMap> {
/**
* Returns the set of documents that have been updated since the specified read
* time.
*/
// PORTING NOTE: This is only used for multi-tab synchronization.
getDocumentChanges(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, the getNewDocumentChanges still makes sense while explicitly passing the readTime. That name helps convey that it's a diff of changes.

I don't oppose renaming it to getDocumentChanges if that's your preference, but I don't think it needed to change.

transaction: PersistenceTransaction,
sinceReadTime: SnapshotVersion
): PersistencePromise<{
changedDocs: MaybeDocumentMap;
readTime: SnapshotVersion;
}> {
let changedDocs = maybeDocumentMap();

const lastReadTime = this.serializer.toDbTimestampKey(
this.lastProcessedReadTime
);
let lastReadTime = this.serializer.toDbTimestampKey(sinceReadTime);

const documentsStore = remoteDocumentsStore(transaction);
const range = IDBKeyRange.lowerBound(lastReadTime, true);
return documentsStore
.iterate(
{ index: DbRemoteDocument.readTimeIndex, range },
(_, dbRemoteDoc) => {
// Unlike `getEntry()` and others, `getNewDocumentChanges()` parses
// Unlike `getEntry()` and others, `getDocumentChanges()` parses
// the documents directly since we want to keep sentinel deletes.
const doc = this.serializer.fromDbRemoteDocument(dbRemoteDoc);
changedDocs = changedDocs.insert(doc.key, doc);
this.lastProcessedReadTime = this.serializer.fromDbTimestampKey(
dbRemoteDoc.readTime!
);
lastReadTime = dbRemoteDoc.readTime!;
}
)
.next(() => changedDocs);
.next(() => {
return {
changedDocs,
readTime: this.serializer.fromDbTimestampKey(lastReadTime)
};
});
}

/**
* Sets the last processed read time to the maximum read time of the backing
* object store, allowing calls to getNewDocumentChanges() to return subsequent
* changes.
* Returns the last document that has changed, as well as the read time of the
* last change. If no document has changed, returns SnapshotVersion.MIN.
*/
private synchronizeLastProcessedReadTime(
transaction: SimpleDbTransaction
): PersistencePromise<void> {
const documentsStore = SimpleDb.getStore<
DbRemoteDocumentKey,
DbRemoteDocument
>(transaction, DbRemoteDocument.store);

// If there are no existing entries, we set `lastProcessedReadTime` to 0.
this.lastProcessedReadTime = SnapshotVersion.forDeletedDoc();
return documentsStore.iterate(
{ index: DbRemoteDocument.readTimeIndex, reverse: true },
(key, value, control) => {
if (value.readTime) {
this.lastProcessedReadTime = this.serializer.fromDbTimestampKey(
value.readTime
);
// PORTING NOTE: This is only used for multi-tab synchronization.
getLastDocumentChange(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in the synchronizeLastProcessedReadTime version of LocalStore. The changedDoc is never read, but I opted to return it to make the API similar to getDocumentChanges above.

transaction: PersistenceTransaction
): PersistencePromise<{
changedDoc: MaybeDocument | undefined;
readTime: SnapshotVersion;
}> {
const documentsStore = remoteDocumentsStore(transaction);

// If there are no existing entries, we return SnapshotVersion.MIN.
let readTime = SnapshotVersion.MIN;
let changedDoc: MaybeDocument | undefined;

return documentsStore
.iterate(
{ index: DbRemoteDocument.readTimeIndex, reverse: true },
(key, dbRemoteDoc, control) => {
changedDoc = this.serializer.fromDbRemoteDocument(dbRemoteDoc);
if (dbRemoteDoc.readTime) {
readTime = this.serializer.fromDbTimestampKey(dbRemoteDoc.readTime);
}
control.done();
}
control.done();
}
);
)
.next(() => {
return { changedDoc, readTime };
});
}

newChangeBuffer(options?: {
Expand Down Expand Up @@ -413,7 +413,7 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
doc.version.isEqual(SnapshotVersion.forDeletedDoc())
) {
// The document is a sentinel removal and should only be used in the
// `getNewDocumentChanges()`.
// `getDocumentChanges()`.
return null;
}

Expand All @@ -438,7 +438,7 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
/**
* @param documentCache The IndexedDbRemoteDocumentCache to apply the changes to.
* @param trackRemovals Whether to create sentinel deletes that can be tracked by
* `getNewDocumentChanges()`.
* `getDocumentChanges()`.
*/
constructor(
private readonly documentCache: IndexedDbRemoteDocumentCache,
Expand Down Expand Up @@ -478,7 +478,7 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
// In order to track removals, we store a "sentinel delete" in the
// RemoteDocumentCache. This entry is represented by a NoDocument
// with a version of 0 and ignored by `maybeDecodeDocument()` but
// preserved in `getNewDocumentChanges()`.
// preserved in `getDocumentChanges()`.
const deletedDoc = this.documentCache.serializer.toDbRemoteDocument(
new NoDocument(key, SnapshotVersion.forDeletedDoc()),
this.readTime
Expand Down
Loading