-
Notifications
You must be signed in to change notification settings - Fork 939
Make notifyLocalViewChanges idempotent #3044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1070,8 +1070,6 @@ function clientMetadataStore( | |
|
||
/** Provides LRU functionality for IndexedDB persistence. */ | ||
export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate { | ||
private inMemoryPins: ReferenceSet | null = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed the |
||
|
||
readonly garbageCollector: LruGarbageCollector; | ||
|
||
constructor(private readonly db: IndexedDbPersistence, params: LruParams) { | ||
|
@@ -1081,14 +1079,14 @@ export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate { | |
getSequenceNumberCount( | ||
txn: PersistenceTransaction | ||
): PersistencePromise<number> { | ||
const docCountPromise = this.orphanedDocmentCount(txn); | ||
const docCountPromise = this.orphanedDocumentCount(txn); | ||
const targetCountPromise = this.db.getTargetCache().getTargetCount(txn); | ||
return targetCountPromise.next(targetCount => | ||
docCountPromise.next(docCount => targetCount + docCount) | ||
); | ||
} | ||
|
||
private orphanedDocmentCount( | ||
private orphanedDocumentCount( | ||
txn: PersistenceTransaction | ||
): PersistencePromise<number> { | ||
let orphanedCount = 0; | ||
|
@@ -1113,19 +1111,17 @@ export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate { | |
); | ||
} | ||
|
||
setInMemoryPins(inMemoryPins: ReferenceSet): void { | ||
this.inMemoryPins = inMemoryPins; | ||
} | ||
|
||
addReference( | ||
txn: PersistenceTransaction, | ||
targetId: TargetId, | ||
key: DocumentKey | ||
): PersistencePromise<void> { | ||
return writeSentinelKey(txn, key); | ||
} | ||
|
||
removeReference( | ||
txn: PersistenceTransaction, | ||
targetId: TargetId, | ||
key: DocumentKey | ||
): PersistencePromise<void> { | ||
return writeSentinelKey(txn, key); | ||
|
@@ -1141,7 +1137,7 @@ export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate { | |
.removeTargets(txn, upperBound, activeTargetIds); | ||
} | ||
|
||
removeMutationReference( | ||
markPotentiallyOrphaned( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renamed this method as I need to use it to trigger |
||
txn: PersistenceTransaction, | ||
key: DocumentKey | ||
): PersistencePromise<void> { | ||
|
@@ -1158,11 +1154,7 @@ export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate { | |
txn: PersistenceTransaction, | ||
docKey: DocumentKey | ||
): PersistencePromise<boolean> { | ||
if (this.inMemoryPins!.containsKey(docKey)) { | ||
return PersistencePromise.resolve<boolean>(true); | ||
} else { | ||
return mutationQueuesContainKey(txn, docKey); | ||
} | ||
return mutationQueuesContainKey(txn, docKey); | ||
} | ||
|
||
removeOrphanedDocuments( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -165,11 +165,6 @@ export class LocalStore { | |
*/ | ||
protected localDocuments: LocalDocumentsView; | ||
|
||
/** | ||
* The set of document references maintained by any local views. | ||
*/ | ||
private localViewReferences = new ReferenceSet(); | ||
|
||
/** Maps a target to its `TargetData`. */ | ||
protected targetCache: TargetCache; | ||
|
||
|
@@ -206,9 +201,6 @@ export class LocalStore { | |
persistence.started, | ||
'LocalStore was passed an unstarted persistence implementation' | ||
); | ||
this.persistence.referenceDelegate.setInMemoryPins( | ||
this.localViewReferences | ||
); | ||
this.mutationQueue = persistence.getMutationQueue(initialUser); | ||
this.remoteDocuments = persistence.getRemoteDocumentCache(); | ||
this.targetCache = persistence.getTargetCache(); | ||
|
@@ -704,49 +696,56 @@ export class LocalStore { | |
* Notify local store of the changed views to locally pin documents. | ||
*/ | ||
notifyLocalViewChanges(viewChanges: LocalViewChanges[]): Promise<void> { | ||
for (const viewChange of viewChanges) { | ||
const targetId = viewChange.targetId; | ||
|
||
this.localViewReferences.addReferences(viewChange.addedKeys, targetId); | ||
this.localViewReferences.removeReferences( | ||
viewChange.removedKeys, | ||
targetId | ||
); | ||
|
||
if (!viewChange.fromCache) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that this part of the loop is just moved to below the transaction. The goal is that if the transaction fails, this code doesn't get executed. |
||
const targetData = this.targetDataByTarget.get(targetId); | ||
debugAssert( | ||
targetData !== null, | ||
`Can't set limbo-free snapshot version for unknown target: ${targetId}` | ||
); | ||
|
||
// Advance the last limbo free snapshot version | ||
const lastLimboFreeSnapshotVersion = targetData.snapshotVersion; | ||
const updatedTargetData = targetData.withLastLimboFreeSnapshotVersion( | ||
lastLimboFreeSnapshotVersion | ||
); | ||
this.targetDataByTarget = this.targetDataByTarget.insert( | ||
targetId, | ||
updatedTargetData | ||
); | ||
} | ||
} | ||
return this.persistence.runTransaction( | ||
'notifyLocalViewChanges', | ||
'readwrite', | ||
txn => { | ||
return this.persistence | ||
.runTransaction('notifyLocalViewChanges', 'readwrite', txn => { | ||
return PersistencePromise.forEach( | ||
viewChanges, | ||
(viewChange: LocalViewChanges) => { | ||
return PersistencePromise.forEach( | ||
viewChange.removedKeys, | ||
viewChange.addedKeys, | ||
(key: DocumentKey) => | ||
this.persistence.referenceDelegate.removeReference(txn, key) | ||
this.persistence.referenceDelegate.addReference( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the call that makes it possible to remove the local pins. |
||
txn, | ||
viewChange.targetId, | ||
key | ||
) | ||
).next(() => | ||
PersistencePromise.forEach( | ||
viewChange.removedKeys, | ||
(key: DocumentKey) => | ||
this.persistence.referenceDelegate.removeReference( | ||
txn, | ||
viewChange.targetId, | ||
key | ||
) | ||
) | ||
); | ||
} | ||
); | ||
} | ||
); | ||
}) | ||
.then(() => { | ||
for (const viewChange of viewChanges) { | ||
const targetId = viewChange.targetId; | ||
|
||
if (!viewChange.fromCache) { | ||
const targetData = this.targetDataByTarget.get(targetId); | ||
debugAssert( | ||
targetData !== null, | ||
`Can't set limbo-free snapshot version for unknown target: ${targetId}` | ||
); | ||
|
||
// Advance the last limbo free snapshot version | ||
const lastLimboFreeSnapshotVersion = targetData.snapshotVersion; | ||
const updatedTargetData = targetData.withLastLimboFreeSnapshotVersion( | ||
lastLimboFreeSnapshotVersion | ||
); | ||
this.targetDataByTarget = this.targetDataByTarget.insert( | ||
targetId, | ||
updatedTargetData | ||
); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
|
@@ -869,26 +868,11 @@ export class LocalStore { | |
const mode = keepPersistedTargetData ? 'readwrite' : 'readwrite-primary'; | ||
return this.persistence | ||
.runTransaction('Release target', mode, txn => { | ||
// References for documents sent via Watch are automatically removed | ||
// when we delete a target's data from the reference delegate. | ||
// Since this does not remove references for locally mutated documents, | ||
// we have to remove the target associations for these documents | ||
// manually. | ||
// This operation needs to be run inside the transaction since EagerGC | ||
// uses the local view references during the transaction's commit. | ||
// Fortunately, the operation is safe to be re-run in case the | ||
// transaction fails since there are no side effects if the target has | ||
// already been removed. | ||
const removed = this.localViewReferences.removeReferencesForId( | ||
targetId | ||
); | ||
|
||
if (!keepPersistedTargetData) { | ||
return PersistencePromise.forEach(removed, (key: DocumentKey) => | ||
this.persistence.referenceDelegate.removeReference(txn, key) | ||
).next(() => { | ||
this.persistence.referenceDelegate.removeTarget(txn, targetData!); | ||
}); | ||
return this.persistence.referenceDelegate.removeTarget( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reference delegate now knows internally what documents are orphaned in the view. |
||
txn, | ||
targetData! | ||
); | ||
} else { | ||
return PersistencePromise.resolve(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,7 @@ import { | |
LruParams | ||
} from './lru_garbage_collector'; | ||
import { ListenSequence } from '../core/listen_sequence'; | ||
import { ListenSequenceNumber } from '../core/types'; | ||
import { ListenSequenceNumber, TargetId } from '../core/types'; | ||
import { estimateByteSize } from '../model/values'; | ||
import { MemoryIndexManager } from './memory_index_manager'; | ||
import { MemoryMutationQueue } from './memory_mutation_queue'; | ||
|
@@ -184,7 +184,9 @@ export interface MemoryReferenceDelegate extends ReferenceDelegate { | |
} | ||
|
||
export class MemoryEagerDelegate implements MemoryReferenceDelegate { | ||
private inMemoryPins: ReferenceSet | null = null; | ||
/** Manages all documents that are active in Query views. */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I always find that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I hope this is not a general stance on management :) Updated to use "track". |
||
private localViewReferences: ReferenceSet = new ReferenceSet(); | ||
/** The list of documents that are potentially GCed after each transaction. */ | ||
private _orphanedDocuments: Set<DocumentKey> | null = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: why is this variable prefixed with an underscore, while There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general, we don't use underscore prefixes for classes that are not part of the private API. This is an exception as |
||
|
||
private constructor(private readonly persistence: MemoryPersistence) {} | ||
|
@@ -201,27 +203,27 @@ export class MemoryEagerDelegate implements MemoryReferenceDelegate { | |
} | ||
} | ||
|
||
setInMemoryPins(inMemoryPins: ReferenceSet): void { | ||
this.inMemoryPins = inMemoryPins; | ||
} | ||
|
||
addReference( | ||
txn: PersistenceTransaction, | ||
targetId: TargetId, | ||
key: DocumentKey | ||
): PersistencePromise<void> { | ||
this.localViewReferences.addReference(key, targetId); | ||
this.orphanedDocuments.delete(key); | ||
return PersistencePromise.resolve(); | ||
} | ||
|
||
removeReference( | ||
txn: PersistenceTransaction, | ||
targetId: TargetId, | ||
key: DocumentKey | ||
): PersistencePromise<void> { | ||
this.localViewReferences.removeReference(key, targetId); | ||
this.orphanedDocuments.add(key); | ||
return PersistencePromise.resolve(); | ||
} | ||
|
||
removeMutationReference( | ||
markPotentiallyOrphaned( | ||
txn: PersistenceTransaction, | ||
key: DocumentKey | ||
): PersistencePromise<void> { | ||
|
@@ -233,6 +235,10 @@ export class MemoryEagerDelegate implements MemoryReferenceDelegate { | |
txn: PersistenceTransaction, | ||
targetData: TargetData | ||
): PersistencePromise<void> { | ||
const orphaned = this.localViewReferences.removeReferencesForId( | ||
targetData.targetId | ||
); | ||
orphaned.forEach(key => this.orphanedDocuments.add(key)); | ||
const cache = this.persistence.getTargetCache(); | ||
return cache | ||
.getMatchingKeysForTargetId(txn, targetData.targetId) | ||
|
@@ -290,15 +296,15 @@ export class MemoryEagerDelegate implements MemoryReferenceDelegate { | |
key: DocumentKey | ||
): PersistencePromise<boolean> { | ||
return PersistencePromise.or([ | ||
() => | ||
PersistencePromise.resolve(this.localViewReferences.containsKey(key)), | ||
() => this.persistence.getTargetCache().containsKey(txn, key), | ||
() => this.persistence.mutationQueuesContainKey(txn, key), | ||
() => PersistencePromise.resolve(this.inMemoryPins!.containsKey(key)) | ||
() => this.persistence.mutationQueuesContainKey(txn, key) | ||
]); | ||
} | ||
} | ||
|
||
export class MemoryLruDelegate implements ReferenceDelegate, LruDelegate { | ||
private inMemoryPins: ReferenceSet | null = null; | ||
private orphanedSequenceNumbers: ObjectMap< | ||
DocumentKey, | ||
ListenSequenceNumber | ||
|
@@ -371,10 +377,6 @@ export class MemoryLruDelegate implements ReferenceDelegate, LruDelegate { | |
); | ||
} | ||
|
||
setInMemoryPins(inMemoryPins: ReferenceSet): void { | ||
this.inMemoryPins = inMemoryPins; | ||
} | ||
|
||
removeTargets( | ||
txn: PersistenceTransaction, | ||
upperBound: ListenSequenceNumber, | ||
|
@@ -403,7 +405,7 @@ export class MemoryLruDelegate implements ReferenceDelegate, LruDelegate { | |
return p.next(() => changeBuffer.apply(txn)).next(() => count); | ||
} | ||
|
||
removeMutationReference( | ||
markPotentiallyOrphaned( | ||
txn: PersistenceTransaction, | ||
key: DocumentKey | ||
): PersistencePromise<void> { | ||
|
@@ -421,6 +423,7 @@ export class MemoryLruDelegate implements ReferenceDelegate, LruDelegate { | |
|
||
addReference( | ||
txn: PersistenceTransaction, | ||
targetId: TargetId, | ||
key: DocumentKey | ||
): PersistencePromise<void> { | ||
this.orphanedSequenceNumbers.set(key, txn.currentSequenceNumber); | ||
|
@@ -429,6 +432,7 @@ export class MemoryLruDelegate implements ReferenceDelegate, LruDelegate { | |
|
||
removeReference( | ||
txn: PersistenceTransaction, | ||
targetId: TargetId, | ||
key: DocumentKey | ||
): PersistencePromise<void> { | ||
this.orphanedSequenceNumbers.set(key, txn.currentSequenceNumber); | ||
|
@@ -458,7 +462,6 @@ export class MemoryLruDelegate implements ReferenceDelegate, LruDelegate { | |
): PersistencePromise<boolean> { | ||
return PersistencePromise.or([ | ||
() => this.persistence.mutationQueuesContainKey(txn, key), | ||
() => PersistencePromise.resolve(this.inMemoryPins!.containsKey(key)), | ||
() => this.persistence.getTargetCache().containsKey(txn, key), | ||
() => { | ||
const orphanedAt = this.orphanedSequenceNumbers.get(key); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Combined these lines since
removeReferencesForId
already returns the references for the target.