Skip to content

Commit 70f1311

Browse files
Code complete, need to fix Event
anager tests by adding callback interface
1 parent 9b018db commit 70f1311

File tree

11 files changed

+1324
-1288
lines changed

11 files changed

+1324
-1288
lines changed

packages/firestore/exp/dependencies.json

+422-640
Large diffs are not rendered by default.

packages/firestore/exp/src/api/components.ts

+7-4
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@ import {
2323
OfflineComponentProvider,
2424
OnlineComponentProvider
2525
} from '../../../src/core/component_provider';
26-
import { LocalStore } from '../../../src/local/local_store';
26+
import {handleUserChange, LocalStore} from '../../../src/local/local_store';
2727
import { Deferred } from '../../../src/util/promise';
2828
import { logDebug } from '../../../src/util/log';
2929
import { SyncEngine } from '../../../src/core/sync_engine';
30-
import { RemoteStore } from '../../../src/remote/remote_store';
30+
import {
31+
RemoteStore,
32+
remoteStoreHandleCredentialChange
33+
} from '../../../src/remote/remote_store';
3134
import { Persistence } from '../../../src/local/persistence';
3235
import { EventManager } from '../../../src/core/event_manager';
3336
export const LOG_TAG = 'ComponentProvider';
@@ -65,7 +68,7 @@ export async function setOfflineComponentProvider(
6568
firestore._queue.enqueueAndForget(() =>
6669
// TODO(firestorexp): Make sure handleUserChange is a no-op if user
6770
// didn't change
68-
offlineComponentProvider.localStore.handleUserChange(user)
71+
handleUserChange(offlineComponentProvider.localStore, user)
6972
)
7073
);
7174
// When a user calls clearPersistence() in one client, all other clients
@@ -95,7 +98,7 @@ export async function setOnlineComponentProvider(
9598
// precedence over the offline component provider.
9699
firestore._setCredentialChangeListener(user =>
97100
firestore._queue.enqueueAndForget(() =>
98-
onlineComponentProvider.remoteStore.handleCredentialChange(user)
101+
remoteStoreHandleCredentialChange(onlineComponentProvider.remoteStore, user)
99102
)
100103
);
101104
onlineDeferred.resolve(onlineComponentProvider);

packages/firestore/exp/src/api/database.ts

+15-5
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,13 @@ import {
5555
setOfflineComponentProvider,
5656
setOnlineComponentProvider
5757
} from './components';
58-
5958
import { DEFAULT_HOST, DEFAULT_SSL } from '../../../lite/src/api/components';
6059
import { DatabaseInfo } from '../../../src/core/database_info';
6160
import { AutoId } from '../../../src/util/misc';
6261
import { User } from '../../../src/auth/user';
6362
import { CredentialChangeListener } from '../../../src/api/credentials';
6463
import { logDebug } from '../../../src/util/log';
64+
import { fillWritePipeline } from '../../../src/remote/remote_store';
6565

6666
const LOG_TAG = 'Firestore';
6767

@@ -198,6 +198,8 @@ export function enableIndexedDbPersistence(
198198
// `getOnlineComponentProvider()`
199199
const settings = firestoreImpl._getSettings();
200200

201+
const onlineComponentProvider = new OnlineComponentProvider();
202+
201203
// TODO(firestoreexp): Add forceOwningTab
202204
return setOfflineComponentProvider(
203205
firestoreImpl,
@@ -209,7 +211,12 @@ export function enableIndexedDbPersistence(
209211
forceOwningTab: false
210212
},
211213
new IndexedDbOfflineComponentProvider()
212-
);
214+
)
215+
.then(() =>
216+
setOnlineComponentProvider(firestoreImpl, new OnlineComponentProvider())
217+
)
218+
// Enqueue writes from a previous session
219+
.then(() => fillWritePipeline(onlineComponentProvider.remoteStore));
213220
}
214221

215222
export function enableMultiTabIndexedDbPersistence(
@@ -238,9 +245,12 @@ export function enableMultiTabIndexedDbPersistence(
238245
forceOwningTab: false
239246
},
240247
offlineComponentProvider
241-
).then(() =>
242-
setOnlineComponentProvider(firestoreImpl, onlineComponentProvider)
243-
);
248+
)
249+
.then(() =>
250+
setOnlineComponentProvider(firestoreImpl, onlineComponentProvider)
251+
)
252+
// Enqueue writes from a previous session
253+
.then(() => fillWritePipeline(onlineComponentProvider.remoteStore));
244254
}
245255

246256
export function clearIndexedDbPersistence(

packages/firestore/src/core/component_provider.ts

+10-7
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,15 @@ import {
3333
applyPrimaryState,
3434
applyTargetState,
3535
getActiveClients,
36-
handleCredentialChange,
36+
syncEngineHandleCredentialChange,
3737
newSyncEngine,
3838
SyncEngine
3939
} from './sync_engine';
40-
import { RemoteStore } from '../remote/remote_store';
40+
import {
41+
newRemoteStore,
42+
RemoteStore,
43+
remoteStoreApplyPrimaryState, remoteStoreShutdown
44+
} from '../remote/remote_store';
4145
import { EventManager } from './event_manager';
4246
import { AsyncQueue } from '../util/async_queue';
4347
import { DatabaseId, DatabaseInfo } from './database_info';
@@ -341,13 +345,12 @@ export class OnlineComponentProvider {
341345
OnlineStateSource.SharedClientState
342346
);
343347

344-
this.remoteStore.remoteSyncer.handleCredentialChange = handleCredentialChange.bind(
348+
this.remoteStore.syncEngine.handleCredentialChange = syncEngineHandleCredentialChange.bind(
345349
null,
346350
this.syncEngine
347351
);
348352

349-
await this.remoteStore.start();
350-
await this.remoteStore.applyPrimaryState(this.syncEngine.isPrimaryClient);
353+
await remoteStoreApplyPrimaryState(this.remoteStore, this.syncEngine.isPrimaryClient);
351354
}
352355

353356
createEventManager(cfg: ComponentConfiguration): EventManager {
@@ -361,7 +364,7 @@ export class OnlineComponentProvider {
361364
}
362365

363366
createRemoteStore(cfg: ComponentConfiguration): RemoteStore {
364-
return new RemoteStore(
367+
return newRemoteStore(
365368
this.localStore,
366369
this.datastore,
367370
cfg.asyncQueue,
@@ -388,6 +391,6 @@ export class OnlineComponentProvider {
388391
}
389392

390393
terminate(): Promise<void> {
391-
return this.remoteStore.shutdown();
394+
return remoteStoreShutdown(this.remoteStore);
392395
}
393396
}

packages/firestore/src/core/firestore_client.ts

+14-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@ import { GarbageCollectionScheduler, Persistence } from '../local/persistence';
2828
import { Document, NoDocument } from '../model/document';
2929
import { DocumentKey } from '../model/document_key';
3030
import { Mutation } from '../model/mutation';
31-
import { RemoteStore } from '../remote/remote_store';
31+
import {
32+
fillWritePipeline,
33+
remoteStoreHandleCredentialChange,
34+
RemoteStore,
35+
enableNetwork,
36+
disableNetwork, remoteStoreShutdown
37+
} from '../remote/remote_store';
3238
import { AsyncQueue, wrapInUserErrorIfRecoverable } from '../util/async_queue';
3339
import { Code, FirestoreError } from '../util/error';
3440
import { logDebug } from '../util/log';
@@ -205,7 +211,7 @@ export class FirestoreClient {
205211
).then(this.initializationDone.resolve, this.initializationDone.reject);
206212
} else {
207213
this.asyncQueue.enqueueRetryable(() =>
208-
this.remoteStore.handleCredentialChange(user)
214+
remoteStoreHandleCredentialChange(this.remoteStore, user)
209215
);
210216
}
211217
});
@@ -224,7 +230,7 @@ export class FirestoreClient {
224230
this.verifyNotTerminated();
225231
return this.asyncQueue.enqueue(() => {
226232
this.persistence.setNetworkEnabled(true);
227-
return this.remoteStore.enableNetwork();
233+
return enableNetwork(this.remoteStore);
228234
});
229235
}
230236

@@ -289,6 +295,8 @@ export class FirestoreClient {
289295
await this.terminate();
290296
});
291297

298+
await fillWritePipeline(this.remoteStore);
299+
292300
persistenceResult.resolve();
293301
} catch (error) {
294302
// Regardless of whether or not the retry succeeds, from an user
@@ -368,7 +376,7 @@ export class FirestoreClient {
368376
this.verifyNotTerminated();
369377
return this.asyncQueue.enqueue(() => {
370378
this.persistence.setNetworkEnabled(false);
371-
return this.remoteStore.disableNetwork();
379+
return disableNetwork(this.remoteStore);
372380
});
373381
}
374382

@@ -382,7 +390,7 @@ export class FirestoreClient {
382390
this.gcScheduler.stop();
383391
}
384392

385-
await this.remoteStore.shutdown();
393+
await remoteStoreShutdown(this.remoteStore);
386394
await this.sharedClientState.shutdown();
387395
await this.persistence.shutdown();
388396

@@ -573,7 +581,7 @@ export function enqueueNetworkEnabled(
573581
): Promise<void> {
574582
return asyncQueue.enqueue(() => {
575583
persistence.setNetworkEnabled(enabled);
576-
return enabled ? remoteStore.enableNetwork() : remoteStore.disableNetwork();
584+
return enabled ? enableNetwork(remoteStore) : disableNetwork(remoteStore);
577585
});
578586
}
579587

packages/firestore/src/core/sync_engine.ts

+29-21
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,14 @@ import { DocumentKey } from '../model/document_key';
4848
import { Mutation } from '../model/mutation';
4949
import { BATCHID_UNKNOWN, MutationBatchResult } from '../model/mutation_batch';
5050
import { RemoteEvent, TargetChange } from '../remote/remote_event';
51-
import { RemoteStore } from '../remote/remote_store';
51+
import {
52+
canUseNetwork,
53+
fillWritePipeline,
54+
RemoteStore,
55+
remoteStoreApplyPrimaryState,
56+
remoteStoreListen,
57+
remoteStoreUnlisten
58+
} from '../remote/remote_store';
5259
import { debugAssert, debugCast, fail, hardAssert } from '../util/assert';
5360
import { Code, FirestoreError } from '../util/error';
5461
import { logDebug } from '../util/log';
@@ -338,7 +345,7 @@ export async function syncEngineListen(
338345
status === 'current'
339346
);
340347
if (syncEngineImpl.isPrimaryClient) {
341-
syncEngineImpl.remoteStore.listen(targetData);
348+
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
342349
}
343350
}
344351

@@ -439,7 +446,7 @@ export async function syncEngineUnlisten(
439446
)
440447
.then(() => {
441448
syncEngineImpl.sharedClientState.clearQueryState(queryView.targetId);
442-
syncEngineImpl.remoteStore.unlisten(queryView.targetId);
449+
remoteStoreUnlisten(syncEngineImpl.remoteStore, queryView.targetId);
443450
removeAndCleanupTarget(syncEngineImpl, queryView.targetId);
444451
})
445452
.catch(ignoreIfPrimaryLeaseLoss);
@@ -477,7 +484,7 @@ export async function syncEngineWrite(
477484
syncEngineImpl.sharedClientState.addPendingMutation(result.batchId);
478485
addMutationCallback(syncEngineImpl, result.batchId, userCallback);
479486
await emitNewSnapsAndNotifyLocalStore(syncEngineImpl, result.changes);
480-
await syncEngineImpl.remoteStore.fillWritePipeline();
487+
await fillWritePipeline(syncEngineImpl.remoteStore);
481488
} catch (e) {
482489
// If we can't persist the mutation, we reject the user callback and
483490
// don't send the mutation. The user can then retry the write.
@@ -725,7 +732,7 @@ export async function registerPendingWritesCallback(
725732
callback: Deferred<void>
726733
): Promise<void> {
727734
const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl);
728-
if (!syncEngineImpl.remoteStore.canUseNetwork()) {
735+
if (!canUseNetwork(syncEngineImpl.remoteStore)) {
729736
logDebug(
730737
LOG_TAG,
731738
'The network is disabled. The task returned by ' +
@@ -888,7 +895,7 @@ function removeLimboTarget(
888895
return;
889896
}
890897

891-
syncEngineImpl.remoteStore.unlisten(limboTargetId);
898+
remoteStoreUnlisten(syncEngineImpl.remoteStore, limboTargetId);
892899
syncEngineImpl.activeLimboTargetsByKey = syncEngineImpl.activeLimboTargetsByKey.remove(
893900
key
894901
);
@@ -960,7 +967,8 @@ function pumpEnqueuedLimboResolutions(syncEngineImpl: SyncEngineImpl): void {
960967
key,
961968
limboTargetId
962969
);
963-
syncEngineImpl.remoteStore.listen(
970+
remoteStoreListen(
971+
syncEngineImpl.remoteStore,
964972
new TargetData(
965973
queryToTarget(newQueryForPath(key.path)),
966974
limboTargetId,
@@ -1064,7 +1072,7 @@ async function applyDocChanges(
10641072
return viewChange.snapshot;
10651073
}
10661074

1067-
export async function handleCredentialChange(
1075+
export async function syncEngineHandleCredentialChange(
10681076
syncEngine: SyncEngine,
10691077
user: User
10701078
): Promise<void> {
@@ -1181,7 +1189,7 @@ export async function applyBatchState(
11811189
// If we are the primary client, we need to send this write to the
11821190
// backend. Secondary clients will ignore these writes since their remote
11831191
// connection is disabled.
1184-
await syncEngineImpl.remoteStore.fillWritePipeline();
1192+
await fillWritePipeline(syncEngineImpl.remoteStore);
11851193
} else if (batchState === 'acknowledged' || batchState === 'rejected') {
11861194
// NOTE: Both these methods are no-ops for batches that originated from
11871195
// other clients.
@@ -1217,9 +1225,9 @@ export async function applyPrimaryState(
12171225
/*transitionToPrimary=*/ true
12181226
);
12191227
syncEngineImpl._isPrimaryClient = true;
1220-
await syncEngineImpl.remoteStore.applyPrimaryState(true);
1228+
await remoteStoreApplyPrimaryState(syncEngineImpl.remoteStore, true);
12211229
for (const targetData of activeQueries) {
1222-
syncEngineImpl.remoteStore.listen(targetData);
1230+
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
12231231
}
12241232
} else if (isPrimary === false && syncEngineImpl._isPrimaryClient !== false) {
12251233
const activeTargets: TargetId[] = [];
@@ -1238,7 +1246,7 @@ export async function applyPrimaryState(
12381246
);
12391247
});
12401248
}
1241-
syncEngineImpl.remoteStore.unlisten(targetId);
1249+
remoteStoreUnlisten(syncEngineImpl.remoteStore, targetId);
12421250
});
12431251
await p;
12441252

@@ -1249,15 +1257,15 @@ export async function applyPrimaryState(
12491257
);
12501258
resetLimboDocuments(syncEngineImpl);
12511259
syncEngineImpl._isPrimaryClient = false;
1252-
await syncEngineImpl.remoteStore.applyPrimaryState(false);
1260+
await remoteStoreApplyPrimaryState(syncEngineImpl.remoteStore, false);
12531261
}
12541262
}
12551263

12561264
// PORTING NOTE: Multi-Tab only.
12571265
function resetLimboDocuments(syncEngine: SyncEngine): void {
12581266
const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl);
12591267
syncEngineImpl.activeLimboResolutionsByTarget.forEach((_, targetId) => {
1260-
syncEngineImpl.remoteStore.unlisten(targetId);
1268+
remoteStoreUnlisten(syncEngineImpl.remoteStore, targetId);
12611269
});
12621270
syncEngineImpl.limboDocumentRefs.removeAllReferences();
12631271
syncEngineImpl.activeLimboResolutionsByTarget = new Map<
@@ -1447,7 +1455,7 @@ export async function applyActiveTargetsChange(
14471455
targetData.targetId,
14481456
/*current=*/ false
14491457
);
1450-
syncEngineImpl.remoteStore.listen(targetData);
1458+
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
14511459
}
14521460

14531461
for (const targetId of removed) {
@@ -1464,7 +1472,7 @@ export async function applyActiveTargetsChange(
14641472
/* keepPersistedTargetData */ false
14651473
)
14661474
.then(() => {
1467-
syncEngineImpl.remoteStore.unlisten(targetId);
1475+
remoteStoreUnlisten(syncEngineImpl.remoteStore, targetId);
14681476
removeAndCleanupTarget(syncEngineImpl, targetId);
14691477
})
14701478
.catch(ignoreIfPrimaryLeaseLoss);
@@ -1473,15 +1481,15 @@ export async function applyActiveTargetsChange(
14731481

14741482
function ensureWatchCallbacks(syncEngine: SyncEngine): SyncEngineImpl {
14751483
const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl);
1476-
syncEngineImpl.remoteStore.remoteSyncer.applyRemoteEvent = applyRemoteEvent.bind(
1484+
syncEngineImpl.remoteStore.syncEngine.applyRemoteEvent = applyRemoteEvent.bind(
14771485
null,
14781486
syncEngineImpl
14791487
);
1480-
syncEngineImpl.remoteStore.remoteSyncer.getRemoteKeysForTarget = getRemoteKeysForTarget.bind(
1488+
syncEngineImpl.remoteStore.syncEngine.getRemoteKeysForTarget = getRemoteKeysForTarget.bind(
14811489
null,
14821490
syncEngineImpl
14831491
);
1484-
syncEngineImpl.remoteStore.remoteSyncer.rejectListen = rejectListen.bind(
1492+
syncEngineImpl.remoteStore.syncEngine.rejectListen = rejectListen.bind(
14851493
null,
14861494
syncEngineImpl
14871495
);
@@ -1490,11 +1498,11 @@ function ensureWatchCallbacks(syncEngine: SyncEngine): SyncEngineImpl {
14901498

14911499
function ensureWriteCallbacks(syncEngine: SyncEngine): SyncEngineImpl {
14921500
const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl);
1493-
syncEngineImpl.remoteStore.remoteSyncer.applySuccessfulWrite = applySuccessfulWrite.bind(
1501+
syncEngineImpl.remoteStore.syncEngine.applySuccessfulWrite = applySuccessfulWrite.bind(
14941502
null,
14951503
syncEngineImpl
14961504
);
1497-
syncEngineImpl.remoteStore.remoteSyncer.rejectFailedWrite = rejectFailedWrite.bind(
1505+
syncEngineImpl.remoteStore.syncEngine.rejectFailedWrite = rejectFailedWrite.bind(
14981506
null,
14991507
syncEngineImpl
15001508
);

0 commit comments

Comments
 (0)