Skip to content

Commit 367bcbd

Browse files
committed
[WIP] refactoring..
1 parent e750b98 commit 367bcbd

35 files changed

+2470
-2
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package io.kubernetes.client.examples;
2+
3+
public class CacheExample {}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package io.kubernetes.client.examples;
2+
3+
import io.kubernetes.client.ApiClient;
4+
import io.kubernetes.client.ApiException;
5+
import io.kubernetes.client.Configuration;
6+
import io.kubernetes.client.apis.CoreV1Api;
7+
import io.kubernetes.client.informer.*;
8+
import io.kubernetes.client.informer.cache.Lister;
9+
import io.kubernetes.client.models.V1Node;
10+
import io.kubernetes.client.models.V1NodeSpec;
11+
import io.kubernetes.client.models.V1ObjectMeta;
12+
import io.kubernetes.client.util.CallGeneratorParams;
13+
import io.kubernetes.client.util.Config;
14+
15+
public class InformerExample {
16+
public static void main(String[] args) throws Exception {
17+
18+
ApiClient client = Config.defaultClient();
19+
Configuration.setDefaultApiClient(client);
20+
21+
SharedInformerFactory factory = new SharedInformerFactory();
22+
23+
CoreV1Api coreV1Api = new CoreV1Api();
24+
25+
// Node informer
26+
SharedIndexInformer<V1Node> nodeInformer =
27+
factory.sharedIndexInformerFor(
28+
(CallGeneratorParams params) -> {
29+
try {
30+
return coreV1Api.listNodeCall(
31+
null,
32+
null,
33+
null,
34+
null,
35+
null,
36+
null,
37+
params.resourceVersion,
38+
params.timeoutSeconds,
39+
params.watch,
40+
null,
41+
null);
42+
} catch (ApiException e) {
43+
throw new RuntimeException(e);
44+
}
45+
});
46+
nodeInformer.addEventHandler(
47+
new ResourceEventHandler<V1Node>() {
48+
@Override
49+
public void onAdd(V1Node node) {
50+
System.out.printf("%s node added!", node.getMetadata().getName());
51+
}
52+
53+
@Override
54+
public void onUpdate(V1Node oldNode, V1Node newNode) {
55+
System.out.printf(
56+
"%s => %s node added!",
57+
oldNode.getMetadata().getName(), newNode.getMetadata().getName());
58+
}
59+
60+
@Override
61+
public void onDelete(V1Node node) {
62+
System.out.printf("%s node deleted!", node.getMetadata().getName());
63+
}
64+
});
65+
66+
factory.start();
67+
68+
coreV1Api.createNode(
69+
new V1Node() {
70+
{
71+
setMetadata(
72+
new V1ObjectMeta() {
73+
{
74+
setName("noxu");
75+
}
76+
});
77+
setSpec(
78+
new V1NodeSpec() {
79+
{
80+
setExternalID("noxu_id");
81+
}
82+
});
83+
}
84+
},
85+
null,
86+
null,
87+
null);
88+
Thread.sleep(3000);
89+
90+
Lister<V1Node> nodeLister = new Lister<V1Node>(nodeInformer);
91+
V1Node node = nodeLister.get("noxu");
92+
System.out.printf("noxu created! %s", node.getMetadata().getCreationTimestamp());
93+
}
94+
}

util/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@
139139
<groupId>org.apache.maven.plugins</groupId>
140140
<artifactId>maven-compiler-plugin</artifactId>
141141
<configuration>
142-
<source>1.7</source>
143-
<target>1.7</target>
142+
<source>8</source>
143+
<target>8</target>
144144
</configuration>
145145
</plugin>
146146
<plugin>
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.kubernetes.client.informer;
2+
3+
public enum EventType {
4+
ADDED,
5+
6+
MODIFIED,
7+
8+
DELETED,
9+
10+
ERROR;
11+
12+
/**
13+
* getByType returns the corresponding EventType by type.
14+
*
15+
* @param type specific code
16+
* @return corresponding EventType
17+
*/
18+
public static EventType getByType(String type) {
19+
if (type != null && type.length() > 0) {
20+
for (EventType eventType : EventType.values()) {
21+
if (eventType.name().equalsIgnoreCase(type)) {
22+
return eventType;
23+
}
24+
}
25+
}
26+
return null;
27+
}
28+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.kubernetes.client.informer;
2+
3+
import io.kubernetes.client.ApiException;
4+
import io.kubernetes.client.util.CallGeneratorParams;
5+
import io.kubernetes.client.util.Watch;
6+
7+
public interface ListerWatcher<ApiType, ApiListType> {
8+
9+
ApiListType list(CallGeneratorParams params) throws ApiException;
10+
11+
Watch<ApiType> watch(CallGeneratorParams params) throws ApiException;
12+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.kubernetes.client.informer;
2+
3+
public interface ResourceEventHandler<ApiType> {
4+
5+
void onAdd(ApiType obj);
6+
7+
void onUpdate(ApiType oldObj, ApiType newObj);
8+
9+
void onDelete(ApiType obj);
10+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.kubernetes.client.informer;
2+
3+
import io.kubernetes.client.informer.cache.Store;
4+
import io.kubernetes.client.informer.cache.function.ResyncFunc;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
public class ResyncRunnable<ApiType> implements Runnable {
9+
10+
private static final Logger log = LoggerFactory.getLogger(ResyncRunnable.class);
11+
12+
private Store store;
13+
private ResyncFunc resyncFuncMillis;
14+
15+
public ResyncRunnable(Store<ApiType> store, ResyncFunc resyncFunc) {
16+
this.store = store;
17+
this.resyncFuncMillis = resyncFunc;
18+
}
19+
20+
public void run() {
21+
if (log.isDebugEnabled()) {
22+
log.debug("ResyncRunnable#resync ticker tick");
23+
}
24+
25+
if (resyncFuncMillis == null || resyncFuncMillis.shouldResync()) {
26+
if (log.isDebugEnabled()) {
27+
log.debug("ResyncRunnable#force resync");
28+
}
29+
this.store.resync();
30+
}
31+
}
32+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.kubernetes.client.informer;
2+
3+
import io.kubernetes.client.informer.cache.Indexer;
4+
import io.kubernetes.client.informer.cache.function.IndexFunc;
5+
import java.util.Map;
6+
7+
public interface SharedIndexInformer<T> extends SharedInformer<T> {
8+
9+
/**
10+
* Add indexers.
11+
*
12+
* @param indexers the indexers
13+
*/
14+
void addIndexers(Map<String, IndexFunc> indexers);
15+
16+
/**
17+
* getIndexer returns the internal indexer store.
18+
*
19+
* @return the internal indexer store
20+
*/
21+
Indexer<T> getIndexer();
22+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.kubernetes.client.informer;
2+
3+
public interface SharedInformer<ApiType> {
4+
5+
/**
6+
* Add event handler.
7+
*
8+
* @param handler the handler
9+
*/
10+
void addEventHandler(ResourceEventHandler<ApiType> handler);
11+
12+
/**
13+
* addEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
14+
* specified resync period. Events to a single handler are delivered sequentially, but there is no
15+
* coordination between different handlers.
16+
*
17+
* @param handler the event handler
18+
* @param resyncPeriod the specific resync period
19+
*/
20+
void addEventHandlerWithResyncPeriod(ResourceEventHandler<ApiType> handler, long resyncPeriod);
21+
22+
/** run starts the shared informer, which will be stopped until stop() is called. */
23+
void run();
24+
25+
/** stop stops the shared informer. */
26+
void stop();
27+
28+
/** hasSynced returns true if the shared informer's store has synced. */
29+
boolean hasSynced();
30+
31+
/**
32+
* Last sync resource version string.
33+
*
34+
* @return the string
35+
*/
36+
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
37+
// store. The value returned is not synchronized with access to the underlying store and is not
38+
// thread-safe.
39+
String lastSyncResourceVersion();
40+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package io.kubernetes.client.informer;
2+
3+
import com.google.gson.reflect.TypeToken;
4+
import com.squareup.okhttp.Call;
5+
import com.squareup.okhttp.Response;
6+
import com.squareup.okhttp.ResponseBody;
7+
import io.kubernetes.client.*;
8+
import io.kubernetes.client.informer.impl.DefaultSharedIndexInformer;
9+
import io.kubernetes.client.util.CallGeneratorParams;
10+
import io.kubernetes.client.util.Watch;
11+
import io.kubernetes.client.util.common.Collections;
12+
import java.io.IOException;
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.Executors;
18+
import java.util.function.Function;
19+
20+
public class SharedInformerFactory {
21+
22+
private Map<Class, SharedIndexInformer> informers;
23+
24+
private Map<Class, Boolean> startedInformers;
25+
26+
private ExecutorService informerExecutor;
27+
28+
/** Constructor. */
29+
public SharedInformerFactory() {
30+
informers = new ConcurrentHashMap<>();
31+
startedInformers = new HashMap<>();
32+
informerExecutor = Executors.newCachedThreadPool();
33+
}
34+
35+
public <ApiType, ApiListType> SharedIndexInformer<ApiType> sharedIndexInformerFor(
36+
Function<CallGeneratorParams, Call> callGenerator) {
37+
return sharedIndexInformerFor(callGenerator, 0);
38+
}
39+
40+
public <ApiType, ApiListType> SharedIndexInformer<ApiType> sharedIndexInformerFor(
41+
Function<CallGeneratorParams, Call> callGenerator, long resyncPeriodInMillis) {
42+
ListerWatcher<ApiType, ApiListType> listerWatcher = listerWatcherFor(callGenerator);
43+
return new DefaultSharedIndexInformer<ApiType, ApiListType>(
44+
listerWatcher, resyncPeriodInMillis);
45+
}
46+
47+
public <ApiType, ApiListType> ListerWatcher<ApiType, ApiListType> listerWatcherFor(
48+
Function<CallGeneratorParams, Call> callGenerator) {
49+
ApiClient apiClient = Configuration.getDefaultApiClient();
50+
return new ListerWatcher<ApiType, ApiListType>() {
51+
@Override
52+
public ApiListType list(CallGeneratorParams params) throws ApiException {
53+
Call call = callGenerator.apply(params);
54+
Response response = null;
55+
try {
56+
response = call.execute();
57+
return (ApiListType)
58+
apiClient.execute(call, new TypeToken<ApiListType>() {}.getType()).getData();
59+
} catch (IOException e) {
60+
if (response != null) {
61+
throw new ApiException(
62+
response.message(), e, response.code(), response.headers().toMultimap());
63+
} else {
64+
throw new ApiException(e);
65+
}
66+
}
67+
}
68+
69+
@Override
70+
public Watch<ApiType> watch(CallGeneratorParams params) throws ApiException {
71+
Call call = callGenerator.apply(params);
72+
Response response = null;
73+
try {
74+
response = call.execute();
75+
ResponseBody body = response.body();
76+
77+
if (!response.isSuccessful()) {
78+
String respBody = null;
79+
if (body != null) {
80+
respBody = response.body().string();
81+
}
82+
return null;
83+
}
84+
85+
return Watch.createWatch(apiClient, call, new TypeToken<ApiType>() {}.getType());
86+
87+
} catch (IOException e) {
88+
if (response != null) {
89+
throw new ApiException(
90+
response.message(), e, response.code(), response.headers().toMultimap());
91+
} else {
92+
throw new ApiException(e);
93+
}
94+
}
95+
}
96+
};
97+
}
98+
99+
// Start initializes all requested informers.
100+
public synchronized void start() {
101+
if (Collections.isEmptyMap(informers)) {
102+
return;
103+
}
104+
informers.forEach(
105+
(informerType, informer) -> {
106+
Boolean started = startedInformers.get(informerType);
107+
if (started == null) {
108+
informerExecutor.execute(informer::run);
109+
startedInformers.put(informerType, true);
110+
}
111+
});
112+
}
113+
}

0 commit comments

Comments
 (0)