Skip to content

Commit 40982b5

Browse files
authored
Merge pull request #525 from adohe/master
add workqueue implementation for kube-style controller
2 parents c3d3629 + 9fcb7dd commit 40982b5

File tree

11 files changed

+812
-0
lines changed

11 files changed

+812
-0
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package io.kubernetes.client.extended.wait;
2+
3+
import java.time.Duration;
4+
import java.util.concurrent.Executors;
5+
import java.util.concurrent.ScheduledExecutorService;
6+
import java.util.concurrent.ScheduledFuture;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
import java.util.function.Supplier;
10+
11+
public class Wait {
12+
13+
/**
14+
* Poll tries a condition func until it returns true, an exception, or the timeout is reached.
15+
*
16+
* @param interval the check interval
17+
* @param timeout the timeout period
18+
* @param condition the condition func
19+
*/
20+
public static boolean poll(Duration interval, Duration timeout, Supplier<Boolean> condition) {
21+
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
22+
AtomicBoolean result = new AtomicBoolean(false);
23+
long dueDate = System.currentTimeMillis() + timeout.toMillis();
24+
ScheduledFuture<?> future =
25+
executorService.scheduleAtFixedRate(
26+
() -> {
27+
try {
28+
result.set(condition.get());
29+
} catch (Exception e) {
30+
result.set(false);
31+
}
32+
},
33+
interval.toMillis(),
34+
interval.toMillis(),
35+
TimeUnit.MILLISECONDS);
36+
try {
37+
while (System.currentTimeMillis() < dueDate) {
38+
if (result.get()) {
39+
future.cancel(true);
40+
return true;
41+
}
42+
}
43+
} catch (Exception e) {
44+
return result.get();
45+
}
46+
future.cancel(true);
47+
return result.get();
48+
}
49+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import com.google.common.primitives.Longs;
4+
import java.time.Duration;
5+
import java.time.Instant;
6+
import java.time.temporal.Temporal;
7+
import java.util.Map;
8+
import java.util.concurrent.BlockingQueue;
9+
import java.util.concurrent.ConcurrentHashMap;
10+
import java.util.concurrent.ConcurrentMap;
11+
import java.util.concurrent.DelayQueue;
12+
import java.util.concurrent.Delayed;
13+
import java.util.concurrent.ExecutorService;
14+
import java.util.concurrent.Executors;
15+
import java.util.concurrent.LinkedBlockingQueue;
16+
import java.util.concurrent.TimeUnit;
17+
18+
/** The default delaying queue implementation. */
19+
public class DefaultDelayingQueue<T> extends DefaultWorkQueue<T> implements DelayingQueue<T> {
20+
21+
public static Duration heartBeatInterval = Duration.ofSeconds(10);
22+
23+
private DelayQueue<WaitForEntry<T>> delayQueue;
24+
private ConcurrentMap<T, WaitForEntry<T>> waitingEntryByData;
25+
protected BlockingQueue<WaitForEntry<T>> waitingForAddQueue;
26+
27+
public DefaultDelayingQueue(ExecutorService waitingWorker) {
28+
this.delayQueue = new DelayQueue<>();
29+
this.waitingEntryByData = new ConcurrentHashMap<>();
30+
this.waitingForAddQueue = new LinkedBlockingQueue<>(1000);
31+
waitingWorker.submit(this::waitingLoop);
32+
}
33+
34+
public DefaultDelayingQueue() {
35+
this(Executors.newSingleThreadExecutor());
36+
}
37+
38+
public void addAfter(T item, Duration duration) {
39+
// don't add if we're already shutting down
40+
if (super.isShuttingDown()) {
41+
return;
42+
}
43+
44+
// immediately add things w/o delay
45+
if (duration.isZero()) {
46+
super.add(item);
47+
return;
48+
}
49+
WaitForEntry<T> entry = new WaitForEntry<>(item, duration.addTo(Instant.now()));
50+
this.waitingForAddQueue.offer(entry);
51+
}
52+
53+
private void waitingLoop() {
54+
try {
55+
while (true) {
56+
// underlying work-queue is shutting down, quit the loop.
57+
if (super.isShuttingDown()) {
58+
return;
59+
}
60+
// peek the first item from the delay queue
61+
WaitForEntry<T> entry = delayQueue.peek();
62+
// default next ready-at time to "never"
63+
Duration nextReadyAt = heartBeatInterval;
64+
if (entry != null) {
65+
// the delay-queue isn't empty, so we deal with the item in the following logic:
66+
// 1. check if the item is ready to fire
67+
// a. if ready, remove it from the delay-queue and push it into underlying work-queue
68+
// b. if not, refresh the next ready-at time.
69+
Instant now = Instant.now();
70+
if (!Duration.between(entry.readyAtMillis, now).isNegative()) {
71+
delayQueue.remove(entry);
72+
super.add(entry.data);
73+
this.waitingEntryByData.remove(entry.data);
74+
continue;
75+
} else {
76+
nextReadyAt = Duration.between(now, entry.readyAtMillis);
77+
}
78+
}
79+
80+
WaitForEntry<T> waitForEntry =
81+
waitingForAddQueue.poll(nextReadyAt.toMillis(), TimeUnit.MILLISECONDS);
82+
if (waitForEntry != null) {
83+
if (Duration.between(waitForEntry.readyAtMillis, Instant.now()).isNegative()) {
84+
// the item is not yet ready, insert it to the delay-queue
85+
insert(this.delayQueue, this.waitingEntryByData, waitForEntry);
86+
} else {
87+
// the item is ready as soon as received, fire it to the work-queue directly
88+
super.add(waitForEntry.data);
89+
}
90+
}
91+
}
92+
} catch (InterruptedException e) {
93+
// empty block
94+
}
95+
}
96+
97+
private void insert(
98+
DelayQueue<WaitForEntry<T>> q, Map<T, WaitForEntry<T>> knownEntries, WaitForEntry entry) {
99+
WaitForEntry existing = knownEntries.get((T) entry.data);
100+
if (existing != null) {
101+
if (Duration.between(existing.readyAtMillis, entry.readyAtMillis).isNegative()) {
102+
q.remove(existing);
103+
existing.readyAtMillis = entry.readyAtMillis;
104+
q.add(existing);
105+
}
106+
107+
return;
108+
}
109+
110+
q.offer(entry);
111+
knownEntries.put((T) entry.data, entry);
112+
}
113+
114+
// WaitForEntry holds the data to add and the time it should be added.
115+
private class WaitForEntry<T> implements Delayed {
116+
117+
private WaitForEntry(T data, Temporal readyAtMillis) {
118+
this.data = data;
119+
this.readyAtMillis = readyAtMillis;
120+
}
121+
122+
private T data;
123+
private Temporal readyAtMillis;
124+
125+
@Override
126+
public long getDelay(TimeUnit unit) {
127+
Duration duration = Duration.between(Instant.now(), readyAtMillis);
128+
return unit.convert(duration.toMillis(), TimeUnit.MILLISECONDS);
129+
}
130+
131+
@Override
132+
public int compareTo(Delayed o) {
133+
return Longs.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
134+
}
135+
}
136+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import java.time.Duration;
4+
import java.util.Map;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.ExecutorService;
7+
8+
/** The default rate limiting queue implementation. */
9+
public class DefaultRateLimitingQueue<T> extends DefaultDelayingQueue<T>
10+
implements RateLimitingQueue<T> {
11+
12+
private RateLimiter rateLimiter;
13+
14+
public DefaultRateLimitingQueue(ExecutorService waitingWorker) {
15+
super(waitingWorker);
16+
this.rateLimiter = new ExponentialRateLimiter();
17+
}
18+
19+
public DefaultRateLimitingQueue(ExecutorService waitingWorker, RateLimiter rateLimiter) {
20+
super(waitingWorker);
21+
this.rateLimiter = rateLimiter;
22+
}
23+
24+
@Override
25+
public int numRequeues(T item) {
26+
return rateLimiter.numRequeues(item);
27+
}
28+
29+
@Override
30+
public void forget(Object item) {
31+
rateLimiter.forget(item);
32+
}
33+
34+
@Override
35+
public void addRateLimited(T item) {
36+
super.addAfter(item, rateLimiter.when(item));
37+
}
38+
39+
public static class ExponentialRateLimiter implements RateLimiter {
40+
41+
Duration baseDelay;
42+
Duration maxDelay;
43+
44+
private Map<Object, Integer> failures = new ConcurrentHashMap<>();
45+
46+
public ExponentialRateLimiter() {
47+
this.baseDelay = Duration.ofMillis(5);
48+
this.maxDelay = Duration.ofSeconds(1000);
49+
}
50+
51+
public ExponentialRateLimiter(Duration baseDelay, Duration maxDelay) {
52+
this.baseDelay = baseDelay;
53+
this.maxDelay = maxDelay;
54+
}
55+
56+
@Override
57+
public void forget(Object item) {
58+
failures.remove(item);
59+
}
60+
61+
@Override
62+
public int numRequeues(Object item) {
63+
return failures.get(item);
64+
}
65+
66+
@Override
67+
public Duration when(Object item) {
68+
Integer exp = failures.getOrDefault(item, 0);
69+
failures.put(item, exp + 1);
70+
double backoff = baseDelay.toNanos() * Math.pow(2, exp);
71+
if (backoff > Long.MAX_VALUE) {
72+
return maxDelay;
73+
}
74+
if (backoff > maxDelay.toNanos()) {
75+
return maxDelay;
76+
}
77+
return Duration.ofNanos(Double.valueOf(backoff).longValue());
78+
}
79+
}
80+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import com.google.common.collect.Lists;
4+
import com.google.common.collect.Sets;
5+
import java.util.LinkedList;
6+
import java.util.Set;
7+
8+
/**
9+
* The default {@link WorkQueue} implementation that uses a doubly-linked list to store work items.
10+
* This class ensures the added work items are, not in dirty set or currently processing set, before
11+
* append them to the list.
12+
*
13+
* <p>Usage example, based on a typical producer-consumer scenario. </pre>
14+
*/
15+
public class DefaultWorkQueue<T> implements WorkQueue<T> {
16+
17+
// queue defines the order in which we will work on items. Every element of queue
18+
// should be in the dirty set and not in the processing set.
19+
private LinkedList<T> queue;
20+
21+
// dirty defines all of the items that need to be processed.
22+
private Set<T> dirty;
23+
24+
// Things that are currently being processed are in the processing set.
25+
// These things may be simultaneously in the dirty set. When we finish
26+
// processing something and remove it from this set, we'll check if
27+
// it's in the dirty set, and if so, add it to the queue.
28+
private Set<T> processing;
29+
30+
private boolean shuttingDown = false;
31+
32+
public DefaultWorkQueue() {
33+
this.queue = Lists.newLinkedList();
34+
this.dirty = Sets.newHashSet();
35+
this.processing = Sets.newHashSet();
36+
}
37+
38+
@Override
39+
public synchronized void add(T item) {
40+
if (shuttingDown) {
41+
return;
42+
}
43+
44+
if (this.dirty.contains(item)) {
45+
return;
46+
}
47+
48+
this.dirty.add(item);
49+
if (this.processing.contains(item)) {
50+
return;
51+
}
52+
53+
this.queue.add(item);
54+
this.notify();
55+
}
56+
57+
@Override
58+
public synchronized int length() {
59+
return this.queue.size();
60+
}
61+
62+
@Override
63+
public synchronized T get() throws InterruptedException {
64+
while (queue.size() == 0 && !shuttingDown) {
65+
this.wait();
66+
}
67+
if (queue.size() == 0) {
68+
// We must be shutting down
69+
return null;
70+
}
71+
T obj = this.queue.poll();
72+
this.processing.add(obj);
73+
this.dirty.remove(obj);
74+
return obj;
75+
}
76+
77+
@Override
78+
public synchronized void done(T item) {
79+
this.processing.remove(item);
80+
if (this.dirty.contains(item)) {
81+
this.queue.add(item);
82+
this.notify();
83+
}
84+
}
85+
86+
@Override
87+
public synchronized void shutDown() {
88+
this.shuttingDown = true;
89+
this.notifyAll();
90+
}
91+
92+
@Override
93+
public synchronized boolean isShuttingDown() {
94+
return shuttingDown;
95+
}
96+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import java.time.Duration;
4+
5+
/**
6+
* DelayingQueue defines a queue that can Add an item at a later time. This makes it easier to
7+
* requeue items after failures without ending up in a hot-loop.
8+
*/
9+
public interface DelayingQueue<T> extends WorkQueue<T> {
10+
11+
/**
12+
* addAfter adds an item to the workqueue after the indicated duration has passed.
13+
*
14+
* @param item item to add
15+
* @param duration specific duration
16+
*/
17+
void addAfter(T item, Duration duration);
18+
}

0 commit comments

Comments
 (0)