Skip to content

gRPC: fix bugs found during testing #2039

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
enableASanStackUseAfterReturn = "YES"
enableThreadSanitizer = "YES"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
<TestableReference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
enableASanStackUseAfterReturn = "YES"
enableThreadSanitizer = "YES"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
<TestableReference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ bool HasSpecialConfig(const std::string& host) {
call->FinishAndNotify(Status{FirestoreErrorCode::Unavailable,
"Network connectivity changed"});
}
// The old channel may hang for a long time trying to reestablish
// connection before eventually failing. Note that gRPC Objective-C
// client does the same thing:
// https://github.com/grpc/grpc/blob/fe11db09575f2dfbe1f88cd44bd417acc168e354/src/objective-c/GRPCClient/private/GRPCHost.m#L309-L314
grpc_channel_.reset();
});
}

Expand Down
54 changes: 19 additions & 35 deletions Firestore/core/src/firebase/firestore/remote/grpc_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,27 +175,20 @@ void GrpcStream::FinishAndNotify(const Status& status) {
void GrpcStream::Shutdown() {
MaybeUnregister();
if (completions_.empty()) {
// Nothing to cancel.
// Nothing to cancel -- either the call was already finished, or it has
// never been started.
return;
}

// Important: since the stream always has a pending read operation,
// cancellation has to be called, or else the read would hang forever, and
// finish operation will never get completed.
//
// (on the other hand, when an operation fails, cancellation should not be
// called, otherwise the real failure cause will be overwritten by status
// "canceled".)
context_->TryCancel();

// The observer is not interested in this event -- since it initiated the
// finish operation, the observer must know the reason.
GrpcCompletion* completion = NewCompletion(Type::Finish, {});
// TODO(varconst): is issuing a finish operation necessary in this case? We
// don't care about the status, but perhaps it will make the server notice
// client disconnecting sooner?
call_->Finish(completion->status(), completion);

FinishCall({});
// Wait until "finish" is off the queue.
FastFinishCompletionsBlocking();
}

Expand All @@ -206,6 +199,14 @@ void GrpcStream::MaybeUnregister() {
}
}

void GrpcStream::FinishCall(const OnSuccess& callback) {
// All completions issued by this call must be taken off the queue before
// finish operation can be enqueued.
FastFinishCompletionsBlocking();
Copy link
Contributor Author

@var-const var-const Nov 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in line to what gRPC folks described:

  • calls should always be finished;
  • all tags must be taken off the queue before Finish can be called.

The crash I saw happened when there were two calls to GenericClientAsyncReaderWriter::Finish. I'm not 100% sure, but one way it could happen is if network connectivity changed in-between OnOperationFailed and OnFinishedByServer (which is line to how I ran into this, trying to switch off network randomly):

  • a stream has both a pending read and a pending write;
  • one of those, say, the read, failed and triggered OnOperationFailed. At this point, the read is already removed from completions_, but the write is still there;
  • OnOperationFailed calls Finish on the gRPC call. The write is still on the completion queue;
  • after this, but before the finish operation completes and invokes OnFinishedByServer, network connectivity changes, which invokes FinishAndNotify on this stream;
  • FinishAndNotify invokes Shutdown, which notices completions_ are not empty (there is still that write there), which it takes as a signal to call Finish on the gRPC call. However, this call has already been finished, which triggers an assertion failure in gRPC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see what you mean. In general performing all operations on the async queue should help alleviate the need for this kind of reasoning, but that's hard to do down here.

At the higher level we impose the restriction that callbacks are only honored if they were fired before a major state transition. Such a strategy doesn't work well for listeners registered to get callbacks indefinitely. An alternative is to ensure that routines called by the network monitor are idempotent.

If I'm reading correctly it seems that the latter is what you've done here. I'm down with that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think so. The new logic is that before Finish is invoked on the gRPC call, pending completions are always drained. Because Finish is a no-op when there are no completions, it effectively does make Finish idempotent.

GrpcCompletion* completion = NewCompletion(Type::Finish, callback);
call_->Finish(completion->status(), completion);
}

void GrpcStream::FastFinishCompletionsBlocking() {
// TODO(varconst): reset buffered_writer_? Should not be necessary, because it
// should never be called again after a call to Finish.
Expand Down Expand Up @@ -276,29 +277,10 @@ void GrpcStream::OnWrite() {
}

void GrpcStream::OnOperationFailed() {
if (is_finishing_) {
// `Finish` itself cannot fail. If another failed operation already
// triggered `Finish`, there's nothing to do.
return;
}

is_finishing_ = true;

if (observer_) {
GrpcCompletion* completion =
NewCompletion(Type::Finish, [this](const GrpcCompletion* completion) {
OnFinishedByServer(*completion->status());
});
call_->Finish(completion->status(), completion);
} else {
// The only reason to finish would be to get the status; if the observer is
// no longer interested, there is no need to do that.
Shutdown();
}
}

void GrpcStream::OnFinishedByServer(const grpc::Status& status) {
FinishAndNotify(ConvertStatus(status));
FinishCall([this](const GrpcCompletion* completion) {
Status status = ConvertStatus(*completion->status());
FinishAndNotify(status);
});
}

void GrpcStream::RemoveCompletion(const GrpcCompletion* to_remove) {
Expand All @@ -315,7 +297,9 @@ GrpcCompletion* GrpcStream::NewCompletion(Type tag,
RemoveCompletion(completion);

if (ok) {
on_success(completion);
if (on_success) {
on_success(completion);
}
} else {
// Use the same error-handling for all operations; all errors are
// unrecoverable.
Expand Down
8 changes: 5 additions & 3 deletions Firestore/core/src/firebase/firestore/remote/grpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,16 @@ class GrpcStream : public GrpcCall {
void OnRead(const grpc::ByteBuffer& message);
void OnWrite();
void OnOperationFailed();
void OnFinishedByServer(const grpc::Status& status);
void RemoveCompletion(const GrpcCompletion* to_remove);

using OnSuccess = std::function<void(const GrpcCompletion*)>;
GrpcCompletion* NewCompletion(GrpcCompletion::Type type,
const OnSuccess& callback);
// Finishes the underlying gRPC call. Must always be invoked on any call that
// was started. Presumes that any pending completions will quickly come off
// the queue and will block until they do, so this must only be invoked when
// the current call either failed (`OnOperationFailed`) or canceled.
void FinishCall(const OnSuccess& callback);

// Blocks until all the completions issued by this stream come out from the
// gRPC completion queue. Once they do, it is safe to delete this `GrpcStream`
Expand Down Expand Up @@ -227,8 +231,6 @@ class GrpcStream : public GrpcCall {
internal::BufferedWriter buffered_writer_;

std::vector<GrpcCompletion*> completions_;

bool is_finishing_ = false;
};

} // namespace remote
Expand Down
59 changes: 34 additions & 25 deletions Firestore/core/test/firebase/firestore/remote/grpc_stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ class GrpcStreamTest : public testing::Test {
return {states};
}

void UnexpectedType(GrpcCompletion* completion) {
ADD_FAILURE() << "Unexpected completion type "
<< static_cast<int>(completion->type());
}

AsyncQueue worker_queue;

std::unique_ptr<ConnectivityMonitor> connectivity_monitor;
Expand Down Expand Up @@ -239,8 +244,7 @@ TEST_F(GrpcStreamTest, CanAddSeveralWrites) {
completion->Complete(true);
break;
default:
ADD_FAILURE() << "Unexpected completion type "
<< static_cast<int>(completion->type());
UnexpectedType(completion);
break;
}

Expand Down Expand Up @@ -316,28 +320,31 @@ TEST_F(GrpcStreamTest, ErrorOnWrite) {
});

bool failed_write = false;
ForceFinish([&](GrpcCompletion* completion) {
auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) {
switch (completion->type()) {
case Type::Read:
completion->Complete(true);
break;
// After a write is failed, fail the read too.
completion->Complete(!failed_write);
return false;

case Type::Write:
failed_write = true;
completion->Complete(false);
break;
return false;

case Type::Finish:
EXPECT_TRUE(failed_write);
*completion->status() = grpc::Status{grpc::ABORTED, ""};
completion->Complete(true);
return true;

default:
ADD_FAILURE() << "Unexpected completion type "
<< static_cast<int>(completion->type());
break;
UnexpectedType(completion);
return false;
}

return failed_write;
});

ForceFinish(
{{Type::Read, Error}, {Type::Finish, grpc::Status{grpc::ABORTED, ""}}});
future.wait();
worker_queue.EnqueueBlocking([] {});

EXPECT_EQ(observed_states().back(), "OnStreamFinish(Aborted)");
}
Expand All @@ -351,25 +358,27 @@ TEST_F(GrpcStreamTest, ErrorWithPendingWrites) {
});

bool failed_write = false;
ForceFinish([&](GrpcCompletion* completion) {
auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) {
switch (completion->type()) {
case Type::Read:
completion->Complete(true);
break;
completion->Complete(!failed_write);
return false;
case Type::Write:
failed_write = true;
completion->Complete(false);
break;
return false;
case Type::Finish:
EXPECT_TRUE(failed_write);
*completion->status() = grpc::Status{grpc::UNAVAILABLE, ""};
completion->Complete(true);
return true;
default:
ADD_FAILURE() << "Unexpected completion type "
<< static_cast<int>(completion->type());
break;
UnexpectedType(completion);
return false;
}

return failed_write;
});
ForceFinish({{Type::Read, Error},
{Type::Finish, grpc::Status{grpc::UNAVAILABLE, ""}}});
future.wait();
worker_queue.EnqueueBlocking([] {});

EXPECT_EQ(observed_states().back(), "OnStreamFinish(Unavailable)");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,31 +214,33 @@ TEST_F(GrpcStreamingReaderTest, ErrorOnWrite) {
StartReader();

bool failed_write = false;
// Callback is used because it's indeterminate whether one or two read
// operations will have a chance to succeed.
ForceFinish([&](GrpcCompletion* completion) {
auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) {
switch (completion->type()) {
case Type::Read:
completion->Complete(true);
break;
// After a write is failed, fail the read too.
completion->Complete(!failed_write);
return false;

case Type::Write:
failed_write = true;
completion->Complete(false);
break;
return false;

case Type::Finish:
EXPECT_TRUE(failed_write);
*completion->status() = grpc::Status{grpc::RESOURCE_EXHAUSTED, ""};
completion->Complete(true);
return true;

default:
ADD_FAILURE() << "Unexpected completion type "
<< static_cast<int>(completion->type());
break;
return false;
}

return failed_write;
});
future.wait();
worker_queue.EnqueueBlocking([] {});

ForceFinish(
{{Type::Read, Error},
{Type::Finish, grpc::Status{grpc::StatusCode::RESOURCE_EXHAUSTED, ""}}});
ASSERT_TRUE(status.has_value());
EXPECT_EQ(status.value().code(), FirestoreErrorCode::ResourceExhausted);
EXPECT_TRUE(responses.empty());
Expand Down
27 changes: 14 additions & 13 deletions Firestore/core/test/firebase/firestore/remote/stream_test.mm
Original file line number Diff line number Diff line change
Expand Up @@ -461,31 +461,32 @@ void StartStream() {
worker_queue.EnqueueBlocking([&] { firestore_stream->WriteEmptyBuffer(); });

bool failed_write = false;
// Callback is used because it's indeterminate whether one or two read
// operations will have a chance to succeed.
ForceFinish([&](GrpcCompletion* completion) {
auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) {
switch (completion->type()) {
case Type::Read:
completion->Complete(true);
break;
// After a write is failed, fail the read too.
completion->Complete(!failed_write);
return false;

case Type::Write:
failed_write = true;
completion->Complete(false);
break;
return false;

case Type::Finish:
EXPECT_TRUE(failed_write);
*completion->status() = grpc::Status{grpc::UNAUTHENTICATED, ""};
completion->Complete(true);
return true;

default:
ADD_FAILURE() << "Unexpected completion type "
<< static_cast<int>(completion->type());
break;
return false;
}

return failed_write;
});

ForceFinish(
{{Type::Read, Error},
{Type::Finish, grpc::Status{grpc::StatusCode::UNAUTHENTICATED, ""}}});
future.wait();
worker_queue.EnqueueBlocking([] {});

worker_queue.EnqueueBlocking([&] {
EXPECT_FALSE(firestore_stream->IsStarted());
Expand Down
Loading