Skip to content

Commit f1e5a04

Browse files
authored
Merge pull request #461 from yue9944882/feature/informer-impl
Java informer portal implementation
2 parents 19cb6f5 + 296e569 commit f1e5a04

36 files changed

+3116
-9
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package io.kubernetes.client.examples;
2+
3+
import io.kubernetes.client.ApiClient;
4+
import io.kubernetes.client.ApiException;
5+
import io.kubernetes.client.Configuration;
6+
import io.kubernetes.client.apis.CoreV1Api;
7+
import io.kubernetes.client.informer.*;
8+
import io.kubernetes.client.informer.cache.Lister;
9+
import io.kubernetes.client.models.V1Node;
10+
import io.kubernetes.client.models.V1NodeList;
11+
import io.kubernetes.client.models.V1ObjectMeta;
12+
import io.kubernetes.client.util.CallGeneratorParams;
13+
import io.kubernetes.client.util.Config;
14+
15+
public class InformerExample {
16+
public static void main(String[] args) throws Exception {
17+
18+
ApiClient client = Config.defaultClient();
19+
Configuration.setDefaultApiClient(client);
20+
21+
SharedInformerFactory factory = new SharedInformerFactory();
22+
23+
CoreV1Api coreV1Api = new CoreV1Api();
24+
25+
// Node informer
26+
SharedIndexInformer<V1Node> nodeInformer =
27+
factory.sharedIndexInformerFor(
28+
(CallGeneratorParams params) -> {
29+
try {
30+
return coreV1Api.listNodeCall(
31+
null,
32+
null,
33+
null,
34+
null,
35+
null,
36+
null,
37+
params.resourceVersion,
38+
params.timeoutSeconds,
39+
params.watch,
40+
null,
41+
null);
42+
} catch (ApiException e) {
43+
throw new RuntimeException(e);
44+
}
45+
},
46+
V1Node.class,
47+
V1NodeList.class);
48+
49+
nodeInformer.addEventHandler(
50+
new ResourceEventHandler<V1Node>() {
51+
@Override
52+
public void onAdd(V1Node node) {
53+
System.out.printf("%s node added!\n", node.getMetadata().getName());
54+
}
55+
56+
@Override
57+
public void onUpdate(V1Node oldNode, V1Node newNode) {
58+
System.out.printf(
59+
"%s => %s node updated!\n",
60+
oldNode.getMetadata().getName(), newNode.getMetadata().getName());
61+
}
62+
63+
@Override
64+
public void onDelete(V1Node node, boolean deletedFinalStateUnknown) {
65+
System.out.printf("%s node deleted!\n", node.getMetadata().getName());
66+
}
67+
});
68+
69+
factory.startAllRegisteredInformers();
70+
71+
V1Node nodeToCreate = new V1Node();
72+
V1ObjectMeta metadata = new V1ObjectMeta();
73+
metadata.setName("noxu");
74+
nodeToCreate.setMetadata(metadata);
75+
String s = coreV1Api.getApiClient().getJSON().serialize(nodeToCreate);
76+
V1Node createdNode = coreV1Api.createNode(nodeToCreate, null, null, null);
77+
Thread.sleep(3000);
78+
79+
Lister<V1Node> nodeLister = new Lister<V1Node>(nodeInformer.getIndexer());
80+
V1Node node = nodeLister.get("noxu");
81+
System.out.printf("noxu created! %s\n", node.getMetadata().getCreationTimestamp());
82+
factory.stopAllRegisteredInformers();
83+
Thread.sleep(3000);
84+
System.out.println("informer stopped..");
85+
}
86+
}

util/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@
140140
<groupId>org.apache.maven.plugins</groupId>
141141
<artifactId>maven-compiler-plugin</artifactId>
142142
<configuration>
143-
<source>1.7</source>
144-
<target>1.7</target>
143+
<source>8</source>
144+
<target>8</target>
145145
</configuration>
146146
</plugin>
147147
<plugin>
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.kubernetes.client.informer;
2+
3+
public enum EventType {
4+
ADDED,
5+
6+
MODIFIED,
7+
8+
DELETED,
9+
10+
ERROR;
11+
12+
/**
13+
* getByType returns the corresponding EventType by type.
14+
*
15+
* @param type specific code
16+
* @return corresponding EventType
17+
*/
18+
public static EventType getByType(String type) {
19+
if (type != null && type.length() > 0) {
20+
for (EventType eventType : EventType.values()) {
21+
if (eventType.name().equalsIgnoreCase(type)) {
22+
return eventType;
23+
}
24+
}
25+
}
26+
return null;
27+
}
28+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.kubernetes.client.informer;
2+
3+
import io.kubernetes.client.ApiException;
4+
import io.kubernetes.client.util.CallGeneratorParams;
5+
import io.kubernetes.client.util.Watchable;
6+
7+
public interface ListerWatcher<ApiType, ApiListType> {
8+
9+
ApiListType list(CallGeneratorParams params) throws ApiException;
10+
11+
Watchable<ApiType> watch(CallGeneratorParams params) throws ApiException;
12+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.kubernetes.client.informer;
2+
3+
public interface ResourceEventHandler<ApiType> {
4+
5+
void onAdd(ApiType obj);
6+
7+
void onUpdate(ApiType oldObj, ApiType newObj);
8+
9+
void onDelete(ApiType obj, boolean deletedFinalStateUnknown);
10+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.kubernetes.client.informer;
2+
3+
import io.kubernetes.client.informer.cache.Store;
4+
import java.util.function.Supplier;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
/**
9+
* ResyncRunnable class implements Runnable interface. It calls the resync function of Store
10+
* interface which is actually always implemented by DeltaFIFO.
11+
*/
12+
public class ResyncRunnable<ApiType> implements Runnable {
13+
14+
private static final Logger log = LoggerFactory.getLogger(ResyncRunnable.class);
15+
16+
private Store<ApiType> store;
17+
private Supplier<Boolean> shouldResyncFunc;
18+
19+
public ResyncRunnable(Store<ApiType> store, Supplier<Boolean> shouldResyncFunc) {
20+
this.store = store;
21+
this.shouldResyncFunc = shouldResyncFunc;
22+
}
23+
24+
public void run() {
25+
if (log.isDebugEnabled()) {
26+
log.debug("ResyncRunnable#resync ticker tick");
27+
}
28+
29+
if (shouldResyncFunc == null || shouldResyncFunc.get()) {
30+
if (log.isDebugEnabled()) {
31+
log.debug("ResyncRunnable#force resync");
32+
}
33+
this.store.resync();
34+
}
35+
}
36+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.kubernetes.client.informer;
2+
3+
import io.kubernetes.client.informer.cache.Indexer;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.function.Function;
7+
8+
/*
9+
* SharedIndexInformer extends SharedInformer and provides indexer operability additionally.
10+
*/
11+
public interface SharedIndexInformer<ApiType> extends SharedInformer<ApiType> {
12+
13+
/**
14+
* Add indexers.
15+
*
16+
* @param indexers the indexers
17+
*/
18+
void addIndexers(Map<String, Function<ApiType, List<String>>> indexers);
19+
20+
/**
21+
* getIndexer returns the internal indexer store.
22+
*
23+
* @return the internal indexer store
24+
*/
25+
Indexer<ApiType> getIndexer();
26+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.kubernetes.client.informer;
2+
3+
/*
4+
* SharedInformer defines basic methods of a informer.
5+
*/
6+
public interface SharedInformer<ApiType> {
7+
8+
/**
9+
* Add event handler.
10+
*
11+
* @param handler the handler
12+
*/
13+
void addEventHandler(ResourceEventHandler<ApiType> handler);
14+
15+
/**
16+
* addEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
17+
* specified resync period. Events to a single handler are delivered sequentially, but there is no
18+
* coordination between different handlers.
19+
*
20+
* @param handler the event handler
21+
* @param resyncPeriod the specific resync period
22+
*/
23+
void addEventHandlerWithResyncPeriod(ResourceEventHandler<ApiType> handler, long resyncPeriod);
24+
25+
/** run starts the shared informer, which will be stopped until stop() is called. */
26+
void run();
27+
28+
/** stop stops the shared informer. */
29+
void stop();
30+
31+
/** hasSynced returns true if the shared informer's store has synced. */
32+
boolean hasSynced();
33+
34+
/**
35+
* Last sync resource version string.
36+
*
37+
* @return the string
38+
*/
39+
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
40+
// store. The value returned is not synchronized with access to the underlying store and is not
41+
// thread-safe.
42+
String lastSyncResourceVersion();
43+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package io.kubernetes.client.informer;
2+
3+
import com.google.gson.reflect.TypeToken;
4+
import com.squareup.okhttp.Call;
5+
import io.kubernetes.client.*;
6+
import io.kubernetes.client.informer.impl.DefaultSharedIndexInformer;
7+
import io.kubernetes.client.util.CallGeneratorParams;
8+
import io.kubernetes.client.util.Watch;
9+
import io.kubernetes.client.util.common.Collections;
10+
import java.lang.reflect.Type;
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
import java.util.concurrent.ExecutorService;
14+
import java.util.concurrent.Executors;
15+
import java.util.concurrent.Future;
16+
import java.util.function.Function;
17+
18+
/** SharedInformerFactory class constructs and caches informers for api types. */
19+
public class SharedInformerFactory {
20+
21+
private Map<Type, SharedIndexInformer> informers;
22+
23+
private Map<Type, Future> startedInformers;
24+
25+
private ExecutorService informerExecutor;
26+
27+
/** Constructor w/ default thread pool. */
28+
public SharedInformerFactory() {
29+
this(Executors.newCachedThreadPool());
30+
}
31+
32+
/**
33+
* Constructor w/ thread pool specified.
34+
*
35+
* @param threadPool specified thread pool
36+
*/
37+
public SharedInformerFactory(ExecutorService threadPool) {
38+
informerExecutor = threadPool;
39+
informers = new HashMap<>();
40+
startedInformers = new HashMap<>();
41+
}
42+
43+
/**
44+
* Shared index informer for shared index informer.
45+
*
46+
* @param <ApiType> the type parameter
47+
* @param <ApiListType> the type parameter
48+
* @param callGenerator the call generator
49+
* @param apiTypeClass the api type class
50+
* @param apiListTypeClass the api list type class
51+
* @return the shared index informer
52+
*/
53+
public synchronized <ApiType, ApiListType> SharedIndexInformer<ApiType> sharedIndexInformerFor(
54+
Function<CallGeneratorParams, Call> callGenerator,
55+
Class<ApiType> apiTypeClass,
56+
Class<ApiListType> apiListTypeClass) {
57+
return sharedIndexInformerFor(callGenerator, apiTypeClass, apiListTypeClass, 0);
58+
}
59+
60+
/**
61+
* Constructs and returns a shared index informer w/ resync period specified. And the informer
62+
* cache will be overwritten.
63+
*
64+
* @param <ApiType> the type parameter
65+
* @param <ApiListType> the type parameter
66+
* @param callGenerator the call generator
67+
* @param apiTypeClass the api type class
68+
* @param apiListTypeClass the api list type class
69+
* @param resyncPeriodInMillis the resync period in millis
70+
* @return the shared index informer
71+
*/
72+
public synchronized <ApiType, ApiListType> SharedIndexInformer<ApiType> sharedIndexInformerFor(
73+
Function<CallGeneratorParams, Call> callGenerator,
74+
Class<ApiType> apiTypeClass,
75+
Class<ApiListType> apiListTypeClass,
76+
long resyncPeriodInMillis) {
77+
ListerWatcher<ApiType, ApiListType> listerWatcher =
78+
listerWatcherFor(callGenerator, apiTypeClass, apiListTypeClass);
79+
SharedIndexInformer<ApiType> informer =
80+
new DefaultSharedIndexInformer<ApiType, ApiListType>(
81+
apiTypeClass, listerWatcher, resyncPeriodInMillis);
82+
this.informers.put(TypeToken.get(apiTypeClass).getType(), informer);
83+
return informer;
84+
}
85+
86+
private <ApiType, ApiListType> ListerWatcher<ApiType, ApiListType> listerWatcherFor(
87+
Function<CallGeneratorParams, Call> callGenerator,
88+
Class<ApiType> apiTypeClass,
89+
Class<ApiListType> apiListTypeClass) {
90+
ApiClient apiClient = Configuration.getDefaultApiClient();
91+
return new ListerWatcher<ApiType, ApiListType>() {
92+
@Override
93+
public ApiListType list(CallGeneratorParams params) throws ApiException {
94+
Call call = callGenerator.apply(params);
95+
return (ApiListType) apiClient.execute(call, apiListTypeClass).getData();
96+
}
97+
98+
@Override
99+
public Watch<ApiType> watch(CallGeneratorParams params) throws ApiException {
100+
Call call = callGenerator.apply(params);
101+
return Watch.createWatch(
102+
apiClient,
103+
call,
104+
TypeToken.getParameterized(Watch.Response.class, apiTypeClass).getType());
105+
}
106+
};
107+
}
108+
109+
/** Start all registered informers. */
110+
public synchronized void startAllRegisteredInformers() {
111+
if (Collections.isEmptyMap(informers)) {
112+
return;
113+
}
114+
informers.forEach(
115+
(informerType, informer) -> {
116+
if (!startedInformers.containsKey(informerType)) {
117+
startedInformers.put(informerType, informerExecutor.submit(informer::run));
118+
}
119+
});
120+
}
121+
122+
/** Stop all registered informers. */
123+
public synchronized void stopAllRegisteredInformers() {
124+
if (Collections.isEmptyMap(informers)) {
125+
return;
126+
}
127+
informers.forEach(
128+
(informerType, informer) -> {
129+
if (startedInformers.containsKey(informerType)) {
130+
startedInformers.remove(informerType);
131+
informer.stop();
132+
}
133+
});
134+
informerExecutor.shutdown();
135+
}
136+
}

0 commit comments

Comments
 (0)