Skip to content

gRPC: fix cases where gRPC call could be finished twice #2146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Dec 5, 2018
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 66 additions & 33 deletions Firestore/core/src/firebase/firestore/remote/grpc_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "Firestore/core/src/firebase/firestore/remote/grpc_connection.h"
#include "Firestore/core/src/firebase/firestore/remote/grpc_util.h"
#include "Firestore/core/src/firebase/firestore/util/log.h"

namespace firebase {
namespace firestore {
Expand Down Expand Up @@ -96,6 +97,7 @@ GrpcStream::GrpcStream(
}

GrpcStream::~GrpcStream() {
LOG_DEBUG("GrpcStream('%s'): destroying stream", this);
HARD_ASSERT(completions_.empty(),
"GrpcStream is being destroyed without proper shutdown");
MaybeUnregister();
Expand Down Expand Up @@ -156,11 +158,15 @@ void GrpcStream::MaybeWrite(absl::optional<BufferedWrite> maybe_write) {
}

void GrpcStream::FinishImmediately() {
LOG_DEBUG("GrpcStream('%s'): finishing without notifying observers", this);

Shutdown();
UnsetObserver();
}

void GrpcStream::FinishAndNotify(const Status& status) {
LOG_DEBUG("GrpcStream('%s'): finishing and notifying observers", this);

Shutdown();

if (observer_) {
Expand All @@ -173,22 +179,28 @@ void GrpcStream::FinishAndNotify(const Status& status) {
}

void GrpcStream::Shutdown() {
LOG_DEBUG(
"GrpcStream('%s'): shutting down; completion(s): %s, is finished: %s",
this, completions_.size(), is_grpc_call_finished_);

MaybeUnregister();
if (completions_.empty()) {
// Nothing to cancel -- either the call was already finished, or it has
// never been started.
return;

if (!completions_.empty() && !is_grpc_call_finished_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't entirely understand why both conditions must hold here. It seems as if is_grpc_call_finished_ should be sufficient, no?

In particular this suggests that it's possible for e.g. completions_ to be empty but is_grpc_call_finished_ to be false and we wouldn't want to call FinishGrpcCall, but that seems wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this suggests that it's possible for e.g. completions_ to be empty but is_grpc_call_finished_ to be false and we wouldn't want to call FinishGrpcCall, but that seems wrong.

The stream is in this state before it is started. Finishing a non-started stream would also fail a gRPC assertion. I don't think we should ever run into the case when Stream tries to Finish a GrpcStream before it is started. I'm mainly doing this because it's an easy check to make.

If you think protecting against this case isn't worthwhile, I'll simplify the check. Otherwise, I'll add a comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Looking at the prior state more closely there was a comment there that explained this and it made sense. It's possible you may want to merge something of that comment with what you have below.

// Important: since the stream always has a pending read operation,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is hard to parse because:

  • it conflates pre- and post-conditions
  • doesn't clearly indicate normal and error operation
  • uses ambiguous phrasing (e.g. "cancellation" could seemingly apply to either the context or the call)

Consider phrasing something like:

Important: during normal operation, the stream always has a pending read operation so Shutdown would hang indefinitely if we didn't cancel the context_. However, if the stream has already failed, avoid canceling the context to avoid overwriting the status captured during the OnOperationFailed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I like this wording.

// cancellation has to be called, or else the read would hang forever, and
// finish operation will never get completed.
// (on the other hand, when an operation fails, cancellation should not be
// called, otherwise the real failure cause will be overwritten by status
// "canceled".)
context_->TryCancel();
FinishGrpcCall({});
}

// Important: since the stream always has a pending read operation,
// cancellation has to be called, or else the read would hang forever, and
// finish operation will never get completed.
// (on the other hand, when an operation fails, cancellation should not be
// called, otherwise the real failure cause will be overwritten by status
// "canceled".)
context_->TryCancel();
FinishCall({});
// Wait until "finish" is off the queue.
// Drain the completions -- `Shutdown` guarantees to bring the stream into
// destructible state. Two possibilities here:
// - completions are empty -- nothing to block on;
// - the only completion is "finish" (whether enqueued by this function or
// previously) -- "finish" is a very fast operation.
FastFinishCompletionsBlocking();
}

Expand All @@ -199,7 +211,12 @@ void GrpcStream::MaybeUnregister() {
}
}

void GrpcStream::FinishCall(const OnSuccess& callback) {
void GrpcStream::FinishGrpcCall(const OnSuccess& callback) {
LOG_DEBUG("GrpcStream('%s'): finishing the underlying call", this);

HARD_ASSERT(!is_grpc_call_finished_, "FinishGrpcCall called twice");
is_grpc_call_finished_ = true;

// All completions issued by this call must be taken off the queue before
// finish operation can be enqueued.
FastFinishCompletionsBlocking();
Expand All @@ -208,6 +225,9 @@ void GrpcStream::FinishCall(const OnSuccess& callback) {
}

void GrpcStream::FastFinishCompletionsBlocking() {
LOG_DEBUG("GrpcStream('%s'): fast finishing %s completion(s)", this,
completions_.size());

// TODO(varconst): reset buffered_writer_? Should not be necessary, because it
// should never be called again after a call to Finish.

Expand All @@ -226,30 +246,35 @@ void GrpcStream::FastFinishCompletionsBlocking() {

bool GrpcStream::WriteAndFinish(grpc::ByteBuffer&& message) {
bool did_last_write = false;
if (!is_grpc_call_finished_) {
did_last_write = TryLastWrite(std::move(message));
}
FinishImmediately();
return did_last_write;
}

bool GrpcStream::TryLastWrite(grpc::ByteBuffer&& message) {
absl::optional<BufferedWrite> maybe_write =
buffered_writer_.EnqueueWrite(std::move(message));
// Only bother with the last write if there is no active write at the moment.
if (maybe_write) {
BufferedWrite last_write = std::move(maybe_write).value();
GrpcCompletion* completion = NewCompletion(Type::Write, {});
*completion->message() = last_write.message;
call_->WriteLast(*completion->message(), grpc::WriteOptions{}, completion);

// Empirically, the write normally takes less than a millisecond to finish
// (both with and without network connection), and never more than several
// dozen milliseconds. Nevertheless, ensure `WriteAndFinish` doesn't hang if
// there happen to be circumstances under which the write may block
// indefinitely (in that case, rely on the fact that canceling a gRPC call
// makes all pending operations come back from the queue quickly).
auto status = completion->WaitUntilOffQueue(std::chrono::milliseconds(500));
if (status == std::future_status::ready) {
did_last_write = true;
}
if (!maybe_write) {
return false;
}

FinishImmediately();
return did_last_write;
BufferedWrite last_write = std::move(maybe_write).value();
GrpcCompletion* completion = NewCompletion(Type::Write, {});
*completion->message() = last_write.message;
call_->WriteLast(*completion->message(), grpc::WriteOptions{}, completion);

// Empirically, the write normally takes less than a millisecond to finish
// (both with and without network connection), and never more than several
// dozen milliseconds. Nevertheless, ensure `WriteAndFinish` doesn't hang if
// there happen to be circumstances under which the write may block
// indefinitely (in that case, rely on the fact that canceling a gRPC call
// makes all pending operations come back from the queue quickly).

auto status = completion->WaitUntilOffQueue(std::chrono::milliseconds(500));
return status == std::future_status::ready;
}

GrpcStream::Metadata GrpcStream::GetResponseHeaders() const {
Expand Down Expand Up @@ -277,7 +302,13 @@ void GrpcStream::OnWrite() {
}

void GrpcStream::OnOperationFailed() {
FinishCall([this](const GrpcCompletion* completion) {
if (is_grpc_call_finished_) {
// If a finish operation has been enqueued already (possibly by a previous
// failed operation), there's nothing to do.
return;
}

FinishGrpcCall([this](const GrpcCompletion* completion) {
Status status = ConvertStatus(*completion->status());
FinishAndNotify(status);
});
Expand All @@ -303,6 +334,8 @@ GrpcCompletion* GrpcStream::NewCompletion(Type tag,
} else {
// Use the same error-handling for all operations; all errors are
// unrecoverable.
LOG_DEBUG("GrpcStream('%s'): operation of type %s failed", this,
completion->type());
OnOperationFailed();
}
};
Expand Down
6 changes: 5 additions & 1 deletion Firestore/core/src/firebase/firestore/remote/grpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class GrpcStream : public GrpcCall {
private:
void Read();
void MaybeWrite(absl::optional<internal::BufferedWrite> maybe_write);
bool TryLastWrite(grpc::ByteBuffer&& message);

void Shutdown();
void UnsetObserver() {
Expand All @@ -200,7 +201,7 @@ class GrpcStream : public GrpcCall {
// was started. Presumes that any pending completions will quickly come off
// the queue and will block until they do, so this must only be invoked when
// the current call either failed (`OnOperationFailed`) or canceled.
void FinishCall(const OnSuccess& callback);
void FinishGrpcCall(const OnSuccess& callback);

// Blocks until all the completions issued by this stream come out from the
// gRPC completion queue. Once they do, it is safe to delete this `GrpcStream`
Expand Down Expand Up @@ -231,6 +232,9 @@ class GrpcStream : public GrpcCall {
internal::BufferedWriter buffered_writer_;

std::vector<GrpcCompletion*> completions_;

// gRPC asserts that a call is finished exactly once.
bool is_grpc_call_finished_ = false;
};

} // namespace remote
Expand Down
48 changes: 48 additions & 0 deletions Firestore/core/test/firebase/firestore/remote/grpc_stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,54 @@ TEST_F(GrpcStreamTest, ObserverCanImmediatelyDestroyStreamOnFinishAndNotify) {
});
}

// Double finish

TEST_F(GrpcStreamTest, DoubleFinish_FailThenFinishImmediately) {
worker_queue.EnqueueBlocking([&] { stream->Start(); });

ForceFinish({{Type::Read, Error}});
KeepPollingGrpcQueue();
worker_queue.EnqueueBlocking(
[&] { EXPECT_NO_THROW(stream->FinishImmediately()); });
}

TEST_F(GrpcStreamTest, DoubleFinish_FailThenWriteAndFinish) {
worker_queue.EnqueueBlocking([&] { stream->Start(); });

ForceFinish({{Type::Read, Error}});
KeepPollingGrpcQueue();
worker_queue.EnqueueBlocking(
[&] { EXPECT_NO_THROW(stream->WriteAndFinish({})); });
}

TEST_F(GrpcStreamTest, DoubleFinish_FailThenFailAgain) {
worker_queue.EnqueueBlocking([&] {
stream->Start();
stream->Write({});
});

int failures_count = 0;
auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) {
switch (completion->type()) {
case Type::Read:
case Type::Write:
++failures_count;
completion->Complete(false);
return failures_count == 2;
default:
UnexpectedType(completion);
return true;
}
});
future.wait();
worker_queue.EnqueueBlocking([] {});

// Normally, "Finish" never fails, but for the test it's easier to abuse the
// finish operation that has already been enqueued by `OnOperationFailed`
// rather than adding a new operation.
ForceFinish({{Type::Finish, Error}});
}

} // namespace remote
} // namespace firestore
} // namespace firebase