Skip to content

Commit 44731f9

Browse files
Remove support for lastStreamToken
1 parent 7de1a7c commit 44731f9

12 files changed

+30
-282
lines changed

packages/firestore/src/local/indexeddb_mutation_queue.ts

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import { BATCHID_UNKNOWN, MutationBatch } from '../model/mutation_batch';
2626
import { ResourcePath } from '../model/path';
2727
import { debugAssert, fail, hardAssert } from '../util/assert';
2828
import { primitiveComparator } from '../util/misc';
29-
import { ByteString } from '../util/byte_string';
3029
import { SortedMap } from '../util/sorted_map';
3130
import { SortedSet } from '../util/sorted_set';
3231
import { decodeResourcePath } from './encoded_resource_path';
@@ -118,40 +117,6 @@ export class IndexedDbMutationQueue implements MutationQueue {
118117
.next(() => empty);
119118
}
120119

121-
acknowledgeBatch(
122-
transaction: PersistenceTransaction,
123-
batch: MutationBatch,
124-
streamToken: ByteString
125-
): PersistencePromise<void> {
126-
return this.getMutationQueueMetadata(transaction).next(metadata => {
127-
// We can't store the resumeToken as a ByteString in IndexedDB, so we
128-
// convert it to a Base64 string for storage.
129-
metadata.lastStreamToken = streamToken.toBase64();
130-
131-
return mutationQueuesStore(transaction).put(metadata);
132-
});
133-
}
134-
135-
getLastStreamToken(
136-
transaction: PersistenceTransaction
137-
): PersistencePromise<ByteString> {
138-
return this.getMutationQueueMetadata(transaction).next<ByteString>(
139-
metadata => ByteString.fromBase64String(metadata.lastStreamToken)
140-
);
141-
}
142-
143-
setLastStreamToken(
144-
transaction: PersistenceTransaction,
145-
streamToken: ByteString
146-
): PersistencePromise<void> {
147-
return this.getMutationQueueMetadata(transaction).next(metadata => {
148-
// We can't store the resumeToken as a ByteString in IndexedDB, so we
149-
// convert it to a Base64 string for storage.
150-
metadata.lastStreamToken = streamToken.toBase64();
151-
return mutationQueuesStore(transaction).put(metadata);
152-
});
153-
}
154-
155120
addMutationBatch(
156121
transaction: PersistenceTransaction,
157122
localWriteTime: Timestamp,

packages/firestore/src/local/indexeddb_schema.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,8 @@ export class DbMutationQueue {
423423
*
424424
* After sending this token, earlier tokens may not be used anymore so
425425
* only a single stream token is retained.
426+
*
427+
* NOTE: this is deprecated and no longer used by the code.
426428
*/
427429
public lastStreamToken: string
428430
) {}

packages/firestore/src/local/local_store.ts

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ import { RemoteDocumentCache } from './remote_document_cache';
6060
import { RemoteDocumentChangeBuffer } from './remote_document_change_buffer';
6161
import { ClientId } from './shared_client_state';
6262
import { TargetData, TargetPurpose } from './target_data';
63-
import { ByteString } from '../util/byte_string';
6463
import { IndexedDbPersistence } from './indexeddb_persistence';
6564
import { IndexedDbMutationQueue } from './indexeddb_mutation_queue';
6665
import { IndexedDbRemoteDocumentCache } from './indexeddb_remote_document_cache';
@@ -378,11 +377,11 @@ export class LocalStore {
378377
const documentBuffer = this.remoteDocuments.newChangeBuffer({
379378
trackRemovals: true // Make sure document removals show up in `getNewDocumentChanges()`
380379
});
381-
return this.mutationQueue
382-
.acknowledgeBatch(txn, batchResult.batch, batchResult.streamToken)
383-
.next(() =>
384-
this.applyWriteToRemoteDocuments(txn, batchResult, documentBuffer)
385-
)
380+
return this.applyWriteToRemoteDocuments(
381+
txn,
382+
batchResult,
383+
documentBuffer
384+
)
386385
.next(() => documentBuffer.apply(txn))
387386
.next(() => this.mutationQueue.performConsistencyCheck(txn))
388387
.next(() => this.localDocuments.getDocuments(txn, affected));
@@ -433,32 +432,6 @@ export class LocalStore {
433432
);
434433
}
435434

436-
/** Returns the last recorded stream token for the current user. */
437-
getLastStreamToken(): Promise<ByteString> {
438-
return this.persistence.runTransaction(
439-
'Get last stream token',
440-
'readonly',
441-
txn => {
442-
return this.mutationQueue.getLastStreamToken(txn);
443-
}
444-
);
445-
}
446-
447-
/**
448-
* Sets the stream token for the current user without acknowledging any
449-
* mutation batch. This is usually only useful after a stream handshake or in
450-
* response to an error that requires clearing the stream token.
451-
*/
452-
setLastStreamToken(streamToken: ByteString): Promise<void> {
453-
return this.persistence.runTransaction(
454-
'Set last stream token',
455-
'readwrite-primary',
456-
txn => {
457-
return this.mutationQueue.setLastStreamToken(txn, streamToken);
458-
}
459-
);
460-
}
461-
462435
/**
463436
* Returns the last consistent snapshot processed (used by the RemoteStore to
464437
* determine whether to buffer incoming snapshots from the backend).

packages/firestore/src/local/memory_mutation_queue.ts

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import { Mutation } from '../model/mutation';
2323
import { MutationBatch, BATCHID_UNKNOWN } from '../model/mutation_batch';
2424
import { debugAssert, hardAssert } from '../util/assert';
2525
import { primitiveComparator } from '../util/misc';
26-
import { ByteString } from '../util/byte_string';
2726
import { SortedMap } from '../util/sorted_map';
2827
import { SortedSet } from '../util/sorted_set';
2928

@@ -43,12 +42,6 @@ export class MemoryMutationQueue implements MutationQueue {
4342
/** Next value to use when assigning sequential IDs to each mutation batch. */
4443
private nextBatchId: BatchId = 1;
4544

46-
/** The last received stream token from the server, used to acknowledge which
47-
* responses the client has processed. Stream tokens are opaque checkpoint
48-
* markers whose only real value is their inclusion in the next request.
49-
*/
50-
private lastStreamToken: ByteString = ByteString.EMPTY_BYTE_STRING;
51-
5245
/** An ordered mapping between documents and the mutations batch IDs. */
5346
private batchesByDocumentKey = new SortedSet(DocReference.compareByKey);
5447

@@ -61,46 +54,6 @@ export class MemoryMutationQueue implements MutationQueue {
6154
return PersistencePromise.resolve(this.mutationQueue.length === 0);
6255
}
6356

64-
acknowledgeBatch(
65-
transaction: PersistenceTransaction,
66-
batch: MutationBatch,
67-
streamToken: ByteString
68-
): PersistencePromise<void> {
69-
const batchId = batch.batchId;
70-
const batchIndex = this.indexOfExistingBatchId(batchId, 'acknowledged');
71-
hardAssert(
72-
batchIndex === 0,
73-
'Can only acknowledge the first batch in the mutation queue'
74-
);
75-
76-
// Verify that the batch in the queue is the one to be acknowledged.
77-
const check = this.mutationQueue[batchIndex];
78-
debugAssert(
79-
batchId === check.batchId,
80-
'Queue ordering failure: expected batch ' +
81-
batchId +
82-
', got batch ' +
83-
check.batchId
84-
);
85-
86-
this.lastStreamToken = streamToken;
87-
return PersistencePromise.resolve();
88-
}
89-
90-
getLastStreamToken(
91-
transaction: PersistenceTransaction
92-
): PersistencePromise<ByteString> {
93-
return PersistencePromise.resolve(this.lastStreamToken);
94-
}
95-
96-
setLastStreamToken(
97-
transaction: PersistenceTransaction,
98-
streamToken: ByteString
99-
): PersistencePromise<void> {
100-
this.lastStreamToken = streamToken;
101-
return PersistencePromise.resolve();
102-
}
103-
10457
addMutationBatch(
10558
transaction: PersistenceTransaction,
10659
localWriteTime: Timestamp,

packages/firestore/src/local/mutation_queue.ts

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import { BatchId } from '../core/types';
2121
import { DocumentKey } from '../model/document_key';
2222
import { Mutation } from '../model/mutation';
2323
import { MutationBatch } from '../model/mutation_batch';
24-
import { ByteString } from '../util/byte_string';
2524
import { SortedMap } from '../util/sorted_map';
2625

2726
import { PersistenceTransaction } from './persistence';
@@ -32,26 +31,6 @@ export interface MutationQueue {
3231
/** Returns true if this queue contains no mutation batches. */
3332
checkEmpty(transaction: PersistenceTransaction): PersistencePromise<boolean>;
3433

35-
/**
36-
* Acknowledges the given batch.
37-
*/
38-
acknowledgeBatch(
39-
transaction: PersistenceTransaction,
40-
batch: MutationBatch,
41-
streamToken: ByteString
42-
): PersistencePromise<void>;
43-
44-
/** Returns the current stream token for this mutation queue. */
45-
getLastStreamToken(
46-
transaction: PersistenceTransaction
47-
): PersistencePromise<ByteString>;
48-
49-
/** Sets the stream token for this mutation queue. */
50-
setLastStreamToken(
51-
transaction: PersistenceTransaction,
52-
streamToken: ByteString
53-
): PersistencePromise<void>;
54-
5534
/**
5635
* Creates a new mutation batch and adds it to this mutation queue.
5736
*

packages/firestore/src/model/mutation_batch.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import { SnapshotVersion } from '../core/snapshot_version';
2020
import { BatchId } from '../core/types';
2121
import { hardAssert, debugAssert } from '../util/assert';
2222
import { arrayEquals } from '../util/misc';
23-
import { ByteString } from '../util/byte_string';
2423
import {
2524
documentKeySet,
2625
DocumentKeySet,
@@ -189,7 +188,6 @@ export class MutationBatchResult {
189188
readonly batch: MutationBatch,
190189
readonly commitVersion: SnapshotVersion,
191190
readonly mutationResults: MutationResult[],
192-
readonly streamToken: ByteString,
193191
/**
194192
* A pre-computed mapping from each mutated document to the resulting
195193
* version.
@@ -205,8 +203,7 @@ export class MutationBatchResult {
205203
static from(
206204
batch: MutationBatch,
207205
commitVersion: SnapshotVersion,
208-
results: MutationResult[],
209-
streamToken: ByteString
206+
results: MutationResult[]
210207
): MutationBatchResult {
211208
hardAssert(
212209
batch.mutations.length === results.length,
@@ -222,12 +219,6 @@ export class MutationBatchResult {
222219
versionMap = versionMap.insert(mutations[i].key, results[i].version);
223220
}
224221

225-
return new MutationBatchResult(
226-
batch,
227-
commitVersion,
228-
results,
229-
streamToken,
230-
versionMap
231-
);
222+
return new MutationBatchResult(batch, commitVersion, results, versionMap);
232223
}
233224
}

packages/firestore/src/remote/persistent_stream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ export class PersistentWriteStream extends PersistentStream<
669669
* PersistentWriteStream manages propagating this value from responses to the
670670
* next request.
671671
*/
672-
lastStreamToken: ByteString = ByteString.EMPTY_BYTE_STRING;
672+
private lastStreamToken: ByteString = ByteString.EMPTY_BYTE_STRING;
673673

674674
/**
675675
* Tracks whether or not a handshake has been successfully exchanged and

packages/firestore/src/remote/remote_store.ts

Lines changed: 17 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import { SnapshotVersion } from '../core/snapshot_version';
1919
import { Transaction } from '../core/transaction';
2020
import { OnlineState, TargetId } from '../core/types';
21-
import { ignoreIfPrimaryLeaseLoss, LocalStore } from '../local/local_store';
21+
import { LocalStore } from '../local/local_store';
2222
import { TargetData, TargetPurpose } from '../local/target_data';
2323
import { MutationResult } from '../model/mutation';
2424
import {
@@ -43,7 +43,7 @@ import {
4343
PersistentWriteStream
4444
} from './persistent_stream';
4545
import { RemoteSyncer } from './remote_syncer';
46-
import { isPermanentError, isPermanentWriteError } from './rpc_error';
46+
import { isPermanentWriteError } from './rpc_error';
4747
import {
4848
DocumentWatchChange,
4949
ExistenceFilterChange,
@@ -199,8 +199,6 @@ export class RemoteStore implements TargetMetadataProvider {
199199

200200
private async enableNetworkInternal(): Promise<void> {
201201
if (this.canUseNetwork()) {
202-
this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken();
203-
204202
if (this.shouldStartWatchStream()) {
205203
this.startWatchStream();
206204
} else {
@@ -644,17 +642,11 @@ export class RemoteStore implements TargetMetadataProvider {
644642
this.writeStream.writeHandshake();
645643
}
646644

647-
private onWriteHandshakeComplete(): Promise<void> {
648-
// Record the stream token.
649-
return this.localStore
650-
.setLastStreamToken(this.writeStream.lastStreamToken)
651-
.then(() => {
652-
// Send the write pipeline now that the stream is established.
653-
for (const batch of this.writePipeline) {
654-
this.writeStream.writeMutations(batch.mutations);
655-
}
656-
})
657-
.catch(ignoreIfPrimaryLeaseLoss);
645+
private async onWriteHandshakeComplete(): Promise<void> {
646+
// Send the write pipeline now that the stream is established.
647+
for (const batch of this.writePipeline) {
648+
this.writeStream.writeMutations(batch.mutations);
649+
}
658650
}
659651

660652
private onMutationResult(
@@ -668,12 +660,7 @@ export class RemoteStore implements TargetMetadataProvider {
668660
'Got result for empty write pipeline'
669661
);
670662
const batch = this.writePipeline.shift()!;
671-
const success = MutationBatchResult.from(
672-
batch,
673-
commitVersion,
674-
results,
675-
this.writeStream.lastStreamToken
676-
);
663+
const success = MutationBatchResult.from(batch, commitVersion, results);
677664
return this.syncEngine.applySuccessfulWrite(success).then(() => {
678665
// It's possible that with the completion of this mutation another
679666
// slot has freed up.
@@ -691,46 +678,17 @@ export class RemoteStore implements TargetMetadataProvider {
691678
);
692679
}
693680

694-
// If the write stream closed due to an error, invoke the error callbacks if
695-
// there are pending writes.
696-
if (error && this.writePipeline.length > 0) {
697-
if (this.writeStream.handshakeComplete) {
698-
// This error affects the actual write.
699-
await this.handleWriteError(error!);
700-
} else {
701-
// If there was an error before the handshake has finished, it's
702-
// possible that the server is unable to process the stream token
703-
// we're sending. (Perhaps it's too old?)
704-
await this.handleHandshakeError(error!);
705-
}
706-
707-
// The write stream might have been started by refilling the write
708-
// pipeline for failed writes
709-
if (this.shouldStartWriteStream()) {
710-
this.startWriteStream();
711-
}
681+
// If the write stream closed after the write handshake completes, a write
682+
// operation failed and we fail the pending operation.
683+
if (error && this.writeStream.handshakeComplete) {
684+
// This error affects the actual write.
685+
await this.handleWriteError(error!);
712686
}
713-
// No pending writes, nothing to do
714-
}
715-
716-
private async handleHandshakeError(error: FirestoreError): Promise<void> {
717-
// Reset the token if it's a permanent error, signaling the write stream is
718-
// no longer valid. Note that the handshake does not count as a write: see
719-
// comments on isPermanentWriteError for details.
720-
if (isPermanentError(error.code)) {
721-
logDebug(
722-
LOG_TAG,
723-
'RemoteStore error before completed handshake; resetting stream token: ',
724-
this.writeStream.lastStreamToken
725-
);
726-
this.writeStream.lastStreamToken = ByteString.EMPTY_BYTE_STRING;
727687

728-
return this.localStore
729-
.setLastStreamToken(ByteString.EMPTY_BYTE_STRING)
730-
.catch(ignoreIfPrimaryLeaseLoss);
731-
} else {
732-
// Some other error, don't reset stream token. Our stream logic will
733-
// just retry with exponential backoff.
688+
// The write stream might have been started by refilling the write
689+
// pipeline for failed writes
690+
if (this.shouldStartWriteStream()) {
691+
this.startWriteStream();
734692
}
735693
}
736694

0 commit comments

Comments
 (0)