Skip to content

Commit 88c85ef

Browse files
Sending an empty write request before tearing down the WriteStream. (#297)
1 parent c8bcb67 commit 88c85ef

File tree

6 files changed

+199
-30
lines changed

6 files changed

+199
-30
lines changed

packages/firestore/src/remote/persistent_stream.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,16 +323,16 @@ export abstract class PersistentStream<
323323
this.backoff.resetToMax();
324324
}
325325

326-
// This state must be assigned before calling onClose() to allow the callback to
327-
// inhibit backoff or otherwise manipulate the state in its non-started state.
328-
this.state = finalState;
329-
330326
// Clean up the underlying stream because we are no longer interested in events.
331327
if (this.stream !== null) {
328+
this.tearDown();
332329
this.stream.close();
333330
this.stream = null;
334331
}
335332

333+
// This state must be assigned before calling onClose() to allow the callback to
334+
// inhibit backoff or otherwise manipulate the state in its non-started state.
335+
this.state = finalState;
336336
const listener = this.listener!;
337337

338338
// Clear the listener to avoid bleeding of events from the underlying streams.
@@ -347,6 +347,12 @@ export abstract class PersistentStream<
347347
}
348348
}
349349

350+
/**
351+
* Can be overridden to perform additional cleanup before the stream is closed.
352+
* Calling super.tearDown() is not required.
353+
*/
354+
protected tearDown(): void {}
355+
350356
/**
351357
* Used by subclasses to start the concrete RPC and return the underlying
352358
* connection stream.
@@ -646,6 +652,12 @@ export class PersistentWriteStream extends PersistentStream<
646652
super.start(listener);
647653
}
648654

655+
protected tearDown() {
656+
if (this.handshakeComplete_) {
657+
this.writeMutations([]);
658+
}
659+
}
660+
649661
protected startRpc(
650662
token: Token | null
651663
): Stream<api.WriteRequest, api.WriteResponse> {

packages/firestore/test/unit/specs/listen_spec.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,4 +237,28 @@ describeSpec('Listens:', [], () => {
237237
.expectEvents(allQuery, { fromCache: false })
238238
);
239239
});
240+
241+
specTest('Listens are reestablished after network disconnect', [], () => {
242+
const expectRequestCount = requestCounts =>
243+
requestCounts.addTarget + requestCounts.removeTarget;
244+
245+
const query = Query.atPath(path('collection'));
246+
const docA = doc('collection/a', 1000, { key: 'a' });
247+
const docB = doc('collection/b', 2000, { key: 'b' });
248+
return spec()
249+
.userListens(query)
250+
.expectWatchStreamRequestCount(
251+
expectRequestCount({ addTarget: 1, removeTarget: 0 })
252+
)
253+
.watchAcksFull(query, 1000, docA)
254+
.expectEvents(query, { added: [docA] })
255+
.disableNetwork()
256+
.enableNetwork()
257+
.restoreListen(query, 'resume-token-1000')
258+
.expectWatchStreamRequestCount(
259+
expectRequestCount({ addTarget: 2, removeTarget: 0 })
260+
)
261+
.watchAcksFull(query, 2000, docB)
262+
.expectEvents(query, { added: [docB] });
263+
});
240264
});

packages/firestore/test/unit/specs/spec_builder.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,30 @@ export class SpecBuilder {
129129
return this;
130130
}
131131

132+
/**
133+
* Registers a previously active target with the test expectations after a
134+
* stream disconnect.
135+
*/
136+
restoreListen(query: Query, resumeToken: string): SpecBuilder {
137+
let targetId = this.queryMapping[query.canonicalId()];
138+
139+
if (isNullOrUndefined(targetId)) {
140+
throw new Error("Can't restore an unknown query: " + query);
141+
}
142+
143+
this.activeTargets[targetId] = {
144+
query: SpecBuilder.queryToSpec(query),
145+
resumeToken: resumeToken
146+
};
147+
148+
const currentStep = this.currentStep!;
149+
currentStep.stateExpect = currentStep.stateExpect || {};
150+
currentStep.stateExpect.activeTargets = objUtils.shallowCopy(
151+
this.activeTargets
152+
);
153+
return this;
154+
}
155+
132156
userUnlistens(query: Query): SpecBuilder {
133157
this.nextStep();
134158
if (!objUtils.contains(this.queryMapping, query.canonicalId())) {
@@ -176,6 +200,26 @@ export class SpecBuilder {
176200
return this;
177201
}
178202

203+
disableNetwork(): SpecBuilder {
204+
this.nextStep();
205+
this.currentStep = {
206+
enableNetwork: false,
207+
stateExpect: {
208+
activeTargets: {},
209+
limboDocs: []
210+
}
211+
};
212+
return this;
213+
}
214+
215+
enableNetwork(): SpecBuilder {
216+
this.nextStep();
217+
this.currentStep = {
218+
enableNetwork: true
219+
};
220+
return this;
221+
}
222+
179223
restart(): SpecBuilder {
180224
this.nextStep();
181225
this.currentStep = {
@@ -493,6 +537,30 @@ export class SpecBuilder {
493537
return this;
494538
}
495539

540+
/**
541+
* Verifies the total number of requests sent to the write backend since test
542+
* initialization.
543+
*/
544+
expectWriteStreamRequestCount(num: number): SpecBuilder {
545+
this.assertStep('Expectations require previous step');
546+
const currentStep = this.currentStep!;
547+
currentStep.stateExpect = currentStep.stateExpect || {};
548+
currentStep.stateExpect.writeStreamRequestCount = num;
549+
return this;
550+
}
551+
552+
/**
553+
* Verifies the total number of requests sent to the watch backend since test
554+
* initialization.
555+
*/
556+
expectWatchStreamRequestCount(num: number): SpecBuilder {
557+
this.assertStep('Expectations require previous step');
558+
const currentStep = this.currentStep!;
559+
currentStep.stateExpect = currentStep.stateExpect || {};
560+
currentStep.stateExpect.watchStreamRequestCount = num;
561+
return this;
562+
}
563+
496564
expectNumOutstandingWrites(num: number): SpecBuilder {
497565
this.assertStep('Expectations require previous step');
498566
const currentStep = this.currentStep!;

packages/firestore/test/unit/specs/spec_test_runner.ts

Lines changed: 69 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,19 @@ class MockConnection implements Connection {
9999
* Used to make sure a write was actually sent out on the network before the
100100
* test runner continues.
101101
*/
102-
writeSendBarriers: Array<Deferred<api.Write>> = [];
102+
writeSendBarriers: Array<Deferred<api.WriteRequest>> = [];
103103

104104
/**
105105
* The set of mutations sent out before there was a corresponding
106106
* writeSendBarrier.
107107
*/
108-
earlyWrites: api.Write[] = [];
108+
earlyWrites: api.WriteRequest[] = [];
109+
110+
/** The total number of requests sent to the watch stream. */
111+
watchStreamRequestCount = 0;
112+
113+
/** The total number of requests sent to the write stream. */
114+
writeStreamRequestCount = 0;
109115

110116
nextWriteStreamToken = 0;
111117

@@ -120,15 +126,22 @@ class MockConnection implements Connection {
120126
/** A Deferred that is resolved once watch opens. */
121127
watchOpen = new Deferred<void>();
122128

129+
reset(): void {
130+
this.watchStreamRequestCount = 0;
131+
this.writeStreamRequestCount = 0;
132+
this.earlyWrites = [];
133+
this.activeTargets = [];
134+
}
135+
123136
invoke(rpcName: string, request: any): Promise<any> {
124137
throw new Error('Not implemented!');
125138
}
126139

127-
waitForWriteSend(): Promise<api.Write> {
140+
waitForWriteRequest(): Promise<api.WriteRequest> {
128141
if (this.earlyWrites.length > 0) {
129142
return Promise.resolve(this.earlyWrites.shift()) as AnyDuringMigration;
130143
}
131-
const barrier = new Deferred<api.Write>();
144+
const barrier = new Deferred<WriteRequest>();
132145
this.writeSendBarriers.push(barrier);
133146
return barrier.promise;
134147
}
@@ -176,6 +189,7 @@ class MockConnection implements Connection {
176189
let firstCall = true;
177190
const writeStream = new StreamBridge<WriteRequest, api.WriteResponse>({
178191
sendFn: (request: WriteRequest) => {
192+
++this.writeStreamRequestCount;
179193
if (firstCall) {
180194
assert(
181195
!!request.database,
@@ -195,27 +209,19 @@ class MockConnection implements Connection {
195209
'streamToken must be set on all writes'
196210
);
197211
assert(!!request.writes, 'writes must be set on all writes');
198-
assert(
199-
request.writes!.length > 0,
200-
'there must be non-zero mutations'
201-
);
202-
if (request.writes!.length !== 1) {
203-
// TODO(dimond): support batching?
204-
fail('Unexpected batched mutation found!');
205-
}
206212

207213
const barrier = this.writeSendBarriers.shift();
208214
if (!barrier) {
209215
// The test runner hasn't set up the barrier yet, so we queue
210216
// up this mutation to provide to the barrier promise when it
211217
// arrives.
212-
this.earlyWrites.push(request.writes![0]);
218+
this.earlyWrites.push(request);
213219
} else {
214220
// The test runner is waiting on a write invocation, now that we
215221
// have it we can resolve the write send barrier. If we add
216222
// (automatic) batching support we need to make sure the number of
217-
// batches matches the number of calls to waitForWriteSend.
218-
barrier.resolve(request.writes![0]);
223+
// batches matches the number of calls to waitForWriteRequest.
224+
barrier.resolve(request);
219225
}
220226
},
221227
closeFn: () => {
@@ -240,6 +246,7 @@ class MockConnection implements Connection {
240246
api.ListenResponse
241247
>({
242248
sendFn: (request: api.ListenRequest) => {
249+
++this.watchStreamRequestCount;
243250
if (request.addTarget) {
244251
const targetId = request.addTarget.targetId!;
245252
this.activeTargets[targetId] = request.addTarget;
@@ -407,6 +414,7 @@ abstract class TestRunner {
407414
protected abstract destroyPersistence(): Promise<void>;
408415

409416
async start(): Promise<void> {
417+
this.connection.reset();
410418
await this.persistence.start();
411419
await this.localStore.start();
412420
await this.remoteStore.start();
@@ -458,6 +466,10 @@ abstract class TestRunner {
458466
return this.doWriteAck(step.writeAck!);
459467
} else if ('failWrite' in step) {
460468
return this.doFailWrite(step.failWrite!);
469+
} else if ('enableNetwork' in step) {
470+
return step.enableNetwork!
471+
? this.doEnableNetwork()
472+
: this.doDisableNetwork();
461473
} else if ('restart' in step) {
462474
assert(step.restart!, 'Restart cannot be false');
463475
return this.doRestart();
@@ -517,9 +529,9 @@ abstract class TestRunner {
517529
private doMutations(mutations: Mutation[]): Promise<void> {
518530
const userCallback = new Deferred<void>();
519531
this.outstandingWrites.push({ mutations, userCallback });
520-
return this.queue.schedule(() =>
521-
this.syncEngine.write(mutations, userCallback)
522-
);
532+
return this.queue.schedule(() => {
533+
return this.syncEngine.write(mutations, userCallback);
534+
});
523535
}
524536

525537
private doWatchAck(
@@ -693,19 +705,24 @@ abstract class TestRunner {
693705
}
694706

695707
/** Validates that a write was sent and matches the expected write. */
696-
private validateNextWriteSent(mutations: Mutation[]): Promise<void> {
708+
private validateNextWriteRequest(mutations: Mutation[]): Promise<void> {
697709
// Make sure this write was sent on the wire and it matches the expected
698710
// write.
699-
return this.connection.waitForWriteSend().then(write => {
700-
assert(mutations.length === 1, "We don't support multiple mutations.");
701-
expect(write).to.deep.equal(this.serializer.toMutation(mutations[0]));
711+
return this.connection.waitForWriteRequest().then(request => {
712+
const writes = request.writes!;
713+
expect(writes.length).to.equal(mutations.length);
714+
for (let i = 0; i < writes.length; ++i) {
715+
expect(writes[i]).to.deep.equal(
716+
this.serializer.toMutation(mutations[i])
717+
);
718+
}
702719
});
703720
}
704721

705722
private doWriteAck(writeAck: SpecWriteAck): Promise<void> {
706723
const updateTime = this.serializer.toVersion(version(writeAck.version));
707724
const nextWrite = this.outstandingWrites.shift()!;
708-
return this.validateNextWriteSent(nextWrite.mutations).then(() => {
725+
return this.validateNextWriteRequest(nextWrite.mutations).then(() => {
709726
this.connection.ackWrite(updateTime, [{ updateTime }]);
710727
if (writeAck.expectUserCallback) {
711728
return nextWrite.userCallback.promise;
@@ -722,7 +739,7 @@ abstract class TestRunner {
722739
specError.message
723740
);
724741
const nextWrite = this.outstandingWrites.shift()!;
725-
return this.validateNextWriteSent(nextWrite.mutations).then(() => {
742+
return this.validateNextWriteRequest(nextWrite.mutations).then(() => {
726743
// If this is not a permanent error, the write is expected to be sent
727744
// again.
728745
if (!isPermanentError(error.code)) {
@@ -745,6 +762,17 @@ abstract class TestRunner {
745762
});
746763
}
747764

765+
private async doDisableNetwork(): Promise<void> {
766+
// Make sure to execute all writes that are currently queued. This allows us
767+
// to assert on the total number of requests sent before shutdown.
768+
await this.remoteStore.fillWritePipeline();
769+
await this.remoteStore.disableNetwork();
770+
}
771+
772+
private async doEnableNetwork(): Promise<void> {
773+
await this.remoteStore.enableNetwork();
774+
}
775+
748776
private async doRestart(): Promise<void> {
749777
// Reinitialize everything, except the persistence.
750778
// No local store to shutdown.
@@ -793,6 +821,16 @@ abstract class TestRunner {
793821
expectation.numOutstandingWrites
794822
);
795823
}
824+
if ('writeStreamRequestCount' in expectation) {
825+
expect(this.connection.writeStreamRequestCount).to.deep.equal(
826+
expectation.writeStreamRequestCount
827+
);
828+
}
829+
if ('watchStreamRequestCount' in expectation) {
830+
expect(this.connection.watchStreamRequestCount).to.deep.equal(
831+
expectation.watchStreamRequestCount
832+
);
833+
}
796834
if ('limboDocs' in expectation) {
797835
this.expectedLimboDocs = expectation.limboDocs!.map(key);
798836
}
@@ -1052,6 +1090,9 @@ export interface SpecStep {
10521090
/** Fail a write */
10531091
failWrite?: SpecWriteFailure;
10541092

1093+
/** Enable or disable RemoteStore's network connection. */
1094+
enableNetwork?: boolean;
1095+
10551096
/** Change to a new active user (specified by uid or null for anonymous). */
10561097
changeUser?: string | null;
10571098

@@ -1193,6 +1234,10 @@ export interface SpecExpectation {
11931234
export interface StateExpectation {
11941235
/** Number of outstanding writes in the datastore queue. */
11951236
numOutstandingWrites?: number;
1237+
/** Number of requests sent to the write stream. */
1238+
writeStreamRequestCount?: number;
1239+
/** Number of requests sent to the watch stream. */
1240+
watchStreamRequestCount?: number;
11961241
/** Current documents in limbo. Verified in each step until overwritten. */
11971242
limboDocs?: string[];
11981243
/**

0 commit comments

Comments
 (0)