Skip to content

Commit ed1e579

Browse files
authored
Merge pull request #646 from StefanCenusa/feature/listwatch-labelselector
Watch with labelSelector
2 parents 2aa448f + 5e0367e commit ed1e579

File tree

4 files changed

+177
-37
lines changed

4 files changed

+177
-37
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// tslint:disable:no-console
2+
import * as k8s from '@kubernetes/client-node';
3+
4+
const kc = new k8s.KubeConfig();
5+
kc.loadFromDefault();
6+
7+
const k8sApi = kc.makeApiClient(k8s.CoreV1Api);
8+
9+
const APP_LABEL_SELECTOR = 'app=foo';
10+
11+
const listFn = () => k8sApi.listNamespacedPod(
12+
'default',
13+
undefined,
14+
undefined,
15+
undefined,
16+
undefined,
17+
APP_LABEL_SELECTOR,
18+
);
19+
20+
const createPod = async (name, app) => {
21+
const appPodContainer = {
22+
name: 'nginx',
23+
image: 'nginx:latest',
24+
} as k8s.V1Container;
25+
26+
const appPod = {
27+
metadata: {
28+
name,
29+
labels: {
30+
app,
31+
},
32+
},
33+
spec: {
34+
containers: [appPodContainer],
35+
},
36+
} as k8s.V1Pod;
37+
await k8sApi.createNamespacedPod('default', appPod).catch((e) => console.error(e));
38+
console.log('create', name);
39+
};
40+
41+
const deletePod = async (name, namespace) => {
42+
await k8sApi.deleteNamespacedPod(name, namespace);
43+
console.log('delete', name);
44+
};
45+
46+
const delay = (ms) => {
47+
return new Promise((resolve) => setTimeout(resolve, ms));
48+
};
49+
50+
const informer = k8s.makeInformer(
51+
kc,
52+
'/api/v1/namespaces/default/pods',
53+
listFn,
54+
APP_LABEL_SELECTOR,
55+
);
56+
57+
informer.on('add', (obj: k8s.V1Pod) => {
58+
console.log(`Added: ${obj.metadata!.name}`);
59+
});
60+
informer.on('update', (obj: k8s.V1Pod) => {
61+
console.log(`Updated: ${obj.metadata!.name}`);
62+
});
63+
informer.on('delete', (obj: k8s.V1Pod) => {
64+
console.log(`Deleted: ${obj.metadata!.name}`);
65+
});
66+
informer.on('error', (err: k8s.V1Pod) => {
67+
console.error(err);
68+
// Restart informer after 5sec
69+
setTimeout(() => {
70+
informer.start();
71+
}, 5000);
72+
});
73+
74+
informer.start().then(() => {
75+
setTimeout(async () => {
76+
await createPod('server-foo', 'foo');
77+
await delay(5000);
78+
await createPod('server-bar', 'bar');
79+
await delay(5000);
80+
await deletePod('server-foo', 'default');
81+
await deletePod('server-bar', 'default');
82+
}, 5000);
83+
});

src/cache.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { ObjectSerializer } from './api';
12
import {
23
ADD,
34
CHANGE,
@@ -15,6 +16,7 @@ import { RequestResult, Watch } from './watch';
1516

1617
export interface ObjectCache<T> {
1718
get(name: string, namespace?: string): T | undefined;
19+
1820
list(namespace?: string): ReadonlyArray<T>;
1921
}
2022

@@ -31,6 +33,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
3133
private readonly watch: Watch,
3234
private readonly listFn: ListPromise<T>,
3335
autoStart: boolean = true,
36+
private readonly labelSelector?: string,
3437
) {
3538
this.callbackCache[ADD] = [];
3639
this.callbackCache[UPDATE] = [];
@@ -142,9 +145,18 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
142145
}
143146
});
144147
this.addOrUpdateItems(list.items);
148+
const queryParams = {
149+
resourceVersion: list.metadata!.resourceVersion,
150+
} as {
151+
resourceVersion: string | undefined;
152+
labelSelector: string | undefined;
153+
};
154+
if (this.labelSelector !== undefined) {
155+
queryParams.labelSelector = ObjectSerializer.serialize(this.labelSelector, 'string');
156+
}
145157
this.request = await this.watch.watch(
146158
this.path,
147-
{ resourceVersion: list.metadata!.resourceVersion },
159+
queryParams,
148160
this.watchHandler.bind(this),
149161
this.doneHandler.bind(this),
150162
);

src/cache_test.ts

Lines changed: 79 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { expect, use } from 'chai';
2+
import * as request from 'request';
23
import chaiAsPromised = require('chai-as-promised');
34

45
import * as mock from 'ts-mockito';
@@ -9,18 +10,46 @@ import { EventEmitter } from 'ws';
910

1011
import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1ListMeta } from './api';
1112
import { deleteObject, ListWatch, deleteItems } from './cache';
12-
import { ListPromise } from './informer';
13+
import { KubeConfig } from './config';
14+
import { Cluster, Context, User } from './config_types';
15+
import { ADD, UPDATE, DELETE, ERROR, ListPromise, CHANGE } from './informer';
1316

1417
use(chaiAsPromised);
1518

16-
import { RequestResult, Watch } from './watch';
19+
import { DefaultRequest, RequestResult, Watch } from './watch';
1720

1821
// Object replacing real Request object in the test
1922
class FakeRequest extends EventEmitter implements RequestResult {
2023
pipe(stream: Duplex): void {}
2124
abort() {}
2225
}
2326

27+
const server = 'foo.company.com';
28+
29+
const fakeConfig: {
30+
clusters: Cluster[];
31+
contexts: Context[];
32+
users: User[];
33+
} = {
34+
clusters: [
35+
{
36+
name: 'cluster',
37+
server,
38+
} as Cluster,
39+
],
40+
contexts: [
41+
{
42+
cluster: 'cluster',
43+
user: 'user',
44+
} as Context,
45+
],
46+
users: [
47+
{
48+
name: 'user',
49+
} as User,
50+
],
51+
};
52+
2453
describe('ListWatchCache', () => {
2554
it('should throw on unknown update', () => {
2655
const fake = mock.mock(Watch);
@@ -1024,52 +1053,67 @@ describe('ListWatchCache', () => {
10241053
).once();
10251054
expect(errorEmitted).to.equal(true);
10261055
});
1027-
});
10281056

1029-
describe('delete items', () => {
1030-
it('should remove correctly', () => {
1031-
const listA: V1Pod[] = [
1057+
it('should send label selector', async () => {
1058+
const APP_LABEL_SELECTOR = 'app=foo';
1059+
1060+
const list: V1Namespace[] = [
10321061
{
10331062
metadata: {
10341063
name: 'name1',
1035-
namespace: 'ns1',
1064+
labels: {
1065+
app: 'foo',
1066+
},
10361067
} as V1ObjectMeta,
1037-
} as V1Pod,
1068+
} as V1Namespace,
10381069
{
10391070
metadata: {
10401071
name: 'name2',
1041-
namespace: 'ns2',
1042-
} as V1ObjectMeta,
1043-
} as V1Pod,
1044-
];
1045-
const listB: V1Pod[] = [
1046-
{
1047-
metadata: {
1048-
name: 'name1',
1049-
namespace: 'ns1',
1072+
labels: {
1073+
app: 'foo',
1074+
},
10501075
} as V1ObjectMeta,
1051-
} as V1Pod,
1052-
{
1053-
metadata: {
1054-
name: 'name3',
1055-
namespace: 'ns3',
1056-
} as V1ObjectMeta,
1057-
} as V1Pod,
1058-
];
1059-
const expected: V1Pod[] = [
1060-
{
1061-
metadata: {
1062-
name: 'name1',
1063-
namespace: 'ns1',
1064-
} as V1ObjectMeta,
1065-
} as V1Pod,
1076+
} as V1Namespace,
10661077
];
1078+
const listObj = {
1079+
metadata: {
1080+
resourceVersion: '12345',
1081+
} as V1ListMeta,
1082+
items: list,
1083+
} as V1NamespaceList;
10671084

1068-
const output = deleteItems(listA, listB);
1069-
expect(output).to.deep.equal(expected);
1085+
const listFn: ListPromise<V1Namespace> = function(): Promise<{
1086+
response: http.IncomingMessage;
1087+
body: V1NamespaceList;
1088+
}> {
1089+
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>(
1090+
(resolve, reject) => {
1091+
resolve({ response: {} as http.IncomingMessage, body: listObj });
1092+
},
1093+
);
1094+
};
1095+
1096+
const kc = new KubeConfig();
1097+
Object.assign(kc, fakeConfig);
1098+
const fakeRequestor = mock.mock(DefaultRequest);
1099+
const watch = new Watch(kc, mock.instance(fakeRequestor));
1100+
1101+
const fakeRequest = new FakeRequest();
1102+
mock.when(fakeRequestor.webRequest(mock.anything())).thenReturn(fakeRequest);
1103+
1104+
const informer = new ListWatch('/some/path', watch, listFn, false, APP_LABEL_SELECTOR);
1105+
1106+
await informer.start();
1107+
1108+
mock.verify(fakeRequestor.webRequest(mock.anything()));
1109+
const [opts] = mock.capture(fakeRequestor.webRequest).last();
1110+
const reqOpts: request.OptionsWithUri = opts as request.OptionsWithUri;
1111+
expect(reqOpts.qs.labelSelector).to.equal(APP_LABEL_SELECTOR);
10701112
});
1113+
});
10711114

1072-
it('should callback correctly', () => {
1115+
describe('delete items', () => {
1116+
it('should remove correctly', () => {
10731117
const listA: V1Pod[] = [
10741118
{
10751119
metadata: {

src/informer.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ export function makeInformer<T>(
3535
kubeconfig: KubeConfig,
3636
path: string,
3737
listPromiseFn: ListPromise<T>,
38+
labelSelector?: string,
3839
): Informer<T> {
3940
const watch = new Watch(kubeconfig);
40-
return new ListWatch<T>(path, watch, listPromiseFn, false);
41+
return new ListWatch<T>(path, watch, listPromiseFn, false, labelSelector);
4142
}

0 commit comments

Comments
 (0)