Skip to content

Commit bd4ada9

Browse files
committed
workqueue implementation for kube-style controller
1 parent 7413e98 commit bd4ada9

File tree

12 files changed

+751
-0
lines changed

12 files changed

+751
-0
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.kubernetes.client.extended.function;
2+
3+
@FunctionalInterface
4+
public interface ConditionFunc {
5+
boolean cond() throws Exception;
6+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package io.kubernetes.client.extended.wait;
2+
3+
import io.kubernetes.client.extended.function.ConditionFunc;
4+
import java.time.Duration;
5+
import java.util.Timer;
6+
import java.util.TimerTask;
7+
import java.util.concurrent.ExecutorCompletionService;
8+
import java.util.concurrent.Executors;
9+
import java.util.concurrent.Future;
10+
import java.util.concurrent.LinkedBlockingQueue;
11+
import java.util.concurrent.TimeUnit;
12+
13+
public class Wait {
14+
15+
/**
16+
* Poll tries a condition func until it returns true, an exception, or the timeout is reached.
17+
*
18+
* @param interval the check interval
19+
* @param timeout the timeout period
20+
* @param condition the condition func
21+
*/
22+
public static boolean poll(Duration interval, Duration timeout, ConditionFunc condition) {
23+
ExecutorCompletionService<Boolean> completionService =
24+
new ExecutorCompletionService<>(
25+
Executors.newSingleThreadExecutor(), new LinkedBlockingQueue<>());
26+
Timer timer = new Timer("poll-timer", true);
27+
try {
28+
timer.scheduleAtFixedRate(
29+
new TimerTask() {
30+
@Override
31+
public void run() {
32+
completionService.submit(condition::cond);
33+
}
34+
},
35+
interval.toMillis(),
36+
interval.toMillis());
37+
Future<Boolean> future = completionService.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
38+
if (future == null) {
39+
return false;
40+
}
41+
if (future.isDone()) {
42+
return true;
43+
}
44+
} catch (Exception e) {
45+
return false;
46+
} finally {
47+
timer.cancel();
48+
}
49+
return false;
50+
}
51+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import com.google.common.primitives.Longs;
4+
import java.time.Duration;
5+
import java.util.concurrent.DelayQueue;
6+
import java.util.concurrent.Delayed;
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.TimeUnit;
9+
10+
/** The default delaying queue implementation. */
11+
public class DefaultDelayingQueue<T> extends DefaultWorkQueue<T> implements DelayingQueue<T> {
12+
13+
protected DelayQueue<WaitForEntry> delayQueue = new DelayQueue<>();
14+
15+
public DefaultDelayingQueue(ExecutorService waitingWorker) {
16+
waitingWorker.submit(this::waitingLoop);
17+
}
18+
19+
public void addAfter(T item, Duration duration) {
20+
// don't add if we're already shutting down
21+
if (super.shuttingDown()) {
22+
return;
23+
}
24+
25+
// immediately add things w/o delay
26+
if (duration.isZero()) {
27+
super.add(item);
28+
return;
29+
}
30+
WaitForEntry entry = new WaitForEntry();
31+
entry.data = item;
32+
entry.readyAtMillis = System.currentTimeMillis() + duration.toMillis();
33+
delayQueue.offer(entry);
34+
}
35+
36+
private void waitingLoop() {
37+
try {
38+
while (true) {
39+
if (super.shuttingDown()) {
40+
return;
41+
}
42+
WaitForEntry entry = delayQueue.take();
43+
super.add((T) entry.data);
44+
}
45+
} catch (InterruptedException e) {
46+
// empty block
47+
}
48+
}
49+
50+
private static class WaitForEntry implements Delayed {
51+
52+
private Object data;
53+
private long readyAtMillis;
54+
55+
@Override
56+
public long getDelay(TimeUnit unit) {
57+
return unit.convert(readyAtMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
58+
}
59+
60+
@Override
61+
public int compareTo(Delayed o) {
62+
return Longs.compare(readyAtMillis, ((WaitForEntry) o).readyAtMillis);
63+
}
64+
}
65+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import java.time.Duration;
4+
import java.util.Map;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.ExecutorService;
7+
8+
/** The default rate limiting queue implementation. */
9+
public class DefaultRateLimitingQueue<T> extends DefaultDelayingQueue<T>
10+
implements RateLimitingQueue<T> {
11+
12+
private RateLimiter rateLimiter;
13+
14+
public DefaultRateLimitingQueue(ExecutorService waitingWorker) {
15+
super(waitingWorker);
16+
this.rateLimiter = new ExponentialRateLimiter();
17+
}
18+
19+
public DefaultRateLimitingQueue(ExecutorService waitingWorker, RateLimiter rateLimiter) {
20+
super(waitingWorker);
21+
this.rateLimiter = rateLimiter;
22+
}
23+
24+
@Override
25+
public int numRequeues(T item) {
26+
return rateLimiter.numRequeues(item);
27+
}
28+
29+
@Override
30+
public void forget(Object item) {
31+
rateLimiter.forget(item);
32+
}
33+
34+
@Override
35+
public void addRateLimited(T item) {
36+
super.addAfter(item, rateLimiter.when(item));
37+
}
38+
39+
public static class ExponentialRateLimiter implements RateLimiter {
40+
41+
Duration baseDelay;
42+
Duration maxDelay;
43+
44+
private Map<Object, Integer> failures = new ConcurrentHashMap<>();
45+
46+
public ExponentialRateLimiter() {
47+
this.baseDelay = Duration.ofMillis(5);
48+
this.maxDelay = Duration.ofSeconds(1000);
49+
}
50+
51+
public ExponentialRateLimiter(Duration baseDelay, Duration maxDelay) {
52+
this.baseDelay = baseDelay;
53+
this.maxDelay = maxDelay;
54+
}
55+
56+
@Override
57+
public void forget(Object item) {
58+
failures.remove(item);
59+
}
60+
61+
@Override
62+
public int numRequeues(Object item) {
63+
return failures.get(item);
64+
}
65+
66+
@Override
67+
public Duration when(Object item) {
68+
Integer exp = failures.getOrDefault(item, 0);
69+
failures.put(item, exp + 1);
70+
double backoff = baseDelay.toNanos() * Math.pow(2, exp);
71+
if (backoff > Long.MAX_VALUE) {
72+
return maxDelay;
73+
}
74+
if (backoff > maxDelay.toNanos()) {
75+
return maxDelay;
76+
}
77+
return Duration.ofNanos(Double.valueOf(backoff).longValue());
78+
}
79+
}
80+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import com.google.common.collect.Lists;
4+
import com.google.common.collect.Sets;
5+
import java.util.LinkedList;
6+
import java.util.Set;
7+
import java.util.concurrent.atomic.AtomicBoolean;
8+
9+
/** The default workqueue implementation. */
10+
public class DefaultWorkQueue<T> implements WorkQueue<T> {
11+
12+
// queue defines the order in which we will work on items. Every element of queue
13+
// should be in the dirty set and not in the processing set.
14+
private LinkedList<T> queue;
15+
16+
// dirty defines all of the items that need to be processed.
17+
private Set<T> dirty;
18+
19+
// Things that are currently being processed are in the processing set.
20+
// These things may be simultaneously in the dirty set. When we finish
21+
// processing something and remove it from this set, we'll check if
22+
// it's in the dirty set, and if so, add it to the queue.
23+
private Set<T> processing;
24+
25+
private AtomicBoolean shuttingDown = new AtomicBoolean(false);
26+
27+
public DefaultWorkQueue() {
28+
this.queue = Lists.newLinkedList();
29+
this.dirty = Sets.newHashSet();
30+
this.processing = Sets.newHashSet();
31+
}
32+
33+
@Override
34+
public synchronized void add(T item) {
35+
if (shuttingDown.get()) {
36+
return;
37+
}
38+
39+
if (this.dirty.contains(item)) {
40+
return;
41+
}
42+
43+
this.dirty.add(item);
44+
if (this.processing.contains(item)) {
45+
return;
46+
}
47+
48+
this.queue.add(item);
49+
this.notify();
50+
}
51+
52+
@Override
53+
public synchronized int length() {
54+
return this.queue.size();
55+
}
56+
57+
@Override
58+
public synchronized T get() throws InterruptedException {
59+
while (queue.size() == 0 && !shuttingDown.get()) {
60+
this.wait();
61+
}
62+
if (queue.size() == 0) {
63+
// We must be shutting down
64+
return null;
65+
}
66+
T obj = this.queue.poll();
67+
this.processing.add(obj);
68+
this.dirty.remove(obj);
69+
return obj;
70+
}
71+
72+
@Override
73+
public synchronized void done(T item) {
74+
this.processing.remove(item);
75+
if (this.dirty.contains(item)) {
76+
this.queue.add(item);
77+
this.notify();
78+
}
79+
}
80+
81+
@Override
82+
public synchronized void shutDown() {
83+
this.shuttingDown.compareAndSet(false, true);
84+
this.notifyAll();
85+
}
86+
87+
@Override
88+
public synchronized boolean shuttingDown() {
89+
return shuttingDown.get();
90+
}
91+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import java.time.Duration;
4+
5+
/**
6+
* DelayingQueue defines a queue that can Add an item at a later time. This makes it easier to
7+
* requeue items after failures without ending up in a hot-loop.
8+
*/
9+
public interface DelayingQueue<T> extends WorkQueue<T> {
10+
11+
/**
12+
* addAfter adds an item to the workqueue after the indicated duration has passed.
13+
*
14+
* @param item item to add
15+
* @param duration specific duration
16+
*/
17+
void addAfter(T item, Duration duration);
18+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import java.time.Duration;
4+
5+
public interface RateLimiter {
6+
7+
/**
8+
* when gets an item and gets to decide how long that item should wait.
9+
*
10+
* @param item specific item
11+
* @return how long the item should wait
12+
*/
13+
Duration when(Object item);
14+
15+
/**
16+
* forget indicates that an item is finished being retried. Doesn't matter whether its for perm
17+
* failing or for success, we'll stop tracking it
18+
*
19+
* @param item item that is finished being retried
20+
*/
21+
void forget(Object item);
22+
23+
/**
24+
* numRequeues returns back how many failures the item has had.
25+
*
26+
* @param item specific item
27+
* @return how many failures the item has had
28+
*/
29+
int numRequeues(Object item);
30+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
/** RateLimitingQueue defines a queue that rate limits items being added to the queue. */
4+
public interface RateLimitingQueue<T> extends DelayingQueue<T> {
5+
6+
/**
7+
* addRateLimited adds an item to the workqueue after the rate limiter says its ok.
8+
*
9+
* @param item item to add
10+
*/
11+
void addRateLimited(T item);
12+
13+
/**
14+
* forget indicates that an item is finished being retried. Doesn't matter whether its for perm
15+
* failing or for success, we'll stop the rate limiter from tracking it. This only clears the
16+
* `rateLimiter`, you still have to call `Done` on the queue.
17+
*
18+
* @param item item which is finished being retried
19+
*/
20+
void forget(T item);
21+
22+
/**
23+
* numRequeues returns back how many times the item was requeued.
24+
*
25+
* @param item specific item
26+
* @return times the item was requeued
27+
*/
28+
int numRequeues(T item);
29+
}

0 commit comments

Comments
 (0)