Skip to content

Commit 20b436a

Browse files
committed
apply bloomFilter while handling existence filter mismatch
1 parent eaef9da commit 20b436a

File tree

4 files changed

+369
-50
lines changed

4 files changed

+369
-50
lines changed

packages/firestore/src/remote/watch_change.ts

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ import {
2727
} from '../model/collections';
2828
import { MutableDocument } from '../model/document';
2929
import { DocumentKey } from '../model/document_key';
30+
import { normalizeByteString } from '../model/normalize';
3031
import { debugAssert, fail, hardAssert } from '../util/assert';
3132
import { ByteString } from '../util/byte_string';
3233
import { FirestoreError } from '../util/error';
3334
import { logDebug } from '../util/log';
3435
import { primitiveComparator } from '../util/misc';
3536
import { SortedMap } from '../util/sorted_map';
3637
import { SortedSet } from '../util/sorted_set';
38+
import { BloomFilter } from './bloom_filter';
3739

3840
import { ExistenceFilter } from './existence_filter';
3941
import { RemoteEvent, TargetChange } from './remote_event';
@@ -409,16 +411,80 @@ export class WatchChangeAggregator {
409411
}
410412
} else {
411413
const currentSize = this.getCurrentDocumentCountForTarget(targetId);
414+
// Existence filter mismatch. Remove documents to limbo, and raise a
415+
// snapshot with `isFromCache:true`.
412416
if (currentSize !== expectedCount) {
413-
// Existence filter mismatch: We reset the mapping and raise a new
414-
// snapshot with `isFromCache:true`.
415-
this.resetTarget(targetId);
416-
this.pendingTargetResets = this.pendingTargetResets.add(targetId);
417+
// Apply bloom filter to identify and move removed documents to limbo.
418+
const bloomFilterApplied = this.applyBloomFilter(
419+
watchChange.existenceFilter,
420+
targetId,
421+
currentSize
422+
);
423+
if (!bloomFilterApplied) {
424+
// If bloom filter application fails, we reset the mapping and trigger
425+
// re-run of the query.
426+
this.resetTarget(targetId);
427+
this.pendingTargetResets = this.pendingTargetResets.add(targetId);
428+
}
417429
}
418430
}
419431
}
420432
}
421433

434+
/** Returns wheather a bloom filter removed the deleted documents successfully. */
435+
private applyBloomFilter(
436+
existenceFilter: ExistenceFilter,
437+
targetId: number,
438+
currentCount: number
439+
): boolean {
440+
const unchangedNames = existenceFilter.unchangedNames;
441+
const expectedCount = existenceFilter.count;
442+
443+
if (!unchangedNames || !unchangedNames.bits) {
444+
return false;
445+
}
446+
447+
const {
448+
bits: { bitmap = '', padding = 0 },
449+
hashCount = 0
450+
} = unchangedNames;
451+
const normalizedBitmap = normalizeByteString(bitmap).toUint8Array();
452+
const bloomFilter = new BloomFilter(normalizedBitmap, padding, hashCount);
453+
454+
if (bloomFilter.size === 0) return false;
455+
456+
const removedDocumentCount = this.filterRemovedDocuments(
457+
bloomFilter,
458+
targetId
459+
);
460+
461+
if (currentCount - removedDocumentCount === expectedCount) {
462+
return true;
463+
}
464+
// Bloom filter might falsly remove existing documents,leaving existingDocumentsCount
465+
// smaller than expectedCount.
466+
return false;
467+
}
468+
469+
/**
470+
* Filter out removed documents based on bloom filter membership result and return number
471+
* of documents removed.
472+
*/
473+
private filterRemovedDocuments(
474+
bloomFilter: BloomFilter,
475+
targetId: number
476+
): number {
477+
const existingKeys = this.metadataProvider.getRemoteKeysForTarget(targetId);
478+
let removalCount = 0;
479+
existingKeys.forEach(key => {
480+
if (!bloomFilter.mightContain(key.toString())) {
481+
this.removeDocumentFromTarget(targetId, key, /*updatedDocument=*/ null);
482+
removalCount++;
483+
}
484+
});
485+
return removalCount;
486+
}
487+
422488
/**
423489
* Converts the currently accumulated state into a remote event at the
424490
* provided snapshot version. Resets the accumulated changes before returning.

packages/firestore/test/unit/specs/existence_filter_spec.test.ts

Lines changed: 130 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,6 @@ describeSpec('Existence Filters:', [], () => {
3535
.watchSnapshots(2000);
3636
});
3737

38-
// This test is only to make sure watchFilters can accept bloom filter.
39-
// TODO:(mila) update the tests when bloom filter logic is implemented.
40-
specTest('Existence filter with bloom filter match', [], () => {
41-
const query1 = query('collection');
42-
const doc1 = doc('collection/1', 1000, { v: 1 });
43-
return spec()
44-
.userListens(query1)
45-
.watchAcksFull(query1, 1000, doc1)
46-
.expectEvents(query1, { added: [doc1] })
47-
.watchFilters([query1], [doc1.key], {
48-
bits: { bitmap: 'a', padding: 1 },
49-
hashCount: 1
50-
})
51-
.watchSnapshots(2000);
52-
});
53-
5438
specTest('Existence filter match after pending update', [], () => {
5539
const query1 = query('collection');
5640
const doc1 = doc('collection/1', 2000, { v: 2 });
@@ -128,36 +112,6 @@ describeSpec('Existence Filters:', [], () => {
128112
);
129113
});
130114

131-
// This test is only to make sure watchFilters can accept bloom filter.
132-
// TODO:(mila) update the tests when bloom filter logic is implemented.
133-
specTest('Existence filter mismatch triggers bloom filter', [], () => {
134-
const query1 = query('collection');
135-
const doc1 = doc('collection/1', 1000, { v: 1 });
136-
const doc2 = doc('collection/2', 1000, { v: 2 });
137-
return (
138-
spec()
139-
.userListens(query1)
140-
.watchAcksFull(query1, 1000, doc1, doc2)
141-
.expectEvents(query1, { added: [doc1, doc2] })
142-
.watchFilters([query1], [doc1.key], {
143-
bits: { bitmap: 'a', padding: 1 },
144-
hashCount: 3
145-
}) // in the next sync doc2 was deleted
146-
.watchSnapshots(2000)
147-
// query is now marked as "inconsistent" because of filter mismatch
148-
.expectEvents(query1, { fromCache: true })
149-
.expectActiveTargets({ query: query1, resumeToken: '' })
150-
.watchRemoves(query1) // Acks removal of query
151-
.watchAcksFull(query1, 2000, doc1)
152-
.expectLimboDocs(doc2.key) // doc2 is now in limbo
153-
.ackLimbo(2000, deletedDoc('collection/2', 2000))
154-
.expectLimboDocs() // doc2 is no longer in limbo
155-
.expectEvents(query1, {
156-
removed: [doc2]
157-
})
158-
);
159-
});
160-
161115
specTest('Existence filter mismatch will drop resume token', [], () => {
162116
const query1 = query('collection');
163117
const doc1 = doc('collection/1', 1000, { v: 1 });
@@ -286,4 +240,134 @@ describeSpec('Existence Filters:', [], () => {
286240
);
287241
}
288242
);
243+
244+
/**
245+
* ExistenceFilter with BloomFilter
246+
* This bloomFilter bitmap size and hashCount is small enough to make false positive rate
247+
* as high as 0.5.
248+
* {
249+
* bits: {
250+
* bitmap: 'AQ=='
251+
* padding: 6
252+
* },
253+
* hashCount: 1
254+
* }
255+
* When testing migthContain(), 'collection/a','collection/c' will return true,
256+
* while mightContain('collection/b') will return false.
257+
*/
258+
259+
specTest(
260+
'Full requery is skipped when bloom filter can identify documents deleted',
261+
[],
262+
() => {
263+
const query1 = query('collection');
264+
const docA = doc('collection/a', 1000, { v: 1 });
265+
const docB = doc('collection/b', 1000, { v: 2 });
266+
return (
267+
spec()
268+
.userListens(query1)
269+
.watchAcksFull(query1, 1000, docA, docB)
270+
.expectEvents(query1, { added: [docA, docB] })
271+
.watchFilters([query1], [docA.key], {
272+
bits: { bitmap: 'AQ==', padding: 6 },
273+
hashCount: 1
274+
})
275+
.watchSnapshots(2000)
276+
// BloomFilter identify docB is deleted, skip full query and put docB
277+
// into limbo directly.
278+
.expectEvents(query1, { fromCache: true })
279+
.expectLimboDocs(docB.key) // docB is now in limbo
280+
.ackLimbo(2000, deletedDoc('collection/b', 2000))
281+
.expectLimboDocs() // docB is no longer in limbo
282+
.expectEvents(query1, {
283+
removed: [docB]
284+
})
285+
);
286+
}
287+
);
288+
289+
specTest(
290+
'Full requery is triggered when bloom filter can not identify documents deleted',
291+
[],
292+
() => {
293+
const query1 = query('collection');
294+
const docA = doc('collection/a', 1000, { v: 1 });
295+
const docC = doc('collection/c', 1000, { v: 2 });
296+
297+
return (
298+
spec()
299+
.userListens(query1)
300+
.watchAcksFull(query1, 1000, docA, docC)
301+
.expectEvents(query1, { added: [docA, docC] })
302+
.watchFilters([query1], [docA.key], {
303+
bits: { bitmap: 'AQ==', padding: 6 },
304+
hashCount: 1
305+
})
306+
.watchSnapshots(2000)
307+
// BloomFilter yields false positive results, cannot identify docC
308+
// is deleted. Re-run query is triggered.
309+
.expectEvents(query1, { fromCache: true })
310+
.expectActiveTargets({ query: query1, resumeToken: '' })
311+
.watchRemoves(query1) // Acks removal of query
312+
.watchAcksFull(query1, 2000, docA)
313+
.expectLimboDocs(docC.key) // docC is now in limbo
314+
.ackLimbo(2000, deletedDoc('collection/c', 2000))
315+
.expectLimboDocs() // docC is no longer in limbo
316+
.expectEvents(query1, {
317+
removed: [docC]
318+
})
319+
);
320+
}
321+
);
322+
323+
specTest('Bloom filter handled at global snapshot', [], () => {
324+
const query1 = query('collection');
325+
const docA = doc('collection/a', 1000, { v: 1 });
326+
const docB = doc('collection/b', 2000, { v: 2 });
327+
const docC = doc('collection/c', 3000, { v: 3 });
328+
return (
329+
spec()
330+
.userListens(query1)
331+
.watchAcksFull(query1, 1000, docA, docB)
332+
.expectEvents(query1, { added: [docA, docB] })
333+
// Send a mismatching existence filter with one document, but don't
334+
// send a new global snapshot. We should not see an event until we
335+
// receive the snapshot.
336+
.watchFilters([query1], [docA.key], {
337+
bits: { bitmap: 'AQ==', padding: 6 },
338+
hashCount: 1
339+
})
340+
// BloomFilter identifies docB is removed, moves it to limbo.
341+
.watchSends({ affects: [query1] }, docC)
342+
.watchSnapshots(2000)
343+
.expectEvents(query1, { added: [docC], fromCache: true })
344+
// re-run of the query1 is skipped, docB is in limbo.
345+
.expectLimboDocs(docB.key)
346+
);
347+
});
348+
349+
specTest('Bloom filter limbo resolution is denied', [], () => {
350+
const query1 = query('collection');
351+
const docA = doc('collection/a', 1000, { v: 1 });
352+
const docB = doc('collection/b', 1000, { v: 2 });
353+
return spec()
354+
.userListens(query1)
355+
.watchAcksFull(query1, 1000, docA, docB)
356+
.expectEvents(query1, { added: [docA, docB] })
357+
.watchFilters([query1], [docA.key], {
358+
bits: { bitmap: 'AQ==', padding: 6 },
359+
hashCount: 1
360+
})
361+
.watchSnapshots(2000)
362+
.expectEvents(query1, { fromCache: true })
363+
.expectLimboDocs(docB.key) // docB is now in limbo
364+
.watchRemoves(
365+
newQueryForPath(docB.key.path),
366+
new RpcError(Code.PERMISSION_DENIED, 'no')
367+
)
368+
.expectLimboDocs() // docB is no longer in limbo
369+
.expectEvents(query1, {
370+
removed: [docB]
371+
});
372+
});
289373
});

packages/firestore/test/unit/specs/limbo_spec.test.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,81 @@ describeSpec('Limbo Documents:', [], () => {
915915
}
916916
);
917917

918+
/**
919+
* BloomFilter:
920+
* {
921+
* bits: {
922+
* bitmap: 'AQ=='
923+
* padding: 6
924+
* },
925+
* hashCount: 1
926+
* }
927+
* When testing migthContain(), 'collection/a','collection/c' will return true,
928+
* while mightContain('collection/b') will return false.
929+
*/
930+
specTest(
931+
'Limbo resolution throttling with bloom filter application',
932+
[],
933+
() => {
934+
const query1 = query('collection');
935+
const docA = doc('collection/a', 1000, { key: 'a' });
936+
const docB = doc('collection/b', 1000, { key: 'b' });
937+
const docC = doc('collection/c', 1000, { key: 'c' });
938+
939+
const docBQuery = newQueryForPath(docB.key.path);
940+
941+
// Verify that limbo resolution throttling works correctly with bloom filter
942+
// application on existence filter mismatches.
943+
return (
944+
spec()
945+
.withMaxConcurrentLimboResolutions(2)
946+
.userListens(query1)
947+
.watchAcks(query1)
948+
.watchSends({ affects: [query1] }, docA, docB)
949+
.watchCurrents(query1, 'resume-token-1000')
950+
.watchSnapshots(1000)
951+
.expectEvents(query1, { added: [docA, docB] })
952+
// Simulate that the client loses network connection.
953+
.disableNetwork()
954+
// Limbo document causes query to be "inconsistent"
955+
.expectEvents(query1, { fromCache: true })
956+
.enableNetwork()
957+
.restoreListen(query1, 'resume-token-1000')
958+
.watchAcks(query1)
959+
// While this client was disconnected, another client deleted docB, and
960+
// added docC. If Watch has to re-run the underlying query when this
961+
// client re-listens, Watch won't be able to tell that docB were deleted
962+
// and will only send us existing documents that changed since the resume
963+
// token. This will cause it to just send the docC with an existence filter
964+
// expectedCount of 2.
965+
.watchSends({ affects: [query1] }, docC)
966+
.watchFilters([query1], [docA.key, docC.key], {
967+
bits: { bitmap: 'AQ==', padding: 6 },
968+
hashCount: 1
969+
})
970+
.watchSnapshots(1001)
971+
.expectEvents(query1, {
972+
added: [docC],
973+
fromCache: true
974+
})
975+
// The view now contains the docA, docB and docC (3 documents), but
976+
// the existence filter indicated only 2 should match. There is an existence
977+
// filter mismatch. Bloom filter checks membership of the docs, and filters
978+
// out docB, while docA and docC returns true. Number of existing docs matches
979+
// the expected count, so skip the re-run of the query.
980+
.watchCurrents(query1, 'resume-token-1002')
981+
.watchSnapshots(1002)
982+
// The docB is in limbo; the client begins limbo resolution.
983+
.expectLimboDocs(docB.key)
984+
.watchAcks(docBQuery)
985+
.watchCurrents(docBQuery, 'resume-token-1003')
986+
.watchSnapshots(1003)
987+
.expectEvents(query1, { removed: [docB] })
988+
.expectLimboDocs()
989+
);
990+
}
991+
);
992+
918993
specTest(
919994
'A limbo resolution for a document should not be started if one is already active',
920995
[],

0 commit comments

Comments
 (0)