Skip to content

Commit 59c84a9

Browse files
rls: change lossy GetState() and WaitForStateChange() to use grpcsync.PubSub (#8055)
1 parent a26ff2a commit 59c84a9

File tree

1 file changed

+60
-30
lines changed

1 file changed

+60
-30
lines changed

balancer/rls/control_channel.go

+60-30
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ import (
2929
"google.golang.org/grpc/connectivity"
3030
"google.golang.org/grpc/credentials/insecure"
3131
"google.golang.org/grpc/internal"
32+
"google.golang.org/grpc/internal/buffer"
3233
internalgrpclog "google.golang.org/grpc/internal/grpclog"
34+
"google.golang.org/grpc/internal/grpcsync"
3335
"google.golang.org/grpc/internal/pretty"
3436
rlsgrpc "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
3537
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
@@ -55,37 +57,58 @@ type controlChannel struct {
5557
// hammering the RLS service while it is overloaded or down.
5658
throttler adaptiveThrottler
5759

58-
cc *grpc.ClientConn
59-
client rlsgrpc.RouteLookupServiceClient
60-
logger *internalgrpclog.PrefixLogger
60+
cc *grpc.ClientConn
61+
client rlsgrpc.RouteLookupServiceClient
62+
logger *internalgrpclog.PrefixLogger
63+
connectivityStateCh *buffer.Unbounded
64+
unsubscribe func()
65+
monitorDoneCh chan struct{}
6166
}
6267

6368
// newControlChannel creates a controlChannel to rlsServerName and uses
6469
// serviceConfig, if non-empty, as the default service config for the underlying
6570
// gRPC channel.
6671
func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) {
6772
ctrlCh := &controlChannel{
68-
rpcTimeout: rpcTimeout,
69-
backToReadyFunc: backToReadyFunc,
70-
throttler: newAdaptiveThrottler(),
73+
rpcTimeout: rpcTimeout,
74+
backToReadyFunc: backToReadyFunc,
75+
throttler: newAdaptiveThrottler(),
76+
connectivityStateCh: buffer.NewUnbounded(),
77+
monitorDoneCh: make(chan struct{}),
7178
}
7279
ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh))
7380

7481
dopts, err := ctrlCh.dialOpts(bOpts, serviceConfig)
7582
if err != nil {
7683
return nil, err
7784
}
78-
ctrlCh.cc, err = grpc.Dial(rlsServerName, dopts...)
85+
ctrlCh.cc, err = grpc.NewClient(rlsServerName, dopts...)
7986
if err != nil {
8087
return nil, err
8188
}
89+
// Subscribe to connectivity state before connecting to avoid missing initial
90+
// updates, which are only delivered to active subscribers.
91+
ctrlCh.unsubscribe = internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(ctrlCh.cc, ctrlCh)
92+
ctrlCh.cc.Connect()
8293
ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc)
8394
ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName)
84-
85-
go ctrlCh.monitorConnectivityState()
95+
start := make(chan struct{})
96+
go func() {
97+
close(start)
98+
ctrlCh.monitorConnectivityState()
99+
}()
100+
<-start
86101
return ctrlCh, nil
87102
}
88103

104+
func (cc *controlChannel) OnMessage(msg any) {
105+
st, ok := msg.(connectivity.State)
106+
if !ok {
107+
panic(fmt.Sprintf("Unexpected message type %T , wanted connectectivity.State type", msg))
108+
}
109+
cc.connectivityStateCh.Put(st)
110+
}
111+
89112
// dialOpts constructs the dial options for the control plane channel.
90113
func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig string) ([]grpc.DialOption, error) {
91114
// The control plane channel will use the same authority as the parent
@@ -97,7 +120,6 @@ func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig st
97120
if bOpts.Dialer != nil {
98121
dopts = append(dopts, grpc.WithContextDialer(bOpts.Dialer))
99122
}
100-
101123
// The control channel will use the channel credentials from the parent
102124
// channel, including any call creds associated with the channel creds.
103125
var credsOpt grpc.DialOption
@@ -133,6 +155,8 @@ func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig st
133155

134156
func (cc *controlChannel) monitorConnectivityState() {
135157
cc.logger.Infof("Starting connectivity state monitoring goroutine")
158+
defer close(cc.monitorDoneCh)
159+
136160
// Since we use two mechanisms to deal with RLS server being down:
137161
// - adaptive throttling for the channel as a whole
138162
// - exponential backoff on a per-request basis
@@ -154,39 +178,45 @@ func (cc *controlChannel) monitorConnectivityState() {
154178
// returning only one new picker, regardless of how many backoff timers are
155179
// cancelled.
156180

157-
// Using the background context is fine here since we check for the ClientConn
158-
// entering SHUTDOWN and return early in that case.
159-
ctx := context.Background()
181+
// Wait for the control channel to become READY for the first time.
182+
for s, ok := <-cc.connectivityStateCh.Get(); s != connectivity.Ready; s, ok = <-cc.connectivityStateCh.Get() {
183+
if !ok {
184+
return
185+
}
186+
187+
cc.connectivityStateCh.Load()
188+
if s == connectivity.Shutdown {
189+
return
190+
}
191+
}
192+
cc.connectivityStateCh.Load()
193+
cc.logger.Infof("Connectivity state is READY")
160194

161-
first := true
162195
for {
163-
// Wait for the control channel to become READY.
164-
for s := cc.cc.GetState(); s != connectivity.Ready; s = cc.cc.GetState() {
165-
if s == connectivity.Shutdown {
166-
return
167-
}
168-
cc.cc.WaitForStateChange(ctx, s)
196+
s, ok := <-cc.connectivityStateCh.Get()
197+
if !ok {
198+
return
169199
}
170-
cc.logger.Infof("Connectivity state is READY")
200+
cc.connectivityStateCh.Load()
171201

172-
if !first {
202+
if s == connectivity.Shutdown {
203+
return
204+
}
205+
if s == connectivity.Ready {
173206
cc.logger.Infof("Control channel back to READY")
174207
cc.backToReadyFunc()
175208
}
176-
first = false
177209

178-
// Wait for the control channel to move out of READY.
179-
cc.cc.WaitForStateChange(ctx, connectivity.Ready)
180-
if cc.cc.GetState() == connectivity.Shutdown {
181-
return
182-
}
183-
cc.logger.Infof("Connectivity state is %s", cc.cc.GetState())
210+
cc.logger.Infof("Connectivity state is %s", s)
184211
}
185212
}
186213

187214
func (cc *controlChannel) close() {
188-
cc.logger.Infof("Closing control channel")
215+
cc.unsubscribe()
216+
cc.connectivityStateCh.Close()
217+
<-cc.monitorDoneCh
189218
cc.cc.Close()
219+
cc.logger.Infof("Shutdown")
190220
}
191221

192222
type lookupCallback func(targets []string, headerData string, err error)

0 commit comments

Comments
 (0)