Skip to content

Commit 48b57a3

Browse files
authored
gRPC: fix cases where gRPC call could be finished twice (#2146)
* add a flag, `is_grpc_call_finished_`, to track whether the underlying call has been finished without trying to interpret the current state of `GrpcStream` to infer that information; * add logging to the shutdown sequence of `GrpcStream`, in case more related crashes are discovered; * add tests for two situations where double finish could occur previously. Manually verified that none of the tests pass on master (they lead to a failed gRPC assertion and a crash).
1 parent 4f4d683 commit 48b57a3

File tree

3 files changed

+121
-34
lines changed

3 files changed

+121
-34
lines changed

Firestore/core/src/firebase/firestore/remote/grpc_stream.cc

Lines changed: 68 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include "Firestore/core/src/firebase/firestore/remote/grpc_connection.h"
2323
#include "Firestore/core/src/firebase/firestore/remote/grpc_util.h"
24+
#include "Firestore/core/src/firebase/firestore/util/log.h"
2425

2526
namespace firebase {
2627
namespace firestore {
@@ -96,6 +97,7 @@ GrpcStream::GrpcStream(
9697
}
9798

9899
GrpcStream::~GrpcStream() {
100+
LOG_DEBUG("GrpcStream('%s'): destroying stream", this);
99101
HARD_ASSERT(completions_.empty(),
100102
"GrpcStream is being destroyed without proper shutdown");
101103
MaybeUnregister();
@@ -156,11 +158,15 @@ void GrpcStream::MaybeWrite(absl::optional<BufferedWrite> maybe_write) {
156158
}
157159

158160
void GrpcStream::FinishImmediately() {
161+
LOG_DEBUG("GrpcStream('%s'): finishing without notifying observers", this);
162+
159163
Shutdown();
160164
UnsetObserver();
161165
}
162166

163167
void GrpcStream::FinishAndNotify(const Status& status) {
168+
LOG_DEBUG("GrpcStream('%s'): finishing and notifying observers", this);
169+
164170
Shutdown();
165171

166172
if (observer_) {
@@ -173,22 +179,30 @@ void GrpcStream::FinishAndNotify(const Status& status) {
173179
}
174180

175181
void GrpcStream::Shutdown() {
182+
LOG_DEBUG("GrpcStream('%s'): shutting down; completions: %s, is finished: %s",
183+
this, completions_.size(), is_grpc_call_finished_);
184+
176185
MaybeUnregister();
177-
if (completions_.empty()) {
178-
// Nothing to cancel -- either the call was already finished, or it has
179-
// never been started.
180-
return;
186+
187+
// If completions are empty but the call hasn't been finished, it means this
188+
// stream has never started. Calling `Finish` on the underlying gRPC call is
189+
// invalid if it wasn't started previously.
190+
if (!completions_.empty() && !is_grpc_call_finished_) {
191+
// Important: during normal operation, the stream always has a pending read
192+
// operation, so `Shutdown` would hang indefinitely if we didn't cancel the
193+
// `context_`. However, if the stream has already failed, avoid canceling
194+
// the context to avoid overwriting the status captured during the
195+
// `OnOperationFailed`.
196+
197+
context_->TryCancel();
198+
FinishGrpcCall({});
181199
}
182200

183-
// Important: since the stream always has a pending read operation,
184-
// cancellation has to be called, or else the read would hang forever, and
185-
// finish operation will never get completed.
186-
// (on the other hand, when an operation fails, cancellation should not be
187-
// called, otherwise the real failure cause will be overwritten by status
188-
// "canceled".)
189-
context_->TryCancel();
190-
FinishCall({});
191-
// Wait until "finish" is off the queue.
201+
// Drain the completions -- `Shutdown` guarantees to bring the stream into
202+
// destructible state. Two possibilities here:
203+
// - completions are empty -- nothing to block on;
204+
// - the only completion is "finish" (whether enqueued by this function or
205+
// previously) -- "finish" is a very fast operation.
192206
FastFinishCompletionsBlocking();
193207
}
194208

@@ -199,7 +213,12 @@ void GrpcStream::MaybeUnregister() {
199213
}
200214
}
201215

202-
void GrpcStream::FinishCall(const OnSuccess& callback) {
216+
void GrpcStream::FinishGrpcCall(const OnSuccess& callback) {
217+
LOG_DEBUG("GrpcStream('%s'): finishing the underlying call", this);
218+
219+
HARD_ASSERT(!is_grpc_call_finished_, "FinishGrpcCall called twice");
220+
is_grpc_call_finished_ = true;
221+
203222
// All completions issued by this call must be taken off the queue before
204223
// finish operation can be enqueued.
205224
FastFinishCompletionsBlocking();
@@ -208,6 +227,9 @@ void GrpcStream::FinishCall(const OnSuccess& callback) {
208227
}
209228

210229
void GrpcStream::FastFinishCompletionsBlocking() {
230+
LOG_DEBUG("GrpcStream('%s'): fast finishing %s completion(s)", this,
231+
completions_.size());
232+
211233
// TODO(varconst): reset buffered_writer_? Should not be necessary, because it
212234
// should never be called again after a call to Finish.
213235

@@ -226,30 +248,35 @@ void GrpcStream::FastFinishCompletionsBlocking() {
226248

227249
bool GrpcStream::WriteAndFinish(grpc::ByteBuffer&& message) {
228250
bool did_last_write = false;
251+
if (!is_grpc_call_finished_) {
252+
did_last_write = TryLastWrite(std::move(message));
253+
}
254+
FinishImmediately();
255+
return did_last_write;
256+
}
229257

258+
bool GrpcStream::TryLastWrite(grpc::ByteBuffer&& message) {
230259
absl::optional<BufferedWrite> maybe_write =
231260
buffered_writer_.EnqueueWrite(std::move(message));
232261
// Only bother with the last write if there is no active write at the moment.
233-
if (maybe_write) {
234-
BufferedWrite last_write = std::move(maybe_write).value();
235-
GrpcCompletion* completion = NewCompletion(Type::Write, {});
236-
*completion->message() = last_write.message;
237-
call_->WriteLast(*completion->message(), grpc::WriteOptions{}, completion);
238-
239-
// Empirically, the write normally takes less than a millisecond to finish
240-
// (both with and without network connection), and never more than several
241-
// dozen milliseconds. Nevertheless, ensure `WriteAndFinish` doesn't hang if
242-
// there happen to be circumstances under which the write may block
243-
// indefinitely (in that case, rely on the fact that canceling a gRPC call
244-
// makes all pending operations come back from the queue quickly).
245-
auto status = completion->WaitUntilOffQueue(std::chrono::milliseconds(500));
246-
if (status == std::future_status::ready) {
247-
did_last_write = true;
248-
}
262+
if (!maybe_write) {
263+
return false;
249264
}
250265

251-
FinishImmediately();
252-
return did_last_write;
266+
BufferedWrite last_write = std::move(maybe_write).value();
267+
GrpcCompletion* completion = NewCompletion(Type::Write, {});
268+
*completion->message() = last_write.message;
269+
call_->WriteLast(*completion->message(), grpc::WriteOptions{}, completion);
270+
271+
// Empirically, the write normally takes less than a millisecond to finish
272+
// (both with and without network connection), and never more than several
273+
// dozen milliseconds. Nevertheless, ensure `WriteAndFinish` doesn't hang if
274+
// there happen to be circumstances under which the write may block
275+
// indefinitely (in that case, rely on the fact that canceling a gRPC call
276+
// makes all pending operations come back from the queue quickly).
277+
278+
auto status = completion->WaitUntilOffQueue(std::chrono::milliseconds(500));
279+
return status == std::future_status::ready;
253280
}
254281

255282
GrpcStream::Metadata GrpcStream::GetResponseHeaders() const {
@@ -277,7 +304,13 @@ void GrpcStream::OnWrite() {
277304
}
278305

279306
void GrpcStream::OnOperationFailed() {
280-
FinishCall([this](const GrpcCompletion* completion) {
307+
if (is_grpc_call_finished_) {
308+
// If a finish operation has been enqueued already (possibly by a previous
309+
// failed operation), there's nothing to do.
310+
return;
311+
}
312+
313+
FinishGrpcCall([this](const GrpcCompletion* completion) {
281314
Status status = ConvertStatus(*completion->status());
282315
FinishAndNotify(status);
283316
});
@@ -303,6 +336,8 @@ GrpcCompletion* GrpcStream::NewCompletion(Type tag,
303336
} else {
304337
// Use the same error-handling for all operations; all errors are
305338
// unrecoverable.
339+
LOG_DEBUG("GrpcStream('%s'): operation of type %s failed", this,
340+
completion->type());
306341
OnOperationFailed();
307342
}
308343
};

Firestore/core/src/firebase/firestore/remote/grpc_stream.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ class GrpcStream : public GrpcCall {
180180
private:
181181
void Read();
182182
void MaybeWrite(absl::optional<internal::BufferedWrite> maybe_write);
183+
bool TryLastWrite(grpc::ByteBuffer&& message);
183184

184185
void Shutdown();
185186
void UnsetObserver() {
@@ -200,7 +201,7 @@ class GrpcStream : public GrpcCall {
200201
// was started. Presumes that any pending completions will quickly come off
201202
// the queue and will block until they do, so this must only be invoked when
202203
// the current call either failed (`OnOperationFailed`) or canceled.
203-
void FinishCall(const OnSuccess& callback);
204+
void FinishGrpcCall(const OnSuccess& callback);
204205

205206
// Blocks until all the completions issued by this stream come out from the
206207
// gRPC completion queue. Once they do, it is safe to delete this `GrpcStream`
@@ -231,6 +232,9 @@ class GrpcStream : public GrpcCall {
231232
internal::BufferedWriter buffered_writer_;
232233

233234
std::vector<GrpcCompletion*> completions_;
235+
236+
// gRPC asserts that a call is finished exactly once.
237+
bool is_grpc_call_finished_ = false;
234238
};
235239

236240
} // namespace remote

Firestore/core/test/firebase/firestore/remote/grpc_stream_test.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,54 @@ TEST_F(GrpcStreamTest, ObserverCanImmediatelyDestroyStreamOnFinishAndNotify) {
448448
});
449449
}
450450

451+
// Double finish
452+
453+
TEST_F(GrpcStreamTest, DoubleFinish_FailThenFinishImmediately) {
454+
worker_queue.EnqueueBlocking([&] { stream->Start(); });
455+
456+
ForceFinish({{Type::Read, Error}});
457+
KeepPollingGrpcQueue();
458+
worker_queue.EnqueueBlocking(
459+
[&] { EXPECT_NO_THROW(stream->FinishImmediately()); });
460+
}
461+
462+
TEST_F(GrpcStreamTest, DoubleFinish_FailThenWriteAndFinish) {
463+
worker_queue.EnqueueBlocking([&] { stream->Start(); });
464+
465+
ForceFinish({{Type::Read, Error}});
466+
KeepPollingGrpcQueue();
467+
worker_queue.EnqueueBlocking(
468+
[&] { EXPECT_NO_THROW(stream->WriteAndFinish({})); });
469+
}
470+
471+
TEST_F(GrpcStreamTest, DoubleFinish_FailThenFailAgain) {
472+
worker_queue.EnqueueBlocking([&] {
473+
stream->Start();
474+
stream->Write({});
475+
});
476+
477+
int failures_count = 0;
478+
auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) {
479+
switch (completion->type()) {
480+
case Type::Read:
481+
case Type::Write:
482+
++failures_count;
483+
completion->Complete(false);
484+
return failures_count == 2;
485+
default:
486+
UnexpectedType(completion);
487+
return true;
488+
}
489+
});
490+
future.wait();
491+
worker_queue.EnqueueBlocking([] {});
492+
493+
// Normally, "Finish" never fails, but for the test it's easier to abuse the
494+
// finish operation that has already been enqueued by `OnOperationFailed`
495+
// rather than adding a new operation.
496+
ForceFinish({{Type::Finish, Error}});
497+
}
498+
451499
} // namespace remote
452500
} // namespace firestore
453501
} // namespace firebase

0 commit comments

Comments
 (0)