Skip to content

Commit 7b43c61

Browse files
authored
gRPC C++: don't wait for initial server metadata during GrpcStream::Start (#1786)
By default, StartCall in gRPC C++ client would wait until the initial server metadata is received, after which the corresponding tag will come back from the completion queue. It can alternatively be configured so that initial server metadata is only received together with the first write operation, effectively making StartCall a synchronous operation. The benefit is that Firestore streams won't have to wait for a network roundtrip before they consider themselves open. It is also consistent with the other platforms.
1 parent cba0b44 commit 7b43c61

File tree

2 files changed

+17
-21
lines changed

2 files changed

+17
-21
lines changed

Firestore/core/src/firebase/firestore/remote/grpc_stream.cc

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,20 @@ GrpcStream::~GrpcStream() {
9494
}
9595

9696
void GrpcStream::Start() {
97-
GrpcCompletion* completion =
98-
NewCompletion([this](const GrpcCompletion*) { OnStart(); });
99-
call_->StartCall(completion);
97+
// Make starting a quick operation that avoids a roundtrip to the server by
98+
// skipping the wait for initial server metadata (instead, it will be
99+
// automatically coalesced with the first write operation).
100+
context_->set_initial_metadata_corked(true);
101+
// It's generally okay to pass a null pointer as a tag; in this case in
102+
// particular, the tag will never come back from the completion queue (by
103+
// design).
104+
call_->StartCall(nullptr);
105+
106+
if (observer_) {
107+
observer_->OnStreamStart();
108+
// Start listening for new messages.
109+
Read();
110+
}
100111
}
101112

102113
void GrpcStream::Read() {
@@ -203,14 +214,6 @@ GrpcStream::MetadataT GrpcStream::GetResponseHeaders() const {
203214

204215
// Callbacks
205216

206-
void GrpcStream::OnStart() {
207-
if (observer_) {
208-
observer_->OnStreamStart();
209-
// Start listening for new messages.
210-
Read();
211-
}
212-
}
213-
214217
void GrpcStream::OnRead(const grpc::ByteBuffer& message) {
215218
if (observer_) {
216219
observer_->OnStreamRead(message);

Firestore/core/test/firebase/firestore/remote/grpc_stream_test.cc

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ class GrpcStreamTest : public testing::Test {
108108

109109
void StartStream() {
110110
async_queue().EnqueueBlocking([&] { stream().Start(); });
111-
ForceFinish({/*Start*/ Ok});
112111
}
113112

114113
private:
@@ -221,19 +220,13 @@ TEST_F(GrpcStreamTest, WriteAndFinish) {
221220
});
222221
}
223222

224-
TEST_F(GrpcStreamTest, ErrorOnStart) {
225-
async_queue().EnqueueBlocking([&] { stream().Start(); });
226-
ForceFinish({/*Start*/ Error, /*Finish*/ Ok});
227-
EXPECT_EQ(observed_states(), States({"OnStreamError"}));
228-
}
229-
230223
TEST_F(GrpcStreamTest, ErrorOnWrite) {
231224
StartStream();
232225
async_queue().EnqueueBlocking([&] { stream().Write({}); });
233226

234-
ForceFinish({/*Read*/ Ok, /*Write*/ Error});
227+
ForceFinish({/*Write*/ Error, /*Read*/ Error});
235228
// Give `GrpcStream` a chance to enqueue a finish operation
236-
ForceFinish({/*Read*/ Error, /*Finish*/ Ok});
229+
ForceFinish({/*Finish*/ Ok});
237230

238231
EXPECT_EQ(observed_states().back(), "OnStreamError");
239232
}
@@ -245,7 +238,7 @@ TEST_F(GrpcStreamTest, ErrorWithPendingWrites) {
245238
stream().Write({});
246239
});
247240

248-
ForceFinish({/*Read*/ Ok, /*Write*/ Error});
241+
ForceFinish({/*Write*/ Ok, /*Write*/ Error});
249242
// Give `GrpcStream` a chance to enqueue a finish operation
250243
ForceFinish({/*Read*/ Error, /*Finish*/ Ok});
251244

0 commit comments

Comments
 (0)