Skip to content

Implement bundle loading. #3201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 43 commits into from
Jul 11, 2020
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
9602712
Renaming interfaces without leading I
wu-hui May 15, 2020
c5e783e
Initial commit of bundle reading - for web only.
wu-hui May 21, 2020
5e7fb89
Tests only run when it is not Node.
wu-hui May 21, 2020
1ee1615
Fix redundant imports
wu-hui May 21, 2020
18f0be1
Fix missing textencoder
wu-hui May 21, 2020
aa455bf
Remove generator.
wu-hui May 29, 2020
78248cd
Support bundle reader for Node
wu-hui May 23, 2020
83160a1
Fix rebase errors.
wu-hui May 29, 2020
24e10cb
Remote 'only'
wu-hui May 29, 2020
9d6edc5
Merge branch 'wuandy/Bundles' into wuandy/BundleReaderNode
wu-hui Jun 5, 2020
4cbe608
Merge branch 'wuandy/Bundles' into wuandy/BundleReaderNode
wu-hui Jun 5, 2020
4313e51
Added more comments, and more tests for Node.
wu-hui Jun 5, 2020
296cfc4
Implement BundleCache.
wu-hui Jun 5, 2020
fff3d36
Add applyBundleDocuments to local store.
wu-hui Jun 1, 2020
fb762de
Add rest of bundle service to localstore
wu-hui Jun 1, 2020
1ec4182
Simplify change buffer get read time logic.
wu-hui Jun 2, 2020
cd3ab7a
Fix lint errors
wu-hui Jun 2, 2020
d991c75
Add comments.
wu-hui Jun 2, 2020
af097c5
Change localstore to check for newer bundle directly.
wu-hui Jun 2, 2020
e735e23
temp checkin
wu-hui Jun 3, 2020
17ba434
Implement without async/await
wu-hui Jun 4, 2020
f808d8d
Major code complete.
wu-hui Jun 4, 2020
db1d864
Integration tests added.
wu-hui Jun 6, 2020
979ffd9
Added spec tests.
wu-hui Jun 9, 2020
f7ff495
Add comments and move types to d.ts
wu-hui Jun 9, 2020
556a007
Support loading string for real.
wu-hui Jun 9, 2020
adf1504
Makes sure SDK still works after loading bad bundles.
wu-hui Jun 11, 2020
b364ab0
Better spect test.
wu-hui Jun 12, 2020
b62e6ef
Merge branch 'wuandy/Bundles' into wuandy/BundleLoadProgress
wu-hui Jun 26, 2020
bc2021b
Merge branch 'wuandy/Bundles' into wuandy/BundleLoadProgress
wu-hui Jun 29, 2020
d5efcdf
Merge branch 'wuandy/Bundles' into wuandy/BundleLoadProgress
wu-hui Jun 29, 2020
17ab921
Set default bytesPerRead for browser
wu-hui Jun 29, 2020
21d4d7c
Finally ready for initial review.
wu-hui Jun 29, 2020
bf085ce
Address comments batch 1
wu-hui Jul 1, 2020
8fbdd3e
Fix bytesPerRead default
wu-hui Jul 1, 2020
985b205
Snapshots only once in the end.
wu-hui Jul 6, 2020
685624a
Temp addressing.
wu-hui Jul 8, 2020
5576963
Apply bundle.ts patch
wu-hui Jul 8, 2020
85e3ac6
Change how task is passed.
wu-hui Jul 8, 2020
2f639ae
From promise<void> to void
wu-hui Jul 8, 2020
57a1c63
Spec tests comments
wu-hui Jul 9, 2020
b6b261b
More feedbacks.
wu-hui Jul 10, 2020
6094153
Even more feedbacks.
wu-hui Jul 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions packages/firestore-types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,38 @@ export class FirebaseFirestore {

terminate(): Promise<void>;

loadBundle(
bundleData: ArrayBuffer | ReadableStream<ArrayBuffer> | string
): LoadBundleTask;

INTERNAL: { delete: () => Promise<void> };
}

export interface LoadBundleTask {
onProgress(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, should we include an overload that takes an Observer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's actually in the API doc. I will add it in a followup PR.

next?: (progress: LoadBundleTaskProgress) => any,
error?: (error: Error) => any,
complete?: (progress?: LoadBundleTaskProgress) => any
): Promise<any>;

then(
onFulfilled?: (a: LoadBundleTaskProgress) => any,
onRejected?: (a: Error) => any
): Promise<any>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
then(
onFulfilled?: (a: LoadBundleTaskProgress) => any,
onRejected?: (a: Error) => any
): Promise<any>;
then<T,R>(
onFulfilled?: (a: LoadBundleTaskProgress) => T|PromiseLike<T>,
onRejected?: (a: Error) => R|PromiseLike<R>
): Promise<T|R>;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ES5 definition for these types is:

interface Promise<T> {
    then<TResult1 = T, TResult2 = never>(onfulfilled?: ((value: T) => TResult1 | PromiseLike<TResult1>) | undefined | null, onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null): Promise<TResult1 | TResult2>;
    catch<TResult = never>(onrejected?: ((reason: any) => TResult | PromiseLike<TResult>) | undefined | null): Promise<T | TResult>;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks!


catch(onRejected: (a: Error) => any): Promise<any>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
catch(onRejected: (a: Error) => any): Promise<any>;
catch(onRejected: (a: Error) => R|PromiseLike<R>): Promise<R>;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Although it seems it should return a Promise<R|Progress> in this case.

}

export interface LoadBundleTaskProgress {
documentsLoaded: number;
totalDocuments: number;
bytesLoaded: number;
totalBytes: number;
taskState: TaskState;
}

export type TaskState = 'Error' | 'Running' | 'Success';

export class GeoPoint {
constructor(latitude: number, longitude: number);

Expand Down
7 changes: 7 additions & 0 deletions packages/firestore/src/api/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,13 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService {
};
}

loadBundle(
bundleData: ArrayBuffer | ReadableStream<Uint8Array> | string
): firestore.LoadBundleTask {
this.ensureClientConfigured();
return this._firestoreClient!.loadBundle(bundleData);
}

ensureClientConfigured(): FirestoreClient {
if (!this._firestoreClient) {
// Kick off starting the client but don't actually wait for it.
Expand Down
254 changes: 254 additions & 0 deletions packages/firestore/src/core/bundle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

import * as firestore from '@firebase/firestore-types';
import { Query } from './query';
import { SnapshotVersion } from './snapshot_version';
import {
Expand All @@ -28,6 +29,15 @@ import * as api from '../protos/firestore_proto_api';
import { DocumentKey } from '../model/document_key';
import { MaybeDocument, NoDocument } from '../model/document';
import { debugAssert } from '../util/assert';
import {
applyBundleDocuments,
LocalStore,
saveNamedQuery
} from '../local/local_store';
import { SizedBundleElement } from '../util/bundle_reader';
import { MaybeDocumentMap } from '../model/collections';
import { Deferred } from '../util/promise';
import { BundleMetadata } from '../protos/firestore_bundle_proto';

/**
* Represents a Firestore bundle saved by the SDK in its local storage.
Expand Down Expand Up @@ -98,3 +108,247 @@ export class BundleConverter {
return fromVersion(time);
}
}

/**
* Returns a `LoadBundleTaskProgress` representing the first progress of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Returns a `LoadBundleTaskProgress` representing the first progress of
* Returns a `LoadBundleTaskProgress`, representing the initial progress of

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* loading a bundle.
*/
export function initialProgress(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an exported function, can we add "bundle" to the name?

Optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

state: firestore.TaskState,
metadata: BundleMetadata
): firestore.LoadBundleTaskProgress {
return {
taskState: state,
documentsLoaded: state === 'Success' ? metadata.totalDocuments! : 0,
bytesLoaded: state === 'Success' ? metadata.totalBytes! : 0,
totalDocuments: metadata.totalDocuments!,
totalBytes: metadata.totalBytes!
};
}

/* eslint-disable @typescript-eslint/no-explicit-any */
export class LoadBundleTaskImpl implements firestore.LoadBundleTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple things:

This could probably just be a LoadBundleTask? Our other API classes don't use the "Impl" prefix.

This should also be move to src/api/

All members that are not part of the public API should be prefixed with an underscore.

If possible, all instances of any should be replaced with unknown.

private progressResolver = new Deferred<any>();
private progressNext?: (progress: firestore.LoadBundleTaskProgress) => any;
private progressError?: (err: Error) => any;
private progressComplete?: (
progress?: firestore.LoadBundleTaskProgress
) => any;

private promiseResolver = new Deferred<any>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very generic name. Is this the "completionResolver"?

private promiseFulfilled?: (
progress: firestore.LoadBundleTaskProgress
) => any;
private promiseRejected?: (err: Error) => any;

private lastProgress: firestore.LoadBundleTaskProgress = {
taskState: 'Running',
totalBytes: 0,
totalDocuments: 0,
bytesLoaded: 0,
documentsLoaded: 0
};

onProgress(
next?: (progress: firestore.LoadBundleTaskProgress) => any,
error?: (err: Error) => any,
complete?: (progress?: firestore.LoadBundleTaskProgress) => void
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it customary to make the "progress" argument optional? Every user will have to check for its existence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original API proposal actually has complete() => void, which is more conventional, i'll revert to that.

): Promise<any> {
this.progressNext = next;
this.progressError = error;
this.progressComplete = complete;
return this.progressResolver.promise;
}

catch(onRejected: (a: Error) => any): Promise<any> {
this.promiseRejected = onRejected;
return this.promiseResolver.promise;
}

then(
onFulfilled?: (a: firestore.LoadBundleTaskProgress) => any,
onRejected?: (a: Error) => any
): Promise<any> {
this.promiseFulfilled = onFulfilled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these members? Could we just call return this.promiseResolver.promise.then(onFullfilled, onRejected)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, obviously I cannot wrap my mind around promises.

this.promiseRejected = onRejected;
return this.promiseResolver.promise;
}
/* eslint-enable @typescript-eslint/no-explicit-any */

/**
* Notifies the completion of loading a bundle, with a provided
* `LoadBundleTaskProgress` object.
*/
completeWith(progress: firestore.LoadBundleTaskProgress): void {
let result;
if (this.progressComplete) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a boolean state check, but it is far from it. We probably should rename the members that contain the user functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

result = this.progressComplete(progress);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems slightly strange to me that we use the callback result and pass it back to the user. Is it not more common to simple ignore the return value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done as the return type changed to void.

}
this.progressResolver.resolve(result);

result = undefined;
if (this.promiseFulfilled) {
result = this.promiseFulfilled(progress);
}
this.promiseResolver.resolve(result);
}

/**
* Notifies a failure of loading a bundle, with a provided `Error`
* as the reason.
*/
failedWith(error: Error): void {
if (this.progressNext) {
this.lastProgress.taskState = 'Error';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should always update taskState, regardless of whether there is a callback handle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

this.progressNext(this.lastProgress);
}

let result;
if (this.progressError) {
result = this.progressError(error);
}
this.progressResolver.reject(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here - should we not pass the original error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


result = undefined;
if (this.promiseRejected) {
this.promiseRejected(error);
}
this.promiseResolver.reject(result);
}

/**
* Notifies a progress update of loading a bundle.
* @param progress The new progress.
*/
updateProgress(progress: firestore.LoadBundleTaskProgress): void {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we verify the state machine at this point? For example, we don't want to revert back from state "error".

This should also have an underscore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

this.lastProgress = progress;
if (this.progressNext) {
this.progressNext(progress);
}
}
}

export class LoadResult {
constructor(
readonly progress: firestore.LoadBundleTaskProgress,
readonly changedDocs?: MaybeDocumentMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be optional anymore, which I think leads to one more simplification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

) {}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an exported class, it should have a less generic name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/**
* A class to process the elements from a bundle, load them into local
* storage and provide progress update while loading.
*/
export class BundleLoader {
/** The current progress of loading */
private progress: firestore.LoadBundleTaskProgress;
/**
* The threshold multiplier used to determine whether enough elements are
* batched to be loaded, and a progress update is needed.
*/
private step = 0.01;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the unit here? Is this a percentile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

/** Batched queries to be saved into storage */
private queries: bundleProto.NamedQuery[] = [];
/** Batched documents to be saved into storage */
private documents: BundledDocuments = [];
/** How many bytes in the bundle are being batched. */
private bytesIncrement = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not immediately obvious to me whether this is the total documents loaded so far or some sort of threshold that we use for our diff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

/** How many documents in the bundle are being batched. */
private documentsIncrement = 0;
/**
* A BundleDocumentMetadata is added to the loader, it is saved here while
* we wait for the actual document.
*/
private unpairedDocumentMetadata: bundleProto.BundledDocumentMetadata | null = null;

constructor(
private metadata: bundleProto.BundleMetadata,
private localStore: LocalStore
) {
this.progress = initialProgress('Running', metadata);
}

/**
* Adds an element from the bundle to the loader.
*
* If adding this element leads to actually saving the batched elements into
* storage, the returned promise will resolve to a `LoadResult`, otherwise
* it will resolve to null.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the documentation seems outdated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

*/
addSizedElement(element: SizedBundleElement): Promise<LoadResult | null> {
debugAssert(!element.isBundleMetadata(), 'Unexpected bundle metadata.');

this.bytesIncrement += element.byteLength;
if (element.payload.namedQuery) {
this.queries.push(element.payload.namedQuery);
}

if (element.payload.documentMetadata) {
if (element.payload.documentMetadata.exists) {
this.unpairedDocumentMetadata = element.payload.documentMetadata;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this push to this.documents as well? We could then add the actual document in the next call. This would remove the need for this.unpairedDocumentMetadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, in the next call, we would have to get the last element and make sure it is a metadata and has exists: true though. I am not sure which way is more readable, TBH.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks.

} else {
this.documents.push({
metadata: element.payload.documentMetadata,
document: undefined
});
this.documentsIncrement += 1;
}
}

if (element.payload.document) {
debugAssert(
!!this.unpairedDocumentMetadata,
'Unexpected document when no pairing metadata is found'
);
this.documents.push({
metadata: this.unpairedDocumentMetadata!,
document: element.payload.document
});
this.documentsIncrement += 1;
this.unpairedDocumentMetadata = null;
}

return this.saveAndReportProgress();
}

private async saveAndReportProgress(): Promise<LoadResult | null> {
if (
this.unpairedDocumentMetadata ||
(this.documentsIncrement < this.progress.totalDocuments * this.step &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just blindly report progress when any new document is added? We would have to invoke this twice in addSizedElement - once, when we add metadata for a non-existing document and once when we add the value for an existing document.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. The thing is this way we report before documents actually gets saved, which might leads to confusing behavior in some corner cases when the SDK crashes. We can recover when the SDK is back, but the progress report will not seem consistent.

this.bytesIncrement < this.progress.totalBytes * this.step)
) {
return null;
}

for (const q of this.queries) {
await saveNamedQuery(this.localStore, q);
}

let changedDocs;
if (this.documents.length > 0) {
changedDocs = await applyBundleDocuments(this.localStore, this.documents);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often do you think bundles will be empty? If this is rare, then we could just call applyBundleDocuments, which only has a little bit of overhead but removes two if statements from your code. You already don't call into this code for skipped bundles, so I would think changedDocs would rarely be empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

this.progress.bytesLoaded += this.bytesIncrement;
this.progress.documentsLoaded += this.documentsIncrement;
this.bytesIncrement = 0;
this.documentsIncrement = 0;
this.queries = [];
this.documents = [];

return new LoadResult({ ...this.progress }, changedDocs);
}

/**
* Update the progress to 'Success' and return the updated progress.
*/
complete(): firestore.LoadBundleTaskProgress {
debugAssert(
this.queries.length === 0 && this.documents.length === 0,
'There are more items needs to be saved but complete() is called.'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/is/was/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

);
this.progress.taskState = 'Success';

return this.progress;
}
}
26 changes: 25 additions & 1 deletion packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

import { LoadBundleTask } from '@firebase/firestore-types';
import { CredentialsProvider } from '../api/credentials';
import { User } from '../auth/user';
import { LocalStore } from '../local/local_store';
Expand All @@ -34,7 +35,7 @@ import {
Observer,
QueryListener
} from './event_manager';
import { SyncEngine } from './sync_engine';
import { SyncEngine, loadBundle } from './sync_engine';
import { View } from './view';

import { SharedClientState } from '../local/shared_client_state';
Expand All @@ -47,8 +48,11 @@ import {
ComponentProvider,
MemoryComponentProvider
} from './component_provider';
import { BundleReader } from '../util/bundle_reader';
import { LoadBundleTaskImpl } from './bundle';
import { newConnection } from '../platform/connection';
import { newSerializer } from '../platform/serializer';
import { toByteStreamReader } from '../platform/byte_stream_reader';

const LOG_TAG = 'FirestoreClient';
const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;
Expand Down Expand Up @@ -512,4 +516,24 @@ export class FirestoreClient {
});
return deferred.promise;
}

loadBundle(
data: ReadableStream<Uint8Array> | ArrayBuffer | string
): LoadBundleTask {
this.verifyNotTerminated();

let content: ReadableStream<Uint8Array> | ArrayBuffer;
if (typeof data === 'string') {
content = new TextEncoder().encode(data);
} else {
content = data;
}
const reader = new BundleReader(toByteStreamReader(content));
const task = new LoadBundleTaskImpl();
this.asyncQueue.enqueueAndForget(() => {
return loadBundle(this.syncEngine, reader, task);
});

return task;
}
}
Loading