-
Notifications
You must be signed in to change notification settings - Fork 938
Dequeue limbo resolutions when their respective queries are stopped #4395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
cddbae3
c5f0747
b0ea581
35e6d3e
9bbd652
03a9c22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
'@firebase/firestore': patch | ||
--- | ||
|
||
Fix a bug where local cache inconsistencies were unnecessarily being resolved. | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,7 @@ import { MaybeDocument, NoDocument } from '../model/document'; | |
import { DocumentKey } from '../model/document_key'; | ||
import { Mutation } from '../model/mutation'; | ||
import { MutationBatchResult } from '../model/mutation_batch'; | ||
import { ResourcePath } from '../model/path'; | ||
import { RemoteEvent, TargetChange } from '../remote/remote_event'; | ||
import { | ||
canUseNetwork, | ||
|
@@ -208,9 +209,10 @@ class SyncEngineImpl implements SyncEngine { | |
queriesByTarget = new Map<TargetId, Query[]>(); | ||
/** | ||
* The keys of documents that are in limbo for which we haven't yet started a | ||
* limbo resolution query. | ||
* limbo resolution query. The strings in this set are the result of calling | ||
* `key.path.canonicalString()` where `key` is a `DocumentKey` object. | ||
*/ | ||
enqueuedLimboResolutions: DocumentKey[] = []; | ||
enqueuedLimboResolutions = new Set<string>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should maybe mention that we can use this datastructure in JS because it preserves insertion order. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
/** | ||
* Keeps track of the target ID for each document that is in limbo with an | ||
* active target. | ||
|
@@ -876,6 +878,8 @@ function removeLimboTarget( | |
syncEngineImpl: SyncEngineImpl, | ||
key: DocumentKey | ||
): void { | ||
syncEngineImpl.enqueuedLimboResolutions.delete(key.path.canonicalString()); | ||
|
||
// It's possible that the target already got removed because the query failed. In that case, | ||
// the key won't exist in `limboTargetsByKey`. Only do the cleanup if we still have the target. | ||
const limboTargetId = syncEngineImpl.activeLimboTargetsByKey.get(key); | ||
|
@@ -925,9 +929,13 @@ function trackLimboChange( | |
limboChange: AddedLimboDocument | ||
): void { | ||
const key = limboChange.key; | ||
if (!syncEngineImpl.activeLimboTargetsByKey.get(key)) { | ||
const keyString = key.path.canonicalString(); | ||
if ( | ||
!syncEngineImpl.activeLimboTargetsByKey.get(key) && | ||
!syncEngineImpl.enqueuedLimboResolutions.has(keyString) | ||
) { | ||
logDebug(LOG_TAG, 'New document in limbo: ' + key); | ||
syncEngineImpl.enqueuedLimboResolutions.push(key); | ||
syncEngineImpl.enqueuedLimboResolutions.add(keyString); | ||
pumpEnqueuedLimboResolutions(syncEngineImpl); | ||
} | ||
} | ||
|
@@ -942,11 +950,14 @@ function trackLimboChange( | |
*/ | ||
function pumpEnqueuedLimboResolutions(syncEngineImpl: SyncEngineImpl): void { | ||
while ( | ||
syncEngineImpl.enqueuedLimboResolutions.length > 0 && | ||
syncEngineImpl.enqueuedLimboResolutions.size > 0 && | ||
syncEngineImpl.activeLimboTargetsByKey.size < | ||
syncEngineImpl.maxConcurrentLimboResolutions | ||
) { | ||
const key = syncEngineImpl.enqueuedLimboResolutions.shift()!; | ||
const keyString = syncEngineImpl.enqueuedLimboResolutions.values().next() | ||
.value; | ||
syncEngineImpl.enqueuedLimboResolutions.delete(keyString); | ||
const key = new DocumentKey(ResourcePath.fromString(keyString)); | ||
const limboTargetId = syncEngineImpl.limboTargetIdGenerator.next(); | ||
syncEngineImpl.activeLimboResolutionsByTarget.set( | ||
limboTargetId, | ||
|
@@ -979,7 +990,7 @@ export function syncEngineGetActiveLimboDocumentResolutions( | |
// Visible for testing | ||
export function syncEngineGetEnqueuedLimboDocumentResolutions( | ||
syncEngine: SyncEngine | ||
): DocumentKey[] { | ||
): Set<string> { | ||
const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl); | ||
return syncEngineImpl.enqueuedLimboResolutions; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -922,4 +922,149 @@ describeSpec('Limbo Documents:', [], () => { | |
); | ||
} | ||
); | ||
|
||
specTest( | ||
'A limbo resolution for a document should be removed from the queue when the last query listen stops', | ||
[], | ||
() => { | ||
const doc1 = doc('collection1/doc', 1000, { key: 1 }); | ||
const query1 = query('collection1'); | ||
const filteredQuery1 = query('collection1', filter('key', '==', 1)); | ||
|
||
const doc2 = doc('collection2/doc', 1000, { key: 2 }); | ||
const query2 = query('collection2'); | ||
const filteredQuery2 = query('collection2', filter('key', '==', 2)); | ||
const filteredQuery3 = query('collection2', filter('key', '>=', 2)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe: s/filteredQuery2/filteredQuery2a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
return ( | ||
spec() | ||
.withGCEnabled(false) | ||
.withMaxConcurrentLimboResolutions(1) | ||
|
||
// Max out the number of active limbo resolutions. | ||
.userListens(query1) | ||
.watchAcksFull(query1, 1000, doc1) | ||
.expectEvents(query1, { added: [doc1] }) | ||
.userUnlistens(query1) | ||
.userListens(filteredQuery1) | ||
.expectEvents(filteredQuery1, { added: [doc1], fromCache: true }) | ||
.watchAcksFull(filteredQuery1, 1001) | ||
.expectLimboDocs(doc1.key) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you flip the queries around, the test becomes easier to follow. Here I am now asking: "Doesn't it make sense that the filtered query filters out doc1?" If you flip it and run the filteredQuery before the fullQuery (which might be a better name) then the test becomes a bit more obvious. Applies everywhere. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see what you mean. I had actually originally written it that way. But then I added a third filtered query to some of the tests and it seemed to be a detriment to readability to issue a filtered query followed by a full query followed by another filtered query. For consistency between the test cases, I modified all of the tests to issue the full queries first. What are your thoughts on the following options to address this:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am going with "one of the above seems ok". I think the argument for consistency makes sense, and the test can generally be followed, as long as the person looking at it also looks at the actual contents of the docs/queries. |
||
|
||
// Enqueue a limbo resolution for doc2. | ||
.userListens(query2) | ||
.watchAcksFull(query2, 1002, doc2) | ||
.expectEvents(query2, { added: [doc2] }) | ||
.userUnlistens(query2) | ||
.userListens(filteredQuery2) | ||
.expectEvents(filteredQuery2, { added: [doc2], fromCache: true }) | ||
.watchAcksFull(filteredQuery2, 1003) | ||
.expectLimboDocs(doc1.key) | ||
.expectEnqueuedLimboDocs(doc2.key) | ||
|
||
// Start another query that puts doc2 into limbo again. | ||
.userListens(filteredQuery3) | ||
.expectEvents(filteredQuery3, { added: [doc2], fromCache: true }) | ||
.watchAcksFull(filteredQuery3, 1004) | ||
.expectLimboDocs(doc1.key) | ||
.expectEnqueuedLimboDocs(doc2.key) | ||
|
||
// Stop one of the queries that enqueued a limbo resolution for doc2; | ||
// verify that doc2 is not removed from the limbo resolution queue. | ||
.userUnlistens(filteredQuery3) | ||
.expectLimboDocs(doc1.key) | ||
.expectEnqueuedLimboDocs(doc2.key) | ||
|
||
// Stop the other query that enqueued a limbo resolution for doc2; | ||
// verify that doc2 *is* removed from the limbo resolution queue. | ||
.userUnlistens(filteredQuery2) | ||
.expectLimboDocs(doc1.key) | ||
.expectEnqueuedLimboDocs() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Both of the tests below seem to be subsets of this test. I would therefore move this test under the other tests to first verify the basic implementation before going into the more completed cases. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
); | ||
} | ||
); | ||
|
||
specTest( | ||
'A limbo resolution for a document should not be started if one is already active', | ||
[], | ||
() => { | ||
const doc1 = doc('collection/doc', 1000, { key: 1 }); | ||
const query1 = query('collection'); | ||
const filteredQuery1 = query('collection', filter('key', '==', 1)); | ||
const filteredQuery2 = query('collection', filter('key', '>=', 1)); | ||
|
||
return ( | ||
spec() | ||
.withGCEnabled(false) | ||
|
||
// Start a limbo resolution listen for a document (doc1). | ||
.userListens(query1) | ||
.watchAcksFull(query1, 1000, doc1) | ||
.expectEvents(query1, { added: [doc1] }) | ||
.userUnlistens(query1) | ||
.userListens(filteredQuery1) | ||
.expectEvents(filteredQuery1, { added: [doc1], fromCache: true }) | ||
.watchAcksFull(filteredQuery1, 1001) | ||
.expectLimboDocs(doc1.key) | ||
.expectEnqueuedLimboDocs() | ||
|
||
// Put doc1 into limbo in a different query; verify that another limbo | ||
// resolution is neither started nor enqueued. | ||
.userListens(filteredQuery2) | ||
.expectEvents(filteredQuery2, { added: [doc1], fromCache: true }) | ||
.watchAcksFull(filteredQuery2, 1002) | ||
.expectLimboDocs(doc1.key) | ||
.expectEnqueuedLimboDocs() | ||
schmidt-sebastian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
} | ||
); | ||
|
||
specTest( | ||
'A limbo resolution for a document should not be enqueued if one is already enqueued', | ||
[], | ||
() => { | ||
const doc1 = doc('collection1/doc1', 1000, { key: 1 }); | ||
const query1 = query('collection1'); | ||
const filteredQuery1 = query('collection1', filter('key', '==', 1)); | ||
const doc2 = doc('collection2/doc2', 1000, { key: 2 }); | ||
const query2 = query('collection2'); | ||
const filteredQuery2 = query('collection2', filter('key', '==', 2)); | ||
const filteredQuery3 = query('collection2', filter('key', '>=', 2)); | ||
|
||
return ( | ||
spec() | ||
.withGCEnabled(false) | ||
.withMaxConcurrentLimboResolutions(1) | ||
|
||
// Max out the number of active limbo resolutions. | ||
.userListens(query1) | ||
.watchAcksFull(query1, 1000, doc1) | ||
.expectEvents(query1, { added: [doc1] }) | ||
.userUnlistens(query1) | ||
.userListens(filteredQuery1) | ||
.expectEvents(filteredQuery1, { added: [doc1], fromCache: true }) | ||
.watchAcksFull(filteredQuery1, 1001) | ||
.expectLimboDocs(doc1.key) | ||
|
||
// Start a limbo resolution listen for a different document (doc2). | ||
.userListens(query2) | ||
.watchAcksFull(query2, 1002, doc2) | ||
.expectEvents(query2, { added: [doc2] }) | ||
.userUnlistens(query2) | ||
.userListens(filteredQuery2) | ||
.expectEvents(filteredQuery2, { added: [doc2], fromCache: true }) | ||
.watchAcksFull(filteredQuery2, 1003) | ||
.expectLimboDocs(doc1.key) | ||
.expectEnqueuedLimboDocs(doc2.key) | ||
|
||
// Put doc2 into limbo in a different query and verify that it's not | ||
// added to the limbo resolution queue again. | ||
.userListens(filteredQuery3) | ||
.expectEvents(filteredQuery3, { added: [doc2], fromCache: true }) | ||
.watchAcksFull(filteredQuery3, 1004) | ||
.expectLimboDocs(doc1.key) | ||
.expectEnqueuedLimboDocs(doc2.key) | ||
); | ||
} | ||
); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -118,7 +118,6 @@ import { primitiveComparator } from '../../../src/util/misc'; | |
import { forEach, objectSize } from '../../../src/util/obj'; | ||
import { ObjectMap } from '../../../src/util/obj_map'; | ||
import { Deferred, sequence } from '../../../src/util/promise'; | ||
import { SortedSet } from '../../../src/util/sorted_set'; | ||
import { | ||
byteStringFromString, | ||
deletedDoc, | ||
|
@@ -1008,31 +1007,13 @@ abstract class TestRunner { | |
} | ||
|
||
private validateEnqueuedLimboDocs(): void { | ||
let actualLimboDocs = new SortedSet<DocumentKey>(DocumentKey.comparator); | ||
syncEngineGetEnqueuedLimboDocumentResolutions(this.syncEngine).forEach( | ||
key => { | ||
actualLimboDocs = actualLimboDocs.add(key); | ||
} | ||
); | ||
let expectedLimboDocs = new SortedSet<DocumentKey>(DocumentKey.comparator); | ||
this.expectedEnqueuedLimboDocs.forEach(key => { | ||
expectedLimboDocs = expectedLimboDocs.add(key); | ||
}); | ||
actualLimboDocs.forEach(key => { | ||
expect(expectedLimboDocs.has(key)).to.equal( | ||
true, | ||
`Found enqueued limbo doc ${key.toString()}, but it was not in ` + | ||
`the set of expected enqueued limbo documents ` + | ||
`(${expectedLimboDocs.toString()})` | ||
); | ||
}); | ||
expectedLimboDocs.forEach(key => { | ||
expect(actualLimboDocs.has(key)).to.equal( | ||
true, | ||
`Expected doc ${key.toString()} to be enqueued for limbo resolution, ` + | ||
`but it was not in the queue (${actualLimboDocs.toString()})` | ||
); | ||
}); | ||
const actualLimboDocs = Array.from( | ||
syncEngineGetEnqueuedLimboDocumentResolutions(this.syncEngine) | ||
).sort(); | ||
const expectedLimboDocs = Array.from(this.expectedEnqueuedLimboDocs, key => | ||
key.path.canonicalString() | ||
).sort(); | ||
expect(actualLimboDocs).to.deep.equal(expectedLimboDocs); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To keep some of the descriptiveness of the original error, you should add a message argument (to the last function call) that mentions that this codepath validates limbo resolutions. Otherwise, an error will show up, but it will not be all that helpful. Furthermore, you should use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
} | ||
|
||
private async validateActiveTargets(): Promise<void> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixes or Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.