@@ -58,18 +58,24 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) {
58
58
mgmtServer := e2e .StartManagementServer (t , e2e.ManagementServerOptions {
59
59
Listener : lis ,
60
60
OnStreamRequest : func (_ int64 , req * v3discoverypb.DiscoveryRequest ) error {
61
+ t .Logf ("Received request for resources: %v of type %s" , req .GetResourceNames (), req .GetTypeUrl ())
62
+
63
+ // Drain the resource name channels before writing to them to ensure
64
+ // that the most recently requested names are made available to the
65
+ // test.
61
66
switch req .GetTypeUrl () {
62
67
case version .V3ClusterURL :
63
68
select {
64
- case cdsResourcesCh <- req . GetResourceNames () :
69
+ case <- cdsResourcesCh :
65
70
default :
66
71
}
72
+ cdsResourcesCh <- req .GetResourceNames ()
67
73
case version .V3ListenerURL :
68
- t .Logf ("Received LDS request for resources: %v" , req .GetResourceNames ())
69
74
select {
70
- case ldsResourcesCh <- req . GetResourceNames () :
75
+ case <- ldsResourcesCh :
71
76
default :
72
77
}
78
+ ldsResourcesCh <- req .GetResourceNames ()
73
79
}
74
80
return nil
75
81
},
@@ -130,6 +136,17 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) {
130
136
t .Fatal (err )
131
137
}
132
138
139
+ // Verify the update received by the watcher.
140
+ wantListenerUpdate := listenerUpdateErrTuple {
141
+ update : xdsresource.ListenerUpdate {
142
+ RouteConfigName : routeConfigName ,
143
+ HTTPFilters : []xdsresource.HTTPFilter {{Name : "router" }},
144
+ },
145
+ }
146
+ if err := verifyListenerUpdate (ctx , lw .updateCh , wantListenerUpdate ); err != nil {
147
+ t .Fatal (err )
148
+ }
149
+
133
150
// Cancel the watch for the above listener resource, and verify that an LDS
134
151
// request with no resource names is sent.
135
152
ldsCancel ()
@@ -171,6 +188,11 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) {
171
188
}
172
189
defer ldsCancel ()
173
190
191
+ // Verify the update received by the watcher.
192
+ if err := verifyListenerUpdate (ctx , lw .updateCh , wantListenerUpdate ); err != nil {
193
+ t .Fatal (err )
194
+ }
195
+
174
196
// Create a cluster resource on the management server, in addition to the
175
197
// existing listener resource.
176
198
const clusterName = "cluster"
@@ -192,6 +214,17 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) {
192
214
t .Fatal (err )
193
215
}
194
216
217
+ // Verify the update received by the watcher.
218
+ wantClusterUpdate := clusterUpdateErrTuple {
219
+ update : xdsresource.ClusterUpdate {
220
+ ClusterName : clusterName ,
221
+ EDSServiceName : clusterName ,
222
+ },
223
+ }
224
+ if err := verifyClusterUpdate (ctx , cw .updateCh , wantClusterUpdate ); err != nil {
225
+ t .Fatal (err )
226
+ }
227
+
195
228
// Cancel the watch for the above cluster resource, and verify that a CDS
196
229
// request with no resource names is sent.
197
230
cdsCancel ()
0 commit comments