Skip to content

Commit 1267707

Browse files
Can Gulermenghanl
Can Guler
authored andcommitted
transport: Fix the inconsistency between headerChan and headerDone (#2818)
transport: Fix the inconsistency between headerChan and headerDone
1 parent 69d1464 commit 1267707

File tree

3 files changed

+36
-17
lines changed

3 files changed

+36
-17
lines changed

internal/transport/http2_client.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
549549
s.write(recvMsg{err: err})
550550
close(s.done)
551551
// If headerChan isn't closed, then close it.
552-
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
552+
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
553553
close(s.headerChan)
554554
}
555555

@@ -713,7 +713,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
713713
s.write(recvMsg{err: err})
714714
}
715715
// If headerChan isn't closed, then close it.
716-
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
716+
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
717717
s.noHeaders = true
718718
close(s.headerChan)
719719
}
@@ -1142,26 +1142,24 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
11421142
}
11431143
endStream := frame.StreamEnded()
11441144
atomic.StoreUint32(&s.bytesReceived, 1)
1145-
initialHeader := atomic.SwapUint32(&s.headerDone, 1) == 0
1145+
initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
11461146

11471147
if !initialHeader && !endStream {
1148-
// As specified by RFC 7540, a HEADERS frame (and associated CONTINUATION frames) can only appear
1149-
// at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
1148+
// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
11501149
st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
11511150
t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
11521151
return
11531152
}
11541153

11551154
state := &decodeState{}
1156-
// Initialize isGRPC value to be !initialHeader, since if a gRPC ResponseHeader has been received
1157-
// which indicates peer speaking gRPC, we are in gRPC mode.
1155+
// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
11581156
state.data.isGRPC = !initialHeader
11591157
if err := state.decodeHeader(frame); err != nil {
11601158
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
11611159
return
11621160
}
11631161

1164-
var isHeader bool
1162+
isHeader := false
11651163
defer func() {
11661164
if t.statsHandler != nil {
11671165
if isHeader {
@@ -1180,10 +1178,10 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
11801178
}
11811179
}()
11821180

1183-
// If headers haven't been received yet.
1184-
if initialHeader {
1181+
// If headerChan hasn't been closed yet
1182+
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
11851183
if !endStream {
1186-
// Headers frame is ResponseHeader.
1184+
// HEADERS frame block carries a Response-Headers.
11871185
isHeader = true
11881186
// These values can be set without any synchronization because
11891187
// stream goroutine will read it only after seeing a closed
@@ -1192,14 +1190,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
11921190
if len(state.data.mdata) > 0 {
11931191
s.header = state.data.mdata
11941192
}
1195-
close(s.headerChan)
1196-
return
1193+
} else {
1194+
// HEADERS frame block carries a Trailers-Only.
1195+
s.noHeaders = true
11971196
}
1198-
// Headers frame is Trailers-only.
1199-
s.noHeaders = true
12001197
close(s.headerChan)
12011198
}
12021199

1200+
if !endStream {
1201+
return
1202+
}
1203+
12031204
// if client received END_STREAM from server while stream was still active, send RST_STREAM
12041205
rst := s.getState() == streamActive
12051206
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)

internal/transport/transport.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ type Stream struct {
204204
// is used to adjust flow control, if needed.
205205
requestRead func(int)
206206

207-
headerChan chan struct{} // closed to indicate the end of header metadata.
208-
headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
207+
headerChan chan struct{} // closed to indicate the end of header metadata.
208+
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
209209

210210
// hdrMu protects header and trailer metadata on the server-side.
211211
hdrMu sync.Mutex

internal/transport/transport_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1717,6 +1717,24 @@ func TestInvalidHeaderField(t *testing.T) {
17171717
server.stop()
17181718
}
17191719

1720+
func TestHeaderChanClosedAfterReceivingAnInvalidHeader(t *testing.T) {
1721+
server, ct, cancel := setUp(t, 0, math.MaxUint32, invalidHeaderField)
1722+
defer cancel()
1723+
defer server.stop()
1724+
defer ct.Close()
1725+
s, err := ct.NewStream(context.Background(), &CallHdr{Host: "localhost", Method: "foo"})
1726+
if err != nil {
1727+
t.Fatalf("failed to create the stream")
1728+
}
1729+
timer := time.NewTimer(time.Second)
1730+
defer timer.Stop()
1731+
select {
1732+
case <-s.headerChan:
1733+
case <-timer.C:
1734+
t.Errorf("s.headerChan: got open, want closed")
1735+
}
1736+
}
1737+
17201738
func TestIsReservedHeader(t *testing.T) {
17211739
tests := []struct {
17221740
h string

0 commit comments

Comments
 (0)