Skip to content

Commit a6c2951

Browse files
committed
workqueue implementation for kube-style controller
1 parent 7413e98 commit a6c2951

File tree

11 files changed

+772
-0
lines changed

11 files changed

+772
-0
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package io.kubernetes.client.extended.wait;
2+
3+
import java.time.Duration;
4+
import java.util.concurrent.Executors;
5+
import java.util.concurrent.ScheduledExecutorService;
6+
import java.util.concurrent.ScheduledFuture;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
import java.util.function.Supplier;
10+
11+
public class Wait {
12+
13+
/**
14+
* Poll tries a condition func until it returns true, an exception, or the timeout is reached.
15+
*
16+
* @param interval the check interval
17+
* @param timeout the timeout period
18+
* @param condition the condition func
19+
*/
20+
public static boolean poll(Duration interval, Duration timeout, Supplier<Boolean> condition) {
21+
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
22+
AtomicBoolean result = new AtomicBoolean(false);
23+
long dueDate = System.currentTimeMillis() + timeout.toMillis();
24+
ScheduledFuture<?> future =
25+
executorService.scheduleAtFixedRate(
26+
() -> {
27+
try {
28+
result.set(condition.get());
29+
} catch (Exception e) {
30+
result.set(false);
31+
}
32+
},
33+
interval.toMillis(),
34+
interval.toMillis(),
35+
TimeUnit.MILLISECONDS);
36+
try {
37+
while (System.currentTimeMillis() < dueDate) {
38+
if (result.get()) {
39+
future.cancel(true);
40+
return true;
41+
}
42+
}
43+
} catch (Exception e) {
44+
return result.get();
45+
}
46+
future.cancel(true);
47+
return result.get();
48+
}
49+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import com.google.common.primitives.Longs;
4+
import java.time.Duration;
5+
import java.time.Instant;
6+
import java.time.temporal.Temporal;
7+
import java.util.concurrent.BlockingQueue;
8+
import java.util.concurrent.DelayQueue;
9+
import java.util.concurrent.Delayed;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
12+
import java.util.concurrent.LinkedBlockingQueue;
13+
import java.util.concurrent.TimeUnit;
14+
15+
/** The default delaying queue implementation. */
16+
public class DefaultDelayingQueue<T> extends DefaultWorkQueue<T> implements DelayingQueue<T> {
17+
18+
private DelayQueue<WaitForEntry> delayQueue;
19+
protected BlockingQueue<WaitForEntry> waitingForAddQueue;
20+
21+
public DefaultDelayingQueue(ExecutorService waitingWorker) {
22+
this.delayQueue = new DelayQueue<>();
23+
this.waitingForAddQueue = new LinkedBlockingQueue<>();
24+
waitingWorker.submit(this::waitingLoop);
25+
}
26+
27+
public DefaultDelayingQueue() {
28+
this(Executors.newSingleThreadExecutor());
29+
}
30+
31+
public void addAfter(T item, Duration duration) {
32+
// don't add if we're already shutting down
33+
if (super.shuttingDown()) {
34+
return;
35+
}
36+
37+
// immediately add things w/o delay
38+
if (duration.isZero()) {
39+
super.add(item);
40+
return;
41+
}
42+
WaitForEntry entry = new WaitForEntry();
43+
entry.data = item;
44+
entry.readyAtMillis = duration.addTo(Instant.now());
45+
this.waitingForAddQueue.add(entry);
46+
}
47+
48+
private void waitingLoop() {
49+
try {
50+
while (true) {
51+
if (super.shuttingDown()) {
52+
return;
53+
}
54+
WaitForEntry entry = delayQueue.peek();
55+
Duration nextReadyAt = Duration.ofMillis(10 * 1000L);
56+
if (entry != null) {
57+
Instant now = Instant.now();
58+
if (!Duration.between(entry.readyAtMillis, now).isNegative()) {
59+
delayQueue.remove(entry);
60+
super.add((T) entry.data);
61+
} else {
62+
nextReadyAt = Duration.between(now, entry.readyAtMillis);
63+
}
64+
}
65+
66+
WaitForEntry waitForEntry =
67+
waitingForAddQueue.poll(nextReadyAt.toMillis(), TimeUnit.MILLISECONDS);
68+
if (waitForEntry != null
69+
&& Duration.between(waitForEntry.readyAtMillis, Instant.now()).isNegative()) {
70+
this.delayQueue.offer(waitForEntry);
71+
}
72+
}
73+
} catch (InterruptedException e) {
74+
// empty block
75+
}
76+
}
77+
78+
// WaitForEntry holds the data to add and the time it should be added.
79+
private static class WaitForEntry implements Delayed {
80+
81+
private Object data;
82+
private Temporal readyAtMillis;
83+
84+
@Override
85+
public long getDelay(TimeUnit unit) {
86+
Duration duration = Duration.between(Instant.now(), readyAtMillis);
87+
return unit.convert(duration.toMillis(), TimeUnit.MILLISECONDS);
88+
}
89+
90+
@Override
91+
public int compareTo(Delayed o) {
92+
return Longs.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
93+
}
94+
}
95+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import java.time.Duration;
4+
import java.util.Map;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.ExecutorService;
7+
8+
/** The default rate limiting queue implementation. */
9+
public class DefaultRateLimitingQueue<T> extends DefaultDelayingQueue<T>
10+
implements RateLimitingQueue<T> {
11+
12+
private RateLimiter rateLimiter;
13+
14+
public DefaultRateLimitingQueue(ExecutorService waitingWorker) {
15+
super(waitingWorker);
16+
this.rateLimiter = new ExponentialRateLimiter();
17+
}
18+
19+
public DefaultRateLimitingQueue(ExecutorService waitingWorker, RateLimiter rateLimiter) {
20+
super(waitingWorker);
21+
this.rateLimiter = rateLimiter;
22+
}
23+
24+
@Override
25+
public int numRequeues(T item) {
26+
return rateLimiter.numRequeues(item);
27+
}
28+
29+
@Override
30+
public void forget(Object item) {
31+
rateLimiter.forget(item);
32+
}
33+
34+
@Override
35+
public void addRateLimited(T item) {
36+
super.addAfter(item, rateLimiter.when(item));
37+
}
38+
39+
public static class ExponentialRateLimiter implements RateLimiter {
40+
41+
Duration baseDelay;
42+
Duration maxDelay;
43+
44+
private Map<Object, Integer> failures = new ConcurrentHashMap<>();
45+
46+
public ExponentialRateLimiter() {
47+
this.baseDelay = Duration.ofMillis(5);
48+
this.maxDelay = Duration.ofSeconds(1000);
49+
}
50+
51+
public ExponentialRateLimiter(Duration baseDelay, Duration maxDelay) {
52+
this.baseDelay = baseDelay;
53+
this.maxDelay = maxDelay;
54+
}
55+
56+
@Override
57+
public void forget(Object item) {
58+
failures.remove(item);
59+
}
60+
61+
@Override
62+
public int numRequeues(Object item) {
63+
return failures.get(item);
64+
}
65+
66+
@Override
67+
public Duration when(Object item) {
68+
Integer exp = failures.getOrDefault(item, 0);
69+
failures.put(item, exp + 1);
70+
double backoff = baseDelay.toNanos() * Math.pow(2, exp);
71+
if (backoff > Long.MAX_VALUE) {
72+
return maxDelay;
73+
}
74+
if (backoff > maxDelay.toNanos()) {
75+
return maxDelay;
76+
}
77+
return Duration.ofNanos(Double.valueOf(backoff).longValue());
78+
}
79+
}
80+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import com.google.common.collect.Lists;
4+
import com.google.common.collect.Sets;
5+
import java.util.LinkedList;
6+
import java.util.Set;
7+
import java.util.concurrent.atomic.AtomicBoolean;
8+
9+
/**
10+
* The default {@link WorkQueue} implementation that uses a doubly-linked list to store work items.
11+
* This class ensures the added work items are, not in dirty set or currently processing set, before
12+
* append them to the list.
13+
*
14+
* <p>Usage example, based on a typical producer-consumer scenario. </pre>
15+
*/
16+
public class DefaultWorkQueue<T> implements WorkQueue<T> {
17+
18+
// queue defines the order in which we will work on items. Every element of queue
19+
// should be in the dirty set and not in the processing set.
20+
private LinkedList<T> queue;
21+
22+
// dirty defines all of the items that need to be processed.
23+
private Set<T> dirty;
24+
25+
// Things that are currently being processed are in the processing set.
26+
// These things may be simultaneously in the dirty set. When we finish
27+
// processing something and remove it from this set, we'll check if
28+
// it's in the dirty set, and if so, add it to the queue.
29+
private Set<T> processing;
30+
31+
private AtomicBoolean shuttingDown = new AtomicBoolean(false);
32+
33+
public DefaultWorkQueue() {
34+
this.queue = Lists.newLinkedList();
35+
this.dirty = Sets.newHashSet();
36+
this.processing = Sets.newHashSet();
37+
}
38+
39+
@Override
40+
public synchronized void add(T item) {
41+
if (shuttingDown.get()) {
42+
return;
43+
}
44+
45+
if (this.dirty.contains(item)) {
46+
return;
47+
}
48+
49+
this.dirty.add(item);
50+
if (this.processing.contains(item)) {
51+
return;
52+
}
53+
54+
this.queue.add(item);
55+
this.notify();
56+
}
57+
58+
@Override
59+
public synchronized int length() {
60+
return this.queue.size();
61+
}
62+
63+
@Override
64+
public synchronized T get() throws InterruptedException {
65+
while (queue.size() == 0 && !shuttingDown.get()) {
66+
this.wait();
67+
}
68+
if (queue.size() == 0) {
69+
// We must be shutting down
70+
return null;
71+
}
72+
T obj = this.queue.poll();
73+
this.processing.add(obj);
74+
this.dirty.remove(obj);
75+
return obj;
76+
}
77+
78+
@Override
79+
public synchronized void done(T item) {
80+
this.processing.remove(item);
81+
if (this.dirty.contains(item)) {
82+
this.queue.add(item);
83+
this.notify();
84+
}
85+
}
86+
87+
@Override
88+
public synchronized void shutDown() {
89+
this.shuttingDown.compareAndSet(false, true);
90+
this.notifyAll();
91+
}
92+
93+
@Override
94+
public synchronized boolean shuttingDown() {
95+
return shuttingDown.get();
96+
}
97+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import java.time.Duration;
4+
5+
/**
6+
* DelayingQueue defines a queue that can Add an item at a later time. This makes it easier to
7+
* requeue items after failures without ending up in a hot-loop.
8+
*/
9+
public interface DelayingQueue<T> extends WorkQueue<T> {
10+
11+
/**
12+
* addAfter adds an item to the workqueue after the indicated duration has passed.
13+
*
14+
* @param item item to add
15+
* @param duration specific duration
16+
*/
17+
void addAfter(T item, Duration duration);
18+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import java.time.Duration;
4+
5+
public interface RateLimiter {
6+
7+
/**
8+
* when gets an item and gets to decide how long that item should wait.
9+
*
10+
* @param item specific item
11+
* @return how long the item should wait
12+
*/
13+
Duration when(Object item);
14+
15+
/**
16+
* forget indicates that an item is finished being retried. Doesn't matter whether its for perm
17+
* failing or for success, we'll stop tracking it
18+
*
19+
* @param item item that is finished being retried
20+
*/
21+
void forget(Object item);
22+
23+
/**
24+
* numRequeues returns back how many failures the item has had.
25+
*
26+
* @param item specific item
27+
* @return how many failures the item has had
28+
*/
29+
int numRequeues(Object item);
30+
}

0 commit comments

Comments
 (0)