Skip to content

Commit 5d24ee2

Browse files
authored
xds: store server config for LRS server in xdsresource.ClusterUpdate (#7191)
* xds: support LRS server config * switch to the new bootstrap package in internal/xds
1 parent c76f686 commit 5d24ee2

11 files changed

+126
-166
lines changed

xds/internal/balancer/cdsbalancer/cdsbalancer.go

+1-15
Original file line numberDiff line numberDiff line change
@@ -609,21 +609,7 @@ func (b *cdsBalancer) generateDMsForCluster(name string, depth int, dms []cluste
609609
Cluster: cluster.ClusterName,
610610
EDSServiceName: cluster.EDSServiceName,
611611
MaxConcurrentRequests: cluster.MaxRequests,
612-
}
613-
if cluster.LRSServerConfig == xdsresource.ClusterLRSServerSelf {
614-
bootstrapConfig := b.xdsClient.BootstrapConfig()
615-
parsedName := xdsresource.ParseName(cluster.ClusterName)
616-
if parsedName.Scheme == xdsresource.FederationScheme {
617-
// Is a federation resource name, find the corresponding
618-
// authority server config.
619-
if cfg, ok := bootstrapConfig.Authorities[parsedName.Authority]; ok {
620-
dm.LoadReportingServer = cfg.XDSServer
621-
}
622-
} else {
623-
// Not a federation resource name, use the default
624-
// authority.
625-
dm.LoadReportingServer = bootstrapConfig.XDSServer
626-
}
612+
LoadReportingServer: cluster.LRSServerConfig,
627613
}
628614
case xdsresource.ClusterTypeLogicalDNS:
629615
dm = clusterresolver.DiscoveryMechanism{

xds/internal/xdsclient/authority.go

+4-7
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,6 @@ type authorityArgs struct {
9494
// (although the former is part of the latter) is because authorities in the
9595
// bootstrap config might contain an empty server config, and in this case,
9696
// the top-level server config is to be used.
97-
//
98-
// There are two code paths from where a new authority struct might be
99-
// created. One is when a watch is registered for a resource, and one is
100-
// when load reporting needs to be started. We have the authority name in
101-
// the first case, but do in the second. We only have the server config in
102-
// the second case.
10397
serverCfg *bootstrap.ServerConfig
10498
bootstrapCfg *bootstrap.Config
10599
serializer *grpcsync.CallbackSerializer
@@ -156,7 +150,10 @@ func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate
156150
return xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "Resource URL %v unknown in response from server", resourceUpdate.URL)
157151
}
158152

159-
opts := &xdsresource.DecodeOptions{BootstrapConfig: a.bootstrapCfg}
153+
opts := &xdsresource.DecodeOptions{
154+
BootstrapConfig: a.bootstrapCfg,
155+
ServerConfig: a.serverCfg,
156+
}
160157
updates, md, err := decodeAllResources(opts, rType, resourceUpdate)
161158
a.updateResourceStateAndScheduleCallbacks(rType, updates, md)
162159
return err

xds/internal/xdsclient/clientimpl_authority.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import (
3636
// authority, without holding c.authorityMu.
3737
//
3838
// Caller must not hold c.authorityMu.
39-
func (c *clientImpl) findAuthority(n *xdsresource.Name) (_ *authority, unref func(), _ error) {
39+
func (c *clientImpl) findAuthority(n *xdsresource.Name) (*authority, func(), error) {
4040
scheme, authority := n.Scheme, n.Authority
4141

4242
c.authorityMu.Lock()

xds/internal/xdsclient/clientimpl_watchers.go

-9
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,6 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string,
4848
return func() {}
4949
}
5050

51-
// TODO: replace this with the code does the following when we have
52-
// implemented generic watch API on the authority:
53-
// - Parse the resource name and extract the authority.
54-
// - Locate the corresponding authority object and acquire a reference to
55-
// it. If the authority is not found, error out.
56-
// - Call the watchResource() method on the authority.
57-
// - Return a cancel function to cancel the watch on the authority and to
58-
// release the reference.
59-
6051
// TODO: Make ParseName return an error if parsing fails, and
6152
// schedule the OnError callback in that case.
6253
n := xdsresource.ParseName(resourceName)

xds/internal/xdsclient/tests/resource_update_test.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -669,9 +669,8 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
669669
Resources: []*anypb.Any{testutils.MarshalAny(t, resource1)},
670670
},
671671
wantUpdate: xdsresource.ClusterUpdate{
672-
ClusterName: "resource-name-1",
673-
EDSServiceName: "eds-service-name",
674-
LRSServerConfig: xdsresource.ClusterLRSServerSelf,
672+
ClusterName: "resource-name-1",
673+
EDSServiceName: "eds-service-name",
675674
},
676675
wantUpdateMetadata: map[string]xdsresource.UpdateWithMD{
677676
"resource-name-1": {
@@ -689,9 +688,8 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
689688
Resources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)},
690689
},
691690
wantUpdate: xdsresource.ClusterUpdate{
692-
ClusterName: "resource-name-1",
693-
EDSServiceName: "eds-service-name",
694-
LRSServerConfig: xdsresource.ClusterLRSServerSelf,
691+
ClusterName: "resource-name-1",
692+
EDSServiceName: "eds-service-name",
695693
},
696694
wantUpdateMetadata: map[string]xdsresource.UpdateWithMD{
697695
"resource-name-1": {
@@ -763,6 +761,16 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
763761
if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
764762
t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr)
765763
}
764+
765+
// For tests expected to succeed, we expect an LRS server config in
766+
// the update from the xDS client, because the LRS bit is turned on
767+
// in the cluster resource. We *cannot* set the LRS server config in
768+
// the test table because we do not have the address of the xDS
769+
// server at that point, hence we do it here before verifying the
770+
// received update.
771+
if test.wantErr == "" {
772+
test.wantUpdate.LRSServerConfig = xdstestutils.ServerConfigForAddress(t, mgmtServer.Address)
773+
}
766774
cmpOpts := []cmp.Option{
767775
cmpopts.EquateEmpty(),
768776
cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy"),

xds/internal/xdsclient/xdsresource/cluster_resource_type.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type clusterResourceType struct {
5555
// Decode deserializes and validates an xDS resource serialized inside the
5656
// provided `Any` proto, as received from the xDS management server.
5757
func (clusterResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) {
58-
name, cluster, err := unmarshalClusterResource(resource)
58+
name, cluster, err := unmarshalClusterResource(resource, opts.ServerConfig)
5959
switch {
6060
case name == "":
6161
// Name is unset only when protobuf deserialization fails.

xds/internal/xdsclient/xdsresource/resource_type.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,13 @@ type ResourceData interface {
133133
// DecodeOptions wraps the options required by ResourceType implementation for
134134
// decoding configuration received from the xDS management server.
135135
type DecodeOptions struct {
136-
// BootstrapConfig contains the bootstrap configuration passed to the
137-
// top-level xdsClient. This contains useful data for resource validation.
136+
// BootstrapConfig contains the complete bootstrap configuration passed to
137+
// the xDS client. This contains useful data for resource validation.
138138
BootstrapConfig *bootstrap.Config
139+
// ServerConfig contains the server config (from the above bootstrap
140+
// configuration) of the xDS server from which the current resource, for
141+
// which Decode() is being invoked, was received.
142+
ServerConfig *bootstrap.ServerConfig
139143
}
140144

141145
// DecodeResult is the result of a decode operation.

xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go

+21-15
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,20 @@ import (
2626
"github.com/google/go-cmp/cmp"
2727
"github.com/google/go-cmp/cmp/cmpopts"
2828
"google.golang.org/grpc/balancer/leastrequest"
29-
_ "google.golang.org/grpc/balancer/roundrobin" // To register round_robin load balancer.
3029
"google.golang.org/grpc/internal/balancer/stub"
3130
"google.golang.org/grpc/internal/envconfig"
3231
"google.golang.org/grpc/internal/grpctest"
3332
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
3433
"google.golang.org/grpc/internal/testutils"
3534
"google.golang.org/grpc/internal/testutils/xds/e2e"
35+
"google.golang.org/grpc/internal/xds/bootstrap"
3636
"google.golang.org/grpc/serviceconfig"
37-
_ "google.golang.org/grpc/xds" // Register the xDS LB Registry Converters.
3837
"google.golang.org/grpc/xds/internal/balancer/ringhash"
3938
"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
4039
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
40+
"google.golang.org/protobuf/proto"
41+
"google.golang.org/protobuf/types/known/anypb"
42+
"google.golang.org/protobuf/types/known/structpb"
4143
"google.golang.org/protobuf/types/known/wrapperspb"
4244

4345
v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
@@ -48,9 +50,9 @@ import (
4850
v3ringhashpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/ring_hash/v3"
4951
v3roundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/round_robin/v3"
5052
v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
51-
"google.golang.org/protobuf/proto"
52-
"google.golang.org/protobuf/types/known/anypb"
53-
"google.golang.org/protobuf/types/known/structpb"
53+
54+
_ "google.golang.org/grpc/balancer/roundrobin" // To register round_robin load balancer.
55+
_ "google.golang.org/grpc/xds" // Register the xDS LB Registry Converters.
5456
)
5557

5658
type s struct {
@@ -66,8 +68,6 @@ const (
6668
serviceName = "service"
6769
)
6870

69-
var emptyUpdate = xdsresource.ClusterUpdate{ClusterName: clusterName, LRSServerConfig: xdsresource.ClusterLRSOff}
70-
7171
func wrrLocality(t *testing.T, m proto.Message) *v3wrrlocalitypb.WrrLocality {
7272
return &v3wrrlocalitypb.WrrLocality{
7373
EndpointPickingPolicy: &v3clusterpb.LoadBalancingPolicy{
@@ -105,6 +105,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
105105
tests := []struct {
106106
name string
107107
cluster *v3clusterpb.Cluster
108+
serverCfg *bootstrap.ServerConfig
108109
wantUpdate xdsresource.ClusterUpdate
109110
wantLBConfig *iserviceconfig.BalancerConfig
110111
}{
@@ -164,7 +165,8 @@ func (s) TestValidateCluster_Success(t *testing.T) {
164165
LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
165166
},
166167
wantUpdate: xdsresource.ClusterUpdate{
167-
ClusterName: clusterName, LRSServerConfig: xdsresource.ClusterLRSOff, ClusterType: xdsresource.ClusterTypeAggregate,
168+
ClusterName: clusterName,
169+
ClusterType: xdsresource.ClusterTypeAggregate,
168170
PrioritizedClusterNames: []string{"a", "b", "c"},
169171
},
170172
wantLBConfig: &iserviceconfig.BalancerConfig{
@@ -179,7 +181,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
179181
{
180182
name: "happy-case-no-service-name-no-lrs",
181183
cluster: e2e.DefaultCluster(clusterName, "", e2e.SecurityLevelNone),
182-
wantUpdate: emptyUpdate,
184+
wantUpdate: xdsresource.ClusterUpdate{ClusterName: clusterName},
183185
wantLBConfig: &iserviceconfig.BalancerConfig{
184186
Name: wrrlocality.Name,
185187
Config: &wrrlocality.LBConfig{
@@ -206,16 +208,17 @@ func (s) TestValidateCluster_Success(t *testing.T) {
206208
},
207209
},
208210
{
209-
name: "happiest-case",
211+
name: "happiest-case-with-lrs",
210212
cluster: e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
211213
ClusterName: clusterName,
212214
ServiceName: serviceName,
213215
EnableLRS: true,
214216
}),
217+
serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
215218
wantUpdate: xdsresource.ClusterUpdate{
216219
ClusterName: clusterName,
217220
EDSServiceName: serviceName,
218-
LRSServerConfig: xdsresource.ClusterLRSServerSelf,
221+
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
219222
},
220223
wantLBConfig: &iserviceconfig.BalancerConfig{
221224
Name: wrrlocality.Name,
@@ -248,10 +251,11 @@ func (s) TestValidateCluster_Success(t *testing.T) {
248251
}
249252
return c
250253
}(),
254+
serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
251255
wantUpdate: xdsresource.ClusterUpdate{
252256
ClusterName: clusterName,
253257
EDSServiceName: serviceName,
254-
LRSServerConfig: xdsresource.ClusterLRSServerSelf,
258+
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
255259
MaxRequests: func() *uint32 { i := uint32(512); return &i }(),
256260
},
257261
wantLBConfig: &iserviceconfig.BalancerConfig{
@@ -298,7 +302,8 @@ func (s) TestValidateCluster_Success(t *testing.T) {
298302
LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
299303
},
300304
wantUpdate: xdsresource.ClusterUpdate{
301-
ClusterName: clusterName, EDSServiceName: serviceName,
305+
ClusterName: clusterName,
306+
EDSServiceName: serviceName,
302307
},
303308
wantLBConfig: &iserviceconfig.BalancerConfig{
304309
Name: "least_request_experimental",
@@ -353,7 +358,8 @@ func (s) TestValidateCluster_Success(t *testing.T) {
353358
},
354359
},
355360
wantUpdate: xdsresource.ClusterUpdate{
356-
ClusterName: clusterName, EDSServiceName: serviceName,
361+
ClusterName: clusterName,
362+
EDSServiceName: serviceName,
357363
},
358364
wantLBConfig: &iserviceconfig.BalancerConfig{
359365
Name: "least_request_experimental",
@@ -527,7 +533,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
527533

528534
for _, test := range tests {
529535
t.Run(test.name, func(t *testing.T) {
530-
update, err := xdsresource.ValidateClusterAndConstructClusterUpdateForTesting(test.cluster)
536+
update, err := xdsresource.ValidateClusterAndConstructClusterUpdateForTesting(test.cluster, test.serverCfg)
531537
if err != nil {
532538
t.Errorf("validateClusterAndConstructClusterUpdate(%+v) failed: %v", test.cluster, err)
533539
}

xds/internal/xdsclient/xdsresource/type_cds.go

+5-16
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package xdsresource
2020
import (
2121
"encoding/json"
2222

23+
"google.golang.org/grpc/internal/xds/bootstrap"
2324
"google.golang.org/protobuf/types/known/anypb"
2425
)
2526

@@ -39,18 +40,6 @@ const (
3940
ClusterTypeAggregate
4041
)
4142

42-
// ClusterLRSServerConfigType is the type of LRS server config.
43-
type ClusterLRSServerConfigType int
44-
45-
const (
46-
// ClusterLRSOff indicates LRS is off (loads are not reported for this
47-
// cluster).
48-
ClusterLRSOff ClusterLRSServerConfigType = iota
49-
// ClusterLRSServerSelf indicates loads should be reported to the same
50-
// server (the authority) where the CDS resp is received from.
51-
ClusterLRSServerSelf
52-
)
53-
5443
// ClusterUpdate contains information from a received CDS response, which is of
5544
// interest to the registered CDS watcher.
5645
type ClusterUpdate struct {
@@ -60,10 +49,10 @@ type ClusterUpdate struct {
6049
// EDSServiceName is an optional name for EDS. If it's not set, the balancer
6150
// should watch ClusterName for the EDS resources.
6251
EDSServiceName string
63-
// LRSServerConfig contains the server where the load reports should be sent
64-
// to. This can be change to an interface, to support other types, e.g. a
65-
// ServerConfig with ServerURI, creds.
66-
LRSServerConfig ClusterLRSServerConfigType
52+
// LRSServerConfig contains configuration about the xDS server that sent
53+
// this cluster resource. This is also the server where load reports are to
54+
// be sent, for this cluster.
55+
LRSServerConfig *bootstrap.ServerConfig
6756
// SecurityCfg contains security configuration sent by the control plane.
6857
SecurityCfg *SecurityConfig
6958
// MaxRequests for circuit breaking, if any (otherwise nil).

xds/internal/xdsclient/xdsresource/unmarshal_cds.go

+5-14
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"google.golang.org/grpc/internal/envconfig"
3535
"google.golang.org/grpc/internal/pretty"
3636
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
37+
"google.golang.org/grpc/internal/xds/bootstrap"
3738
"google.golang.org/grpc/internal/xds/matcher"
3839
"google.golang.org/grpc/xds/internal/xdsclient/xdslbregistry"
3940
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
@@ -50,7 +51,7 @@ var ValidateClusterAndConstructClusterUpdateForTesting = validateClusterAndConst
5051
// to this value by the management server.
5152
const transportSocketName = "envoy.transport_sockets.tls"
5253

53-
func unmarshalClusterResource(r *anypb.Any) (string, ClusterUpdate, error) {
54+
func unmarshalClusterResource(r *anypb.Any, serverCfg *bootstrap.ServerConfig) (string, ClusterUpdate, error) {
5455
r, err := UnwrapResource(r)
5556
if err != nil {
5657
return "", ClusterUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
@@ -64,7 +65,7 @@ func unmarshalClusterResource(r *anypb.Any) (string, ClusterUpdate, error) {
6465
if err := proto.Unmarshal(r.GetValue(), cluster); err != nil {
6566
return "", ClusterUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
6667
}
67-
cu, err := validateClusterAndConstructClusterUpdate(cluster)
68+
cu, err := validateClusterAndConstructClusterUpdate(cluster, serverCfg)
6869
if err != nil {
6970
return cluster.GetName(), ClusterUpdate{}, err
7071
}
@@ -81,7 +82,7 @@ const (
8182
defaultLeastRequestChoiceCount = 2
8283
)
8384

84-
func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
85+
func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster, serverCfg *bootstrap.ServerConfig) (ClusterUpdate, error) {
8586
telemetryLabels := make(map[string]string)
8687
if fmd := cluster.GetMetadata().GetFilterMetadata(); fmd != nil {
8788
if val, ok := fmd["com.google.csm.telemetry_labels"]; ok {
@@ -182,21 +183,11 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
182183
TelemetryLabels: telemetryLabels,
183184
}
184185

185-
// Note that this is different from the gRFC (gRFC A47 says to include the
186-
// full ServerConfig{URL,creds,server feature} here). This information is
187-
// not available here, because this function doesn't have access to the
188-
// xdsclient bootstrap information now (can be added if necessary). The
189-
// ServerConfig will be read and populated by the CDS balancer when
190-
// processing this field.
191-
// According to A27:
192-
// If the `lrs_server` field is set, it must have its `self` field set, in
193-
// which case the client should use LRS for load reporting. Otherwise
194-
// (the `lrs_server` field is not set), LRS load reporting will be disabled.
195186
if lrs := cluster.GetLrsServer(); lrs != nil {
196187
if lrs.GetSelf() == nil {
197188
return ClusterUpdate{}, fmt.Errorf("unsupported config_source_specifier %T in lrs_server field", lrs.ConfigSourceSpecifier)
198189
}
199-
ret.LRSServerConfig = ClusterLRSServerSelf
190+
ret.LRSServerConfig = serverCfg
200191
}
201192

202193
// Validate and set cluster type from the response.

0 commit comments

Comments
 (0)