Skip to content

Commit cec9946

Browse files
authored
Implement Firestore IndexBackfiller (#6261)
1 parent 8340f84 commit cec9946

24 files changed

+1345
-160
lines changed

packages/firestore/src/core/component_provider.ts

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
import { CredentialsProvider } from '../api/credentials';
1919
import { User } from '../auth/user';
20+
import {
21+
IndexBackfiller,
22+
IndexBackfillerScheduler
23+
} from '../local/index_backfiller';
2024
import {
2125
indexedDbStoragePrefix,
2226
IndexedDbPersistence
@@ -29,7 +33,7 @@ import {
2933
MemoryEagerDelegate,
3034
MemoryPersistence
3135
} from '../local/memory_persistence';
32-
import { GarbageCollectionScheduler, Persistence } from '../local/persistence';
36+
import { Scheduler, Persistence } from '../local/persistence';
3337
import { QueryEngine } from '../local/query_engine';
3438
import {
3539
ClientId,
@@ -87,7 +91,8 @@ export interface OfflineComponentProvider {
8791
persistence: Persistence;
8892
sharedClientState: SharedClientState;
8993
localStore: LocalStore;
90-
gcScheduler: GarbageCollectionScheduler | null;
94+
gcScheduler: Scheduler | null;
95+
indexBackfillerScheduler: Scheduler | null;
9196
synchronizeTabs: boolean;
9297

9398
initialize(cfg: ComponentConfiguration): Promise<void>;
@@ -105,7 +110,8 @@ export class MemoryOfflineComponentProvider
105110
persistence!: Persistence;
106111
sharedClientState!: SharedClientState;
107112
localStore!: LocalStore;
108-
gcScheduler!: GarbageCollectionScheduler | null;
113+
gcScheduler!: Scheduler | null;
114+
indexBackfillerScheduler!: Scheduler | null;
109115
synchronizeTabs = false;
110116

111117
serializer!: JsonProtoSerializer;
@@ -115,13 +121,28 @@ export class MemoryOfflineComponentProvider
115121
this.sharedClientState = this.createSharedClientState(cfg);
116122
this.persistence = this.createPersistence(cfg);
117123
await this.persistence.start();
118-
this.gcScheduler = this.createGarbageCollectionScheduler(cfg);
119124
this.localStore = this.createLocalStore(cfg);
125+
this.gcScheduler = this.createGarbageCollectionScheduler(
126+
cfg,
127+
this.localStore
128+
);
129+
this.indexBackfillerScheduler = this.createIndexBackfillerScheduler(
130+
cfg,
131+
this.localStore
132+
);
120133
}
121134

122135
createGarbageCollectionScheduler(
123-
cfg: ComponentConfiguration
124-
): GarbageCollectionScheduler | null {
136+
cfg: ComponentConfiguration,
137+
localStore: LocalStore
138+
): Scheduler | null {
139+
return null;
140+
}
141+
142+
createIndexBackfillerScheduler(
143+
cfg: ComponentConfiguration,
144+
localStore: LocalStore
145+
): Scheduler | null {
125146
return null;
126147
}
127148

@@ -158,7 +179,8 @@ export class IndexedDbOfflineComponentProvider extends MemoryOfflineComponentPro
158179
persistence!: IndexedDbPersistence;
159180
sharedClientState!: SharedClientState;
160181
localStore!: LocalStore;
161-
gcScheduler!: GarbageCollectionScheduler | null;
182+
gcScheduler!: Scheduler | null;
183+
indexBackfillerScheduler!: Scheduler | null;
162184
synchronizeTabs = false;
163185

164186
constructor(
@@ -184,7 +206,13 @@ export class IndexedDbOfflineComponentProvider extends MemoryOfflineComponentPro
184206
// set it after localStore / remoteStore are started.
185207
await this.persistence.setPrimaryStateListener(() => {
186208
if (this.gcScheduler && !this.gcScheduler.started) {
187-
this.gcScheduler.start(this.localStore);
209+
this.gcScheduler.start();
210+
}
211+
if (
212+
this.indexBackfillerScheduler &&
213+
!this.indexBackfillerScheduler.started
214+
) {
215+
this.indexBackfillerScheduler.start();
188216
}
189217
return Promise.resolve();
190218
});
@@ -200,11 +228,20 @@ export class IndexedDbOfflineComponentProvider extends MemoryOfflineComponentPro
200228
}
201229

202230
createGarbageCollectionScheduler(
203-
cfg: ComponentConfiguration
204-
): GarbageCollectionScheduler | null {
231+
cfg: ComponentConfiguration,
232+
localStore: LocalStore
233+
): Scheduler | null {
205234
const garbageCollector =
206235
this.persistence.referenceDelegate.garbageCollector;
207-
return new LruScheduler(garbageCollector, cfg.asyncQueue);
236+
return new LruScheduler(garbageCollector, cfg.asyncQueue, localStore);
237+
}
238+
239+
createIndexBackfillerScheduler(
240+
cfg: ComponentConfiguration,
241+
localStore: LocalStore
242+
): Scheduler | null {
243+
const indexBackfiller = new IndexBackfiller(localStore, this.persistence);
244+
return new IndexBackfillerScheduler(cfg.asyncQueue, indexBackfiller);
208245
}
209246

210247
createPersistence(cfg: ComponentConfiguration): IndexedDbPersistence {
@@ -283,11 +320,18 @@ export class MultiTabOfflineComponentProvider extends IndexedDbOfflineComponentP
283320
);
284321
if (this.gcScheduler) {
285322
if (isPrimary && !this.gcScheduler.started) {
286-
this.gcScheduler.start(this.localStore);
323+
this.gcScheduler.start();
287324
} else if (!isPrimary) {
288325
this.gcScheduler.stop();
289326
}
290327
}
328+
if (this.indexBackfillerScheduler) {
329+
if (isPrimary && !this.indexBackfillerScheduler.started) {
330+
this.indexBackfillerScheduler.start();
331+
} else if (!isPrimary) {
332+
this.indexBackfillerScheduler.stop();
333+
}
334+
}
291335
});
292336
}
293337

packages/firestore/src/local/document_overlay_cache.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ export interface DocumentOverlayCache {
4949
*/
5050
getOverlays(
5151
transaction: PersistenceTransaction,
52-
keys: DocumentKeySet
52+
keys: DocumentKey[]
5353
): PersistencePromise<OverlayMap>;
5454

5555
/**
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/**
2+
* @license
3+
* Copyright 2022 Google LLC
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
import { DocumentMap } from '../model/collections';
18+
import {
19+
IndexOffset,
20+
indexOffsetComparator,
21+
newIndexOffsetFromDocument
22+
} from '../model/field_index';
23+
import { debugAssert } from '../util/assert';
24+
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
25+
import { logDebug } from '../util/log';
26+
27+
import { INDEXING_ENABLED } from './indexeddb_schema';
28+
import { ignoreIfPrimaryLeaseLoss, LocalStore } from './local_store';
29+
import { LocalWriteResult } from './local_store_impl';
30+
import { Persistence, Scheduler } from './persistence';
31+
import { PersistencePromise } from './persistence_promise';
32+
import { PersistenceTransaction } from './persistence_transaction';
33+
import { isIndexedDbTransactionError } from './simple_db';
34+
35+
const LOG_TAG = 'IndexBackiller';
36+
37+
/** How long we wait to try running index backfill after SDK initialization. */
38+
const INITIAL_BACKFILL_DELAY_MS = 15;
39+
40+
/** Minimum amount of time between backfill checks, after the first one. */
41+
const REGULAR_BACKFILL_DELAY_MS = 1;
42+
43+
/** The maximum number of documents to process each time backfill() is called. */
44+
const MAX_DOCUMENTS_TO_PROCESS = 50;
45+
46+
/** This class is responsible for the scheduling of Index Backfiller. */
47+
export class IndexBackfillerScheduler implements Scheduler {
48+
private task: DelayedOperation<void> | null;
49+
50+
constructor(
51+
private readonly asyncQueue: AsyncQueue,
52+
private readonly backfiller: IndexBackfiller
53+
) {
54+
this.task = null;
55+
}
56+
57+
start(): void {
58+
debugAssert(
59+
this.task === null,
60+
'Cannot start an already started IndexBackfillerScheduler'
61+
);
62+
if (INDEXING_ENABLED) {
63+
this.schedule(INITIAL_BACKFILL_DELAY_MS);
64+
}
65+
}
66+
67+
stop(): void {
68+
if (this.task) {
69+
this.task.cancel();
70+
this.task = null;
71+
}
72+
}
73+
74+
get started(): boolean {
75+
return this.task !== null;
76+
}
77+
78+
private schedule(delay: number): void {
79+
debugAssert(
80+
this.task === null,
81+
'Cannot schedule IndexBackiller while a task is pending'
82+
);
83+
logDebug(LOG_TAG, `Scheduled in ${delay}ms`);
84+
this.task = this.asyncQueue.enqueueAfterDelay(
85+
TimerId.IndexBackfill,
86+
delay,
87+
async () => {
88+
this.task = null;
89+
try {
90+
const documentsProcessed = await this.backfiller.backfill();
91+
logDebug(LOG_TAG, `Documents written: ${documentsProcessed}`);
92+
} catch (e) {
93+
if (isIndexedDbTransactionError(e)) {
94+
logDebug(
95+
LOG_TAG,
96+
'Ignoring IndexedDB error during index backfill: ',
97+
e
98+
);
99+
} else {
100+
await ignoreIfPrimaryLeaseLoss(e);
101+
}
102+
}
103+
await this.schedule(REGULAR_BACKFILL_DELAY_MS);
104+
}
105+
);
106+
}
107+
}
108+
109+
/** Implements the steps for backfilling indexes. */
110+
export class IndexBackfiller {
111+
constructor(
112+
/**
113+
* LocalStore provides access to IndexManager and LocalDocumentView.
114+
* These properties will update when the user changes. Consequently,
115+
* making a local copy of IndexManager and LocalDocumentView will require
116+
* updates over time. The simpler solution is to rely on LocalStore to have
117+
* an up-to-date references to IndexManager and LocalDocumentStore.
118+
*/
119+
private readonly localStore: LocalStore,
120+
private readonly persistence: Persistence
121+
) {}
122+
123+
async backfill(
124+
maxDocumentsToProcess: number = MAX_DOCUMENTS_TO_PROCESS
125+
): Promise<number> {
126+
return this.persistence.runTransaction(
127+
'Backfill Indexes',
128+
'readwrite-primary',
129+
txn => this.writeIndexEntries(txn, maxDocumentsToProcess)
130+
);
131+
}
132+
133+
/** Writes index entries until the cap is reached. Returns the number of documents processed. */
134+
private writeIndexEntries(
135+
transation: PersistenceTransaction,
136+
maxDocumentsToProcess: number
137+
): PersistencePromise<number> {
138+
const processedCollectionGroups = new Set<string>();
139+
let documentsRemaining = maxDocumentsToProcess;
140+
let continueLoop = true;
141+
return PersistencePromise.doWhile(
142+
() => continueLoop === true && documentsRemaining > 0,
143+
() => {
144+
return this.localStore.indexManager
145+
.getNextCollectionGroupToUpdate(transation)
146+
.next((collectionGroup: string | null) => {
147+
if (
148+
collectionGroup === null ||
149+
processedCollectionGroups.has(collectionGroup)
150+
) {
151+
continueLoop = false;
152+
} else {
153+
logDebug(LOG_TAG, `Processing collection: ${collectionGroup}`);
154+
return this.writeEntriesForCollectionGroup(
155+
transation,
156+
collectionGroup,
157+
documentsRemaining
158+
).next(documentsProcessed => {
159+
documentsRemaining -= documentsProcessed;
160+
processedCollectionGroups.add(collectionGroup);
161+
});
162+
}
163+
});
164+
}
165+
).next(() => maxDocumentsToProcess - documentsRemaining);
166+
}
167+
168+
/**
169+
* Writes entries for the provided collection group. Returns the number of documents processed.
170+
*/
171+
private writeEntriesForCollectionGroup(
172+
transaction: PersistenceTransaction,
173+
collectionGroup: string,
174+
documentsRemainingUnderCap: number
175+
): PersistencePromise<number> {
176+
// Use the earliest offset of all field indexes to query the local cache.
177+
return this.localStore.indexManager
178+
.getMinOffsetFromCollectionGroup(transaction, collectionGroup)
179+
.next(existingOffset =>
180+
this.localStore.localDocuments
181+
.getNextDocuments(
182+
transaction,
183+
collectionGroup,
184+
existingOffset,
185+
documentsRemainingUnderCap
186+
)
187+
.next(nextBatch => {
188+
const docs: DocumentMap = nextBatch.changes;
189+
return this.localStore.indexManager
190+
.updateIndexEntries(transaction, docs)
191+
.next(() => this.getNewOffset(existingOffset, nextBatch))
192+
.next(newOffset => {
193+
logDebug(LOG_TAG, `Updating offset: ${newOffset}`);
194+
return this.localStore.indexManager.updateCollectionGroup(
195+
transaction,
196+
collectionGroup,
197+
newOffset
198+
);
199+
})
200+
.next(() => docs.size);
201+
})
202+
);
203+
}
204+
205+
/** Returns the next offset based on the provided documents. */
206+
private getNewOffset(
207+
existingOffset: IndexOffset,
208+
lookupResult: LocalWriteResult
209+
): IndexOffset {
210+
let maxOffset: IndexOffset = existingOffset;
211+
lookupResult.changes.forEach((key, document) => {
212+
const newOffset: IndexOffset = newIndexOffsetFromDocument(document);
213+
if (indexOffsetComparator(newOffset, maxOffset) > 0) {
214+
maxOffset = newOffset;
215+
}
216+
});
217+
return new IndexOffset(
218+
maxOffset.readTime,
219+
maxOffset.documentKey,
220+
Math.max(lookupResult.batchId, existingOffset.largestBatchId)
221+
);
222+
}
223+
}

packages/firestore/src/local/index_manager.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,10 @@ export interface IndexManager {
164164
transaction: PersistenceTransaction,
165165
target: Target
166166
): PersistencePromise<IndexOffset>;
167+
168+
/** Returns the minimum offset for the given collection group. */
169+
getMinOffsetFromCollectionGroup(
170+
transaction: PersistenceTransaction,
171+
collectionGroup: string
172+
): PersistencePromise<IndexOffset>;
167173
}

0 commit comments

Comments
 (0)