From 090bd08712bcda2703f52e39bc8033c0c1910c89 Mon Sep 17 00:00:00 2001 From: Konstantin Varlamov Date: Thu, 1 Nov 2018 19:02:55 -0400 Subject: [PATCH 1/4] Initial --- .../firebase_credentials_provider_apple.mm | 11 +++- .../firestore/remote/grpc_connection.mm | 5 ++ .../firebase/firestore/remote/grpc_stream.cc | 54 +++++++------------ .../firebase/firestore/remote/grpc_stream.h | 4 +- 4 files changed, 34 insertions(+), 40 deletions(-) diff --git a/Firestore/core/src/firebase/firestore/auth/firebase_credentials_provider_apple.mm b/Firestore/core/src/firebase/firestore/auth/firebase_credentials_provider_apple.mm index f167d3f8d67..6c411b22613 100644 --- a/Firestore/core/src/firebase/firestore/auth/firebase_credentials_provider_apple.mm +++ b/Firestore/core/src/firebase/firestore/auth/firebase_credentials_provider_apple.mm @@ -120,19 +120,26 @@ } }; + bool force_refresh = false; + { + std::lock_guard lock{contents_->mutex}; + force_refresh = contents_->force_refresh; + contents_->force_refresh = false; + } + // TODO(wilhuff): Need a better abstraction over a missing auth provider. if (contents_->auth) { - [contents_->auth getTokenForcingRefresh:contents_->force_refresh + [contents_->auth getTokenForcingRefresh:force_refresh withCallback:get_token_callback]; } else { // If there's no Auth provider, call back immediately with a nil // (unauthenticated) token. get_token_callback(nil, nil); } - contents_->force_refresh = false; } void FirebaseCredentialsProvider::InvalidateToken() { + std::lock_guard lock{contents_->mutex}; contents_->force_refresh = true; } diff --git a/Firestore/core/src/firebase/firestore/remote/grpc_connection.mm b/Firestore/core/src/firebase/firestore/remote/grpc_connection.mm index 6c3aa43a5be..e9234e777b5 100644 --- a/Firestore/core/src/firebase/firestore/remote/grpc_connection.mm +++ b/Firestore/core/src/firebase/firestore/remote/grpc_connection.mm @@ -227,6 +227,11 @@ bool HasSpecialConfig(const std::string& host) { call->FinishAndNotify(Status{FirestoreErrorCode::Unavailable, "Network connectivity changed"}); } + // The old channel may hang for a long time trying to reestablish + // connection before eventually failing. Note that gRPC Objective-C + // client does the same thing: + // https://github.com/grpc/grpc/blob/master/src/objective-c/GRPCClient/private/GRPCHost.m#L309-L314 + grpc_channel_.reset(); }); } diff --git a/Firestore/core/src/firebase/firestore/remote/grpc_stream.cc b/Firestore/core/src/firebase/firestore/remote/grpc_stream.cc index 922c7734faa..289833ec3ba 100644 --- a/Firestore/core/src/firebase/firestore/remote/grpc_stream.cc +++ b/Firestore/core/src/firebase/firestore/remote/grpc_stream.cc @@ -175,27 +175,20 @@ void GrpcStream::FinishAndNotify(const Status& status) { void GrpcStream::Shutdown() { MaybeUnregister(); if (completions_.empty()) { - // Nothing to cancel. + // Nothing to cancel -- either the call was already finished, or it has + // never been started. return; } // Important: since the stream always has a pending read operation, // cancellation has to be called, or else the read would hang forever, and // finish operation will never get completed. - // // (on the other hand, when an operation fails, cancellation should not be // called, otherwise the real failure cause will be overwritten by status // "canceled".) context_->TryCancel(); - - // The observer is not interested in this event -- since it initiated the - // finish operation, the observer must know the reason. - GrpcCompletion* completion = NewCompletion(Type::Finish, {}); - // TODO(varconst): is issuing a finish operation necessary in this case? We - // don't care about the status, but perhaps it will make the server notice - // client disconnecting sooner? - call_->Finish(completion->status(), completion); - + FinishCall({}); + // Wait until "finish" is off the queue. FastFinishCompletionsBlocking(); } @@ -206,6 +199,14 @@ void GrpcStream::MaybeUnregister() { } } +void GrpcStream::FinishCall(const OnSuccess& callback) { + // All completions issued by this call must be taken off the queue before + // finish operation can be enqueued. + FastFinishCompletionsBlocking(); + GrpcCompletion* completion = NewCompletion(Type::Finish, callback); + call_->Finish(completion->status(), completion); +} + void GrpcStream::FastFinishCompletionsBlocking() { // TODO(varconst): reset buffered_writer_? Should not be necessary, because it // should never be called again after a call to Finish. @@ -276,29 +277,10 @@ void GrpcStream::OnWrite() { } void GrpcStream::OnOperationFailed() { - if (is_finishing_) { - // `Finish` itself cannot fail. If another failed operation already - // triggered `Finish`, there's nothing to do. - return; - } - - is_finishing_ = true; - - if (observer_) { - GrpcCompletion* completion = - NewCompletion(Type::Finish, [this](const GrpcCompletion* completion) { - OnFinishedByServer(*completion->status()); - }); - call_->Finish(completion->status(), completion); - } else { - // The only reason to finish would be to get the status; if the observer is - // no longer interested, there is no need to do that. - Shutdown(); - } -} - -void GrpcStream::OnFinishedByServer(const grpc::Status& status) { - FinishAndNotify(ConvertStatus(status)); + FinishCall([this](const GrpcCompletion* completion) { + Status status = ConvertStatus(*completion->status()); + FinishAndNotify(status); + }); } void GrpcStream::RemoveCompletion(const GrpcCompletion* to_remove) { @@ -315,7 +297,9 @@ GrpcCompletion* GrpcStream::NewCompletion(Type tag, RemoveCompletion(completion); if (ok) { - on_success(completion); + if (on_success) { + on_success(completion); + } } else { // Use the same error-handling for all operations; all errors are // unrecoverable. diff --git a/Firestore/core/src/firebase/firestore/remote/grpc_stream.h b/Firestore/core/src/firebase/firestore/remote/grpc_stream.h index cb627dd6448..355c6afd205 100644 --- a/Firestore/core/src/firebase/firestore/remote/grpc_stream.h +++ b/Firestore/core/src/firebase/firestore/remote/grpc_stream.h @@ -191,12 +191,12 @@ class GrpcStream : public GrpcCall { void OnRead(const grpc::ByteBuffer& message); void OnWrite(); void OnOperationFailed(); - void OnFinishedByServer(const grpc::Status& status); void RemoveCompletion(const GrpcCompletion* to_remove); using OnSuccess = std::function; GrpcCompletion* NewCompletion(GrpcCompletion::Type type, const OnSuccess& callback); + void FinishCall(const OnSuccess& callback); // Blocks until all the completions issued by this stream come out from the // gRPC completion queue. Once they do, it is safe to delete this `GrpcStream` @@ -227,8 +227,6 @@ class GrpcStream : public GrpcCall { internal::BufferedWriter buffered_writer_; std::vector completions_; - - bool is_finishing_ = false; }; } // namespace remote From 9408698d1ce5bd1de11dcae7853cd21f9fa1b1b3 Mon Sep 17 00:00:00 2001 From: Konstantin Varlamov Date: Thu, 1 Nov 2018 19:13:53 -0400 Subject: [PATCH 2/4] Comment --- Firestore/core/src/firebase/firestore/remote/grpc_stream.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Firestore/core/src/firebase/firestore/remote/grpc_stream.h b/Firestore/core/src/firebase/firestore/remote/grpc_stream.h index 355c6afd205..9667ed6b708 100644 --- a/Firestore/core/src/firebase/firestore/remote/grpc_stream.h +++ b/Firestore/core/src/firebase/firestore/remote/grpc_stream.h @@ -196,6 +196,10 @@ class GrpcStream : public GrpcCall { using OnSuccess = std::function; GrpcCompletion* NewCompletion(GrpcCompletion::Type type, const OnSuccess& callback); + // Finishes the underlying gRPC call. Must always be invoked on any call that + // was started. Presumes that any pending completions will quickly come off + // the queue and will block until they do, so this must only be invoked when + // the current call either failed (`OnOperationFailed`) or canceled. void FinishCall(const OnSuccess& callback); // Blocks until all the completions issued by this stream come out from the From 5a1324d2ea197fbe588f267cd59262d41e716a01 Mon Sep 17 00:00:00 2001 From: Konstantin Varlamov Date: Fri, 2 Nov 2018 18:11:20 -0400 Subject: [PATCH 3/4] Review feedback --- .../auth/firebase_credentials_provider_apple.mm | 11 ++--------- .../src/firebase/firestore/remote/grpc_connection.mm | 2 +- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/Firestore/core/src/firebase/firestore/auth/firebase_credentials_provider_apple.mm b/Firestore/core/src/firebase/firestore/auth/firebase_credentials_provider_apple.mm index 6c411b22613..f167d3f8d67 100644 --- a/Firestore/core/src/firebase/firestore/auth/firebase_credentials_provider_apple.mm +++ b/Firestore/core/src/firebase/firestore/auth/firebase_credentials_provider_apple.mm @@ -120,26 +120,19 @@ } }; - bool force_refresh = false; - { - std::lock_guard lock{contents_->mutex}; - force_refresh = contents_->force_refresh; - contents_->force_refresh = false; - } - // TODO(wilhuff): Need a better abstraction over a missing auth provider. if (contents_->auth) { - [contents_->auth getTokenForcingRefresh:force_refresh + [contents_->auth getTokenForcingRefresh:contents_->force_refresh withCallback:get_token_callback]; } else { // If there's no Auth provider, call back immediately with a nil // (unauthenticated) token. get_token_callback(nil, nil); } + contents_->force_refresh = false; } void FirebaseCredentialsProvider::InvalidateToken() { - std::lock_guard lock{contents_->mutex}; contents_->force_refresh = true; } diff --git a/Firestore/core/src/firebase/firestore/remote/grpc_connection.mm b/Firestore/core/src/firebase/firestore/remote/grpc_connection.mm index e9234e777b5..12f084513de 100644 --- a/Firestore/core/src/firebase/firestore/remote/grpc_connection.mm +++ b/Firestore/core/src/firebase/firestore/remote/grpc_connection.mm @@ -230,7 +230,7 @@ bool HasSpecialConfig(const std::string& host) { // The old channel may hang for a long time trying to reestablish // connection before eventually failing. Note that gRPC Objective-C // client does the same thing: - // https://github.com/grpc/grpc/blob/master/src/objective-c/GRPCClient/private/GRPCHost.m#L309-L314 + // https://github.com/grpc/grpc/blob/fe11db09575f2dfbe1f88cd44bd417acc168e354/src/objective-c/GRPCClient/private/GRPCHost.m#L309-L314 grpc_channel_.reset(); }); } From 69db1e0caef8f7e52a6429aef2906f2712d1f0cb Mon Sep 17 00:00:00 2001 From: Konstantin Varlamov Date: Mon, 5 Nov 2018 15:59:47 -0500 Subject: [PATCH 4/4] Fix unit tests which relied on the old finish logic --- .../Firestore_IntegrationTests_iOS.xcscheme | 2 + .../xcschemes/Firestore_Tests_iOS.xcscheme | 2 + .../firebase/firestore/remote/grpc_stream.cc | 6 +- .../firestore/remote/grpc_stream_test.cc | 59 +++++++++++-------- .../remote/grpc_streaming_reader_test.cc | 26 ++++---- .../firebase/firestore/remote/stream_test.mm | 27 +++++---- .../firestore/util/grpc_stream_tester.cc | 33 +++++++++-- .../firestore/util/grpc_stream_tester.h | 16 +++++ 8 files changed, 113 insertions(+), 58 deletions(-) diff --git a/Firestore/Example/Firestore.xcodeproj/xcshareddata/xcschemes/Firestore_IntegrationTests_iOS.xcscheme b/Firestore/Example/Firestore.xcodeproj/xcshareddata/xcschemes/Firestore_IntegrationTests_iOS.xcscheme index 4e6f82845e2..21f813cde3d 100644 --- a/Firestore/Example/Firestore.xcodeproj/xcshareddata/xcschemes/Firestore_IntegrationTests_iOS.xcscheme +++ b/Firestore/Example/Firestore.xcodeproj/xcshareddata/xcschemes/Firestore_IntegrationTests_iOS.xcscheme @@ -26,6 +26,8 @@ buildConfiguration = "Debug" selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB" selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB" + enableASanStackUseAfterReturn = "YES" + enableThreadSanitizer = "YES" shouldUseLaunchSchemeArgsEnv = "YES"> status()); - FinishAndNotify(status); - }); + Status status = ConvertStatus(*completion->status()); + FinishAndNotify(status); + }); } void GrpcStream::RemoveCompletion(const GrpcCompletion* to_remove) { diff --git a/Firestore/core/test/firebase/firestore/remote/grpc_stream_test.cc b/Firestore/core/test/firebase/firestore/remote/grpc_stream_test.cc index 7d07b94920b..3e7214f5ef6 100644 --- a/Firestore/core/test/firebase/firestore/remote/grpc_stream_test.cc +++ b/Firestore/core/test/firebase/firestore/remote/grpc_stream_test.cc @@ -149,6 +149,11 @@ class GrpcStreamTest : public testing::Test { return {states}; } + void UnexpectedType(GrpcCompletion* completion) { + ADD_FAILURE() << "Unexpected completion type " + << static_cast(completion->type()); + } + AsyncQueue worker_queue; std::unique_ptr connectivity_monitor; @@ -239,8 +244,7 @@ TEST_F(GrpcStreamTest, CanAddSeveralWrites) { completion->Complete(true); break; default: - ADD_FAILURE() << "Unexpected completion type " - << static_cast(completion->type()); + UnexpectedType(completion); break; } @@ -316,28 +320,31 @@ TEST_F(GrpcStreamTest, ErrorOnWrite) { }); bool failed_write = false; - ForceFinish([&](GrpcCompletion* completion) { + auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) { switch (completion->type()) { case Type::Read: - completion->Complete(true); - break; + // After a write is failed, fail the read too. + completion->Complete(!failed_write); + return false; case Type::Write: failed_write = true; completion->Complete(false); - break; + return false; + + case Type::Finish: + EXPECT_TRUE(failed_write); + *completion->status() = grpc::Status{grpc::ABORTED, ""}; + completion->Complete(true); + return true; default: - ADD_FAILURE() << "Unexpected completion type " - << static_cast(completion->type()); - break; + UnexpectedType(completion); + return false; } - - return failed_write; }); - - ForceFinish( - {{Type::Read, Error}, {Type::Finish, grpc::Status{grpc::ABORTED, ""}}}); + future.wait(); + worker_queue.EnqueueBlocking([] {}); EXPECT_EQ(observed_states().back(), "OnStreamFinish(Aborted)"); } @@ -351,25 +358,27 @@ TEST_F(GrpcStreamTest, ErrorWithPendingWrites) { }); bool failed_write = false; - ForceFinish([&](GrpcCompletion* completion) { + auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) { switch (completion->type()) { case Type::Read: - completion->Complete(true); - break; + completion->Complete(!failed_write); + return false; case Type::Write: failed_write = true; completion->Complete(false); - break; + return false; + case Type::Finish: + EXPECT_TRUE(failed_write); + *completion->status() = grpc::Status{grpc::UNAVAILABLE, ""}; + completion->Complete(true); + return true; default: - ADD_FAILURE() << "Unexpected completion type " - << static_cast(completion->type()); - break; + UnexpectedType(completion); + return false; } - - return failed_write; }); - ForceFinish({{Type::Read, Error}, - {Type::Finish, grpc::Status{grpc::UNAVAILABLE, ""}}}); + future.wait(); + worker_queue.EnqueueBlocking([] {}); EXPECT_EQ(observed_states().back(), "OnStreamFinish(Unavailable)"); } diff --git a/Firestore/core/test/firebase/firestore/remote/grpc_streaming_reader_test.cc b/Firestore/core/test/firebase/firestore/remote/grpc_streaming_reader_test.cc index 75fa9d12000..183326e5899 100644 --- a/Firestore/core/test/firebase/firestore/remote/grpc_streaming_reader_test.cc +++ b/Firestore/core/test/firebase/firestore/remote/grpc_streaming_reader_test.cc @@ -214,31 +214,33 @@ TEST_F(GrpcStreamingReaderTest, ErrorOnWrite) { StartReader(); bool failed_write = false; - // Callback is used because it's indeterminate whether one or two read - // operations will have a chance to succeed. - ForceFinish([&](GrpcCompletion* completion) { + auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) { switch (completion->type()) { case Type::Read: - completion->Complete(true); - break; + // After a write is failed, fail the read too. + completion->Complete(!failed_write); + return false; case Type::Write: failed_write = true; completion->Complete(false); - break; + return false; + + case Type::Finish: + EXPECT_TRUE(failed_write); + *completion->status() = grpc::Status{grpc::RESOURCE_EXHAUSTED, ""}; + completion->Complete(true); + return true; default: ADD_FAILURE() << "Unexpected completion type " << static_cast(completion->type()); - break; + return false; } - - return failed_write; }); + future.wait(); + worker_queue.EnqueueBlocking([] {}); - ForceFinish( - {{Type::Read, Error}, - {Type::Finish, grpc::Status{grpc::StatusCode::RESOURCE_EXHAUSTED, ""}}}); ASSERT_TRUE(status.has_value()); EXPECT_EQ(status.value().code(), FirestoreErrorCode::ResourceExhausted); EXPECT_TRUE(responses.empty()); diff --git a/Firestore/core/test/firebase/firestore/remote/stream_test.mm b/Firestore/core/test/firebase/firestore/remote/stream_test.mm index 921fc4bdd68..d138c9c3b8a 100644 --- a/Firestore/core/test/firebase/firestore/remote/stream_test.mm +++ b/Firestore/core/test/firebase/firestore/remote/stream_test.mm @@ -461,31 +461,32 @@ void StartStream() { worker_queue.EnqueueBlocking([&] { firestore_stream->WriteEmptyBuffer(); }); bool failed_write = false; - // Callback is used because it's indeterminate whether one or two read - // operations will have a chance to succeed. - ForceFinish([&](GrpcCompletion* completion) { + auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) { switch (completion->type()) { case Type::Read: - completion->Complete(true); - break; + // After a write is failed, fail the read too. + completion->Complete(!failed_write); + return false; case Type::Write: failed_write = true; completion->Complete(false); - break; + return false; + + case Type::Finish: + EXPECT_TRUE(failed_write); + *completion->status() = grpc::Status{grpc::UNAUTHENTICATED, ""}; + completion->Complete(true); + return true; default: ADD_FAILURE() << "Unexpected completion type " << static_cast(completion->type()); - break; + return false; } - - return failed_write; }); - - ForceFinish( - {{Type::Read, Error}, - {Type::Finish, grpc::Status{grpc::StatusCode::UNAUTHENTICATED, ""}}}); + future.wait(); + worker_queue.EnqueueBlocking([] {}); worker_queue.EnqueueBlocking([&] { EXPECT_FALSE(firestore_stream->IsStarted()); diff --git a/Firestore/core/test/firebase/firestore/util/grpc_stream_tester.cc b/Firestore/core/test/firebase/firestore/util/grpc_stream_tester.cc index 8ab4992200c..6dc69246865 100644 --- a/Firestore/core/test/firebase/firestore/util/grpc_stream_tester.cc +++ b/Firestore/core/test/firebase/firestore/util/grpc_stream_tester.cc @@ -149,7 +149,10 @@ GrpcCompletion* FakeGrpcQueue::ExtractCompletion() { "gRPC completion queue must only be polled on the dedicated executor"); bool ignored_ok = false; void* tag = nullptr; - grpc_queue_->Next(&tag, &ignored_ok); + bool has_more = grpc_queue_->Next(&tag, &ignored_ok); + if (!has_more) { + return nullptr; + } return static_cast(tag); } @@ -174,14 +177,29 @@ void FakeGrpcQueue::ExtractCompletions(const CompletionCallback& callback) { void FakeGrpcQueue::KeepPolling() { dedicated_executor_->Execute([&] { - void* tag = nullptr; - bool ignored_ok = false; - while (grpc_queue_->Next(&tag, &ignored_ok)) { - static_cast(tag)->Complete(true); + for (auto* completion = ExtractCompletion(); completion != nullptr; + completion = ExtractCompletion()) { + completion->Complete(true); } }); } +std::future FakeGrpcQueue::KeepPolling( + const CompletionCallback& callback) { + current_promise_ = {}; + + dedicated_executor_->Execute([=] { + bool done = false; + while (!done) { + auto* completion = ExtractCompletion(); + done = callback(completion); + } + current_promise_.set_value(); + }); + + return current_promise_.get_future(); +} + // GrpcStreamTester GrpcStreamTester::GrpcStreamTester(AsyncQueue* worker_queue, @@ -281,6 +299,11 @@ GrpcStreamTester::CreateAnyTypeOrderCallback( }; } +std::future GrpcStreamTester::ForceFinishAsync( + const CompletionCallback& callback) { + return fake_grpc_queue_.KeepPolling(callback); +} + void GrpcStreamTester::KeepPollingGrpcQueue() { fake_grpc_queue_.KeepPolling(); } diff --git a/Firestore/core/test/firebase/firestore/util/grpc_stream_tester.h b/Firestore/core/test/firebase/firestore/util/grpc_stream_tester.h index 9fd561bb694..3b242501f15 100644 --- a/Firestore/core/test/firebase/firestore/util/grpc_stream_tester.h +++ b/Firestore/core/test/firebase/firestore/util/grpc_stream_tester.h @@ -18,6 +18,7 @@ #define FIRESTORE_CORE_TEST_FIREBASE_FIRESTORE_UTIL_GRPC_STREAM_TESTER_H_ #include +#include // NOLINT(build/c++11) #include #include #include @@ -106,6 +107,7 @@ class FakeGrpcQueue { void ExtractCompletions(std::initializer_list results); void ExtractCompletions(const CompletionCallback& callback); void KeepPolling(); + std::future KeepPolling(const CompletionCallback& callback); void Shutdown(); @@ -119,6 +121,8 @@ class FakeGrpcQueue { std::unique_ptr dedicated_executor_; grpc::CompletionQueue* grpc_queue_; bool is_shut_down_ = false; + + std::promise current_promise_; }; /** @@ -215,6 +219,14 @@ class GrpcStreamTester { grpc::ClientContext* context, std::initializer_list results); + /** + * Will asynchronously continuously pull gRPC completion queue and delegate + * handling all the completions taken off to the given `callback`, until the + * callback returns true (interpreted as "done"). Returns a future that will + * finish once the callback returns "done". + */ + std::future ForceFinishAsync(const CompletionCallback& callback); + /** * Creates a `CompletionCallback` from given `results` which is equivalent to * what `ForceFinishAnyTypeOrder` would use, but doesn't run it. @@ -222,6 +234,10 @@ class GrpcStreamTester { static CompletionCallback CreateAnyTypeOrderCallback( std::initializer_list results); + /** + * Will asynchronously continuously pull gRPC completion queue and apply "Ok" + * to every completion that comes off the queue. + */ void KeepPollingGrpcQueue(); void ShutdownGrpcQueue();