Skip to content

Commit c2563f2

Browse files
adohezibo.hzb
authored and
zibo.hzb
committed
workqueue implementation for kube-style controller
1 parent 7413e98 commit c2563f2

File tree

7 files changed

+399
-0
lines changed

7 files changed

+399
-0
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package io.kubernetes.client.workqueue;
2+
3+
import com.google.common.primitives.Longs;
4+
5+
import java.time.Duration;
6+
import java.util.concurrent.DelayQueue;
7+
import java.util.concurrent.Delayed;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.TimeUnit;
11+
12+
/**
13+
* The default delaying queue implementation.
14+
*/
15+
public class DefaultDelayingQueue extends DefaultWorkQueue implements DelayingQueue {
16+
17+
private DelayQueue delayQueue = new DelayQueue();
18+
private ExecutorService waitingWorker = Executors.newSingleThreadExecutor();
19+
20+
public DefaultDelayingQueue() {
21+
waitingWorker.submit(this::waitingLoop);
22+
}
23+
24+
public void addAfter(Object item, Duration duration) {
25+
// don't add if we're already shutting down
26+
if (super.shuttingDown()) {
27+
return;
28+
}
29+
30+
// immediately add things w/o delay
31+
if (duration.isZero()) {
32+
super.add(item);
33+
return;
34+
}
35+
WaitForEntry entry = new WaitForEntry();
36+
entry.data = item;
37+
entry.readyAtMillis = System.currentTimeMillis() + duration.toMillis();
38+
delayQueue.offer(entry);
39+
}
40+
41+
public void waitingLoop() {
42+
try {
43+
while (true) {
44+
if (super.shuttingDown()) {
45+
return;
46+
}
47+
WaitForEntry entry = (WaitForEntry) delayQueue.take();
48+
super.add(entry.data);
49+
}
50+
} catch (InterruptedException e) {
51+
return;
52+
}
53+
}
54+
55+
private class WaitForEntry implements Delayed {
56+
57+
private Object data;
58+
private long readyAtMillis;
59+
60+
@Override
61+
public long getDelay(TimeUnit unit) {
62+
return unit.convert(readyAtMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
63+
}
64+
65+
@Override
66+
public int compareTo(Delayed o) {
67+
return Longs.compare(readyAtMillis, ((WaitForEntry) o).readyAtMillis);
68+
}
69+
}
70+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package io.kubernetes.client.workqueue;
2+
3+
import java.time.Duration;
4+
import java.util.Map;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
7+
/**
8+
* The default rate limiting queue implementation.
9+
*/
10+
public class DefaultRateLimitingQueue extends DefaultDelayingQueue implements RateLimitingQueue {
11+
12+
private RateLimiter rateLimiter;
13+
14+
public DefaultRateLimitingQueue() {
15+
rateLimiter = new ExponentialRateLimiter();
16+
}
17+
18+
public DefaultRateLimitingQueue(RateLimiter rateLimiter) {
19+
this.rateLimiter = rateLimiter;
20+
}
21+
22+
@Override
23+
public int numRequeues(Object item) {
24+
return rateLimiter.numRequeues(item);
25+
}
26+
27+
@Override
28+
public void forget(Object item) {
29+
rateLimiter.forget(item);
30+
}
31+
32+
@Override
33+
public void addRateLimited(Object item) {
34+
super.addAfter(item, rateLimiter.when(item));
35+
}
36+
37+
public static class ExponentialRateLimiter implements RateLimiter {
38+
39+
Duration baseDelay;
40+
Duration maxDelay;
41+
42+
private Map<Object, Integer> failures = new ConcurrentHashMap<>();
43+
44+
public ExponentialRateLimiter() {
45+
this.baseDelay = Duration.ofMillis(5);
46+
this.maxDelay = Duration.ofSeconds(1000);
47+
}
48+
49+
public ExponentialRateLimiter(Duration baseDelay, Duration maxDelay) {
50+
this.baseDelay = baseDelay;
51+
this.maxDelay = maxDelay;
52+
}
53+
54+
@Override
55+
public void forget(Object item) {
56+
failures.remove(item);
57+
}
58+
59+
@Override
60+
public int numRequeues(Object item) {
61+
return failures.get(item);
62+
}
63+
64+
@Override
65+
public Duration when(Object item) {
66+
Integer exp = failures.getOrDefault(item, 0);
67+
failures.put(item, exp + 1);
68+
double backoff = baseDelay.toNanos() * Math.pow(2, exp);
69+
if (backoff > Long.MAX_VALUE) {
70+
return maxDelay;
71+
}
72+
if (backoff > maxDelay.toNanos()) {
73+
return maxDelay;
74+
}
75+
return Duration.ofNanos(Double.valueOf(backoff).longValue());
76+
}
77+
}
78+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package io.kubernetes.client.workqueue;
2+
3+
import com.google.common.collect.Sets;
4+
5+
import java.util.LinkedList;
6+
import java.util.Set;
7+
import java.util.concurrent.atomic.AtomicBoolean;
8+
import java.util.concurrent.locks.Condition;
9+
import java.util.concurrent.locks.ReentrantLock;
10+
11+
/**
12+
* The default workqueue implementation.
13+
*/
14+
public class DefaultWorkQueue implements WorkQueue {
15+
16+
// queue defines the order in which we will work on items. Every element of queue
17+
// should be in the dirty set and not in the processing set.
18+
private LinkedList<Object> queue;
19+
20+
// dirty defines all of the items that need to be processed.
21+
private Set<Object> dirty;
22+
23+
// Things that are currently being processed are in the processing set.
24+
// These things may be simultaneously in the dirty set. When we finish
25+
// processing something and remove it from this set, we'll check if
26+
// it's in the dirty set, and if so, add it to the queue.
27+
private Set<Object> processing;
28+
29+
private AtomicBoolean shuttingDown = new AtomicBoolean(false);
30+
31+
private final ReentrantLock lock = new ReentrantLock();
32+
33+
private final Condition notEmpty = lock.newCondition();
34+
35+
public DefaultWorkQueue() {
36+
this.queue = new LinkedList<>();
37+
this.dirty = Sets.newHashSet();
38+
this.processing = Sets.newHashSet();
39+
}
40+
41+
@Override
42+
public void add(Object item) {
43+
lock.lock();
44+
try {
45+
46+
if (shuttingDown.get()) {
47+
return;
48+
}
49+
50+
if (this.dirty.contains(item)) {
51+
return;
52+
}
53+
54+
this.dirty.add(item);
55+
if (this.processing.contains(item)) {
56+
return;
57+
}
58+
59+
this.queue.add(item);
60+
notEmpty.signal();
61+
} finally {
62+
lock.unlock();
63+
}
64+
}
65+
66+
@Override
67+
public int length() {
68+
lock.lock();
69+
try {
70+
return this.queue.size();
71+
} finally {
72+
lock.unlock();
73+
}
74+
}
75+
76+
@Override
77+
public Object get() {
78+
lock.lock();
79+
try {
80+
while (queue.size() == 0 && !shuttingDown.get()) {
81+
notEmpty.await();
82+
}
83+
if (queue.size() == 0) {
84+
// We must be shutting down
85+
return null;
86+
}
87+
Object obj = this.queue.poll();
88+
this.processing.add(obj);
89+
this.dirty.remove(obj);
90+
return obj;
91+
} catch (Throwable t) {
92+
// just use empty body
93+
// potential NPE danger?
94+
return null;
95+
} finally {
96+
lock.unlock();
97+
}
98+
}
99+
100+
@Override
101+
public void done(Object item) {
102+
lock.lock();
103+
try {
104+
this.processing.remove(item);
105+
if (this.dirty.contains(item)) {
106+
this.queue.add(item);
107+
notEmpty.signal();
108+
}
109+
} finally {
110+
lock.unlock();
111+
}
112+
}
113+
114+
@Override
115+
public void shutDown() {
116+
this.shuttingDown.compareAndSet(false, true);
117+
}
118+
119+
@Override
120+
public boolean shuttingDown() {
121+
return shuttingDown.get();
122+
}
123+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.kubernetes.client.workqueue;
2+
3+
import java.time.Duration;
4+
5+
/**
6+
* DelayingQueue defines a queue that can Add an item at a later time. This makes it easier to
7+
* requeue items after failures without ending up in a hot-loop.
8+
*/
9+
public interface DelayingQueue extends WorkQueue {
10+
11+
/**
12+
* addAfter adds an item to the workqueue after the indicated duration has passed.
13+
*
14+
* @param item item to add
15+
* @param duration specific duration
16+
*/
17+
void addAfter(Object item, Duration duration);
18+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.kubernetes.client.workqueue;
2+
3+
import java.time.Duration;
4+
5+
public interface RateLimiter {
6+
7+
/**
8+
* when gets an item and gets to decide how long that item should wait.
9+
*
10+
* @param item specific item
11+
* @return how long the item should wait
12+
*/
13+
Duration when(Object item);
14+
15+
/**
16+
* forget indicates that an item is finished being retried. Doesn't matter
17+
* whether its for perm failing or for success, we'll stop tracking it
18+
*
19+
* @param item item that is finished being retried
20+
*/
21+
void forget(Object item);
22+
23+
/**
24+
* numRequeues returns back how many failures the item has had.
25+
*
26+
* @param item specific item
27+
* @return how many failures the item has had
28+
*/
29+
int numRequeues(Object item);
30+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.kubernetes.client.workqueue;
2+
3+
/**
4+
* RateLimitingQueue defines a queue that rate limits items being added to the queue.
5+
*/
6+
public interface RateLimitingQueue extends DelayingQueue {
7+
8+
/**
9+
* addRateLimited adds an item to the workqueue after the rate limiter says its ok.
10+
*
11+
* @param item item to add
12+
*/
13+
void addRateLimited(Object item);
14+
15+
/**
16+
* forget indicates that an item is finished being retried. Doesn't matter whether
17+
* its for perm failing or for success, we'll stop the rate limiter from tracking it.
18+
* This only clears the `rateLimiter`, you still have to call `Done` on the queue.
19+
*
20+
* @param item item which is finished being retried
21+
*/
22+
void forget(Object item);
23+
24+
/**
25+
* numRequeues returns back how many times the item was requeued.
26+
*
27+
* @param item specific item
28+
* @return times the item was requeued
29+
*/
30+
int numRequeues(Object item);
31+
}

0 commit comments

Comments
 (0)