-
Notifications
You must be signed in to change notification settings - Fork 1.6k
gRPC: fix bugs found during testing #2039
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -120,19 +120,26 @@ | |
} | ||
}; | ||
|
||
bool force_refresh = false; | ||
{ | ||
std::lock_guard<std::mutex> lock{contents_->mutex}; | ||
force_refresh = contents_->force_refresh; | ||
contents_->force_refresh = false; | ||
} | ||
|
||
// TODO(wilhuff): Need a better abstraction over a missing auth provider. | ||
if (contents_->auth) { | ||
[contents_->auth getTokenForcingRefresh:contents_->force_refresh | ||
[contents_->auth getTokenForcingRefresh:force_refresh | ||
withCallback:get_token_callback]; | ||
} else { | ||
// If there's no Auth provider, call back immediately with a nil | ||
// (unauthenticated) token. | ||
get_token_callback(nil, nil); | ||
} | ||
contents_->force_refresh = false; | ||
} | ||
|
||
void FirebaseCredentialsProvider::InvalidateToken() { | ||
std::lock_guard<std::mutex> lock{contents_->mutex}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I presume this is also necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the surface it seems this shouldn't be necessary since However, I can also see how this might be considered a race by tsan because we're accessing this object with a lock held in some cases but not in others. Meanwhile it's definitely easier to reason about this code if the contents of However, that line of reasoning suggests that unprotected access to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this wasn't introduced by gRPC changes, I think I'll just keep this change off this PR for now and upload it separately later on. |
||
contents_->force_refresh = true; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -227,6 +227,11 @@ bool HasSpecialConfig(const std::string& host) { | |
call->FinishAndNotify(Status{FirestoreErrorCode::Unavailable, | ||
"Network connectivity changed"}); | ||
} | ||
// The old channel may hang for a long time trying to reestablish | ||
// connection before eventually failing. Note that gRPC Objective-C | ||
// client does the same thing: | ||
// https://github.com/grpc/grpc/blob/master/src/objective-c/GRPCClient/private/GRPCHost.m#L309-L314 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These line number references aren't stable. Use a permalink There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done (I actually had that at first, but was concerned about the gargantuan length of the link). |
||
grpc_channel_.reset(); | ||
}); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -175,27 +175,20 @@ void GrpcStream::FinishAndNotify(const Status& status) { | |
void GrpcStream::Shutdown() { | ||
MaybeUnregister(); | ||
if (completions_.empty()) { | ||
// Nothing to cancel. | ||
// Nothing to cancel -- either the call was already finished, or it has | ||
// never been started. | ||
return; | ||
} | ||
|
||
// Important: since the stream always has a pending read operation, | ||
// cancellation has to be called, or else the read would hang forever, and | ||
// finish operation will never get completed. | ||
// | ||
// (on the other hand, when an operation fails, cancellation should not be | ||
// called, otherwise the real failure cause will be overwritten by status | ||
// "canceled".) | ||
context_->TryCancel(); | ||
|
||
// The observer is not interested in this event -- since it initiated the | ||
// finish operation, the observer must know the reason. | ||
GrpcCompletion* completion = NewCompletion(Type::Finish, {}); | ||
// TODO(varconst): is issuing a finish operation necessary in this case? We | ||
// don't care about the status, but perhaps it will make the server notice | ||
// client disconnecting sooner? | ||
call_->Finish(completion->status(), completion); | ||
|
||
FinishCall({}); | ||
// Wait until "finish" is off the queue. | ||
FastFinishCompletionsBlocking(); | ||
} | ||
|
||
|
@@ -206,6 +199,14 @@ void GrpcStream::MaybeUnregister() { | |
} | ||
} | ||
|
||
void GrpcStream::FinishCall(const OnSuccess& callback) { | ||
// All completions issued by this call must be taken off the queue before | ||
// finish operation can be enqueued. | ||
FastFinishCompletionsBlocking(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is in line to what gRPC folks described:
The crash I saw happened when there were two calls to
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can see what you mean. In general performing all operations on the async queue should help alleviate the need for this kind of reasoning, but that's hard to do down here. At the higher level we impose the restriction that callbacks are only honored if they were fired before a major state transition. Such a strategy doesn't work well for listeners registered to get callbacks indefinitely. An alternative is to ensure that routines called by the network monitor are idempotent. If I'm reading correctly it seems that the latter is what you've done here. I'm down with that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I think so. The new logic is that before |
||
GrpcCompletion* completion = NewCompletion(Type::Finish, callback); | ||
call_->Finish(completion->status(), completion); | ||
} | ||
|
||
void GrpcStream::FastFinishCompletionsBlocking() { | ||
// TODO(varconst): reset buffered_writer_? Should not be necessary, because it | ||
// should never be called again after a call to Finish. | ||
|
@@ -276,29 +277,10 @@ void GrpcStream::OnWrite() { | |
} | ||
|
||
void GrpcStream::OnOperationFailed() { | ||
if (is_finishing_) { | ||
// `Finish` itself cannot fail. If another failed operation already | ||
// triggered `Finish`, there's nothing to do. | ||
return; | ||
} | ||
|
||
is_finishing_ = true; | ||
|
||
if (observer_) { | ||
GrpcCompletion* completion = | ||
NewCompletion(Type::Finish, [this](const GrpcCompletion* completion) { | ||
OnFinishedByServer(*completion->status()); | ||
}); | ||
call_->Finish(completion->status(), completion); | ||
} else { | ||
// The only reason to finish would be to get the status; if the observer is | ||
// no longer interested, there is no need to do that. | ||
Shutdown(); | ||
} | ||
} | ||
|
||
void GrpcStream::OnFinishedByServer(const grpc::Status& status) { | ||
FinishAndNotify(ConvertStatus(status)); | ||
FinishCall([this](const GrpcCompletion* completion) { | ||
Status status = ConvertStatus(*completion->status()); | ||
FinishAndNotify(status); | ||
}); | ||
} | ||
|
||
void GrpcStream::RemoveCompletion(const GrpcCompletion* to_remove) { | ||
|
@@ -315,7 +297,9 @@ GrpcCompletion* GrpcStream::NewCompletion(Type tag, | |
RemoveCompletion(completion); | ||
|
||
if (ok) { | ||
on_success(completion); | ||
if (on_success) { | ||
on_success(completion); | ||
} | ||
} else { | ||
// Use the same error-handling for all operations; all errors are | ||
// unrecoverable. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread sanitizer reported a race here.