Skip to content

Commit c3f4ba4

Browse files
committed
java informer initial implement
1 parent e750b98 commit c3f4ba4

35 files changed

+2490
-2
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package io.kubernetes.client.examples;
2+
3+
public class CacheExample {}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package io.kubernetes.client.examples;
2+
3+
import io.kubernetes.client.ApiClient;
4+
import io.kubernetes.client.ApiException;
5+
import io.kubernetes.client.Configuration;
6+
import io.kubernetes.client.apis.CoreV1Api;
7+
import io.kubernetes.client.informer.*;
8+
import io.kubernetes.client.informer.cache.Lister;
9+
import io.kubernetes.client.models.V1Node;
10+
import io.kubernetes.client.models.V1NodeList;
11+
import io.kubernetes.client.models.V1ObjectMeta;
12+
import io.kubernetes.client.util.CallGeneratorParams;
13+
import io.kubernetes.client.util.Config;
14+
15+
public class InformerExample {
16+
public static void main(String[] args) throws Exception {
17+
18+
ApiClient client = Config.defaultClient();
19+
Configuration.setDefaultApiClient(client);
20+
21+
SharedInformerFactory factory = new SharedInformerFactory();
22+
23+
CoreV1Api coreV1Api = new CoreV1Api();
24+
25+
// Node informer
26+
SharedIndexInformer<V1Node> nodeInformer =
27+
factory.sharedIndexInformerFor(
28+
(CallGeneratorParams params) -> {
29+
try {
30+
return coreV1Api.listNodeCall(
31+
null,
32+
null,
33+
null,
34+
null,
35+
null,
36+
null,
37+
params.resourceVersion,
38+
params.timeoutSeconds,
39+
params.watch,
40+
null,
41+
null);
42+
} catch (ApiException e) {
43+
throw new RuntimeException(e);
44+
}
45+
},
46+
V1Node.class,
47+
V1NodeList.class);
48+
49+
nodeInformer.addEventHandler(
50+
new ResourceEventHandler<V1Node>() {
51+
@Override
52+
public void onAdd(V1Node node) {
53+
System.out.printf("%s node added!\n", node.getMetadata().getName());
54+
}
55+
56+
@Override
57+
public void onUpdate(V1Node oldNode, V1Node newNode) {
58+
System.out.printf(
59+
"%s => %s node updated!\n",
60+
oldNode.getMetadata().getName(), newNode.getMetadata().getName());
61+
}
62+
63+
@Override
64+
public void onDelete(V1Node node, boolean deletedFinalStateUnknown) {
65+
System.out.printf("%s node deleted!\n", node.getMetadata().getName());
66+
}
67+
});
68+
69+
factory.startAllRegisteredInformers();
70+
71+
V1Node nodeToCreate = new V1Node();
72+
V1ObjectMeta metadata = new V1ObjectMeta();
73+
metadata.setName("noxu");
74+
nodeToCreate.setMetadata(metadata);
75+
String s = coreV1Api.getApiClient().getJSON().serialize(nodeToCreate);
76+
V1Node createdNode = coreV1Api.createNode(nodeToCreate, null, null, null);
77+
Thread.sleep(3000);
78+
79+
Lister<V1Node> nodeLister = new Lister<V1Node>(nodeInformer);
80+
V1Node node = nodeLister.get("noxu");
81+
System.out.printf("noxu created! %s\n", node.getMetadata().getCreationTimestamp());
82+
factory.stopAllRegisteredInformers();
83+
Thread.sleep(3000);
84+
System.out.println("informer stopped..");
85+
}
86+
}

util/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@
139139
<groupId>org.apache.maven.plugins</groupId>
140140
<artifactId>maven-compiler-plugin</artifactId>
141141
<configuration>
142-
<source>1.7</source>
143-
<target>1.7</target>
142+
<source>8</source>
143+
<target>8</target>
144144
</configuration>
145145
</plugin>
146146
<plugin>
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.kubernetes.client.informer;
2+
3+
public enum EventType {
4+
ADDED,
5+
6+
MODIFIED,
7+
8+
DELETED,
9+
10+
ERROR;
11+
12+
/**
13+
* getByType returns the corresponding EventType by type.
14+
*
15+
* @param type specific code
16+
* @return corresponding EventType
17+
*/
18+
public static EventType getByType(String type) {
19+
if (type != null && type.length() > 0) {
20+
for (EventType eventType : EventType.values()) {
21+
if (eventType.name().equalsIgnoreCase(type)) {
22+
return eventType;
23+
}
24+
}
25+
}
26+
return null;
27+
}
28+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.kubernetes.client.informer;
2+
3+
import io.kubernetes.client.ApiException;
4+
import io.kubernetes.client.util.CallGeneratorParams;
5+
import io.kubernetes.client.util.Watch;
6+
7+
public interface ListerWatcher<ApiType, ApiListType> {
8+
9+
ApiListType list(CallGeneratorParams params) throws ApiException;
10+
11+
Watch<ApiType> watch(CallGeneratorParams params) throws ApiException;
12+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.kubernetes.client.informer;
2+
3+
public interface ResourceEventHandler<ApiType> {
4+
5+
void onAdd(ApiType obj);
6+
7+
void onUpdate(ApiType oldObj, ApiType newObj);
8+
9+
void onDelete(ApiType obj, boolean deletedFinalStateUnknown);
10+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.kubernetes.client.informer;
2+
3+
import io.kubernetes.client.informer.cache.Store;
4+
import io.kubernetes.client.informer.cache.function.ResyncFunc;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
public class ResyncRunnable<ApiType> implements Runnable {
9+
10+
private static final Logger log = LoggerFactory.getLogger(ResyncRunnable.class);
11+
12+
private Store store;
13+
private ResyncFunc resyncFuncMillis;
14+
15+
public ResyncRunnable(Store<ApiType> store, ResyncFunc resyncFunc) {
16+
this.store = store;
17+
this.resyncFuncMillis = resyncFunc;
18+
}
19+
20+
public void run() {
21+
if (log.isDebugEnabled()) {
22+
log.debug("ResyncRunnable#resync ticker tick");
23+
}
24+
25+
if (resyncFuncMillis == null || resyncFuncMillis.shouldResync()) {
26+
if (log.isDebugEnabled()) {
27+
log.debug("ResyncRunnable#force resync");
28+
}
29+
this.store.resync();
30+
}
31+
}
32+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.kubernetes.client.informer;
2+
3+
import io.kubernetes.client.informer.cache.Indexer;
4+
import io.kubernetes.client.informer.cache.function.IndexFunc;
5+
import java.util.Map;
6+
7+
public interface SharedIndexInformer<T> extends SharedInformer<T> {
8+
9+
/**
10+
* Add indexers.
11+
*
12+
* @param indexers the indexers
13+
*/
14+
void addIndexers(Map<String, IndexFunc> indexers);
15+
16+
/**
17+
* getIndexer returns the internal indexer store.
18+
*
19+
* @return the internal indexer store
20+
*/
21+
Indexer<T> getIndexer();
22+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.kubernetes.client.informer;
2+
3+
public interface SharedInformer<ApiType> {
4+
5+
/**
6+
* Add event handler.
7+
*
8+
* @param handler the handler
9+
*/
10+
void addEventHandler(ResourceEventHandler<ApiType> handler);
11+
12+
/**
13+
* addEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
14+
* specified resync period. Events to a single handler are delivered sequentially, but there is no
15+
* coordination between different handlers.
16+
*
17+
* @param handler the event handler
18+
* @param resyncPeriod the specific resync period
19+
*/
20+
void addEventHandlerWithResyncPeriod(ResourceEventHandler<ApiType> handler, long resyncPeriod);
21+
22+
/** run starts the shared informer, which will be stopped until stop() is called. */
23+
void run();
24+
25+
/** stop stops the shared informer. */
26+
void stop();
27+
28+
/** hasSynced returns true if the shared informer's store has synced. */
29+
boolean hasSynced();
30+
31+
/**
32+
* Last sync resource version string.
33+
*
34+
* @return the string
35+
*/
36+
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
37+
// store. The value returned is not synchronized with access to the underlying store and is not
38+
// thread-safe.
39+
String lastSyncResourceVersion();
40+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package io.kubernetes.client.informer;
2+
3+
import com.google.gson.reflect.TypeToken;
4+
import com.squareup.okhttp.Call;
5+
import io.kubernetes.client.*;
6+
import io.kubernetes.client.informer.impl.DefaultSharedIndexInformer;
7+
import io.kubernetes.client.util.CallGeneratorParams;
8+
import io.kubernetes.client.util.Watch;
9+
import io.kubernetes.client.util.common.Collections;
10+
import java.lang.reflect.Type;
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
import java.util.concurrent.ConcurrentHashMap;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
import java.util.concurrent.Future;
17+
import java.util.function.Function;
18+
19+
public class SharedInformerFactory {
20+
21+
private Map<Type, SharedIndexInformer> informers;
22+
23+
private Map<Type, Future> startedInformers;
24+
25+
private ExecutorService informerExecutor;
26+
27+
/** Constructor. */
28+
public SharedInformerFactory() {
29+
informers = new ConcurrentHashMap<>();
30+
startedInformers = new HashMap<>();
31+
informerExecutor = Executors.newCachedThreadPool();
32+
}
33+
34+
public <ApiType, ApiListType> SharedIndexInformer<ApiType> sharedIndexInformerFor(
35+
Function<CallGeneratorParams, Call> callGenerator,
36+
Class<ApiType> apiTypeClass,
37+
Class<ApiListType> apiListTypeClass) {
38+
return sharedIndexInformerFor(callGenerator, apiTypeClass, apiListTypeClass, 0);
39+
}
40+
41+
public <ApiType, ApiListType> SharedIndexInformer<ApiType> sharedIndexInformerFor(
42+
Function<CallGeneratorParams, Call> callGenerator,
43+
Class<ApiType> apiTypeClass,
44+
Class<ApiListType> apiListTypeClass,
45+
long resyncPeriodInMillis) {
46+
ListerWatcher<ApiType, ApiListType> listerWatcher =
47+
listerWatcherFor(callGenerator, apiTypeClass, apiListTypeClass);
48+
SharedIndexInformer<ApiType> informer =
49+
new DefaultSharedIndexInformer<ApiType, ApiListType>(listerWatcher, resyncPeriodInMillis);
50+
this.informers.put(TypeToken.get(apiTypeClass).getType(), informer);
51+
return informer;
52+
}
53+
54+
public <ApiType, ApiListType> ListerWatcher<ApiType, ApiListType> listerWatcherFor(
55+
Function<CallGeneratorParams, Call> callGenerator,
56+
Class<ApiType> apiTypeClass,
57+
Class<ApiListType> apiListTypeClass) {
58+
ApiClient apiClient = Configuration.getDefaultApiClient();
59+
return new ListerWatcher<ApiType, ApiListType>() {
60+
@Override
61+
public ApiListType list(CallGeneratorParams params) throws ApiException {
62+
Call call = callGenerator.apply(params);
63+
return (ApiListType) apiClient.execute(call, apiListTypeClass).getData();
64+
}
65+
66+
@Override
67+
public Watch<ApiType> watch(CallGeneratorParams params) throws ApiException {
68+
Call call = callGenerator.apply(params);
69+
return Watch.createWatch(
70+
apiClient,
71+
call,
72+
TypeToken.getParameterized(Watch.Response.class, apiTypeClass).getType());
73+
}
74+
};
75+
}
76+
77+
// Start initializes all requested informers.
78+
public synchronized void startAllRegisteredInformers() {
79+
if (Collections.isEmptyMap(informers)) {
80+
return;
81+
}
82+
informers.forEach(
83+
(informerType, informer) -> {
84+
Future future = startedInformers.get(informerType);
85+
if (future == null) {
86+
startedInformers.put(informerType, informerExecutor.submit(informer::run));
87+
}
88+
});
89+
}
90+
91+
public synchronized void stopAllRegisteredInformers() {
92+
if (Collections.isEmptyMap(informers)) {
93+
return;
94+
}
95+
informers.forEach(
96+
(informerType, informer) -> {
97+
Future future = startedInformers.get(informerType);
98+
if (future != null) {
99+
startedInformers.put(informerType, null);
100+
informer.stop();
101+
}
102+
});
103+
informerExecutor.shutdown();
104+
}
105+
}

0 commit comments

Comments
 (0)