Skip to content

Implement Firestore IndexBackfiller #6261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
104964e
Integrate Document Overlay with the SDK (#6123)
ehsannas Apr 6, 2022
bdd3eae
Overlay migration (#6131)
ehsannas May 2, 2022
2aabc42
Implement IndexBackfiller
tom-andersen May 10, 2022
1ac2934
Format
tom-andersen May 10, 2022
235564b
Fix unbound method
tom-andersen May 11, 2022
736385e
Update overlay migration code to use DbMutationBatchStore (#6268)
ehsannas May 12, 2022
737284a
Implement IndexBackfiller
tom-andersen May 10, 2022
4a319ee
Format
tom-andersen May 10, 2022
d999f11
Fix unbound method
tom-andersen May 11, 2022
5890b8c
Add Index Backfiller test
tom-andersen May 12, 2022
0c7b979
Add more Index Backfiller tests
tom-andersen May 13, 2022
1023247
Merge remote-tracking branch 'origin/tomandersen/index-backfiller' in…
tom-andersen May 13, 2022
1c1c814
Add more Index Backfiller tests
tom-andersen May 16, 2022
e0a06e7
Changes by andy1
tom-andersen May 16, 2022
f613c3a
Make sqlite3 as dev dependency
tom-andersen May 16, 2022
1a12a6e
Remove sqlite3 as dev dependency
tom-andersen May 16, 2022
e370c7b
Prettier
tom-andersen May 16, 2022
776d95a
Revert
tom-andersen May 24, 2022
4028824
Merge remote-tracking branch 'origin/master' into tomandersen/index-b…
tom-andersen May 24, 2022
00dd2f7
Fix after merge. Make tests pass.
tom-andersen May 30, 2022
f24b0f6
Merge branch 'master' of https://github.com/firebase/firebase-js-sdk …
tom-andersen May 30, 2022
05522e2
Fix according to PR comments
tom-andersen May 30, 2022
665d71f
Keep INDEXING_ENABLED flag since Web implementation is still incomplete.
tom-andersen May 31, 2022
6c5576f
Refactor getMinOffsetFromFieldIndexes to follow Android implementatio…
tom-andersen Jun 2, 2022
90c4227
Disable IndexBackfiller and tests
tom-andersen Jun 2, 2022
fa1238f
Disable IndexBackfiller and tests
tom-andersen Jun 2, 2022
c4f87be
Lint
tom-andersen Jun 2, 2022
7b8398d
Refactor with type OverlayedDocumentMap. Add Comments. Fix nit.
tom-andersen Jun 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 56 additions & 12 deletions packages/firestore/src/core/component_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

import { CredentialsProvider } from '../api/credentials';
import { User } from '../auth/user';
import {
IndexBackfiller,
IndexBackfillerScheduler
} from '../local/index_backfiller';
import {
indexedDbStoragePrefix,
IndexedDbPersistence
Expand All @@ -29,7 +33,7 @@ import {
MemoryEagerDelegate,
MemoryPersistence
} from '../local/memory_persistence';
import { GarbageCollectionScheduler, Persistence } from '../local/persistence';
import { Scheduler, Persistence } from '../local/persistence';
import { QueryEngine } from '../local/query_engine';
import {
ClientId,
Expand Down Expand Up @@ -87,7 +91,8 @@ export interface OfflineComponentProvider {
persistence: Persistence;
sharedClientState: SharedClientState;
localStore: LocalStore;
gcScheduler: GarbageCollectionScheduler | null;
gcScheduler: Scheduler | null;
indexBackfillerScheduler: Scheduler | null;
synchronizeTabs: boolean;

initialize(cfg: ComponentConfiguration): Promise<void>;
Expand All @@ -105,7 +110,8 @@ export class MemoryOfflineComponentProvider
persistence!: Persistence;
sharedClientState!: SharedClientState;
localStore!: LocalStore;
gcScheduler!: GarbageCollectionScheduler | null;
gcScheduler!: Scheduler | null;
indexBackfillerScheduler!: Scheduler | null;
synchronizeTabs = false;

serializer!: JsonProtoSerializer;
Expand All @@ -115,13 +121,28 @@ export class MemoryOfflineComponentProvider
this.sharedClientState = this.createSharedClientState(cfg);
this.persistence = this.createPersistence(cfg);
await this.persistence.start();
this.gcScheduler = this.createGarbageCollectionScheduler(cfg);
this.localStore = this.createLocalStore(cfg);
this.gcScheduler = this.createGarbageCollectionScheduler(
cfg,
this.localStore
);
this.indexBackfillerScheduler = this.createIndexBackfillerScheduler(
cfg,
this.localStore
);
}

createGarbageCollectionScheduler(
cfg: ComponentConfiguration
): GarbageCollectionScheduler | null {
cfg: ComponentConfiguration,
localStore: LocalStore
): Scheduler | null {
return null;
}

createIndexBackfillerScheduler(
cfg: ComponentConfiguration,
localStore: LocalStore
): Scheduler | null {
return null;
}

Expand Down Expand Up @@ -158,7 +179,8 @@ export class IndexedDbOfflineComponentProvider extends MemoryOfflineComponentPro
persistence!: IndexedDbPersistence;
sharedClientState!: SharedClientState;
localStore!: LocalStore;
gcScheduler!: GarbageCollectionScheduler | null;
gcScheduler!: Scheduler | null;
indexBackfillerScheduler!: Scheduler | null;
synchronizeTabs = false;

constructor(
Expand All @@ -184,7 +206,13 @@ export class IndexedDbOfflineComponentProvider extends MemoryOfflineComponentPro
// set it after localStore / remoteStore are started.
await this.persistence.setPrimaryStateListener(() => {
if (this.gcScheduler && !this.gcScheduler.started) {
this.gcScheduler.start(this.localStore);
this.gcScheduler.start();
}
if (
this.indexBackfillerScheduler &&
!this.indexBackfillerScheduler.started
) {
this.indexBackfillerScheduler.start();
}
return Promise.resolve();
});
Expand All @@ -200,11 +228,20 @@ export class IndexedDbOfflineComponentProvider extends MemoryOfflineComponentPro
}

createGarbageCollectionScheduler(
cfg: ComponentConfiguration
): GarbageCollectionScheduler | null {
cfg: ComponentConfiguration,
localStore: LocalStore
): Scheduler | null {
const garbageCollector =
this.persistence.referenceDelegate.garbageCollector;
return new LruScheduler(garbageCollector, cfg.asyncQueue);
return new LruScheduler(garbageCollector, cfg.asyncQueue, localStore);
}

createIndexBackfillerScheduler(
cfg: ComponentConfiguration,
localStore: LocalStore
): Scheduler | null {
const indexBackfiller = new IndexBackfiller(localStore, this.persistence);
return new IndexBackfillerScheduler(cfg.asyncQueue, indexBackfiller);
}

createPersistence(cfg: ComponentConfiguration): IndexedDbPersistence {
Expand Down Expand Up @@ -283,11 +320,18 @@ export class MultiTabOfflineComponentProvider extends IndexedDbOfflineComponentP
);
if (this.gcScheduler) {
if (isPrimary && !this.gcScheduler.started) {
this.gcScheduler.start(this.localStore);
this.gcScheduler.start();
} else if (!isPrimary) {
this.gcScheduler.stop();
}
}
if (this.indexBackfillerScheduler) {
if (isPrimary && !this.indexBackfillerScheduler.started) {
this.indexBackfillerScheduler.start();
} else if (!isPrimary) {
this.indexBackfillerScheduler.stop();
}
}
});
}

Expand Down
2 changes: 1 addition & 1 deletion packages/firestore/src/local/document_overlay_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export interface DocumentOverlayCache {
*/
getOverlays(
transaction: PersistenceTransaction,
keys: DocumentKeySet
keys: DocumentKey[]
): PersistencePromise<OverlayMap>;

/**
Expand Down
223 changes: 223 additions & 0 deletions packages/firestore/src/local/index_backfiller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/**
* @license
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { DocumentMap } from '../model/collections';
import {
IndexOffset,
indexOffsetComparator,
newIndexOffsetFromDocument
} from '../model/field_index';
import { debugAssert } from '../util/assert';
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
import { logDebug } from '../util/log';

import { INDEXING_ENABLED } from './indexeddb_schema';
import { ignoreIfPrimaryLeaseLoss, LocalStore } from './local_store';
import { LocalWriteResult } from './local_store_impl';
import { Persistence, Scheduler } from './persistence';
import { PersistencePromise } from './persistence_promise';
import { PersistenceTransaction } from './persistence_transaction';
import { isIndexedDbTransactionError } from './simple_db';

const LOG_TAG = 'IndexBackiller';

/** How long we wait to try running index backfill after SDK initialization. */
const INITIAL_BACKFILL_DELAY_MS = 15;

/** Minimum amount of time between backfill checks, after the first one. */
const REGULAR_BACKFILL_DELAY_MS = 1;

/** The maximum number of documents to process each time backfill() is called. */
const MAX_DOCUMENTS_TO_PROCESS = 50;

/** This class is responsible for the scheduling of Index Backfiller. */
export class IndexBackfillerScheduler implements Scheduler {
private task: DelayedOperation<void> | null;

constructor(
private readonly asyncQueue: AsyncQueue,
private readonly backfiller: IndexBackfiller
) {
this.task = null;
}

start(): void {
debugAssert(
this.task === null,
'Cannot start an already started IndexBackfillerScheduler'
);
if (INDEXING_ENABLED) {
this.schedule(INITIAL_BACKFILL_DELAY_MS);
}
}

stop(): void {
if (this.task) {
this.task.cancel();
this.task = null;
}
}

get started(): boolean {
return this.task !== null;
}

private schedule(delay: number): void {
debugAssert(
this.task === null,
'Cannot schedule IndexBackiller while a task is pending'
);
logDebug(LOG_TAG, `Scheduled in ${delay}ms`);
this.task = this.asyncQueue.enqueueAfterDelay(
TimerId.IndexBackfill,
delay,
async () => {
this.task = null;
try {
const documentsProcessed = await this.backfiller.backfill();
logDebug(LOG_TAG, `Documents written: ${documentsProcessed}`);
} catch (e) {
if (isIndexedDbTransactionError(e)) {
logDebug(
LOG_TAG,
'Ignoring IndexedDB error during index backfill: ',
e
);
} else {
await ignoreIfPrimaryLeaseLoss(e);
}
}
await this.schedule(REGULAR_BACKFILL_DELAY_MS);
}
);
}
}

/** Implements the steps for backfilling indexes. */
export class IndexBackfiller {
constructor(
/**
* LocalStore provides access to IndexManager and LocalDocumentView.
* These properties will update when the user changes. Consequently,
* making a local copy of IndexManager and LocalDocumentView will require
* updates over time. The simpler solution is to rely on LocalStore to have
* an up-to-date references to IndexManager and LocalDocumentStore.
*/
private readonly localStore: LocalStore,
private readonly persistence: Persistence
) {}

async backfill(
maxDocumentsToProcess: number = MAX_DOCUMENTS_TO_PROCESS
): Promise<number> {
return this.persistence.runTransaction(
'Backfill Indexes',
'readwrite-primary',
txn => this.writeIndexEntries(txn, maxDocumentsToProcess)
);
}

/** Writes index entries until the cap is reached. Returns the number of documents processed. */
private writeIndexEntries(
transation: PersistenceTransaction,
maxDocumentsToProcess: number
): PersistencePromise<number> {
const processedCollectionGroups = new Set<string>();
let documentsRemaining = maxDocumentsToProcess;
let continueLoop = true;
return PersistencePromise.doWhile(
() => continueLoop === true && documentsRemaining > 0,
() => {
return this.localStore.indexManager
.getNextCollectionGroupToUpdate(transation)
.next((collectionGroup: string | null) => {
if (
collectionGroup === null ||
processedCollectionGroups.has(collectionGroup)
) {
continueLoop = false;
} else {
logDebug(LOG_TAG, `Processing collection: ${collectionGroup}`);
return this.writeEntriesForCollectionGroup(
transation,
collectionGroup,
documentsRemaining
).next(documentsProcessed => {
documentsRemaining -= documentsProcessed;
processedCollectionGroups.add(collectionGroup);
});
}
});
}
).next(() => maxDocumentsToProcess - documentsRemaining);
}

/**
* Writes entries for the provided collection group. Returns the number of documents processed.
*/
private writeEntriesForCollectionGroup(
transaction: PersistenceTransaction,
collectionGroup: string,
documentsRemainingUnderCap: number
): PersistencePromise<number> {
// Use the earliest offset of all field indexes to query the local cache.
return this.localStore.indexManager
.getMinOffsetFromCollectionGroup(transaction, collectionGroup)
.next(existingOffset =>
this.localStore.localDocuments
.getNextDocuments(
transaction,
collectionGroup,
existingOffset,
documentsRemainingUnderCap
)
.next(nextBatch => {
const docs: DocumentMap = nextBatch.changes;
return this.localStore.indexManager
.updateIndexEntries(transaction, docs)
.next(() => this.getNewOffset(existingOffset, nextBatch))
.next(newOffset => {
logDebug(LOG_TAG, `Updating offset: ${newOffset}`);
return this.localStore.indexManager.updateCollectionGroup(
transaction,
collectionGroup,
newOffset
);
})
.next(() => docs.size);
})
);
}

/** Returns the next offset based on the provided documents. */
private getNewOffset(
existingOffset: IndexOffset,
lookupResult: LocalWriteResult
): IndexOffset {
let maxOffset: IndexOffset = existingOffset;
lookupResult.changes.forEach((key, document) => {
const newOffset: IndexOffset = newIndexOffsetFromDocument(document);
if (indexOffsetComparator(newOffset, maxOffset) > 0) {
maxOffset = newOffset;
}
});
return new IndexOffset(
maxOffset.readTime,
maxOffset.documentKey,
Math.max(lookupResult.batchId, existingOffset.largestBatchId)
);
}
}
6 changes: 6 additions & 0 deletions packages/firestore/src/local/index_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,10 @@ export interface IndexManager {
transaction: PersistenceTransaction,
target: Target
): PersistencePromise<IndexOffset>;

/** Returns the minimum offset for the given collection group. */
getMinOffsetFromCollectionGroup(
transaction: PersistenceTransaction,
collectionGroup: string
): PersistencePromise<IndexOffset>;
}
Loading