Skip to content

Commit 6d66392

Browse files
authored
Allow remote updates from watch to heal a cache with synthesized deletes in it (#1015)
* Write a spec test for the busted cache * Modify spec test to demonstrate deletedDoc issue. (#1017) * Allow updates for targets where the document is modified
1 parent 3d1a15c commit 6d66392

File tree

2 files changed

+107
-5
lines changed

2 files changed

+107
-5
lines changed

packages/firestore/src/local/local_store.ts

+23-4
Original file line numberDiff line numberDiff line change
@@ -425,13 +425,30 @@ export class LocalStore {
425425
const documentBuffer = new RemoteDocumentChangeBuffer(this.remoteDocuments);
426426
return this.persistence.runTransaction('Apply remote event', txn => {
427427
const promises = [] as Array<PersistencePromise<void>>;
428+
let authoritativeUpdates = documentKeySet();
428429
objUtils.forEachNumber(
429430
remoteEvent.targetChanges,
430431
(targetId: TargetId, change: TargetChange) => {
431432
// Do not ref/unref unassigned targetIds - it may lead to leaks.
432433
let queryData = this.targetIds[targetId];
433434
if (!queryData) return;
434435

436+
// When a global snapshot contains updates (either add or modify) we
437+
// can completely trust these updates as authoritative and blindly
438+
// apply them to our cache (as a defensive measure to promote
439+
// self-healing in the unfortunate case that our cache is ever somehow
440+
// corrupted / out-of-sync).
441+
//
442+
// If the document is only updated while removing it from a target
443+
// then watch isn't obligated to send the absolute latest version: it
444+
// can send the first version that caused the document not to match.
445+
change.addedDocuments.forEach(key => {
446+
authoritativeUpdates = authoritativeUpdates.add(key);
447+
});
448+
change.modifiedDocuments.forEach(key => {
449+
authoritativeUpdates = authoritativeUpdates.add(key);
450+
});
451+
435452
promises.push(
436453
this.queryCache
437454
.removeMatchingKeys(txn, change.removedDocuments, targetId)
@@ -463,13 +480,15 @@ export class LocalStore {
463480
changedDocKeys = changedDocKeys.add(key);
464481
promises.push(
465482
documentBuffer.getEntry(txn, key).next(existingDoc => {
466-
// Make sure we don't apply an old document version to the remote
467-
// cache, though we make an exception for SnapshotVersion.MIN which
468-
// can happen for manufactured events (e.g. in the case of a limbo
469-
// document resolution failing).
483+
// If a document update isn't authoritative, make sure we don't
484+
// apply an old document version to the remote cache. We make an
485+
// exception for SnapshotVersion.MIN which can happen for
486+
// manufactured events (e.g. in the case of a limbo document
487+
// resolution failing).
470488
if (
471489
existingDoc == null ||
472490
doc.version.isEqual(SnapshotVersion.MIN) ||
491+
authoritativeUpdates.has(doc.key) ||
473492
doc.version.compareTo(existingDoc.version) >= 0
474493
) {
475494
documentBuffer.addEntry(doc);

packages/firestore/test/unit/specs/listen_spec.test.ts

+84-1
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,10 @@ describeSpec('Listens:', [], () => {
271271
// (the document falls out) and send us a snapshot that's ahead of
272272
// docAv3 (which is already in our cache).
273273
.userListens(visibleQuery, 'resume-token-1000')
274-
.watchAcksFull(visibleQuery, 5000, docAv2)
274+
.watchAcks(visibleQuery)
275+
.watchSends({ removed: [visibleQuery] }, docAv2)
276+
.watchCurrents(visibleQuery, 'resume-token-5000')
277+
.watchSnapshots(5000)
275278
.expectEvents(visibleQuery, { fromCache: false })
276279
.userUnlistens(visibleQuery)
277280
.watchRemoves(visibleQuery)
@@ -283,6 +286,86 @@ describeSpec('Listens:', [], () => {
283286
);
284287
});
285288

289+
specTest('Individual (deleted) documents cannot revert', [], () => {
290+
const allQuery = Query.atPath(path('collection'));
291+
const visibleQuery = Query.atPath(path('collection')).addFilter(
292+
filter('visible', '==', true)
293+
);
294+
const docAv1 = doc('collection/a', 1000, { visible: true, v: 'v1000' });
295+
const docAv2 = doc('collection/a', 2000, { visible: false, v: 'v2000' });
296+
const docAv3 = deletedDoc('collection/a', 3000);
297+
298+
return (
299+
spec()
300+
// Disable GC so the cache persists across listens.
301+
.withGCEnabled(false)
302+
.userListens(visibleQuery)
303+
.watchAcksFull(visibleQuery, 1000, docAv1)
304+
.expectEvents(visibleQuery, { added: [docAv1] })
305+
.userUnlistens(visibleQuery)
306+
.watchRemoves(visibleQuery)
307+
.userListens(allQuery)
308+
.expectEvents(allQuery, { added: [docAv1], fromCache: true })
309+
.watchAcks(allQuery)
310+
.watchSends({ removed: [allQuery] }, docAv3)
311+
.watchCurrents(allQuery, 'resume-token-4000')
312+
.watchSnapshots(4000)
313+
.expectEvents(allQuery, { removed: [docAv1], fromCache: false })
314+
.userUnlistens(allQuery)
315+
.watchRemoves(allQuery)
316+
// Supposing we sent a resume token for visibleQuery, watch could catch
317+
// us up to docAV2 since that's the last relevant change to the query
318+
// (the document falls out) and send us a snapshot that's ahead of
319+
// docAv3 (which is already in our cache).
320+
.userListens(visibleQuery, 'resume-token-1000')
321+
.watchAcks(visibleQuery)
322+
.watchSends({ removed: [visibleQuery] }, docAv2)
323+
.watchCurrents(visibleQuery, 'resume-token-5000')
324+
.watchSnapshots(5000)
325+
.expectEvents(visibleQuery, { fromCache: false })
326+
.userUnlistens(visibleQuery)
327+
.watchRemoves(visibleQuery)
328+
// Listen to allQuery again and make sure we still get no docs.
329+
.userListens(allQuery, 'resume-token-4000')
330+
.watchAcksFull(allQuery, 6000)
331+
.expectEvents(allQuery, { fromCache: false })
332+
);
333+
});
334+
335+
specTest('Deleted documents in cache are fixed', [], () => {
336+
const allQuery = Query.atPath(path('collection'));
337+
const setupQuery = allQuery.addFilter(filter('key', '==', 'a'));
338+
339+
const docAv1 = doc('collection/a', 1000, { key: 'a' });
340+
const docDeleted = deletedDoc('collection/a', 2000);
341+
342+
return (
343+
spec()
344+
// Presuppose an initial state where the remote document cache has a
345+
// broken synthesized delete at a timestamp later than the true version
346+
// of the document. This requires both adding and later removing the
347+
// document in order to force the watch change aggregator to propagate
348+
// the deletion.
349+
.withGCEnabled(false)
350+
.userListens(setupQuery)
351+
.watchAcksFull(setupQuery, 1000, docAv1)
352+
.expectEvents(setupQuery, { added: [docAv1], fromCache: false })
353+
.watchSends({ removed: [setupQuery] }, docDeleted)
354+
.watchSnapshots(2000, [setupQuery], 'resume-token-2000')
355+
.watchSnapshots(2000)
356+
.expectEvents(setupQuery, { removed: [docAv1], fromCache: false })
357+
.userUnlistens(setupQuery)
358+
.watchRemoves(setupQuery)
359+
360+
// Now when the client listens expect the cached NoDocument to be
361+
// discarded because the global snapshot version exceeds what came
362+
// before.
363+
.userListens(allQuery)
364+
.watchAcksFull(allQuery, 3000, docAv1)
365+
.expectEvents(allQuery, { added: [docAv1], fromCache: false })
366+
);
367+
});
368+
286369
specTest('Listens are reestablished after network disconnect', [], () => {
287370
const expectRequestCount = requestCounts =>
288371
requestCounts.addTarget + requestCounts.removeTarget;

0 commit comments

Comments
 (0)