@@ -175,20 +175,27 @@ void GrpcStream::FinishAndNotify(const Status& status) {
175
175
void GrpcStream::Shutdown () {
176
176
MaybeUnregister ();
177
177
if (completions_.empty ()) {
178
- // Nothing to cancel -- either the call was already finished, or it has
179
- // never been started.
178
+ // Nothing to cancel.
180
179
return ;
181
180
}
182
181
183
182
// Important: since the stream always has a pending read operation,
184
183
// cancellation has to be called, or else the read would hang forever, and
185
184
// finish operation will never get completed.
185
+ //
186
186
// (on the other hand, when an operation fails, cancellation should not be
187
187
// called, otherwise the real failure cause will be overwritten by status
188
188
// "canceled".)
189
189
context_->TryCancel ();
190
- FinishCall ({});
191
- // Wait until "finish" is off the queue.
190
+
191
+ // The observer is not interested in this event -- since it initiated the
192
+ // finish operation, the observer must know the reason.
193
+ GrpcCompletion* completion = NewCompletion (Type::Finish, {});
194
+ // TODO(varconst): is issuing a finish operation necessary in this case? We
195
+ // don't care about the status, but perhaps it will make the server notice
196
+ // client disconnecting sooner?
197
+ call_->Finish (completion->status (), completion);
198
+
192
199
FastFinishCompletionsBlocking ();
193
200
}
194
201
@@ -199,14 +206,6 @@ void GrpcStream::MaybeUnregister() {
199
206
}
200
207
}
201
208
202
- void GrpcStream::FinishCall (const OnSuccess& callback) {
203
- // All completions issued by this call must be taken off the queue before
204
- // finish operation can be enqueued.
205
- FastFinishCompletionsBlocking ();
206
- GrpcCompletion* completion = NewCompletion (Type::Finish, callback);
207
- call_->Finish (completion->status (), completion);
208
- }
209
-
210
209
void GrpcStream::FastFinishCompletionsBlocking () {
211
210
// TODO(varconst): reset buffered_writer_? Should not be necessary, because it
212
211
// should never be called again after a call to Finish.
@@ -277,10 +276,29 @@ void GrpcStream::OnWrite() {
277
276
}
278
277
279
278
void GrpcStream::OnOperationFailed () {
280
- FinishCall ([this ](const GrpcCompletion* completion) {
281
- Status status = ConvertStatus (*completion->status ());
282
- FinishAndNotify (status);
283
- });
279
+ if (is_finishing_) {
280
+ // `Finish` itself cannot fail. If another failed operation already
281
+ // triggered `Finish`, there's nothing to do.
282
+ return ;
283
+ }
284
+
285
+ is_finishing_ = true ;
286
+
287
+ if (observer_) {
288
+ GrpcCompletion* completion =
289
+ NewCompletion (Type::Finish, [this ](const GrpcCompletion* completion) {
290
+ OnFinishedByServer (*completion->status ());
291
+ });
292
+ call_->Finish (completion->status (), completion);
293
+ } else {
294
+ // The only reason to finish would be to get the status; if the observer is
295
+ // no longer interested, there is no need to do that.
296
+ Shutdown ();
297
+ }
298
+ }
299
+
300
+ void GrpcStream::OnFinishedByServer (const grpc::Status& status) {
301
+ FinishAndNotify (ConvertStatus (status));
284
302
}
285
303
286
304
void GrpcStream::RemoveCompletion (const GrpcCompletion* to_remove) {
@@ -297,9 +315,7 @@ GrpcCompletion* GrpcStream::NewCompletion(Type tag,
297
315
RemoveCompletion (completion);
298
316
299
317
if (ok) {
300
- if (on_success) {
301
- on_success (completion);
302
- }
318
+ on_success (completion);
303
319
} else {
304
320
// Use the same error-handling for all operations; all errors are
305
321
// unrecoverable.
0 commit comments