-
Notifications
You must be signed in to change notification settings - Fork 1.6k
gRPC: fix cases where gRPC call could be finished twice #2146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
372013b
12f58f9
8a3ddef
2830823
779c0d4
09d151f
5f779b4
785f7e2
0ba8b55
0dd6b08
11cd356
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ | |
|
||
#include "Firestore/core/src/firebase/firestore/remote/grpc_connection.h" | ||
#include "Firestore/core/src/firebase/firestore/remote/grpc_util.h" | ||
#include "Firestore/core/src/firebase/firestore/util/log.h" | ||
|
||
namespace firebase { | ||
namespace firestore { | ||
|
@@ -96,6 +97,7 @@ GrpcStream::GrpcStream( | |
} | ||
|
||
GrpcStream::~GrpcStream() { | ||
LOG_DEBUG("GrpcStream('%s'): destroying stream", this); | ||
HARD_ASSERT(completions_.empty(), | ||
"GrpcStream is being destroyed without proper shutdown"); | ||
MaybeUnregister(); | ||
|
@@ -156,11 +158,15 @@ void GrpcStream::MaybeWrite(absl::optional<BufferedWrite> maybe_write) { | |
} | ||
|
||
void GrpcStream::FinishImmediately() { | ||
LOG_DEBUG("GrpcStream('%s'): finishing without notifying observers", this); | ||
|
||
Shutdown(); | ||
UnsetObserver(); | ||
} | ||
|
||
void GrpcStream::FinishAndNotify(const Status& status) { | ||
LOG_DEBUG("GrpcStream('%s'): finishing and notifying observers", this); | ||
|
||
Shutdown(); | ||
|
||
if (observer_) { | ||
|
@@ -173,22 +179,28 @@ void GrpcStream::FinishAndNotify(const Status& status) { | |
} | ||
|
||
void GrpcStream::Shutdown() { | ||
LOG_DEBUG( | ||
"GrpcStream('%s'): shutting down; completion(s): %s, is finished: %s", | ||
this, completions_.size(), is_grpc_call_finished_); | ||
|
||
MaybeUnregister(); | ||
if (completions_.empty()) { | ||
// Nothing to cancel -- either the call was already finished, or it has | ||
// never been started. | ||
return; | ||
|
||
if (!completions_.empty() && !is_grpc_call_finished_) { | ||
// Important: since the stream always has a pending read operation, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is hard to parse because:
Consider phrasing something like:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, I like this wording. |
||
// cancellation has to be called, or else the read would hang forever, and | ||
// finish operation will never get completed. | ||
// (on the other hand, when an operation fails, cancellation should not be | ||
// called, otherwise the real failure cause will be overwritten by status | ||
// "canceled".) | ||
context_->TryCancel(); | ||
FinishGrpcCall({}); | ||
} | ||
|
||
// Important: since the stream always has a pending read operation, | ||
// cancellation has to be called, or else the read would hang forever, and | ||
// finish operation will never get completed. | ||
// (on the other hand, when an operation fails, cancellation should not be | ||
// called, otherwise the real failure cause will be overwritten by status | ||
// "canceled".) | ||
context_->TryCancel(); | ||
FinishCall({}); | ||
// Wait until "finish" is off the queue. | ||
// Drain the completions -- `Shutdown` guarantees to bring the stream into | ||
// destructible state. Two possibilities here: | ||
// - completions are empty -- nothing to block on; | ||
// - the only completion is "finish" (whether enqueued by this function or | ||
// previously) -- "finish" is a very fast operation. | ||
FastFinishCompletionsBlocking(); | ||
} | ||
|
||
|
@@ -199,7 +211,12 @@ void GrpcStream::MaybeUnregister() { | |
} | ||
} | ||
|
||
void GrpcStream::FinishCall(const OnSuccess& callback) { | ||
void GrpcStream::FinishGrpcCall(const OnSuccess& callback) { | ||
LOG_DEBUG("GrpcStream('%s'): finishing the underlying call", this); | ||
|
||
HARD_ASSERT(!is_grpc_call_finished_, "FinishGrpcCall called twice"); | ||
is_grpc_call_finished_ = true; | ||
|
||
// All completions issued by this call must be taken off the queue before | ||
// finish operation can be enqueued. | ||
FastFinishCompletionsBlocking(); | ||
|
@@ -208,6 +225,9 @@ void GrpcStream::FinishCall(const OnSuccess& callback) { | |
} | ||
|
||
void GrpcStream::FastFinishCompletionsBlocking() { | ||
LOG_DEBUG("GrpcStream('%s'): fast finishing %s completion(s)", this, | ||
completions_.size()); | ||
|
||
// TODO(varconst): reset buffered_writer_? Should not be necessary, because it | ||
// should never be called again after a call to Finish. | ||
|
||
|
@@ -226,30 +246,35 @@ void GrpcStream::FastFinishCompletionsBlocking() { | |
|
||
bool GrpcStream::WriteAndFinish(grpc::ByteBuffer&& message) { | ||
bool did_last_write = false; | ||
if (!is_grpc_call_finished_) { | ||
did_last_write = TryLastWrite(std::move(message)); | ||
wilhuff marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
FinishImmediately(); | ||
return did_last_write; | ||
} | ||
|
||
bool GrpcStream::TryLastWrite(grpc::ByteBuffer&& message) { | ||
absl::optional<BufferedWrite> maybe_write = | ||
buffered_writer_.EnqueueWrite(std::move(message)); | ||
// Only bother with the last write if there is no active write at the moment. | ||
if (maybe_write) { | ||
BufferedWrite last_write = std::move(maybe_write).value(); | ||
GrpcCompletion* completion = NewCompletion(Type::Write, {}); | ||
*completion->message() = last_write.message; | ||
call_->WriteLast(*completion->message(), grpc::WriteOptions{}, completion); | ||
|
||
// Empirically, the write normally takes less than a millisecond to finish | ||
// (both with and without network connection), and never more than several | ||
// dozen milliseconds. Nevertheless, ensure `WriteAndFinish` doesn't hang if | ||
// there happen to be circumstances under which the write may block | ||
// indefinitely (in that case, rely on the fact that canceling a gRPC call | ||
// makes all pending operations come back from the queue quickly). | ||
auto status = completion->WaitUntilOffQueue(std::chrono::milliseconds(500)); | ||
if (status == std::future_status::ready) { | ||
did_last_write = true; | ||
} | ||
if (!maybe_write) { | ||
return false; | ||
} | ||
|
||
FinishImmediately(); | ||
return did_last_write; | ||
BufferedWrite last_write = std::move(maybe_write).value(); | ||
GrpcCompletion* completion = NewCompletion(Type::Write, {}); | ||
*completion->message() = last_write.message; | ||
call_->WriteLast(*completion->message(), grpc::WriteOptions{}, completion); | ||
|
||
// Empirically, the write normally takes less than a millisecond to finish | ||
// (both with and without network connection), and never more than several | ||
// dozen milliseconds. Nevertheless, ensure `WriteAndFinish` doesn't hang if | ||
// there happen to be circumstances under which the write may block | ||
// indefinitely (in that case, rely on the fact that canceling a gRPC call | ||
// makes all pending operations come back from the queue quickly). | ||
|
||
auto status = completion->WaitUntilOffQueue(std::chrono::milliseconds(500)); | ||
return status == std::future_status::ready; | ||
} | ||
|
||
GrpcStream::Metadata GrpcStream::GetResponseHeaders() const { | ||
|
@@ -277,7 +302,13 @@ void GrpcStream::OnWrite() { | |
} | ||
|
||
void GrpcStream::OnOperationFailed() { | ||
FinishCall([this](const GrpcCompletion* completion) { | ||
if (is_grpc_call_finished_) { | ||
// If a finish operation has been enqueued already (possibly by a previous | ||
// failed operation), there's nothing to do. | ||
return; | ||
} | ||
|
||
FinishGrpcCall([this](const GrpcCompletion* completion) { | ||
Status status = ConvertStatus(*completion->status()); | ||
FinishAndNotify(status); | ||
}); | ||
|
@@ -303,6 +334,8 @@ GrpcCompletion* GrpcStream::NewCompletion(Type tag, | |
} else { | ||
// Use the same error-handling for all operations; all errors are | ||
// unrecoverable. | ||
LOG_DEBUG("GrpcStream('%s'): operation of type %s failed", this, | ||
completion->type()); | ||
OnOperationFailed(); | ||
} | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't entirely understand why both conditions must hold here. It seems as if
is_grpc_call_finished_
should be sufficient, no?In particular this suggests that it's possible for e.g.
completions_
to be empty butis_grpc_call_finished_
to be false and we wouldn't want to callFinishGrpcCall
, but that seems wrong.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stream is in this state before it is started. Finishing a non-started stream would also fail a gRPC assertion. I don't think we should ever run into the case when
Stream
tries toFinish
aGrpcStream
before it is started. I'm mainly doing this because it's an easy check to make.If you think protecting against this case isn't worthwhile, I'll simplify the check. Otherwise, I'll add a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Looking at the prior state more closely there was a comment there that explained this and it made sense. It's possible you may want to merge something of that comment with what you have below.