Skip to content

Commit e3f1514

Browse files
authored
Reapply "status: fix/improve status handling (#6662)" (#6673) (#6688)
1 parent 696faa9 commit e3f1514

File tree

8 files changed

+211
-43
lines changed

8 files changed

+211
-43
lines changed

Diff for: internal/status/status.go

+28
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,34 @@ type Status struct {
4343
s *spb.Status
4444
}
4545

46+
// NewWithProto returns a new status including details from statusProto. This
47+
// is meant to be used by the gRPC library only.
48+
func NewWithProto(code codes.Code, message string, statusProto []string) *Status {
49+
if len(statusProto) != 1 {
50+
// No grpc-status-details bin header, or multiple; just ignore.
51+
return &Status{s: &spb.Status{Code: int32(code), Message: message}}
52+
}
53+
st := &spb.Status{}
54+
if err := proto.Unmarshal([]byte(statusProto[0]), st); err != nil {
55+
// Probably not a google.rpc.Status proto; do not provide details.
56+
return &Status{s: &spb.Status{Code: int32(code), Message: message}}
57+
}
58+
if st.Code == int32(code) {
59+
// The codes match between the grpc-status header and the
60+
// grpc-status-details-bin header; use the full details proto.
61+
return &Status{s: st}
62+
}
63+
return &Status{
64+
s: &spb.Status{
65+
Code: int32(codes.Internal),
66+
Message: fmt.Sprintf(
67+
"grpc-status-details-bin mismatch: grpc-status=%v, grpc-message=%q, grpc-status-details-bin=%+v",
68+
code, message, st,
69+
),
70+
},
71+
}
72+
}
73+
4674
// New returns a Status representing c and msg.
4775
func New(c codes.Code, msg string) *Status {
4876
return &Status{s: &spb.Status{Code: int32(c), Message: msg}}

Diff for: internal/stubserver/stubserver.go

+45-10
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"testing"
2828
"time"
2929

30+
"golang.org/x/net/http2"
3031
"google.golang.org/grpc"
3132
"google.golang.org/grpc/connectivity"
3233
"google.golang.org/grpc/credentials/insecure"
@@ -110,8 +111,7 @@ func RegisterServiceServerOption(f func(*grpc.Server)) grpc.ServerOption {
110111
return &registerServiceServerOption{f: f}
111112
}
112113

113-
// StartServer only starts the server. It does not create a client to it.
114-
func (ss *StubServer) StartServer(sopts ...grpc.ServerOption) error {
114+
func (ss *StubServer) setupServer(sopts ...grpc.ServerOption) (net.Listener, error) {
115115
if ss.Network == "" {
116116
ss.Network = "tcp"
117117
}
@@ -127,24 +127,59 @@ func (ss *StubServer) StartServer(sopts ...grpc.ServerOption) error {
127127
var err error
128128
lis, err = net.Listen(ss.Network, ss.Address)
129129
if err != nil {
130-
return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
130+
return nil, fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
131131
}
132132
}
133133
ss.Address = lis.Addr().String()
134-
ss.cleanups = append(ss.cleanups, func() { lis.Close() })
135134

136-
s := grpc.NewServer(sopts...)
135+
ss.S = grpc.NewServer(sopts...)
137136
for _, so := range sopts {
138137
switch x := so.(type) {
139138
case *registerServiceServerOption:
140-
x.f(s)
139+
x.f(ss.S)
140+
}
141+
}
142+
143+
testgrpc.RegisterTestServiceServer(ss.S, ss)
144+
ss.cleanups = append(ss.cleanups, ss.S.Stop)
145+
return lis, nil
146+
}
147+
148+
// StartHandlerServer only starts an HTTP server with a gRPC server as the
149+
// handler. It does not create a client to it. Cannot be used in a StubServer
150+
// that also used StartServer.
151+
func (ss *StubServer) StartHandlerServer(sopts ...grpc.ServerOption) error {
152+
lis, err := ss.setupServer(sopts...)
153+
if err != nil {
154+
return err
155+
}
156+
157+
go func() {
158+
hs := &http2.Server{}
159+
opts := &http2.ServeConnOpts{Handler: ss.S}
160+
for {
161+
conn, err := lis.Accept()
162+
if err != nil {
163+
return
164+
}
165+
hs.ServeConn(conn, opts)
141166
}
167+
}()
168+
ss.cleanups = append(ss.cleanups, func() { lis.Close() })
169+
170+
return nil
171+
}
172+
173+
// StartServer only starts the server. It does not create a client to it.
174+
// Cannot be used in a StubServer that also used StartHandlerServer.
175+
func (ss *StubServer) StartServer(sopts ...grpc.ServerOption) error {
176+
lis, err := ss.setupServer(sopts...)
177+
if err != nil {
178+
return err
142179
}
143180

144-
testgrpc.RegisterTestServiceServer(s, ss)
145-
go s.Serve(lis)
146-
ss.cleanups = append(ss.cleanups, s.Stop)
147-
ss.S = s
181+
go ss.S.Serve(lis)
182+
148183
return nil
149184
}
150185

Diff for: internal/transport/handler_server.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -220,18 +220,20 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
220220
h.Set("Grpc-Message", encodeGrpcMessage(m))
221221
}
222222

223+
s.hdrMu.Lock()
223224
if p := st.Proto(); p != nil && len(p.Details) > 0 {
225+
delete(s.trailer, grpcStatusDetailsBinHeader)
224226
stBytes, err := proto.Marshal(p)
225227
if err != nil {
226228
// TODO: return error instead, when callers are able to handle it.
227229
panic(err)
228230
}
229231

230-
h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))
232+
h.Set(grpcStatusDetailsBinHeader, encodeBinHeader(stBytes))
231233
}
232234

233-
if md := s.Trailer(); len(md) > 0 {
234-
for k, vv := range md {
235+
if len(s.trailer) > 0 {
236+
for k, vv := range s.trailer {
235237
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
236238
if isReservedHeader(k) {
237239
continue
@@ -243,6 +245,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
243245
}
244246
}
245247
}
248+
s.hdrMu.Unlock()
246249
})
247250

248251
if err == nil { // transport has not been closed
@@ -287,7 +290,7 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
287290
}
288291

289292
// writeCustomHeaders sets custom headers set on the stream via SetHeader
290-
// on the first write call (Write, WriteHeader, or WriteStatus).
293+
// on the first write call (Write, WriteHeader, or WriteStatus)
291294
func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {
292295
h := ht.rw.Header()
293296

Diff for: internal/transport/http2_client.go

+2-11
Original file line numberDiff line numberDiff line change
@@ -1399,7 +1399,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
13991399
mdata = make(map[string][]string)
14001400
contentTypeErr = "malformed header: missing HTTP content-type"
14011401
grpcMessage string
1402-
statusGen *status.Status
14031402
recvCompress string
14041403
httpStatusCode *int
14051404
httpStatusErr string
@@ -1434,12 +1433,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
14341433
rawStatusCode = codes.Code(uint32(code))
14351434
case "grpc-message":
14361435
grpcMessage = decodeGrpcMessage(hf.Value)
1437-
case "grpc-status-details-bin":
1438-
var err error
1439-
statusGen, err = decodeGRPCStatusDetails(hf.Value)
1440-
if err != nil {
1441-
headerError = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)
1442-
}
14431436
case ":status":
14441437
if hf.Value == "200" {
14451438
httpStatusErr = ""
@@ -1548,14 +1541,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
15481541
return
15491542
}
15501543

1551-
if statusGen == nil {
1552-
statusGen = status.New(rawStatusCode, grpcMessage)
1553-
}
1544+
status := istatus.NewWithProto(rawStatusCode, grpcMessage, mdata[grpcStatusDetailsBinHeader])
15541545

15551546
// If client received END_STREAM from server while stream was still active,
15561547
// send RST_STREAM.
15571548
rstStream := s.getState() == streamActive
1558-
t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, statusGen, mdata, true)
1549+
t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, status, mdata, true)
15591550
}
15601551

15611552
// readServerPreface reads and handles the initial settings frame from the

Diff for: internal/transport/http2_server.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -1057,12 +1057,15 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
10571057
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
10581058

10591059
if p := st.Proto(); p != nil && len(p.Details) > 0 {
1060+
// Do not use the user's grpc-status-details-bin (if present) if we are
1061+
// even attempting to set our own.
1062+
delete(s.trailer, grpcStatusDetailsBinHeader)
10601063
stBytes, err := proto.Marshal(p)
10611064
if err != nil {
10621065
// TODO: return error instead, when callers are able to handle it.
10631066
t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
10641067
} else {
1065-
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
1068+
headerFields = append(headerFields, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)})
10661069
}
10671070
}
10681071

Diff for: internal/transport/http_util.go

+2-16
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,9 @@ import (
3434
"time"
3535
"unicode/utf8"
3636

37-
"github.com/golang/protobuf/proto"
3837
"golang.org/x/net/http2"
3938
"golang.org/x/net/http2/hpack"
40-
spb "google.golang.org/genproto/googleapis/rpc/status"
4139
"google.golang.org/grpc/codes"
42-
"google.golang.org/grpc/status"
4340
)
4441

4542
const (
@@ -88,6 +85,8 @@ var (
8885
}
8986
)
9087

88+
var grpcStatusDetailsBinHeader = "grpc-status-details-bin"
89+
9190
// isReservedHeader checks whether hdr belongs to HTTP2 headers
9291
// reserved by gRPC protocol. Any other headers are classified as the
9392
// user-specified metadata.
@@ -103,7 +102,6 @@ func isReservedHeader(hdr string) bool {
103102
"grpc-message",
104103
"grpc-status",
105104
"grpc-timeout",
106-
"grpc-status-details-bin",
107105
// Intentionally exclude grpc-previous-rpc-attempts and
108106
// grpc-retry-pushback-ms, which are "reserved", but their API
109107
// intentionally works via metadata.
@@ -154,18 +152,6 @@ func decodeMetadataHeader(k, v string) (string, error) {
154152
return v, nil
155153
}
156154

157-
func decodeGRPCStatusDetails(rawDetails string) (*status.Status, error) {
158-
v, err := decodeBinHeader(rawDetails)
159-
if err != nil {
160-
return nil, err
161-
}
162-
st := &spb.Status{}
163-
if err = proto.Unmarshal(v, st); err != nil {
164-
return nil, err
165-
}
166-
return status.FromProto(st), nil
167-
}
168-
169155
type timeoutUnit uint8
170156

171157
const (

0 commit comments

Comments
 (0)