Skip to content

Commit 112b299

Browse files
WIP
1 parent 9b018db commit 112b299

File tree

10 files changed

+1081
-1062
lines changed

10 files changed

+1081
-1062
lines changed

packages/firestore/exp/dependencies.json

+217-434
Large diffs are not rendered by default.

packages/firestore/exp/src/api/database.ts

+13-5
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,13 @@ import {
5555
setOfflineComponentProvider,
5656
setOnlineComponentProvider
5757
} from './components';
58-
5958
import { DEFAULT_HOST, DEFAULT_SSL } from '../../../lite/src/api/components';
6059
import { DatabaseInfo } from '../../../src/core/database_info';
6160
import { AutoId } from '../../../src/util/misc';
6261
import { User } from '../../../src/auth/user';
6362
import { CredentialChangeListener } from '../../../src/api/credentials';
6463
import { logDebug } from '../../../src/util/log';
64+
import { fillWritePipeline } from '../../../src/remote/remote_store';
6565

6666
const LOG_TAG = 'Firestore';
6767

@@ -198,6 +198,8 @@ export function enableIndexedDbPersistence(
198198
// `getOnlineComponentProvider()`
199199
const settings = firestoreImpl._getSettings();
200200

201+
const onlineComponentProvider = new OnlineComponentProvider();
202+
201203
// TODO(firestoreexp): Add forceOwningTab
202204
return setOfflineComponentProvider(
203205
firestoreImpl,
@@ -209,7 +211,11 @@ export function enableIndexedDbPersistence(
209211
forceOwningTab: false
210212
},
211213
new IndexedDbOfflineComponentProvider()
212-
);
214+
)
215+
.then(() =>
216+
setOnlineComponentProvider(firestoreImpl, new OnlineComponentProvider())
217+
)
218+
.then(() => fillWritePipeline(onlineComponentProvider.remoteStore));
213219
}
214220

215221
export function enableMultiTabIndexedDbPersistence(
@@ -238,9 +244,11 @@ export function enableMultiTabIndexedDbPersistence(
238244
forceOwningTab: false
239245
},
240246
offlineComponentProvider
241-
).then(() =>
242-
setOnlineComponentProvider(firestoreImpl, onlineComponentProvider)
243-
);
247+
)
248+
.then(() =>
249+
setOnlineComponentProvider(firestoreImpl, onlineComponentProvider)
250+
)
251+
.then(() => fillWritePipeline(onlineComponentProvider.remoteStore));
244252
}
245253

246254
export function clearIndexedDbPersistence(

packages/firestore/src/core/component_provider.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import {
3737
newSyncEngine,
3838
SyncEngine
3939
} from './sync_engine';
40-
import { RemoteStore } from '../remote/remote_store';
40+
import { newRemoteStore, RemoteStore } from '../remote/remote_store';
4141
import { EventManager } from './event_manager';
4242
import { AsyncQueue } from '../util/async_queue';
4343
import { DatabaseId, DatabaseInfo } from './database_info';
@@ -361,7 +361,7 @@ export class OnlineComponentProvider {
361361
}
362362

363363
createRemoteStore(cfg: ComponentConfiguration): RemoteStore {
364-
return new RemoteStore(
364+
return newRemoteStore(
365365
this.localStore,
366366
this.datastore,
367367
cfg.asyncQueue,

packages/firestore/src/core/firestore_client.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import { GarbageCollectionScheduler, Persistence } from '../local/persistence';
2828
import { Document, NoDocument } from '../model/document';
2929
import { DocumentKey } from '../model/document_key';
3030
import { Mutation } from '../model/mutation';
31-
import { RemoteStore } from '../remote/remote_store';
31+
import { fillWritePipeline, RemoteStore } from '../remote/remote_store';
3232
import { AsyncQueue, wrapInUserErrorIfRecoverable } from '../util/async_queue';
3333
import { Code, FirestoreError } from '../util/error';
3434
import { logDebug } from '../util/log';
@@ -289,6 +289,8 @@ export class FirestoreClient {
289289
await this.terminate();
290290
});
291291

292+
await fillWritePipeline(this.remoteStore);
293+
292294
persistenceResult.resolve();
293295
} catch (error) {
294296
// Regardless of whether or not the retry succeeds, from an user

packages/firestore/src/core/sync_engine.ts

+28-20
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,14 @@ import { DocumentKey } from '../model/document_key';
4848
import { Mutation } from '../model/mutation';
4949
import { BATCHID_UNKNOWN, MutationBatchResult } from '../model/mutation_batch';
5050
import { RemoteEvent, TargetChange } from '../remote/remote_event';
51-
import { RemoteStore } from '../remote/remote_store';
51+
import {
52+
canUseNetwork,
53+
fillWritePipeline,
54+
RemoteStore,
55+
remoteStoreApplyPrimaryState,
56+
remoteStoreListen,
57+
remoteStoreUnlisten
58+
} from '../remote/remote_store';
5259
import { debugAssert, debugCast, fail, hardAssert } from '../util/assert';
5360
import { Code, FirestoreError } from '../util/error';
5461
import { logDebug } from '../util/log';
@@ -338,7 +345,7 @@ export async function syncEngineListen(
338345
status === 'current'
339346
);
340347
if (syncEngineImpl.isPrimaryClient) {
341-
syncEngineImpl.remoteStore.listen(targetData);
348+
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
342349
}
343350
}
344351

@@ -439,7 +446,7 @@ export async function syncEngineUnlisten(
439446
)
440447
.then(() => {
441448
syncEngineImpl.sharedClientState.clearQueryState(queryView.targetId);
442-
syncEngineImpl.remoteStore.unlisten(queryView.targetId);
449+
remoteStoreUnlisten(syncEngineImpl.remoteStore, queryView.targetId);
443450
removeAndCleanupTarget(syncEngineImpl, queryView.targetId);
444451
})
445452
.catch(ignoreIfPrimaryLeaseLoss);
@@ -477,7 +484,7 @@ export async function syncEngineWrite(
477484
syncEngineImpl.sharedClientState.addPendingMutation(result.batchId);
478485
addMutationCallback(syncEngineImpl, result.batchId, userCallback);
479486
await emitNewSnapsAndNotifyLocalStore(syncEngineImpl, result.changes);
480-
await syncEngineImpl.remoteStore.fillWritePipeline();
487+
await fillWritePipeline(syncEngineImpl.remoteStore);
481488
} catch (e) {
482489
// If we can't persist the mutation, we reject the user callback and
483490
// don't send the mutation. The user can then retry the write.
@@ -725,7 +732,7 @@ export async function registerPendingWritesCallback(
725732
callback: Deferred<void>
726733
): Promise<void> {
727734
const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl);
728-
if (!syncEngineImpl.remoteStore.canUseNetwork()) {
735+
if (!canUseNetwork(syncEngineImpl.remoteStore)) {
729736
logDebug(
730737
LOG_TAG,
731738
'The network is disabled. The task returned by ' +
@@ -888,7 +895,7 @@ function removeLimboTarget(
888895
return;
889896
}
890897

891-
syncEngineImpl.remoteStore.unlisten(limboTargetId);
898+
remoteStoreUnlisten(syncEngineImpl.remoteStore, limboTargetId);
892899
syncEngineImpl.activeLimboTargetsByKey = syncEngineImpl.activeLimboTargetsByKey.remove(
893900
key
894901
);
@@ -960,7 +967,8 @@ function pumpEnqueuedLimboResolutions(syncEngineImpl: SyncEngineImpl): void {
960967
key,
961968
limboTargetId
962969
);
963-
syncEngineImpl.remoteStore.listen(
970+
remoteStoreListen(
971+
syncEngineImpl.remoteStore,
964972
new TargetData(
965973
queryToTarget(newQueryForPath(key.path)),
966974
limboTargetId,
@@ -1181,7 +1189,7 @@ export async function applyBatchState(
11811189
// If we are the primary client, we need to send this write to the
11821190
// backend. Secondary clients will ignore these writes since their remote
11831191
// connection is disabled.
1184-
await syncEngineImpl.remoteStore.fillWritePipeline();
1192+
await fillWritePipeline(syncEngineImpl.remoteStore);
11851193
} else if (batchState === 'acknowledged' || batchState === 'rejected') {
11861194
// NOTE: Both these methods are no-ops for batches that originated from
11871195
// other clients.
@@ -1217,9 +1225,9 @@ export async function applyPrimaryState(
12171225
/*transitionToPrimary=*/ true
12181226
);
12191227
syncEngineImpl._isPrimaryClient = true;
1220-
await syncEngineImpl.remoteStore.applyPrimaryState(true);
1228+
await remoteStoreApplyPrimaryState(syncEngineImpl.remoteStore, true);
12211229
for (const targetData of activeQueries) {
1222-
syncEngineImpl.remoteStore.listen(targetData);
1230+
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
12231231
}
12241232
} else if (isPrimary === false && syncEngineImpl._isPrimaryClient !== false) {
12251233
const activeTargets: TargetId[] = [];
@@ -1238,7 +1246,7 @@ export async function applyPrimaryState(
12381246
);
12391247
});
12401248
}
1241-
syncEngineImpl.remoteStore.unlisten(targetId);
1249+
remoteStoreUnlisten(syncEngineImpl.remoteStore, targetId);
12421250
});
12431251
await p;
12441252

@@ -1249,15 +1257,15 @@ export async function applyPrimaryState(
12491257
);
12501258
resetLimboDocuments(syncEngineImpl);
12511259
syncEngineImpl._isPrimaryClient = false;
1252-
await syncEngineImpl.remoteStore.applyPrimaryState(false);
1260+
await remoteStoreApplyPrimaryState(syncEngineImpl.remoteStore, false);
12531261
}
12541262
}
12551263

12561264
// PORTING NOTE: Multi-Tab only.
12571265
function resetLimboDocuments(syncEngine: SyncEngine): void {
12581266
const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl);
12591267
syncEngineImpl.activeLimboResolutionsByTarget.forEach((_, targetId) => {
1260-
syncEngineImpl.remoteStore.unlisten(targetId);
1268+
remoteStoreUnlisten(syncEngineImpl.remoteStore, targetId);
12611269
});
12621270
syncEngineImpl.limboDocumentRefs.removeAllReferences();
12631271
syncEngineImpl.activeLimboResolutionsByTarget = new Map<
@@ -1447,7 +1455,7 @@ export async function applyActiveTargetsChange(
14471455
targetData.targetId,
14481456
/*current=*/ false
14491457
);
1450-
syncEngineImpl.remoteStore.listen(targetData);
1458+
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
14511459
}
14521460

14531461
for (const targetId of removed) {
@@ -1464,7 +1472,7 @@ export async function applyActiveTargetsChange(
14641472
/* keepPersistedTargetData */ false
14651473
)
14661474
.then(() => {
1467-
syncEngineImpl.remoteStore.unlisten(targetId);
1475+
remoteStoreUnlisten(syncEngineImpl.remoteStore, targetId);
14681476
removeAndCleanupTarget(syncEngineImpl, targetId);
14691477
})
14701478
.catch(ignoreIfPrimaryLeaseLoss);
@@ -1473,15 +1481,15 @@ export async function applyActiveTargetsChange(
14731481

14741482
function ensureWatchCallbacks(syncEngine: SyncEngine): SyncEngineImpl {
14751483
const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl);
1476-
syncEngineImpl.remoteStore.remoteSyncer.applyRemoteEvent = applyRemoteEvent.bind(
1484+
syncEngineImpl.remoteStore.syncEngine.applyRemoteEvent = applyRemoteEvent.bind(
14771485
null,
14781486
syncEngineImpl
14791487
);
1480-
syncEngineImpl.remoteStore.remoteSyncer.getRemoteKeysForTarget = getRemoteKeysForTarget.bind(
1488+
syncEngineImpl.remoteStore.syncEngine.getRemoteKeysForTarget = getRemoteKeysForTarget.bind(
14811489
null,
14821490
syncEngineImpl
14831491
);
1484-
syncEngineImpl.remoteStore.remoteSyncer.rejectListen = rejectListen.bind(
1492+
syncEngineImpl.remoteStore.syncEngine.rejectListen = rejectListen.bind(
14851493
null,
14861494
syncEngineImpl
14871495
);
@@ -1490,11 +1498,11 @@ function ensureWatchCallbacks(syncEngine: SyncEngine): SyncEngineImpl {
14901498

14911499
function ensureWriteCallbacks(syncEngine: SyncEngine): SyncEngineImpl {
14921500
const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl);
1493-
syncEngineImpl.remoteStore.remoteSyncer.applySuccessfulWrite = applySuccessfulWrite.bind(
1501+
syncEngineImpl.remoteStore.syncEngine.applySuccessfulWrite = applySuccessfulWrite.bind(
14941502
null,
14951503
syncEngineImpl
14961504
);
1497-
syncEngineImpl.remoteStore.remoteSyncer.rejectFailedWrite = rejectFailedWrite.bind(
1505+
syncEngineImpl.remoteStore.syncEngine.rejectFailedWrite = rejectFailedWrite.bind(
14981506
null,
14991507
syncEngineImpl
15001508
);

0 commit comments

Comments
 (0)