-
Notifications
You must be signed in to change notification settings - Fork 938
Implement bundle loading. #3201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
9602712
c5e783e
5e7fb89
1ee1615
18f0be1
aa455bf
78248cd
83160a1
24e10cb
9d6edc5
4cbe608
4313e51
296cfc4
fff3d36
fb762de
1ec4182
cd3ab7a
d991c75
af097c5
e735e23
17ba434
f808d8d
db1d864
979ffd9
f7ff495
556a007
adf1504
b364ab0
b62e6ef
bc2021b
d5efcdf
17ab921
21d4d7c
bf085ce
8fbdd3e
985b205
685624a
5576963
85e3ac6
2f639ae
57a1c63
b6b261b
6094153
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -92,9 +92,38 @@ export class FirebaseFirestore { | |||||||||||||||||
|
||||||||||||||||||
terminate(): Promise<void>; | ||||||||||||||||||
|
||||||||||||||||||
loadBundle( | ||||||||||||||||||
bundleData: ArrayBuffer | ReadableStream<ArrayBuffer> | string | ||||||||||||||||||
): LoadBundleTask; | ||||||||||||||||||
|
||||||||||||||||||
INTERNAL: { delete: () => Promise<void> }; | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
export interface LoadBundleTask { | ||||||||||||||||||
onProgress( | ||||||||||||||||||
next?: (progress: LoadBundleTaskProgress) => any, | ||||||||||||||||||
error?: (error: Error) => any, | ||||||||||||||||||
complete?: (progress?: LoadBundleTaskProgress) => any | ||||||||||||||||||
): Promise<any>; | ||||||||||||||||||
|
||||||||||||||||||
then( | ||||||||||||||||||
onFulfilled?: (a: LoadBundleTaskProgress) => any, | ||||||||||||||||||
onRejected?: (a: Error) => any | ||||||||||||||||||
): Promise<any>; | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The ES5 definition for these types is:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, thanks! |
||||||||||||||||||
|
||||||||||||||||||
catch(onRejected: (a: Error) => any): Promise<any>; | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Although it seems it should return a |
||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
export interface LoadBundleTaskProgress { | ||||||||||||||||||
documentsLoaded: number; | ||||||||||||||||||
totalDocuments: number; | ||||||||||||||||||
bytesLoaded: number; | ||||||||||||||||||
totalBytes: number; | ||||||||||||||||||
taskState: TaskState; | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
export type TaskState = 'Error' | 'Running' | 'Success'; | ||||||||||||||||||
|
||||||||||||||||||
export class GeoPoint { | ||||||||||||||||||
constructor(latitude: number, longitude: number); | ||||||||||||||||||
|
||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -15,6 +15,7 @@ | |||||
* limitations under the License. | ||||||
*/ | ||||||
|
||||||
import * as firestore from '@firebase/firestore-types'; | ||||||
import { Query } from './query'; | ||||||
import { SnapshotVersion } from './snapshot_version'; | ||||||
import { | ||||||
|
@@ -28,6 +29,15 @@ import * as api from '../protos/firestore_proto_api'; | |||||
import { DocumentKey } from '../model/document_key'; | ||||||
import { MaybeDocument, NoDocument } from '../model/document'; | ||||||
import { debugAssert } from '../util/assert'; | ||||||
import { | ||||||
applyBundleDocuments, | ||||||
LocalStore, | ||||||
saveNamedQuery | ||||||
} from '../local/local_store'; | ||||||
import { SizedBundleElement } from '../util/bundle_reader'; | ||||||
import { MaybeDocumentMap } from '../model/collections'; | ||||||
import { Deferred } from '../util/promise'; | ||||||
import { BundleMetadata } from '../protos/firestore_bundle_proto'; | ||||||
|
||||||
/** | ||||||
* Represents a Firestore bundle saved by the SDK in its local storage. | ||||||
|
@@ -98,3 +108,247 @@ export class BundleConverter { | |||||
return fromVersion(time); | ||||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
* Returns a `LoadBundleTaskProgress` representing the first progress of | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||
* loading a bundle. | ||||||
*/ | ||||||
export function initialProgress( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is an exported function, can we add "bundle" to the name? Optional. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||
state: firestore.TaskState, | ||||||
metadata: BundleMetadata | ||||||
): firestore.LoadBundleTaskProgress { | ||||||
return { | ||||||
taskState: state, | ||||||
documentsLoaded: state === 'Success' ? metadata.totalDocuments! : 0, | ||||||
bytesLoaded: state === 'Success' ? metadata.totalBytes! : 0, | ||||||
totalDocuments: metadata.totalDocuments!, | ||||||
totalBytes: metadata.totalBytes! | ||||||
}; | ||||||
} | ||||||
|
||||||
/* eslint-disable @typescript-eslint/no-explicit-any */ | ||||||
export class LoadBundleTaskImpl implements firestore.LoadBundleTask { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couple things: This could probably just be a This should also be move to All members that are not part of the public API should be prefixed with an underscore. If possible, all instances of |
||||||
private progressResolver = new Deferred<any>(); | ||||||
private progressNext?: (progress: firestore.LoadBundleTaskProgress) => any; | ||||||
private progressError?: (err: Error) => any; | ||||||
private progressComplete?: ( | ||||||
progress?: firestore.LoadBundleTaskProgress | ||||||
) => any; | ||||||
|
||||||
private promiseResolver = new Deferred<any>(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a very generic name. Is this the "completionResolver"? |
||||||
private promiseFulfilled?: ( | ||||||
progress: firestore.LoadBundleTaskProgress | ||||||
) => any; | ||||||
private promiseRejected?: (err: Error) => any; | ||||||
|
||||||
private lastProgress: firestore.LoadBundleTaskProgress = { | ||||||
taskState: 'Running', | ||||||
totalBytes: 0, | ||||||
totalDocuments: 0, | ||||||
bytesLoaded: 0, | ||||||
documentsLoaded: 0 | ||||||
}; | ||||||
|
||||||
onProgress( | ||||||
next?: (progress: firestore.LoadBundleTaskProgress) => any, | ||||||
error?: (err: Error) => any, | ||||||
complete?: (progress?: firestore.LoadBundleTaskProgress) => void | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it customary to make the "progress" argument optional? Every user will have to check for its existence. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Original API proposal actually has |
||||||
): Promise<any> { | ||||||
this.progressNext = next; | ||||||
this.progressError = error; | ||||||
this.progressComplete = complete; | ||||||
return this.progressResolver.promise; | ||||||
} | ||||||
|
||||||
catch(onRejected: (a: Error) => any): Promise<any> { | ||||||
this.promiseRejected = onRejected; | ||||||
return this.promiseResolver.promise; | ||||||
} | ||||||
|
||||||
then( | ||||||
onFulfilled?: (a: firestore.LoadBundleTaskProgress) => any, | ||||||
onRejected?: (a: Error) => any | ||||||
): Promise<any> { | ||||||
this.promiseFulfilled = onFulfilled; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need these members? Could we just call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could, obviously I cannot wrap my mind around promises. |
||||||
this.promiseRejected = onRejected; | ||||||
return this.promiseResolver.promise; | ||||||
} | ||||||
/* eslint-enable @typescript-eslint/no-explicit-any */ | ||||||
|
||||||
/** | ||||||
* Notifies the completion of loading a bundle, with a provided | ||||||
* `LoadBundleTaskProgress` object. | ||||||
*/ | ||||||
completeWith(progress: firestore.LoadBundleTaskProgress): void { | ||||||
let result; | ||||||
if (this.progressComplete) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sounds like a boolean state check, but it is far from it. We probably should rename the members that contain the user functions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||
result = this.progressComplete(progress); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems slightly strange to me that we use the callback result and pass it back to the user. Is it not more common to simple ignore the return value? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done as the return type changed to void. |
||||||
} | ||||||
this.progressResolver.resolve(result); | ||||||
|
||||||
result = undefined; | ||||||
if (this.promiseFulfilled) { | ||||||
result = this.promiseFulfilled(progress); | ||||||
} | ||||||
this.promiseResolver.resolve(result); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Notifies a failure of loading a bundle, with a provided `Error` | ||||||
* as the reason. | ||||||
*/ | ||||||
failedWith(error: Error): void { | ||||||
if (this.progressNext) { | ||||||
this.lastProgress.taskState = 'Error'; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably should always update There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. |
||||||
this.progressNext(this.lastProgress); | ||||||
} | ||||||
|
||||||
let result; | ||||||
if (this.progressError) { | ||||||
result = this.progressError(error); | ||||||
} | ||||||
this.progressResolver.reject(result); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar comment here - should we not pass the original error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||
|
||||||
result = undefined; | ||||||
if (this.promiseRejected) { | ||||||
this.promiseRejected(error); | ||||||
} | ||||||
this.promiseResolver.reject(result); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Notifies a progress update of loading a bundle. | ||||||
* @param progress The new progress. | ||||||
*/ | ||||||
updateProgress(progress: firestore.LoadBundleTaskProgress): void { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we verify the state machine at this point? For example, we don't want to revert back from state "error". This should also have an underscore. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||
this.lastProgress = progress; | ||||||
if (this.progressNext) { | ||||||
this.progressNext(progress); | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
export class LoadResult { | ||||||
constructor( | ||||||
readonly progress: firestore.LoadBundleTaskProgress, | ||||||
readonly changedDocs?: MaybeDocumentMap | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't need to be optional anymore, which I think leads to one more simplification. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||
) {} | ||||||
} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is an exported class, it should have a less generic name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||
|
||||||
/** | ||||||
* A class to process the elements from a bundle, load them into local | ||||||
* storage and provide progress update while loading. | ||||||
*/ | ||||||
export class BundleLoader { | ||||||
/** The current progress of loading */ | ||||||
private progress: firestore.LoadBundleTaskProgress; | ||||||
/** | ||||||
* The threshold multiplier used to determine whether enough elements are | ||||||
* batched to be loaded, and a progress update is needed. | ||||||
*/ | ||||||
private step = 0.01; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the unit here? Is this a percentile? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renamed. |
||||||
/** Batched queries to be saved into storage */ | ||||||
private queries: bundleProto.NamedQuery[] = []; | ||||||
/** Batched documents to be saved into storage */ | ||||||
private documents: BundledDocuments = []; | ||||||
/** How many bytes in the bundle are being batched. */ | ||||||
private bytesIncrement = 0; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not immediately obvious to me whether this is the total documents loaded so far or some sort of threshold that we use for our diff. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renamed. |
||||||
/** How many documents in the bundle are being batched. */ | ||||||
private documentsIncrement = 0; | ||||||
/** | ||||||
* A BundleDocumentMetadata is added to the loader, it is saved here while | ||||||
* we wait for the actual document. | ||||||
*/ | ||||||
private unpairedDocumentMetadata: bundleProto.BundledDocumentMetadata | null = null; | ||||||
|
||||||
constructor( | ||||||
private metadata: bundleProto.BundleMetadata, | ||||||
private localStore: LocalStore | ||||||
) { | ||||||
this.progress = initialProgress('Running', metadata); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Adds an element from the bundle to the loader. | ||||||
* | ||||||
* If adding this element leads to actually saving the batched elements into | ||||||
* storage, the returned promise will resolve to a `LoadResult`, otherwise | ||||||
* it will resolve to null. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This part of the documentation seems outdated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||||||
*/ | ||||||
addSizedElement(element: SizedBundleElement): Promise<LoadResult | null> { | ||||||
debugAssert(!element.isBundleMetadata(), 'Unexpected bundle metadata.'); | ||||||
|
||||||
this.bytesIncrement += element.byteLength; | ||||||
if (element.payload.namedQuery) { | ||||||
this.queries.push(element.payload.namedQuery); | ||||||
} | ||||||
|
||||||
if (element.payload.documentMetadata) { | ||||||
if (element.payload.documentMetadata.exists) { | ||||||
this.unpairedDocumentMetadata = element.payload.documentMetadata; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this push to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, in the next call, we would have to get the last element and make sure it is a metadata and has There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, thanks. |
||||||
} else { | ||||||
this.documents.push({ | ||||||
metadata: element.payload.documentMetadata, | ||||||
document: undefined | ||||||
}); | ||||||
this.documentsIncrement += 1; | ||||||
} | ||||||
} | ||||||
|
||||||
if (element.payload.document) { | ||||||
debugAssert( | ||||||
!!this.unpairedDocumentMetadata, | ||||||
'Unexpected document when no pairing metadata is found' | ||||||
); | ||||||
this.documents.push({ | ||||||
metadata: this.unpairedDocumentMetadata!, | ||||||
document: element.payload.document | ||||||
}); | ||||||
this.documentsIncrement += 1; | ||||||
this.unpairedDocumentMetadata = null; | ||||||
} | ||||||
|
||||||
return this.saveAndReportProgress(); | ||||||
} | ||||||
|
||||||
private async saveAndReportProgress(): Promise<LoadResult | null> { | ||||||
if ( | ||||||
this.unpairedDocumentMetadata || | ||||||
(this.documentsIncrement < this.progress.totalDocuments * this.step && | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we just blindly report progress when any new document is added? We would have to invoke this twice in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could. The thing is this way we report before documents actually gets saved, which might leads to confusing behavior in some corner cases when the SDK crashes. We can recover when the SDK is back, but the progress report will not seem consistent. |
||||||
this.bytesIncrement < this.progress.totalBytes * this.step) | ||||||
) { | ||||||
return null; | ||||||
} | ||||||
|
||||||
for (const q of this.queries) { | ||||||
await saveNamedQuery(this.localStore, q); | ||||||
} | ||||||
|
||||||
let changedDocs; | ||||||
if (this.documents.length > 0) { | ||||||
changedDocs = await applyBundleDocuments(this.localStore, this.documents); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How often do you think bundles will be empty? If this is rare, then we could just call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||
} | ||||||
|
||||||
this.progress.bytesLoaded += this.bytesIncrement; | ||||||
this.progress.documentsLoaded += this.documentsIncrement; | ||||||
this.bytesIncrement = 0; | ||||||
this.documentsIncrement = 0; | ||||||
this.queries = []; | ||||||
this.documents = []; | ||||||
|
||||||
return new LoadResult({ ...this.progress }, changedDocs); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Update the progress to 'Success' and return the updated progress. | ||||||
*/ | ||||||
complete(): firestore.LoadBundleTaskProgress { | ||||||
debugAssert( | ||||||
this.queries.length === 0 && this.documents.length === 0, | ||||||
'There are more items needs to be saved but complete() is called.' | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/is/was/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||
); | ||||||
this.progress.taskState = 'Success'; | ||||||
|
||||||
return this.progress; | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, should we include an overload that takes an Observer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's actually in the API doc. I will add it in a followup PR.