Skip to content

Commit 63853fd

Browse files
authored
rls: update picker synchronously on configuration update (#7412)
1 parent 86135c3 commit 63853fd

File tree

2 files changed

+182
-8
lines changed

2 files changed

+182
-8
lines changed

balancer/rls/balancer.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -322,14 +322,7 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
322322

323323
// Update the copy of the config in the LB policy before releasing the lock.
324324
b.lbCfg = newCfg
325-
326-
// Enqueue an event which will notify us when the above update has been
327-
// propagated to all child policies, and the child policies have all
328-
// processed their updates, and we have sent a picker update.
329-
done := make(chan struct{})
330-
b.updateCh.Put(resumePickerUpdates{done: done})
331325
b.stateMu.Unlock()
332-
<-done
333326

334327
// We cannot do cache operations above because `cacheMu` needs to be grabbed
335328
// before `stateMu` if we are to hold both locks at the same time.
@@ -338,10 +331,18 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
338331
if resizeCache {
339332
// If the new config changes reduces the size of the data cache, we
340333
// might have to evict entries to get the cache size down to the newly
341-
// specified size.
334+
// specified size. If we do evict an entry with valid backoff timer,
335+
// the new picker needs to be sent to the channel to re-process any
336+
// RPCs queued as a result of this backoff timer.
342337
b.dataCache.resize(newCfg.cacheSizeBytes)
343338
}
344339
b.cacheMu.Unlock()
340+
// Enqueue an event which will notify us when the above update has been
341+
// propagated to all child policies, and the child policies have all
342+
// processed their updates, and we have sent a picker update.
343+
done := make(chan struct{})
344+
b.updateCh.Put(resumePickerUpdates{done: done})
345+
<-done
345346
return nil
346347
}
347348

balancer/rls/balancer_test.go

+173
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,179 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
652652
verifyRLSRequest(t, rlsReqCh, true)
653653
}
654654

655+
// Test that when a data cache entry is evicted due to config change
656+
// in cache size, the picker is updated accordingly.
657+
func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
658+
// Override the clientConn update hook to get notified.
659+
clientConnUpdateDone := make(chan struct{}, 1)
660+
origClientConnUpdateHook := clientConnUpdateHook
661+
clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} }
662+
defer func() { clientConnUpdateHook = origClientConnUpdateHook }()
663+
664+
// Override the cache entry size func, and always return 1.
665+
origEntrySizeFunc := computeDataCacheEntrySize
666+
computeDataCacheEntrySize = func(cacheKey, *cacheEntry) int64 { return 1 }
667+
defer func() { computeDataCacheEntrySize = origEntrySizeFunc }()
668+
669+
// Override the backoff strategy to return a large backoff which
670+
// will make sure the date cache entry remains in backoff for the
671+
// duration of the test.
672+
origBackoffStrategy := defaultBackoffStrategy
673+
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
674+
defer func() { defaultBackoffStrategy = origBackoffStrategy }()
675+
676+
// Override the minEvictionDuration to ensure that when the config update
677+
// reduces the cache size, the resize operation is not stopped because
678+
// we find an entry whose minExpiryDuration has not elapsed.
679+
origMinEvictDuration := minEvictDuration
680+
minEvictDuration = time.Duration(0)
681+
defer func() { minEvictDuration = origMinEvictDuration }()
682+
683+
// Register the top-level wrapping balancer which forwards calls to RLS.
684+
topLevelBalancerName := t.Name() + "top-level"
685+
var ccWrapper *testCCWrapper
686+
stub.Register(topLevelBalancerName, stub.BalancerFuncs{
687+
Init: func(bd *stub.BalancerData) {
688+
ccWrapper = &testCCWrapper{ClientConn: bd.ClientConn}
689+
bd.Data = balancer.Get(Name).Build(ccWrapper, bd.BuildOptions)
690+
},
691+
ParseConfig: func(sc json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
692+
parser := balancer.Get(Name).(balancer.ConfigParser)
693+
return parser.ParseConfig(sc)
694+
},
695+
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
696+
bal := bd.Data.(balancer.Balancer)
697+
return bal.UpdateClientConnState(ccs)
698+
},
699+
Close: func(bd *stub.BalancerData) {
700+
bal := bd.Data.(balancer.Balancer)
701+
bal.Close()
702+
},
703+
})
704+
705+
// Start an RLS server and set the throttler to never throttle requests.
706+
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
707+
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
708+
709+
// Register an LB policy to act as the child policy for RLS LB policy.
710+
childPolicyName := "test-child-policy" + t.Name()
711+
e2e.RegisterRLSChildPolicy(childPolicyName, nil)
712+
t.Logf("Registered child policy with name %q", childPolicyName)
713+
714+
// Start a couple of test backends, and set up the fake RLS server to return
715+
// these as targets in the RLS response, based on request keys.
716+
// Start a couple of test backends, and set up the fake RLS server to return
717+
// these as targets in the RLS response, based on request keys.
718+
backendCh1, backendAddress1 := startBackend(t)
719+
backendCh2, backendAddress2 := startBackend(t)
720+
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
721+
if req.KeyMap["k1"] == "v1" {
722+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
723+
}
724+
if req.KeyMap["k2"] == "v2" {
725+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
726+
}
727+
return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
728+
})
729+
730+
// Register a manual resolver and push the RLS service config through it.
731+
r := manual.NewBuilderWithScheme("rls-e2e")
732+
headers := `
733+
[
734+
{
735+
"key": "k1",
736+
"names": [
737+
"n1"
738+
]
739+
},
740+
{
741+
"key": "k2",
742+
"names": [
743+
"n2"
744+
]
745+
}
746+
]
747+
`
748+
749+
configJSON := `
750+
{
751+
"loadBalancingConfig": [
752+
{
753+
"%s": {
754+
"routeLookupConfig": {
755+
"grpcKeybuilders": [{
756+
"names": [{"service": "grpc.testing.TestService"}],
757+
"headers": %s
758+
}],
759+
"lookupService": "%s",
760+
"cacheSizeBytes": %d
761+
},
762+
"childPolicy": [{"%s": {}}],
763+
"childPolicyConfigTargetFieldName": "Backend"
764+
}
765+
}
766+
]
767+
}`
768+
scJSON := fmt.Sprintf(configJSON, topLevelBalancerName, headers, rlsServer.Address, 1000, childPolicyName)
769+
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
770+
r.InitialState(resolver.State{ServiceConfig: sc})
771+
772+
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
773+
if err != nil {
774+
t.Fatalf("create grpc.Dial() failed: %v", err)
775+
}
776+
defer cc.Close()
777+
778+
<-clientConnUpdateDone
779+
780+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
781+
defer cancel()
782+
// Make an RPC call with empty metadata, which will eventually throw
783+
// the error as no metadata will match from rlsServer response
784+
// callback defined above. This will cause the control channel to
785+
// throw the error and cause the item to get into backoff.
786+
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil)
787+
788+
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
789+
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh1)
790+
verifyRLSRequest(t, rlsReqCh, true)
791+
792+
ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n2", "v2")
793+
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2)
794+
verifyRLSRequest(t, rlsReqCh, true)
795+
796+
initialStateCnt := len(ccWrapper.getStates())
797+
// Setting the size to 1 will cause the entries to be
798+
// evicted.
799+
scJSON1 := fmt.Sprintf(`
800+
{
801+
"loadBalancingConfig": [
802+
{
803+
"%s": {
804+
"routeLookupConfig": {
805+
"grpcKeybuilders": [{
806+
"names": [{"service": "grpc.testing.TestService"}],
807+
"headers": %s
808+
}],
809+
"lookupService": "%s",
810+
"cacheSizeBytes": 2
811+
},
812+
"childPolicy": [{"%s": {}}],
813+
"childPolicyConfigTargetFieldName": "Backend"
814+
}
815+
}
816+
]
817+
}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName)
818+
sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1)
819+
r.UpdateState(resolver.State{ServiceConfig: sc1})
820+
<-clientConnUpdateDone
821+
finalStateCnt := len(ccWrapper.getStates())
822+
823+
if finalStateCnt != initialStateCnt+1 {
824+
t.Errorf("Unexpected balancer state count: got %v, want %v", finalStateCnt, initialStateCnt)
825+
}
826+
}
827+
655828
// TestDataCachePurging verifies that the LB policy periodically evicts expired
656829
// entries from the data cache.
657830
func (s) TestDataCachePurging(t *testing.T) {

0 commit comments

Comments
 (0)