Skip to content

Commit b9b5a43

Browse files
Merge
1 parent 871d955 commit b9b5a43

12 files changed

+270
-22
lines changed

packages/firestore/src/local/indexeddb_mutation_queue.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import { BATCHID_UNKNOWN, MutationBatch } from '../model/mutation_batch';
2626
import { ResourcePath } from '../model/path';
2727
import { debugAssert, fail, hardAssert } from '../util/assert';
2828
import { primitiveComparator } from '../util/misc';
29+
import { ByteString } from '../util/byte_string';
2930
import { SortedMap } from '../util/sorted_map';
3031
import { SortedSet } from '../util/sorted_set';
3132
import { decodeResourcePath } from './encoded_resource_path';
@@ -117,6 +118,40 @@ export class IndexedDbMutationQueue implements MutationQueue {
117118
.next(() => empty);
118119
}
119120

121+
acknowledgeBatch(
122+
transaction: PersistenceTransaction,
123+
batch: MutationBatch,
124+
streamToken: ByteString
125+
): PersistencePromise<void> {
126+
return this.getMutationQueueMetadata(transaction).next(metadata => {
127+
// We can't store the resumeToken as a ByteString in IndexedDB, so we
128+
// convert it to a Base64 string for storage.
129+
metadata.lastStreamToken = streamToken.toBase64();
130+
131+
return mutationQueuesStore(transaction).put(metadata);
132+
});
133+
}
134+
135+
getLastStreamToken(
136+
transaction: PersistenceTransaction
137+
): PersistencePromise<ByteString> {
138+
return this.getMutationQueueMetadata(transaction).next<ByteString>(
139+
metadata => ByteString.fromBase64String(metadata.lastStreamToken)
140+
);
141+
}
142+
143+
setLastStreamToken(
144+
transaction: PersistenceTransaction,
145+
streamToken: ByteString
146+
): PersistencePromise<void> {
147+
return this.getMutationQueueMetadata(transaction).next(metadata => {
148+
// We can't store the resumeToken as a ByteString in IndexedDB, so we
149+
// convert it to a Base64 string for storage.
150+
metadata.lastStreamToken = streamToken.toBase64();
151+
return mutationQueuesStore(transaction).put(metadata);
152+
});
153+
}
154+
120155
addMutationBatch(
121156
transaction: PersistenceTransaction,
122157
localWriteTime: Timestamp,

packages/firestore/src/local/indexeddb_schema.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,8 +423,6 @@ export class DbMutationQueue {
423423
*
424424
* After sending this token, earlier tokens may not be used anymore so
425425
* only a single stream token is retained.
426-
*
427-
* NOTE: this is deprecated and no longer used by the code.
428426
*/
429427
public lastStreamToken: string
430428
) {}

packages/firestore/src/local/local_store.ts

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import { RemoteDocumentCache } from './remote_document_cache';
6060
import { RemoteDocumentChangeBuffer } from './remote_document_change_buffer';
6161
import { ClientId } from './shared_client_state';
6262
import { TargetData, TargetPurpose } from './target_data';
63+
import { ByteString } from '../util/byte_string';
6364
import { IndexedDbPersistence } from './indexeddb_persistence';
6465
import { IndexedDbMutationQueue } from './indexeddb_mutation_queue';
6566
import { IndexedDbRemoteDocumentCache } from './indexeddb_remote_document_cache';
@@ -377,11 +378,11 @@ export class LocalStore {
377378
const documentBuffer = this.remoteDocuments.newChangeBuffer({
378379
trackRemovals: true // Make sure document removals show up in `getNewDocumentChanges()`
379380
});
380-
return this.applyWriteToRemoteDocuments(
381-
txn,
382-
batchResult,
383-
documentBuffer
384-
)
381+
return this.mutationQueue
382+
.acknowledgeBatch(txn, batchResult.batch, batchResult.streamToken)
383+
.next(() =>
384+
this.applyWriteToRemoteDocuments(txn, batchResult, documentBuffer)
385+
)
385386
.next(() => documentBuffer.apply(txn))
386387
.next(() => this.mutationQueue.performConsistencyCheck(txn))
387388
.next(() => this.localDocuments.getDocuments(txn, affected));
@@ -432,6 +433,32 @@ export class LocalStore {
432433
);
433434
}
434435

436+
/** Returns the last recorded stream token for the current user. */
437+
getLastStreamToken(): Promise<ByteString> {
438+
return this.persistence.runTransaction(
439+
'Get last stream token',
440+
'readonly',
441+
txn => {
442+
return this.mutationQueue.getLastStreamToken(txn);
443+
}
444+
);
445+
}
446+
447+
/**
448+
* Sets the stream token for the current user without acknowledging any
449+
* mutation batch. This is usually only useful after a stream handshake or in
450+
* response to an error that requires clearing the stream token.
451+
*/
452+
setLastStreamToken(streamToken: ByteString): Promise<void> {
453+
return this.persistence.runTransaction(
454+
'Set last stream token',
455+
'readwrite-primary',
456+
txn => {
457+
return this.mutationQueue.setLastStreamToken(txn, streamToken);
458+
}
459+
);
460+
}
461+
435462
/**
436463
* Returns the last consistent snapshot processed (used by the RemoteStore to
437464
* determine whether to buffer incoming snapshots from the backend).

packages/firestore/src/local/memory_mutation_queue.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { Mutation } from '../model/mutation';
2323
import { MutationBatch, BATCHID_UNKNOWN } from '../model/mutation_batch';
2424
import { debugAssert, hardAssert } from '../util/assert';
2525
import { primitiveComparator } from '../util/misc';
26+
import { ByteString } from '../util/byte_string';
2627
import { SortedMap } from '../util/sorted_map';
2728
import { SortedSet } from '../util/sorted_set';
2829

@@ -42,6 +43,12 @@ export class MemoryMutationQueue implements MutationQueue {
4243
/** Next value to use when assigning sequential IDs to each mutation batch. */
4344
private nextBatchId: BatchId = 1;
4445

46+
/** The last received stream token from the server, used to acknowledge which
47+
* responses the client has processed. Stream tokens are opaque checkpoint
48+
* markers whose only real value is their inclusion in the next request.
49+
*/
50+
private lastStreamToken: ByteString = ByteString.EMPTY_BYTE_STRING;
51+
4552
/** An ordered mapping between documents and the mutations batch IDs. */
4653
private batchesByDocumentKey = new SortedSet(DocReference.compareByKey);
4754

@@ -54,6 +61,46 @@ export class MemoryMutationQueue implements MutationQueue {
5461
return PersistencePromise.resolve(this.mutationQueue.length === 0);
5562
}
5663

64+
acknowledgeBatch(
65+
transaction: PersistenceTransaction,
66+
batch: MutationBatch,
67+
streamToken: ByteString
68+
): PersistencePromise<void> {
69+
const batchId = batch.batchId;
70+
const batchIndex = this.indexOfExistingBatchId(batchId, 'acknowledged');
71+
hardAssert(
72+
batchIndex === 0,
73+
'Can only acknowledge the first batch in the mutation queue'
74+
);
75+
76+
// Verify that the batch in the queue is the one to be acknowledged.
77+
const check = this.mutationQueue[batchIndex];
78+
debugAssert(
79+
batchId === check.batchId,
80+
'Queue ordering failure: expected batch ' +
81+
batchId +
82+
', got batch ' +
83+
check.batchId
84+
);
85+
86+
this.lastStreamToken = streamToken;
87+
return PersistencePromise.resolve();
88+
}
89+
90+
getLastStreamToken(
91+
transaction: PersistenceTransaction
92+
): PersistencePromise<ByteString> {
93+
return PersistencePromise.resolve(this.lastStreamToken);
94+
}
95+
96+
setLastStreamToken(
97+
transaction: PersistenceTransaction,
98+
streamToken: ByteString
99+
): PersistencePromise<void> {
100+
this.lastStreamToken = streamToken;
101+
return PersistencePromise.resolve();
102+
}
103+
57104
addMutationBatch(
58105
transaction: PersistenceTransaction,
59106
localWriteTime: Timestamp,

packages/firestore/src/local/mutation_queue.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { BatchId } from '../core/types';
2121
import { DocumentKey } from '../model/document_key';
2222
import { Mutation } from '../model/mutation';
2323
import { MutationBatch } from '../model/mutation_batch';
24+
import { ByteString } from '../util/byte_string';
2425
import { SortedMap } from '../util/sorted_map';
2526

2627
import { PersistenceTransaction } from './persistence';
@@ -31,6 +32,26 @@ export interface MutationQueue {
3132
/** Returns true if this queue contains no mutation batches. */
3233
checkEmpty(transaction: PersistenceTransaction): PersistencePromise<boolean>;
3334

35+
/**
36+
* Acknowledges the given batch.
37+
*/
38+
acknowledgeBatch(
39+
transaction: PersistenceTransaction,
40+
batch: MutationBatch,
41+
streamToken: ByteString
42+
): PersistencePromise<void>;
43+
44+
/** Returns the current stream token for this mutation queue. */
45+
getLastStreamToken(
46+
transaction: PersistenceTransaction
47+
): PersistencePromise<ByteString>;
48+
49+
/** Sets the stream token for this mutation queue. */
50+
setLastStreamToken(
51+
transaction: PersistenceTransaction,
52+
streamToken: ByteString
53+
): PersistencePromise<void>;
54+
3455
/**
3556
* Creates a new mutation batch and adds it to this mutation queue.
3657
*

packages/firestore/src/model/mutation_batch.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { SnapshotVersion } from '../core/snapshot_version';
2020
import { BatchId } from '../core/types';
2121
import { hardAssert, debugAssert } from '../util/assert';
2222
import { arrayEquals } from '../util/misc';
23+
import { ByteString } from '../util/byte_string';
2324
import {
2425
documentKeySet,
2526
DocumentKeySet,
@@ -188,6 +189,7 @@ export class MutationBatchResult {
188189
readonly batch: MutationBatch,
189190
readonly commitVersion: SnapshotVersion,
190191
readonly mutationResults: MutationResult[],
192+
readonly streamToken: ByteString,
191193
/**
192194
* A pre-computed mapping from each mutated document to the resulting
193195
* version.
@@ -203,7 +205,8 @@ export class MutationBatchResult {
203205
static from(
204206
batch: MutationBatch,
205207
commitVersion: SnapshotVersion,
206-
results: MutationResult[]
208+
results: MutationResult[],
209+
streamToken: ByteString
207210
): MutationBatchResult {
208211
hardAssert(
209212
batch.mutations.length === results.length,
@@ -219,6 +222,12 @@ export class MutationBatchResult {
219222
versionMap = versionMap.insert(mutations[i].key, results[i].version);
220223
}
221224

222-
return new MutationBatchResult(batch, commitVersion, results, versionMap);
225+
return new MutationBatchResult(
226+
batch,
227+
commitVersion,
228+
results,
229+
streamToken,
230+
versionMap
231+
);
223232
}
224233
}

packages/firestore/src/remote/persistent_stream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ export class PersistentWriteStream extends PersistentStream<
669669
* PersistentWriteStream manages propagating this value from responses to the
670670
* next request.
671671
*/
672-
private lastStreamToken: ByteString = ByteString.EMPTY_BYTE_STRING;
672+
lastStreamToken: ByteString = ByteString.EMPTY_BYTE_STRING;
673673

674674
/**
675675
* Tracks whether or not a handshake has been successfully exchanged and

packages/firestore/src/remote/remote_store.ts

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import { SnapshotVersion } from '../core/snapshot_version';
1919
import { Transaction } from '../core/transaction';
2020
import { OnlineState, TargetId } from '../core/types';
21-
import { LocalStore } from '../local/local_store';
21+
import { ignoreIfPrimaryLeaseLoss, LocalStore } from '../local/local_store';
2222
import { TargetData, TargetPurpose } from '../local/target_data';
2323
import { MutationResult } from '../model/mutation';
2424
import {
@@ -43,7 +43,7 @@ import {
4343
PersistentWriteStream
4444
} from './persistent_stream';
4545
import { RemoteSyncer } from './remote_syncer';
46-
import { isPermanentWriteError } from './rpc_error';
46+
import { isPermanentError, isPermanentWriteError } from './rpc_error';
4747
import {
4848
DocumentWatchChange,
4949
ExistenceFilterChange,
@@ -199,6 +199,8 @@ export class RemoteStore implements TargetMetadataProvider {
199199

200200
private async enableNetworkInternal(): Promise<void> {
201201
if (this.canUseNetwork()) {
202+
this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken();
203+
202204
if (this.shouldStartWatchStream()) {
203205
this.startWatchStream();
204206
} else {
@@ -656,11 +658,16 @@ export class RemoteStore implements TargetMetadataProvider {
656658
}
657659

658660
private onWriteHandshakeComplete(): Promise<void> {
659-
// Send the write pipeline now that the stream is established.
660-
for (const batch of this.writePipeline) {
661-
this.writeStream.writeMutations(batch.mutations);
662-
}
663-
return Promise.resolve();
661+
// Record the stream token.
662+
return this.localStore
663+
.setLastStreamToken(this.writeStream.lastStreamToken)
664+
.then(() => {
665+
// Send the write pipeline now that the stream is established.
666+
for (const batch of this.writePipeline) {
667+
this.writeStream.writeMutations(batch.mutations);
668+
}
669+
})
670+
.catch(ignoreIfPrimaryLeaseLoss);
664671
}
665672

666673
private async onMutationResult(
@@ -674,7 +681,12 @@ export class RemoteStore implements TargetMetadataProvider {
674681
'Got result for empty write pipeline'
675682
);
676683
const batch = this.writePipeline.shift()!;
677-
const success = MutationBatchResult.from(batch, commitVersion, results);
684+
const success = MutationBatchResult.from(
685+
batch,
686+
commitVersion,
687+
results,
688+
this.writeStream.lastStreamToken
689+
);
678690
try {
679691
await this.syncEngine.applySuccessfulWrite(success);
680692
} catch (e) {
@@ -698,19 +710,45 @@ export class RemoteStore implements TargetMetadataProvider {
698710
'Write stream was stopped gracefully while still needed.'
699711
);
700712
}
701-
702713
// An error that occurs after the write handshake completes is an indication
703714
// that the write operation itself failed.
704715
if (error && this.writeStream.handshakeComplete) {
705716
// This error affects the actual write.
706717
await this.handleWriteError(error!);
718+
} else if (error) {
719+
// If there was an error before the handshake has finished, it's
720+
// possible that the server is unable to process the stream token
721+
// we're sending. (Perhaps it's too old?)
722+
await this.handleHandshakeError(error!);
707723
}
708724

709725
// The write stream might have been started by refilling the write
710726
// pipeline for failed writes
711727
if (this.shouldStartWriteStream()) {
712728
this.startWriteStream();
713729
}
730+
// No pending writes, nothing to do
731+
}
732+
733+
private async handleHandshakeError(error: FirestoreError): Promise<void> {
734+
// Reset the token if it's a permanent error, signaling the write stream is
735+
// no longer valid. Note that the handshake does not count as a write: see
736+
// comments on isPermanentWriteError for details.
737+
if (isPermanentError(error.code)) {
738+
logDebug(
739+
LOG_TAG,
740+
'RemoteStore error before completed handshake; resetting stream token: ',
741+
this.writeStream.lastStreamToken
742+
);
743+
this.writeStream.lastStreamToken = ByteString.EMPTY_BYTE_STRING;
744+
745+
return this.localStore
746+
.setLastStreamToken(ByteString.EMPTY_BYTE_STRING)
747+
.catch(ignoreIfPrimaryLeaseLoss);
748+
} else {
749+
// Some other error, don't reset stream token. Our stream logic will
750+
// just retry with exponential backoff.
751+
}
714752
}
715753

716754
private async handleWriteError(error: FirestoreError): Promise<void> {

0 commit comments

Comments
 (0)