-
Notifications
You must be signed in to change notification settings - Fork 938
Implement bundle loading. #3201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 41 commits
9602712
c5e783e
5e7fb89
1ee1615
18f0be1
aa455bf
78248cd
83160a1
24e10cb
9d6edc5
4cbe608
4313e51
296cfc4
fff3d36
fb762de
1ec4182
cd3ab7a
d991c75
af097c5
e735e23
17ba434
f808d8d
db1d864
979ffd9
f7ff495
556a007
adf1504
b364ab0
b62e6ef
bc2021b
d5efcdf
17ab921
21d4d7c
bf085ce
8fbdd3e
985b205
685624a
5576963
85e3ac6
2f639ae
57a1c63
b6b261b
6094153
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,111 @@ | ||||
/** | ||||
* @license | ||||
* Copyright 2020 Google LLC | ||||
* | ||||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||||
* you may not use this file except in compliance with the License. | ||||
* You may obtain a copy of the License at | ||||
* | ||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||
* | ||||
* Unless required by applicable law or agreed to in writing, software | ||||
* distributed under the License is distributed on an "AS IS" BASIS, | ||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
* See the License for the specific language governing permissions and | ||||
* limitations under the License. | ||||
*/ | ||||
|
||||
import * as firestore from '@firebase/firestore-types'; | ||||
import { Deferred } from '../util/promise'; | ||||
import { PartialObserver } from './observer'; | ||||
import { debugAssert } from '../util/assert'; | ||||
|
||||
export class LoadBundleTask | ||||
implements | ||||
firestore.LoadBundleTask, | ||||
PromiseLike<firestore.LoadBundleTaskProgress> { | ||||
private _progressObserver?: PartialObserver<firestore.LoadBundleTaskProgress>; | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider initializing this to an empty observer: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||
private _taskCompletionResolver = new Deferred< | ||||
firestore.LoadBundleTaskProgress | ||||
>(); | ||||
|
||||
private _lastProgress: firestore.LoadBundleTaskProgress = { | ||||
taskState: 'Running', | ||||
totalBytes: 0, | ||||
totalDocuments: 0, | ||||
bytesLoaded: 0, | ||||
documentsLoaded: 0 | ||||
}; | ||||
|
||||
onProgress( | ||||
next?: (progress: firestore.LoadBundleTaskProgress) => unknown, | ||||
error?: (err: Error) => unknown, | ||||
complete?: () => void | ||||
): void { | ||||
this._progressObserver = { | ||||
next, | ||||
error, | ||||
complete | ||||
}; | ||||
} | ||||
|
||||
catch<R>( | ||||
onRejected: (a: Error) => R | PromiseLike<R> | ||||
): Promise<R | firestore.LoadBundleTaskProgress> { | ||||
return this._taskCompletionResolver.promise.catch(onRejected); | ||||
} | ||||
|
||||
then<T, R>( | ||||
onFulfilled?: (a: firestore.LoadBundleTaskProgress) => T | PromiseLike<T>, | ||||
onRejected?: (a: Error) => R | PromiseLike<R> | ||||
): Promise<T | R> { | ||||
return this._taskCompletionResolver.promise.then(onFulfilled, onRejected); | ||||
} | ||||
|
||||
/** | ||||
* Notifies all observers that bundle loading has completed, with a provided | ||||
* `LoadBundleTaskProgress` object. | ||||
*/ | ||||
_completeWith(progress: firestore.LoadBundleTaskProgress): void { | ||||
this._updateProgress(progress); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we not need to set the taskState here (like you did below)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The call sites set this up. I added an assert instead. |
||||
if (this._progressObserver && this._progressObserver.complete) { | ||||
this._progressObserver.complete(); | ||||
} | ||||
|
||||
this._taskCompletionResolver.resolve(progress); | ||||
} | ||||
|
||||
/** | ||||
* Notifies all observers that bundle loading has failed, with a provided | ||||
* `Error` as the reason. | ||||
*/ | ||||
_failWith(error: Error): void { | ||||
this._lastProgress.taskState = 'Error'; | ||||
|
||||
if (this._progressObserver && this._progressObserver.next) { | ||||
this._progressObserver.next(this._lastProgress); | ||||
} | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please try to match this behavior: firebase-js-sdk/packages/storage/src/task.ts Line 626 in 0131e1f
Basically, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed, we would keep current behaviour because these progress updates can be considered part of the progress update series. |
||||
|
||||
if (this._progressObserver && this._progressObserver.error) { | ||||
this._progressObserver.error(error); | ||||
} | ||||
|
||||
this._taskCompletionResolver.reject(error); | ||||
} | ||||
|
||||
/** | ||||
* Notifies a progress update of loading a bundle. | ||||
* @param progress The new progress. | ||||
*/ | ||||
_updateProgress(progress: firestore.LoadBundleTaskProgress): void { | ||||
debugAssert( | ||||
this._lastProgress.taskState !== 'Error', | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You may want to add 'complete' to this check as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||
'Cannot update progress on a failed task' | ||||
); | ||||
|
||||
this._lastProgress = progress; | ||||
if (this._progressObserver && this._progressObserver.next) { | ||||
this._progressObserver.next(progress); | ||||
} | ||||
} | ||||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -15,6 +15,7 @@ | |||||||||||||||||||||||||||||||
* limitations under the License. | ||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
import * as firestore from '@firebase/firestore-types'; | ||||||||||||||||||||||||||||||||
import { Query } from './query'; | ||||||||||||||||||||||||||||||||
import { SnapshotVersion } from './snapshot_version'; | ||||||||||||||||||||||||||||||||
import { | ||||||||||||||||||||||||||||||||
|
@@ -28,6 +29,14 @@ import * as api from '../protos/firestore_proto_api'; | |||||||||||||||||||||||||||||||
import { DocumentKey } from '../model/document_key'; | ||||||||||||||||||||||||||||||||
import { MaybeDocument, NoDocument } from '../model/document'; | ||||||||||||||||||||||||||||||||
import { debugAssert } from '../util/assert'; | ||||||||||||||||||||||||||||||||
import { | ||||||||||||||||||||||||||||||||
applyBundleDocuments, | ||||||||||||||||||||||||||||||||
LocalStore, | ||||||||||||||||||||||||||||||||
saveNamedQuery | ||||||||||||||||||||||||||||||||
} from '../local/local_store'; | ||||||||||||||||||||||||||||||||
import { SizedBundleElement } from '../util/bundle_reader'; | ||||||||||||||||||||||||||||||||
import { MaybeDocumentMap } from '../model/collections'; | ||||||||||||||||||||||||||||||||
import { BundleMetadata } from '../protos/firestore_bundle_proto'; | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||
* Represents a Firestore bundle saved by the SDK in its local storage. | ||||||||||||||||||||||||||||||||
|
@@ -58,7 +67,7 @@ export interface NamedQuery { | |||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||
interface BundledDocument { | ||||||||||||||||||||||||||||||||
metadata: bundleProto.BundledDocumentMetadata; | ||||||||||||||||||||||||||||||||
document: api.Document | undefined; | ||||||||||||||||||||||||||||||||
document?: api.Document; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||
|
@@ -98,3 +107,132 @@ export class BundleConverter { | |||||||||||||||||||||||||||||||
return fromVersion(time); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||
* Returns a `LoadBundleTaskProgress` representing the initial progress of | ||||||||||||||||||||||||||||||||
* loading a bundle. | ||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||
export function bundleInitialProgress( | ||||||||||||||||||||||||||||||||
metadata: BundleMetadata | ||||||||||||||||||||||||||||||||
): firestore.LoadBundleTaskProgress { | ||||||||||||||||||||||||||||||||
return { | ||||||||||||||||||||||||||||||||
taskState: 'Running', | ||||||||||||||||||||||||||||||||
documentsLoaded: 0, | ||||||||||||||||||||||||||||||||
bytesLoaded: 0, | ||||||||||||||||||||||||||||||||
totalDocuments: metadata.totalDocuments!, | ||||||||||||||||||||||||||||||||
totalBytes: metadata.totalBytes! | ||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||
* Returns a `LoadBundleTaskProgress` representing the progress that the loading | ||||||||||||||||||||||||||||||||
* has succeeded. | ||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||
export function bundleSuccessProgress( | ||||||||||||||||||||||||||||||||
metadata: BundleMetadata | ||||||||||||||||||||||||||||||||
): firestore.LoadBundleTaskProgress { | ||||||||||||||||||||||||||||||||
return { | ||||||||||||||||||||||||||||||||
taskState: 'Success', | ||||||||||||||||||||||||||||||||
documentsLoaded: metadata.totalDocuments!, | ||||||||||||||||||||||||||||||||
bytesLoaded: metadata.totalBytes!, | ||||||||||||||||||||||||||||||||
totalDocuments: metadata.totalDocuments!, | ||||||||||||||||||||||||||||||||
totalBytes: metadata.totalBytes! | ||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
export class LoadResult { | ||||||||||||||||||||||||||||||||
constructor( | ||||||||||||||||||||||||||||||||
readonly progress: firestore.LoadBundleTaskProgress, | ||||||||||||||||||||||||||||||||
readonly changedDocs?: MaybeDocumentMap | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't need to be optional anymore, which I think leads to one more simplification. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||||||||||||||||||||
) {} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is an exported class, it should have a less generic name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||
* A class to process the elements from a bundle, load them into local | ||||||||||||||||||||||||||||||||
* storage and provide progress update while loading. | ||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||
export class BundleLoader { | ||||||||||||||||||||||||||||||||
/** The current progress of loading */ | ||||||||||||||||||||||||||||||||
private progress: firestore.LoadBundleTaskProgress; | ||||||||||||||||||||||||||||||||
/** Batched queries to be saved into storage */ | ||||||||||||||||||||||||||||||||
private queries: bundleProto.NamedQuery[] = []; | ||||||||||||||||||||||||||||||||
/** Batched documents to be saved into storage */ | ||||||||||||||||||||||||||||||||
private documents: BundledDocuments = []; | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
constructor( | ||||||||||||||||||||||||||||||||
private metadata: bundleProto.BundleMetadata, | ||||||||||||||||||||||||||||||||
private localStore: LocalStore | ||||||||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||||||||
this.progress = bundleInitialProgress(metadata); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||
* Adds an element from the bundle to the loader. | ||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||
* Returns a new progress if adding the element leads to a new progress, | ||||||||||||||||||||||||||||||||
* otherwise returns null. | ||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||
addSizedElement( | ||||||||||||||||||||||||||||||||
element: SizedBundleElement | ||||||||||||||||||||||||||||||||
): firestore.LoadBundleTaskProgress | null { | ||||||||||||||||||||||||||||||||
debugAssert(!element.isBundleMetadata(), 'Unexpected bundle metadata.'); | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
this.progress.bytesLoaded += element.byteLength; | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
let documentsLoaded = this.progress.documentsLoaded; | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
if (element.payload.namedQuery) { | ||||||||||||||||||||||||||||||||
this.queries.push(element.payload.namedQuery); | ||||||||||||||||||||||||||||||||
} else if (element.payload.documentMetadata) { | ||||||||||||||||||||||||||||||||
this.documents.push({ metadata: element.payload.documentMetadata }); | ||||||||||||||||||||||||||||||||
if (!element.payload.documentMetadata.exists) { | ||||||||||||||||||||||||||||||||
++documentsLoaded; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} else if (element.payload.document) { | ||||||||||||||||||||||||||||||||
debugAssert( | ||||||||||||||||||||||||||||||||
this.documents.length > 0 && | ||||||||||||||||||||||||||||||||
this.documents[this.documents.length - 1].metadata.name === | ||||||||||||||||||||||||||||||||
element.payload.document.name, | ||||||||||||||||||||||||||||||||
'The document being added does not match the stored metadata.' | ||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||
this.documents[this.documents.length - 1].document = | ||||||||||||||||||||||||||||||||
element.payload.document; | ||||||||||||||||||||||||||||||||
++documentsLoaded; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
if (documentsLoaded !== this.progress.documentsLoaded) { | ||||||||||||||||||||||||||||||||
this.progress.documentsLoaded = documentsLoaded; | ||||||||||||||||||||||||||||||||
return { ...this.progress }; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
return null; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||
* Update the progress to 'Success' and return the updated progress. | ||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||
async complete(): Promise<LoadResult> { | ||||||||||||||||||||||||||||||||
const lastDocument = | ||||||||||||||||||||||||||||||||
this.documents.length === 0 | ||||||||||||||||||||||||||||||||
? null | ||||||||||||||||||||||||||||||||
: this.documents[this.documents.length - 1]; | ||||||||||||||||||||||||||||||||
debugAssert( | ||||||||||||||||||||||||||||||||
!!lastDocument || | ||||||||||||||||||||||||||||||||
!lastDocument!.metadata.exists || | ||||||||||||||||||||||||||||||||
(!!lastDocument!.metadata.exists && !!lastDocument!.document), | ||||||||||||||||||||||||||||||||
'Bundled documents ends with a document metadata.' | ||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
for (const q of this.queries) { | ||||||||||||||||||||||||||||||||
await saveNamedQuery(this.localStore, q); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
let changedDocs; | ||||||||||||||||||||||||||||||||
if (this.documents.length > 0) { | ||||||||||||||||||||||||||||||||
changedDocs = await applyBundleDocuments(this.localStore, this.documents); | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How often do you think bundles will be empty? If this is rare, then we could just call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
this.progress.taskState = 'Success'; | ||||||||||||||||||||||||||||||||
return new LoadResult({ ...this.progress }, changedDocs); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
* limitations under the License. | ||
*/ | ||
|
||
import * as firestore from '@firebase/firestore-types'; | ||
import { CredentialsProvider } from '../api/credentials'; | ||
import { User } from '../auth/user'; | ||
import { LocalStore } from '../local/local_store'; | ||
|
@@ -26,15 +27,15 @@ import { newDatastore } from '../remote/datastore'; | |
import { RemoteStore } from '../remote/remote_store'; | ||
import { AsyncQueue, wrapInUserErrorIfRecoverable } from '../util/async_queue'; | ||
import { Code, FirestoreError } from '../util/error'; | ||
import { logDebug } from '../util/log'; | ||
import { logDebug, logWarn } from '../util/log'; | ||
import { Deferred } from '../util/promise'; | ||
import { | ||
EventManager, | ||
ListenOptions, | ||
Observer, | ||
QueryListener | ||
} from './event_manager'; | ||
import { SyncEngine } from './sync_engine'; | ||
import { SyncEngine, loadBundle } from './sync_engine'; | ||
import { View } from './view'; | ||
|
||
import { SharedClientState } from '../local/shared_client_state'; | ||
|
@@ -47,8 +48,11 @@ import { | |
ComponentProvider, | ||
MemoryComponentProvider | ||
} from './component_provider'; | ||
import { BundleReader } from '../util/bundle_reader'; | ||
import { LoadBundleTask } from '../api/bundle'; | ||
import { newConnection } from '../platform/connection'; | ||
import { newSerializer } from '../platform/serializer'; | ||
import { toByteStreamReader } from '../platform/byte_stream_reader'; | ||
|
||
const LOG_TAG = 'FirestoreClient'; | ||
const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100; | ||
|
@@ -512,4 +516,27 @@ export class FirestoreClient { | |
}); | ||
return deferred.promise; | ||
} | ||
|
||
loadBundle( | ||
data: ReadableStream<Uint8Array> | ArrayBuffer | string | ||
): firestore.LoadBundleTask { | ||
this.verifyNotTerminated(); | ||
|
||
let content: ReadableStream<Uint8Array> | ArrayBuffer; | ||
if (typeof data === 'string') { | ||
content = new TextEncoder().encode(data); | ||
} else { | ||
content = data; | ||
} | ||
const reader = new BundleReader(toByteStreamReader(content)); | ||
const task = new LoadBundleTask(); | ||
this.asyncQueue.enqueueAndForget(async () => { | ||
loadBundle(this.syncEngine, reader, task); | ||
return task.catch(e => { | ||
logWarn(`Loading bundle failed with ${e}`); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should add a LOG_TAG (see most other log lines). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
}); | ||
}); | ||
|
||
return task; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, should we include an overload that takes an Observer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's actually in the API doc. I will add it in a followup PR.