Skip to content

Commit 5fc35cd

Browse files
committed
workqueue implementation for kube-style controller
1 parent 7413e98 commit 5fc35cd

File tree

7 files changed

+371
-0
lines changed

7 files changed

+371
-0
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package io.kubernetes.client.workqueue;
2+
3+
import com.google.common.primitives.Longs;
4+
5+
import java.time.Duration;
6+
import java.util.concurrent.DelayQueue;
7+
import java.util.concurrent.Delayed;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.TimeUnit;
10+
11+
/**
12+
* The default delaying queue implementation.
13+
*/
14+
public class DefaultDelayingQueue<T> extends DefaultWorkQueue<T> implements DelayingQueue<T> {
15+
16+
private DelayQueue<WaitForEntry> delayQueue = new DelayQueue<>();
17+
18+
public DefaultDelayingQueue(ExecutorService waitingWorker) {
19+
waitingWorker.submit(this::waitingLoop);
20+
}
21+
22+
public void addAfter(T item, Duration duration) {
23+
// don't add if we're already shutting down
24+
if (super.shuttingDown()) {
25+
return;
26+
}
27+
28+
// immediately add things w/o delay
29+
if (duration.isZero()) {
30+
super.add(item);
31+
return;
32+
}
33+
WaitForEntry entry = new WaitForEntry();
34+
entry.data = item;
35+
entry.readyAtMillis = System.currentTimeMillis() + duration.toMillis();
36+
delayQueue.offer(entry);
37+
}
38+
39+
private void waitingLoop() {
40+
try {
41+
while (true) {
42+
if (super.shuttingDown()) {
43+
return;
44+
}
45+
WaitForEntry entry = delayQueue.take();
46+
super.add((T) entry.data);
47+
}
48+
} catch (InterruptedException e) {
49+
// empty block
50+
}
51+
}
52+
53+
private static class WaitForEntry implements Delayed {
54+
55+
private Object data;
56+
private long readyAtMillis;
57+
58+
@Override
59+
public long getDelay(TimeUnit unit) {
60+
return unit.convert(readyAtMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
61+
}
62+
63+
@Override
64+
public int compareTo(Delayed o) {
65+
return Longs.compare(readyAtMillis, ((WaitForEntry) o).readyAtMillis);
66+
}
67+
}
68+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package io.kubernetes.client.workqueue;
2+
3+
import java.time.Duration;
4+
import java.util.Map;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.ExecutorService;
7+
8+
/**
9+
* The default rate limiting queue implementation.
10+
*/
11+
public class DefaultRateLimitingQueue<T> extends DefaultDelayingQueue<T> implements RateLimitingQueue<T> {
12+
13+
private RateLimiter rateLimiter;
14+
15+
public DefaultRateLimitingQueue(ExecutorService waitingWorker) {
16+
super(waitingWorker);
17+
this.rateLimiter = new ExponentialRateLimiter();
18+
}
19+
20+
public DefaultRateLimitingQueue(ExecutorService waitingWorker, RateLimiter rateLimiter) {
21+
super(waitingWorker);
22+
this.rateLimiter = rateLimiter;
23+
}
24+
25+
@Override
26+
public int numRequeues(T item) {
27+
return rateLimiter.numRequeues(item);
28+
}
29+
30+
@Override
31+
public void forget(Object item) {
32+
rateLimiter.forget(item);
33+
}
34+
35+
@Override
36+
public void addRateLimited(T item) {
37+
super.addAfter(item, rateLimiter.when(item));
38+
}
39+
40+
public static class ExponentialRateLimiter implements RateLimiter {
41+
42+
Duration baseDelay;
43+
Duration maxDelay;
44+
45+
private Map<Object, Integer> failures = new ConcurrentHashMap<>();
46+
47+
public ExponentialRateLimiter() {
48+
this.baseDelay = Duration.ofMillis(5);
49+
this.maxDelay = Duration.ofSeconds(1000);
50+
}
51+
52+
public ExponentialRateLimiter(Duration baseDelay, Duration maxDelay) {
53+
this.baseDelay = baseDelay;
54+
this.maxDelay = maxDelay;
55+
}
56+
57+
@Override
58+
public void forget(Object item) {
59+
failures.remove(item);
60+
}
61+
62+
@Override
63+
public int numRequeues(Object item) {
64+
return failures.get(item);
65+
}
66+
67+
@Override
68+
public Duration when(Object item) {
69+
Integer exp = failures.getOrDefault(item, 0);
70+
failures.put(item, exp + 1);
71+
double backoff = baseDelay.toNanos() * Math.pow(2, exp);
72+
if (backoff > Long.MAX_VALUE) {
73+
return maxDelay;
74+
}
75+
if (backoff > maxDelay.toNanos()) {
76+
return maxDelay;
77+
}
78+
return Duration.ofNanos(Double.valueOf(backoff).longValue());
79+
}
80+
}
81+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package io.kubernetes.client.workqueue;
2+
3+
import com.google.common.collect.Lists;
4+
import com.google.common.collect.Sets;
5+
6+
import java.util.LinkedList;
7+
import java.util.Set;
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
10+
/**
11+
* The default workqueue implementation.
12+
*/
13+
public class DefaultWorkQueue<T> implements WorkQueue<T> {
14+
15+
// queue defines the order in which we will work on items. Every element of queue
16+
// should be in the dirty set and not in the processing set.
17+
private LinkedList<T> queue;
18+
19+
// dirty defines all of the items that need to be processed.
20+
private Set<T> dirty;
21+
22+
// Things that are currently being processed are in the processing set.
23+
// These things may be simultaneously in the dirty set. When we finish
24+
// processing something and remove it from this set, we'll check if
25+
// it's in the dirty set, and if so, add it to the queue.
26+
private Set<T> processing;
27+
28+
private AtomicBoolean shuttingDown = new AtomicBoolean(false);
29+
30+
public DefaultWorkQueue() {
31+
this.queue = Lists.newLinkedList();
32+
this.dirty = Sets.newHashSet();
33+
this.processing = Sets.newHashSet();
34+
}
35+
36+
@Override
37+
public synchronized void add(T item) {
38+
if (shuttingDown.get()) {
39+
return;
40+
}
41+
42+
if (this.dirty.contains(item)) {
43+
return;
44+
}
45+
46+
this.dirty.add(item);
47+
if (this.processing.contains(item)) {
48+
return;
49+
}
50+
51+
this.queue.add(item);
52+
this.notify();
53+
}
54+
55+
@Override
56+
public synchronized int length() {
57+
return this.queue.size();
58+
}
59+
60+
@Override
61+
public synchronized T get() throws InterruptedException {
62+
while (queue.size() == 0 && !shuttingDown.get()) {
63+
this.wait();
64+
}
65+
if (queue.size() == 0) {
66+
// We must be shutting down
67+
return null;
68+
}
69+
T obj = this.queue.poll();
70+
this.processing.add(obj);
71+
this.dirty.remove(obj);
72+
return obj;
73+
}
74+
75+
@Override
76+
public synchronized void done(T item) {
77+
this.processing.remove(item);
78+
if (this.dirty.contains(item)) {
79+
this.queue.add(item);
80+
this.notify();
81+
}
82+
}
83+
84+
@Override
85+
public synchronized void shutDown() {
86+
this.shuttingDown.compareAndSet(false, true);
87+
this.notifyAll();
88+
}
89+
90+
@Override
91+
public synchronized boolean shuttingDown() {
92+
return shuttingDown.get();
93+
}
94+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.kubernetes.client.workqueue;
2+
3+
import java.time.Duration;
4+
5+
/**
6+
* DelayingQueue defines a queue that can Add an item at a later time. This makes it easier to
7+
* requeue items after failures without ending up in a hot-loop.
8+
*/
9+
public interface DelayingQueue<T> extends WorkQueue<T> {
10+
11+
/**
12+
* addAfter adds an item to the workqueue after the indicated duration has passed.
13+
*
14+
* @param item item to add
15+
* @param duration specific duration
16+
*/
17+
void addAfter(T item, Duration duration);
18+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.kubernetes.client.workqueue;
2+
3+
import java.time.Duration;
4+
5+
public interface RateLimiter {
6+
7+
/**
8+
* when gets an item and gets to decide how long that item should wait.
9+
*
10+
* @param item specific item
11+
* @return how long the item should wait
12+
*/
13+
Duration when(Object item);
14+
15+
/**
16+
* forget indicates that an item is finished being retried. Doesn't matter
17+
* whether its for perm failing or for success, we'll stop tracking it
18+
*
19+
* @param item item that is finished being retried
20+
*/
21+
void forget(Object item);
22+
23+
/**
24+
* numRequeues returns back how many failures the item has had.
25+
*
26+
* @param item specific item
27+
* @return how many failures the item has had
28+
*/
29+
int numRequeues(Object item);
30+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.kubernetes.client.workqueue;
2+
3+
/**
4+
* RateLimitingQueue defines a queue that rate limits items being added to the queue.
5+
*/
6+
public interface RateLimitingQueue<T> extends DelayingQueue<T> {
7+
8+
/**
9+
* addRateLimited adds an item to the workqueue after the rate limiter says its ok.
10+
*
11+
* @param item item to add
12+
*/
13+
void addRateLimited(T item);
14+
15+
/**
16+
* forget indicates that an item is finished being retried. Doesn't matter whether
17+
* its for perm failing or for success, we'll stop the rate limiter from tracking it.
18+
* This only clears the `rateLimiter`, you still have to call `Done` on the queue.
19+
*
20+
* @param item item which is finished being retried
21+
*/
22+
void forget(T item);
23+
24+
/**
25+
* numRequeues returns back how many times the item was requeued.
26+
*
27+
* @param item specific item
28+
* @return times the item was requeued
29+
*/
30+
int numRequeues(T item);
31+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package io.kubernetes.client.workqueue;
2+
3+
/**
4+
* The workqueue interface defines the queue behavior.
5+
*/
6+
public interface WorkQueue<T> {
7+
8+
/**
9+
* add marks item as needing processing.
10+
*
11+
* @param item item to add
12+
*/
13+
void add(T item);
14+
15+
/**
16+
* length returns the current queue length, for informational
17+
* purposes only.
18+
*
19+
* @return current queue length
20+
*/
21+
int length();
22+
23+
/**
24+
* Get blocks until it can return an item to be processed. If shutdown = true,
25+
* the caller should end their process.
26+
*
27+
* @return the object
28+
*/
29+
T get() throws InterruptedException;
30+
31+
/**
32+
* Done marks item as done processing, and if it has been marked as dirty again,
33+
*
34+
* @param item specific item
35+
*/
36+
void done(T item);
37+
38+
/**
39+
* ShutDown will cause q to ignore all new items added to it.
40+
*/
41+
void shutDown();
42+
43+
/**
44+
* returns whether the queue shutdown.
45+
*
46+
* @return returns {@code true} if the
47+
*/
48+
boolean shuttingDown();
49+
}

0 commit comments

Comments
 (0)