Skip to content

Commit 6e384cf

Browse files
authored
Cherry-pick #6856 to v1.60.x release branch (#6864)
1 parent 6430548 commit 6e384cf

File tree

2 files changed

+101
-9
lines changed

2 files changed

+101
-9
lines changed

Diff for: server.go

+13-9
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ type Server struct {
144144
channelzID *channelz.Identifier
145145
czData *channelzData
146146

147-
serverWorkerChannel chan func()
147+
serverWorkerChannel chan func()
148+
serverWorkerChannelClose func()
148149
}
149150

150151
type serverOptions struct {
@@ -623,15 +624,14 @@ func (s *Server) serverWorker() {
623624
// connections to reduce the time spent overall on runtime.morestack.
624625
func (s *Server) initServerWorkers() {
625626
s.serverWorkerChannel = make(chan func())
627+
s.serverWorkerChannelClose = grpcsync.OnceFunc(func() {
628+
close(s.serverWorkerChannel)
629+
})
626630
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
627631
go s.serverWorker()
628632
}
629633
}
630634

631-
func (s *Server) stopServerWorkers() {
632-
close(s.serverWorkerChannel)
633-
}
634-
635635
// NewServer creates a gRPC server which has no service registered and has not
636636
// started to accept requests yet.
637637
func NewServer(opt ...ServerOption) *Server {
@@ -1898,15 +1898,19 @@ func (s *Server) stop(graceful bool) {
18981898
s.closeServerTransportsLocked()
18991899
}
19001900

1901-
if s.opts.numServerWorkers > 0 {
1902-
s.stopServerWorkers()
1903-
}
1904-
19051901
for len(s.conns) != 0 {
19061902
s.cv.Wait()
19071903
}
19081904
s.conns = nil
19091905

1906+
if s.opts.numServerWorkers > 0 {
1907+
// Closing the channel (only once, via grpcsync.OnceFunc) after all the
1908+
// connections have been closed above ensures that there are no
1909+
// goroutines executing the callback passed to st.HandleStreams (where
1910+
// the channel is written to).
1911+
s.serverWorkerChannelClose()
1912+
}
1913+
19101914
if s.events != nil {
19111915
s.events.Finish()
19121916
s.events = nil

Diff for: server_ext_test.go

+88
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,20 @@ package grpc_test
2121
import (
2222
"context"
2323
"io"
24+
"runtime"
25+
"sync"
2426
"testing"
2527
"time"
2628

2729
"google.golang.org/grpc"
30+
"google.golang.org/grpc/codes"
31+
"google.golang.org/grpc/credentials/insecure"
2832
"google.golang.org/grpc/internal/grpcsync"
2933
"google.golang.org/grpc/internal/stubserver"
34+
"google.golang.org/grpc/status"
3035

3136
testgrpc "google.golang.org/grpc/interop/grpc_testing"
37+
testpb "google.golang.org/grpc/interop/grpc_testing"
3238
)
3339

3440
// TestServer_MaxHandlers ensures that no more than MaxConcurrentStreams server
@@ -97,3 +103,85 @@ func (s) TestServer_MaxHandlers(t *testing.T) {
97103
t.Fatal("Received unexpected RPC error:", err)
98104
}
99105
}
106+
107+
// Tests the case where the stream worker goroutine option is enabled, and a
108+
// number of RPCs are initiated around the same time that Stop() is called. This
109+
// used to result in a write to a closed channel. This test verifies that there
110+
// is no panic.
111+
func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) {
112+
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
113+
// This deferred stop takes care of stopping the server when one of the
114+
// below grpc.Dials fail, and the test exits early.
115+
defer ss.Stop()
116+
117+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
118+
defer cancel()
119+
const numChannels = 20
120+
const numRPCLoops = 20
121+
122+
// Create a bunch of clientconns and ensure that they are READY by making an
123+
// RPC on them.
124+
ccs := make([]*grpc.ClientConn, numChannels)
125+
for i := 0; i < numChannels; i++ {
126+
var err error
127+
ccs[i], err = grpc.Dial(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
128+
if err != nil {
129+
t.Fatalf("[iteration: %d] grpc.Dial(%s) failed: %v", i, ss.Address, err)
130+
}
131+
defer ccs[i].Close()
132+
client := testgrpc.NewTestServiceClient(ccs[i])
133+
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
134+
t.Fatalf("EmptyCall() failed: %v", err)
135+
}
136+
}
137+
138+
// Make a bunch of concurrent RPCs on the above clientconns. These will
139+
// eventually race with Stop(), and will start to fail.
140+
var wg sync.WaitGroup
141+
for i := 0; i < numChannels; i++ {
142+
client := testgrpc.NewTestServiceClient(ccs[i])
143+
for j := 0; j < numRPCLoops; j++ {
144+
wg.Add(1)
145+
go func(client testgrpc.TestServiceClient) {
146+
defer wg.Done()
147+
for {
148+
_, err := client.EmptyCall(ctx, &testpb.Empty{})
149+
if err == nil {
150+
continue
151+
}
152+
if code := status.Code(err); code == codes.Unavailable {
153+
// Once Stop() has been called on the server, we expect
154+
// subsequent calls to fail with Unavailable.
155+
return
156+
}
157+
t.Errorf("EmptyCall() failed: %v", err)
158+
return
159+
}
160+
}(client)
161+
}
162+
}
163+
164+
// Call Stop() concurrently with the above RPC attempts.
165+
ss.Stop()
166+
wg.Wait()
167+
}
168+
169+
// Tests the case where the stream worker goroutine option is enabled, and both
170+
// Stop() and GracefulStop() care called. This used to result in a close of a
171+
// closed channel. This test verifies that there is no panic.
172+
func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) {
173+
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
174+
defer ss.Stop()
175+
176+
if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
177+
t.Fatalf("Failed to create client to stub server: %v", err)
178+
}
179+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
180+
defer cancel()
181+
client := testgrpc.NewTestServiceClient(ss.CC)
182+
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
183+
t.Fatalf("EmptyCall() failed: %v", err)
184+
}
185+
186+
ss.S.GracefulStop()
187+
}

0 commit comments

Comments
 (0)