Skip to content

Take RemoteStore offline during user change #3193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ export class FirestoreClient {
persistenceResult
).then(initializationDone.resolve, initializationDone.reject);
} else {
this.asyncQueue.enqueueRetryable(() => {
return this.handleCredentialChange(user);
});
this.asyncQueue.enqueueRetryable(() =>
this.remoteStore.handleCredentialChange(user)
);
}
});

Expand Down Expand Up @@ -339,13 +339,6 @@ export class FirestoreClient {
}
}

private handleCredentialChange(user: User): Promise<void> {
this.asyncQueue.verifyOperationInProgress();

logDebug(LOG_TAG, 'Credential Changed. Current user: ' + user.uid);
return this.syncEngine.handleCredentialChange(user);
}

/** Disables the network connection. Pending operations will not complete. */
disableNetwork(): Promise<void> {
this.verifyNotTerminated();
Expand Down
4 changes: 2 additions & 2 deletions packages/firestore/src/core/sync_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,8 @@ export class SyncEngine implements RemoteSyncer {
const userChanged = !this.currentUser.isEqual(user);

if (userChanged) {
logDebug(LOG_TAG, 'User change. New user:', user.toKey());

const result = await this.localStore.handleUserChange(user);
this.currentUser = user;

Expand All @@ -879,8 +881,6 @@ export class SyncEngine implements RemoteSyncer {
);
await this.emitNewSnapsAndNotifyLocalStore(result.affectedDocuments);
}

await this.remoteStore.handleCredentialChange();
}

enableNetwork(): Promise<void> {
Expand Down
82 changes: 49 additions & 33 deletions packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,29 @@ import {
} from './watch_change';
import { ByteString } from '../util/byte_string';
import { isIndexedDbTransactionError } from '../local/simple_db';
import { User } from '../auth/user';

const LOG_TAG = 'RemoteStore';

// TODO(b/35853402): Negotiate this with the stream.
const MAX_PENDING_WRITES = 10;

/** Reasons for why the RemoteStore may be offline. */
const enum OfflineCause {
/** The user has explicitly disabled the network (via `disableNetwork()`). */
UserDisabled,
/** An IndexedDb failure occurred while persisting a stream update. */
IndexedDbFailed,
/** The tab is not the primary tab (only relevant with multi-tab). */
IsSecondary,
/** We are restarting the streams due to an Auth credential change. */
CredentialChange,
/** The connectivity state of the environment has changed. */
ConnectivityChange,
/** The RemoteStore has been shut down. */
Shutdown
}

/**
* RemoteStore - An interface to remotely stored data, basically providing a
* wrapper around the Datastore that is more reliable for the rest of the
Expand Down Expand Up @@ -117,19 +134,10 @@ export class RemoteStore implements TargetMetadataProvider {
private watchChangeAggregator: WatchChangeAggregator | null = null;

/**
* Set to true by enableNetwork() and false by disableNetwork() and indicates
* the user-preferred network state.
* A set of reasons for why the RemoteStore may be offline. If empty, the
* RemoteStore may start its network connections.
*/
private networkEnabled = false;

private isPrimary = false;

/**
* When set to `true`, the network was taken offline due to an IndexedDB
* failure. The state is flipped to `false` when access becomes available
* again.
*/
private indexedDbFailed = false;
private offlineCauses = new Set<OfflineCause>();

private onlineStateTracker: OnlineStateTracker;

Expand Down Expand Up @@ -193,7 +201,7 @@ export class RemoteStore implements TargetMetadataProvider {

/** Re-enables the network. Idempotent. */
enableNetwork(): Promise<void> {
this.networkEnabled = true;
this.offlineCauses.delete(OfflineCause.UserDisabled);
return this.enableNetworkInternal();
}

Expand All @@ -215,7 +223,7 @@ export class RemoteStore implements TargetMetadataProvider {
* enableNetwork().
*/
async disableNetwork(): Promise<void> {
this.networkEnabled = false;
this.offlineCauses.add(OfflineCause.UserDisabled);
await this.disableNetworkInternal();

// Set the OnlineState to Offline so get()s return from cache, etc.
Expand All @@ -239,7 +247,7 @@ export class RemoteStore implements TargetMetadataProvider {

async shutdown(): Promise<void> {
logDebug(LOG_TAG, 'RemoteStore shutting down.');
this.networkEnabled = false;
this.offlineCauses.add(OfflineCause.Shutdown);
await this.disableNetworkInternal();
this.connectivityMonitor.shutdown();

Expand Down Expand Up @@ -348,7 +356,7 @@ export class RemoteStore implements TargetMetadataProvider {
}

canUseNetwork(): boolean {
return !this.indexedDbFailed && this.isPrimary && this.networkEnabled;
return this.offlineCauses.size === 0;
}

private cleanUpWatchStreamState(): void {
Expand Down Expand Up @@ -456,10 +464,10 @@ export class RemoteStore implements TargetMetadataProvider {
): Promise<void> {
if (isIndexedDbTransactionError(e)) {
debugAssert(
!this.indexedDbFailed,
!this.offlineCauses.has(OfflineCause.IndexedDbFailed),
'Unexpected network event when IndexedDB was marked failed.'
);
this.indexedDbFailed = true;
this.offlineCauses.add(OfflineCause.IndexedDbFailed);

// Disable network and raise offline snapshots
await this.disableNetworkInternal();
Expand All @@ -476,7 +484,7 @@ export class RemoteStore implements TargetMetadataProvider {
this.asyncQueue.enqueueRetryable(async () => {
logDebug(LOG_TAG, 'Retrying IndexedDB access');
await op!();
this.indexedDbFailed = false;
this.offlineCauses.delete(OfflineCause.IndexedDbFailed);
await this.enableNetworkInternal();
});
} else {
Expand Down Expand Up @@ -750,31 +758,39 @@ export class RemoteStore implements TargetMetadataProvider {
}

private async restartNetwork(): Promise<void> {
this.networkEnabled = false;
this.offlineCauses.add(OfflineCause.ConnectivityChange);
await this.disableNetworkInternal();
this.onlineStateTracker.set(OnlineState.Unknown);
await this.enableNetwork();
this.offlineCauses.delete(OfflineCause.ConnectivityChange);
await this.enableNetworkInternal();
}

async handleCredentialChange(): Promise<void> {
if (this.canUseNetwork()) {
// Tear down and re-create our network streams. This will ensure we get a fresh auth token
// for the new user and re-fill the write pipeline with new mutations from the LocalStore
// (since mutations are per-user).
logDebug(LOG_TAG, 'RemoteStore restarting streams for new credential');
await this.restartNetwork();
}
async handleCredentialChange(user: User): Promise<void> {
this.asyncQueue.verifyOperationInProgress();

// Tear down and re-create our network streams. This will ensure we get a
// fresh auth token for the new user and re-fill the write pipeline with
// new mutations from the LocalStore (since mutations are per-user).
logDebug(LOG_TAG, 'RemoteStore received new credentials');
this.offlineCauses.add(OfflineCause.CredentialChange);

await this.disableNetworkInternal();
this.onlineStateTracker.set(OnlineState.Unknown);
await this.syncEngine.handleCredentialChange(user);

this.offlineCauses.delete(OfflineCause.CredentialChange);
await this.enableNetworkInternal();
}

/**
* Toggles the network state when the client gains or loses its primary lease.
*/
async applyPrimaryState(isPrimary: boolean): Promise<void> {
this.isPrimary = isPrimary;

if (isPrimary && this.networkEnabled) {
await this.enableNetwork();
if (isPrimary) {
this.offlineCauses.delete(OfflineCause.IsSecondary);
await this.enableNetworkInternal();
} else if (!isPrimary) {
this.offlineCauses.add(OfflineCause.IsSecondary);
await this.disableNetworkInternal();
this.onlineStateTracker.set(OnlineState.Unknown);
}
Expand Down
7 changes: 7 additions & 0 deletions packages/firestore/src/remote/remote_syncer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { DocumentKeySet } from '../model/collections';
import { MutationBatchResult } from '../model/mutation_batch';
import { FirestoreError } from '../util/error';
import { RemoteEvent } from './remote_event';
import { User } from '../auth/user';

/**
* An interface that describes the actions the RemoteStore needs to perform on
Expand Down Expand Up @@ -65,4 +66,10 @@ export interface RemoteSyncer {
* the last snapshot.
*/
getRemoteKeysForTarget(targetId: TargetId): DocumentKeySet;

/**
* Updates all local state to match the pending mutations for the given user.
* May be called repeatedly for the same user.
*/
handleCredentialChange(user: User): Promise<void>;
}
96 changes: 73 additions & 23 deletions packages/firestore/test/unit/specs/recovery_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -721,29 +721,79 @@ describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
{ foo: 'a' },
{ hasLocalMutations: true }
);
return spec()
.changeUser('user1')
.userSets('collection/key1', { foo: 'a' })
.userListens(query)
.expectEvents(query, {
added: [doc1],
fromCache: true,
hasPendingWrites: true
})
.failDatabaseTransactions('Handle user change')
.changeUser('user2')
.recoverDatabase()
.runTimer(TimerId.AsyncQueueRetry)
.expectEvents(query, { removed: [doc1], fromCache: true })
.failDatabaseTransactions('Handle user change')
.changeUser('user1')
.recoverDatabase()
.runTimer(TimerId.AsyncQueueRetry)
.expectEvents(query, {
added: [doc1],
fromCache: true,
hasPendingWrites: true
});
return (
spec()
.changeUser('user1')
.userSets('collection/key1', { foo: 'a' })
.userListens(query)
.expectEvents(query, {
added: [doc1],
fromCache: true,
hasPendingWrites: true
})
.failDatabaseTransactions('Handle user change')
.changeUser('user2')
// The network is offline due to the failed user change
.expectActiveTargets()
.recoverDatabase()
.runTimer(TimerId.AsyncQueueRetry)
.expectActiveTargets({ query })
.expectEvents(query, { removed: [doc1], fromCache: true })
.failDatabaseTransactions('Handle user change')
.changeUser('user1')
// The network is offline due to the failed user change
.expectActiveTargets()
.recoverDatabase()
.runTimer(TimerId.AsyncQueueRetry)
.expectActiveTargets({ query })
.expectEvents(query, {
added: [doc1],
fromCache: true,
hasPendingWrites: true
})
);
}
);

specTest(
'Multiple user changes during transaction failure (with recovery)',
['durable-persistence'],
() => {
const query = Query.atPath(path('collection'));
const doc1 = doc(
'collection/key1',
0,
{ foo: 'a' },
{ hasLocalMutations: true }
);
return (
spec()
.changeUser('user1')
.userSets('collection/key1', { foo: 'a' })
.userListens(query)
.expectEvents(query, {
added: [doc1],
fromCache: true,
hasPendingWrites: true
})
// Change the user to user2 and back to user1 while IndexedDB is failed
.failDatabaseTransactions('Handle user change')
.changeUser('user2')
// The network is offline due to the failed user change
.expectActiveTargets()
.changeUser('user1')
.recoverDatabase()
.runTimer(TimerId.AsyncQueueRetry)
.expectActiveTargets({ query })
// We are now user 2
.expectEvents(query, { removed: [doc1], fromCache: true })
// We are now user 1
.expectEvents(query, {
added: [doc1],
fromCache: true,
hasPendingWrites: true
})
);
}
);
});
17 changes: 6 additions & 11 deletions packages/firestore/test/unit/specs/spec_test_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -716,17 +716,12 @@ abstract class TestRunner {

private async doChangeUser(user: string | null): Promise<void> {
this.user = new User(user);
const deferred = new Deferred<void>();
await this.queue.enqueueRetryable(async () => {
try {
await this.syncEngine.handleCredentialChange(this.user);
} finally {
// Resolve the deferred Promise even if the operation failed. This allows
// the spec tests to manually retry the failed user change.
deferred.resolve();
}
});
return deferred.promise;
// We don't block on `handleUserChange` as it may not get executed
// during an IndexedDb failure. Non-recovery tests will pick up the user
// change when the AsyncQueue is drained.
this.queue.enqueueRetryable(() =>
this.remoteStore.handleCredentialChange(new User(user))
);
}

private async doFailDatabase(
Expand Down