Skip to content

Commit 6053b7b

Browse files
committed
workqueue implementation for kube-style controller
1 parent 7413e98 commit 6053b7b

File tree

12 files changed

+755
-0
lines changed

12 files changed

+755
-0
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.kubernetes.client.extended.function;
2+
3+
@FunctionalInterface
4+
public interface ConditionFunc {
5+
boolean cond() throws Exception;
6+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package io.kubernetes.client.extended.wait;
2+
3+
import io.kubernetes.client.extended.function.ConditionFunc;
4+
import java.time.Duration;
5+
import java.util.concurrent.ExecutionException;
6+
import java.util.concurrent.Executors;
7+
import java.util.concurrent.ScheduledExecutorService;
8+
import java.util.concurrent.ScheduledFuture;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.concurrent.TimeoutException;
11+
import java.util.concurrent.atomic.AtomicBoolean;
12+
13+
public class Wait {
14+
15+
/**
16+
* Poll tries a condition func until it returns true, an exception, or the timeout is reached.
17+
*
18+
* @param interval the check interval
19+
* @param timeout the timeout period
20+
* @param condition the condition func
21+
*/
22+
public static boolean poll(Duration interval, Duration timeout, ConditionFunc condition) {
23+
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
24+
AtomicBoolean result = new AtomicBoolean(false);
25+
long dueDate = System.currentTimeMillis() + timeout.toMillis();
26+
ScheduledFuture<?> future =
27+
executorService.scheduleAtFixedRate(
28+
() -> {
29+
try {
30+
result.set(condition.cond());
31+
} catch (Exception e) {
32+
result.set(false);
33+
}
34+
},
35+
interval.toMillis(),
36+
interval.toMillis(),
37+
TimeUnit.MILLISECONDS);
38+
try {
39+
while (future.get(dueDate - System.currentTimeMillis(), TimeUnit.MILLISECONDS) != null
40+
&& !result.get()) {
41+
if (System.currentTimeMillis() > dueDate) {
42+
future.cancel(true);
43+
return false;
44+
}
45+
}
46+
} catch (InterruptedException | ExecutionException | TimeoutException e) {
47+
return result.get();
48+
}
49+
future.cancel(true);
50+
return result.get();
51+
}
52+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import com.google.common.primitives.Longs;
4+
import java.time.Duration;
5+
import java.util.concurrent.DelayQueue;
6+
import java.util.concurrent.Delayed;
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.TimeUnit;
9+
10+
/** The default delaying queue implementation. */
11+
public class DefaultDelayingQueue<T> extends DefaultWorkQueue<T> implements DelayingQueue<T> {
12+
13+
protected DelayQueue<WaitForEntry> delayQueue = new DelayQueue<>();
14+
15+
public DefaultDelayingQueue(ExecutorService waitingWorker) {
16+
waitingWorker.submit(this::waitingLoop);
17+
}
18+
19+
public void addAfter(T item, Duration duration) {
20+
// don't add if we're already shutting down
21+
if (super.shuttingDown()) {
22+
return;
23+
}
24+
25+
// immediately add things w/o delay
26+
if (duration.isZero()) {
27+
super.add(item);
28+
return;
29+
}
30+
WaitForEntry entry = new WaitForEntry();
31+
entry.data = item;
32+
entry.readyAtMillis = System.currentTimeMillis() + duration.toMillis();
33+
delayQueue.offer(entry);
34+
}
35+
36+
private void waitingLoop() {
37+
try {
38+
while (true) {
39+
if (super.shuttingDown()) {
40+
return;
41+
}
42+
WaitForEntry entry = delayQueue.take();
43+
super.add((T) entry.data);
44+
}
45+
} catch (InterruptedException e) {
46+
// empty block
47+
}
48+
}
49+
50+
private static class WaitForEntry implements Delayed {
51+
52+
private Object data;
53+
private long readyAtMillis;
54+
55+
@Override
56+
public long getDelay(TimeUnit unit) {
57+
return unit.convert(readyAtMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
58+
}
59+
60+
@Override
61+
public int compareTo(Delayed o) {
62+
return Longs.compare(readyAtMillis, ((WaitForEntry) o).readyAtMillis);
63+
}
64+
}
65+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import java.time.Duration;
4+
import java.util.Map;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.ExecutorService;
7+
8+
/** The default rate limiting queue implementation. */
9+
public class DefaultRateLimitingQueue<T> extends DefaultDelayingQueue<T>
10+
implements RateLimitingQueue<T> {
11+
12+
private RateLimiter rateLimiter;
13+
14+
public DefaultRateLimitingQueue(ExecutorService waitingWorker) {
15+
super(waitingWorker);
16+
this.rateLimiter = new ExponentialRateLimiter();
17+
}
18+
19+
public DefaultRateLimitingQueue(ExecutorService waitingWorker, RateLimiter rateLimiter) {
20+
super(waitingWorker);
21+
this.rateLimiter = rateLimiter;
22+
}
23+
24+
@Override
25+
public int numRequeues(T item) {
26+
return rateLimiter.numRequeues(item);
27+
}
28+
29+
@Override
30+
public void forget(Object item) {
31+
rateLimiter.forget(item);
32+
}
33+
34+
@Override
35+
public void addRateLimited(T item) {
36+
super.addAfter(item, rateLimiter.when(item));
37+
}
38+
39+
public static class ExponentialRateLimiter implements RateLimiter {
40+
41+
Duration baseDelay;
42+
Duration maxDelay;
43+
44+
private Map<Object, Integer> failures = new ConcurrentHashMap<>();
45+
46+
public ExponentialRateLimiter() {
47+
this.baseDelay = Duration.ofMillis(5);
48+
this.maxDelay = Duration.ofSeconds(1000);
49+
}
50+
51+
public ExponentialRateLimiter(Duration baseDelay, Duration maxDelay) {
52+
this.baseDelay = baseDelay;
53+
this.maxDelay = maxDelay;
54+
}
55+
56+
@Override
57+
public void forget(Object item) {
58+
failures.remove(item);
59+
}
60+
61+
@Override
62+
public int numRequeues(Object item) {
63+
return failures.get(item);
64+
}
65+
66+
@Override
67+
public Duration when(Object item) {
68+
Integer exp = failures.getOrDefault(item, 0);
69+
failures.put(item, exp + 1);
70+
double backoff = baseDelay.toNanos() * Math.pow(2, exp);
71+
if (backoff > Long.MAX_VALUE) {
72+
return maxDelay;
73+
}
74+
if (backoff > maxDelay.toNanos()) {
75+
return maxDelay;
76+
}
77+
return Duration.ofNanos(Double.valueOf(backoff).longValue());
78+
}
79+
}
80+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import com.google.common.collect.Lists;
4+
import com.google.common.collect.Sets;
5+
import java.util.LinkedList;
6+
import java.util.Set;
7+
import java.util.concurrent.atomic.AtomicBoolean;
8+
9+
/**
10+
* The default {@link WorkQueue} implementation that uses a doubly-linked list to store work items.
11+
* This class ensures the added work items are, not in dirty set or currently processing set, before
12+
* append them to the list.
13+
*
14+
* <p>Usage example, based on a typical producer-consumer scenario. </pre>
15+
*/
16+
public class DefaultWorkQueue<T> implements WorkQueue<T> {
17+
18+
// queue defines the order in which we will work on items. Every element of queue
19+
// should be in the dirty set and not in the processing set.
20+
private LinkedList<T> queue;
21+
22+
// dirty defines all of the items that need to be processed.
23+
private Set<T> dirty;
24+
25+
// Things that are currently being processed are in the processing set.
26+
// These things may be simultaneously in the dirty set. When we finish
27+
// processing something and remove it from this set, we'll check if
28+
// it's in the dirty set, and if so, add it to the queue.
29+
private Set<T> processing;
30+
31+
private AtomicBoolean shuttingDown = new AtomicBoolean(false);
32+
33+
public DefaultWorkQueue() {
34+
this.queue = Lists.newLinkedList();
35+
this.dirty = Sets.newHashSet();
36+
this.processing = Sets.newHashSet();
37+
}
38+
39+
@Override
40+
public synchronized void add(T item) {
41+
if (shuttingDown.get()) {
42+
return;
43+
}
44+
45+
if (this.dirty.contains(item)) {
46+
return;
47+
}
48+
49+
this.dirty.add(item);
50+
if (this.processing.contains(item)) {
51+
return;
52+
}
53+
54+
this.queue.add(item);
55+
this.notify();
56+
}
57+
58+
@Override
59+
public synchronized int length() {
60+
return this.queue.size();
61+
}
62+
63+
@Override
64+
public synchronized T get() throws InterruptedException {
65+
while (queue.size() == 0 && !shuttingDown.get()) {
66+
this.wait();
67+
}
68+
if (queue.size() == 0) {
69+
// We must be shutting down
70+
return null;
71+
}
72+
T obj = this.queue.poll();
73+
this.processing.add(obj);
74+
this.dirty.remove(obj);
75+
return obj;
76+
}
77+
78+
@Override
79+
public synchronized void done(T item) {
80+
this.processing.remove(item);
81+
if (this.dirty.contains(item)) {
82+
this.queue.add(item);
83+
this.notify();
84+
}
85+
}
86+
87+
@Override
88+
public synchronized void shutDown() {
89+
this.shuttingDown.compareAndSet(false, true);
90+
this.notifyAll();
91+
}
92+
93+
@Override
94+
public synchronized boolean shuttingDown() {
95+
return shuttingDown.get();
96+
}
97+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import java.time.Duration;
4+
5+
/**
6+
* DelayingQueue defines a queue that can Add an item at a later time. This makes it easier to
7+
* requeue items after failures without ending up in a hot-loop.
8+
*/
9+
public interface DelayingQueue<T> extends WorkQueue<T> {
10+
11+
/**
12+
* addAfter adds an item to the workqueue after the indicated duration has passed.
13+
*
14+
* @param item item to add
15+
* @param duration specific duration
16+
*/
17+
void addAfter(T item, Duration duration);
18+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import java.time.Duration;
4+
5+
public interface RateLimiter {
6+
7+
/**
8+
* when gets an item and gets to decide how long that item should wait.
9+
*
10+
* @param item specific item
11+
* @return how long the item should wait
12+
*/
13+
Duration when(Object item);
14+
15+
/**
16+
* forget indicates that an item is finished being retried. Doesn't matter whether its for perm
17+
* failing or for success, we'll stop tracking it
18+
*
19+
* @param item item that is finished being retried
20+
*/
21+
void forget(Object item);
22+
23+
/**
24+
* numRequeues returns back how many failures the item has had.
25+
*
26+
* @param item specific item
27+
* @return how many failures the item has had
28+
*/
29+
int numRequeues(Object item);
30+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
/** RateLimitingQueue defines a queue that rate limits items being added to the queue. */
4+
public interface RateLimitingQueue<T> extends DelayingQueue<T> {
5+
6+
/**
7+
* addRateLimited adds an item to the workqueue after the rate limiter says its ok.
8+
*
9+
* @param item item to add
10+
*/
11+
void addRateLimited(T item);
12+
13+
/**
14+
* forget indicates that an item is finished being retried. Doesn't matter whether its for perm
15+
* failing or for success, we'll stop the rate limiter from tracking it. This only clears the
16+
* `rateLimiter`, you still have to call `Done` on the queue.
17+
*
18+
* @param item item which is finished being retried
19+
*/
20+
void forget(T item);
21+
22+
/**
23+
* numRequeues returns back how many times the item was requeued.
24+
*
25+
* @param item specific item
26+
* @return times the item was requeued
27+
*/
28+
int numRequeues(T item);
29+
}

0 commit comments

Comments
 (0)