Skip to content

Commit a69d851

Browse files
authored
Merge pull request #2429 from brendandburns/revit
Cherry-pick in two watch changes from the 0.x branch, clean up tests, fix list -> resourceVersion handling
2 parents 3bd21ad + 2a961a0 commit a69d851

File tree

2 files changed

+122
-33
lines changed

2 files changed

+122
-33
lines changed

src/cache.ts

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,21 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
3434
private readonly watch: Watch;
3535
private readonly listFn: ListPromise<T>;
3636
private readonly labelSelector?: string;
37+
private readonly fieldSelector?: string;
3738

3839
public constructor(
3940
path: string,
4041
watch: Watch,
4142
listFn: ListPromise<T>,
4243
autoStart: boolean = true,
4344
labelSelector?: string,
45+
fieldSelector?: string,
4446
) {
4547
this.path = path;
4648
this.watch = watch;
4749
this.listFn = listFn;
4850
this.labelSelector = labelSelector;
51+
this.fieldSelector = fieldSelector;
4952

5053
this.callbackCache[ADD] = [];
5154
this.callbackCache[UPDATE] = [];
@@ -140,7 +143,10 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
140143

141144
private async doneHandler(err: any): Promise<void> {
142145
this._stop();
143-
if (err && err.statusCode === 410) {
146+
if (
147+
err &&
148+
((err as { statusCode?: number }).statusCode === 410 || (err as { code?: number }).code === 410)
149+
) {
144150
this.resourceVersion = '';
145151
} else if (err) {
146152
this.callbackCache[ERROR].forEach((elt: ErrorCallback) => elt(err));
@@ -162,17 +168,21 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
162168
}
163169
this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
164170
this.addOrUpdateItems(list.items);
165-
this.resourceVersion = list.metadata!.resourceVersion || '';
171+
this.resourceVersion = list.metadata ? list.metadata!.resourceVersion || '' : '';
166172
}
167173
const queryParams = {
168174
resourceVersion: this.resourceVersion,
169175
} as {
170176
resourceVersion: string | undefined;
171177
labelSelector: string | undefined;
178+
fieldSelector: string | undefined;
172179
};
173180
if (this.labelSelector !== undefined) {
174181
queryParams.labelSelector = ObjectSerializer.serialize(this.labelSelector, 'string');
175182
}
183+
if (this.fieldSelector !== undefined) {
184+
queryParams.fieldSelector = ObjectSerializer.serialize(this.fieldSelector, 'string');
185+
}
176186
this.request = await this.watch.watch(
177187
this.path,
178188
queryParams,
@@ -182,6 +192,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
182192
}
183193

184194
private addOrUpdateItems(items: T[]): void {
195+
if (items === undefined || items === null) {
196+
return;
197+
}
185198
items.forEach((obj: T) => {
186199
addOrUpdateObject(
187200
this.objects,
@@ -192,13 +205,18 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
192205
});
193206
}
194207

195-
private watchHandler(phase: string, obj: T, watchObj?: any): void {
208+
private async watchHandler(
209+
phase: string,
210+
obj: T,
211+
watchObj?: { type: string; object: KubernetesObject },
212+
): Promise<void> {
196213
switch (phase) {
197214
case 'ERROR':
198215
if ((obj as { code?: number }).code === 410) {
199216
this.resourceVersion = '';
200217
}
201-
break;
218+
// We don't restart here, because it should be handled by the watch exiting if necessary
219+
return;
202220
case 'ADDED':
203221
case 'MODIFIED':
204222
addOrUpdateObject(
@@ -215,15 +233,16 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
215233
// nothing to do, here for documentation, mostly.
216234
break;
217235
}
218-
if (watchObj && watchObj.metadata) {
219-
this.resourceVersion = watchObj.metadata.resourceVersion;
220-
}
236+
this.resourceVersion = obj.metadata ? obj.metadata!.resourceVersion || '' : '';
221237
}
222238
}
223239

224240
// exported for testing
225241
export function cacheMapFromList<T extends KubernetesObject>(newObjects: T[]): CacheMap<T> {
226242
const objects: CacheMap<T> = new Map();
243+
if (newObjects === undefined || newObjects === null) {
244+
return objects;
245+
}
227246
// build up the new list
228247
for (const obj of newObjects) {
229248
let namespaceObjects = objects.get(obj.metadata!.namespace || '');

src/cache_test.ts

Lines changed: 96 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ describe('ListWatchCache', () => {
235235
watchHandler('ADDED', {
236236
metadata: {
237237
name: 'name3',
238+
resourceVersion: 'blah',
238239
} as V1ObjectMeta,
239240
} as V1Namespace);
240241

@@ -245,40 +246,28 @@ describe('ListWatchCache', () => {
245246
} as V1ObjectMeta,
246247
} as V1Namespace);
247248

248-
watchHandler(
249-
'DELETED',
250-
{
251-
metadata: {
252-
name: 'name2',
253-
resourceVersion: 'blah',
254-
} as V1ObjectMeta,
255-
} as V1Namespace,
256-
{
257-
metadata: {
258-
resourceVersion: '54321',
259-
},
260-
},
261-
);
249+
watchHandler('DELETED', {
250+
metadata: {
251+
name: 'name2',
252+
resourceVersion: '54321',
253+
} as V1ObjectMeta,
254+
} as V1Namespace);
262255

263256
const [addResult, updateResult, deleteResult] = await Promise.all([
264257
addPromise,
265258
updatePromise,
266259
deletePromise,
267260
]);
268-
deepStrictEqual(addResult.metadata, { name: 'name3' });
261+
deepStrictEqual(addResult.metadata, { name: 'name3', resourceVersion: 'blah' });
269262
deepStrictEqual(updateResult.metadata, { name: 'name3', resourceVersion: 'baz' });
270-
deepStrictEqual(deleteResult.metadata, { name: 'name2', resourceVersion: 'blah' });
263+
deepStrictEqual(deleteResult.metadata, { name: 'name2', resourceVersion: '54321' });
271264
strictEqual(informer.latestResourceVersion(), '54321');
272265

273-
watchHandler(
274-
'BOOKMARK',
275-
{},
276-
{
277-
metadata: {
278-
resourceVersion: '5454',
279-
},
266+
watchHandler('BOOKMARK', {
267+
metadata: {
268+
resourceVersion: '5454',
280269
},
281-
);
270+
});
282271
strictEqual(informer.latestResourceVersion(), '5454');
283272
});
284273

@@ -1205,9 +1194,10 @@ describe('ListWatchCache', () => {
12051194
{
12061195
metadata: {
12071196
name: 'name3',
1197+
resourceVersion: '23456',
12081198
} as V1ObjectMeta,
12091199
} as V1Namespace,
1210-
{ metadata: { resourceVersion: '23456' } },
1200+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
12111201
);
12121202

12131203
await informer.stop();
@@ -1259,9 +1249,87 @@ describe('ListWatchCache', () => {
12591249
{
12601250
metadata: {
12611251
name: 'name3',
1252+
resourceVersion: '23456',
12621253
} as V1ObjectMeta,
12631254
} as V1Namespace,
1264-
{ metadata: { resourceVersion: '23456' } },
1255+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
1256+
);
1257+
1258+
await informer.stop();
1259+
1260+
let errorEmitted = false;
1261+
informer.on('error', () => (errorEmitted = true));
1262+
1263+
promise = new Promise((resolve) => {
1264+
mock.when(
1265+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1266+
).thenCall(() => {
1267+
resolve({});
1268+
});
1269+
});
1270+
1271+
informer.start();
1272+
await promise;
1273+
1274+
const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();
1275+
1276+
const object = {
1277+
kind: 'Status',
1278+
apiVersion: 'v1',
1279+
metadata: {},
1280+
status: 'Failure',
1281+
message: 'too old resource version: 12345 (1234)',
1282+
reason: 'Expired',
1283+
code: 410,
1284+
};
1285+
await watchHandler('ERROR', object, { type: 'ERROR', object });
1286+
await doneHandler(null);
1287+
1288+
mock.verify(
1289+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1290+
).thrice();
1291+
strictEqual(errorEmitted, false);
1292+
strictEqual(listCalls, 2);
1293+
});
1294+
1295+
it('should list if the watch errors from the last version', async () => {
1296+
const fakeWatch = mock.mock(Watch);
1297+
1298+
let listCalls = 0;
1299+
const listFn: ListPromise<V1Namespace> = function (): Promise<V1NamespaceList> {
1300+
return new Promise<V1NamespaceList>((resolve, reject) => {
1301+
listCalls++;
1302+
resolve({
1303+
metadata: {
1304+
resourceVersion: '12345',
1305+
} as V1ListMeta,
1306+
items: [],
1307+
} as V1NamespaceList);
1308+
});
1309+
};
1310+
let promise = new Promise((resolve) => {
1311+
mock.when(
1312+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1313+
).thenCall(() => {
1314+
resolve({});
1315+
});
1316+
});
1317+
1318+
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
1319+
1320+
informer.start();
1321+
await promise;
1322+
1323+
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();
1324+
watchHandler(
1325+
'ADDED',
1326+
{
1327+
metadata: {
1328+
name: 'name3',
1329+
resourceVersion: '23456',
1330+
} as V1ObjectMeta,
1331+
} as V1Namespace,
1332+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
12651333
);
12661334

12671335
await informer.stop();
@@ -1335,6 +1403,8 @@ describe('ListWatchCache', () => {
13351403
);
13361404

13371405
await informer.stop();
1406+
strictEqual(listCalls, 1);
1407+
listCalls = 0;
13381408

13391409
let errorEmitted = false;
13401410
informer.on('error', () => (errorEmitted = true));

0 commit comments

Comments
 (0)