Skip to content

Commit 0e17f42

Browse files
committed
Add workqueue support
1 parent a64e01f commit 0e17f42

File tree

10 files changed

+637
-0
lines changed

10 files changed

+637
-0
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import java.time.Duration;
4+
import java.time.Instant;
5+
import java.time.temporal.ChronoUnit;
6+
import java.time.temporal.Temporal;
7+
import java.util.concurrent.*;
8+
import javax.annotation.Nonnull;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
/**
13+
* DelayingWorkQueue can Add an item at a later time. This makes it easier to requeue items after
14+
* failures without ending up in a hot-loop.
15+
*/
16+
public class DelayingWorkQueue<T> extends WorkQueue<T> {
17+
private static final Logger log = LoggerFactory.getLogger(DelayingWorkQueue.class);
18+
19+
private DelayQueue<DelayEntry> delayItems;
20+
21+
public DelayingWorkQueue(ExecutorService executorService) {
22+
this.delayItems = new DelayQueue<>();
23+
executorService.submit(this::waitingLoop);
24+
}
25+
26+
public DelayingWorkQueue() {
27+
this(Executors.newSingleThreadExecutor());
28+
}
29+
30+
public synchronized void addAfter(T item, long amount, ChronoUnit timeUnit) {
31+
addAfter(item, Duration.of(amount, timeUnit));
32+
}
33+
34+
/**
35+
* AddAfter adds an item to the workqueue after the indicated duration has passed
36+
*
37+
* @param item Item will be added to the queue
38+
* @param duration Item will be added to the queue after the duration has passed
39+
*/
40+
public synchronized void addAfter(T item, Duration duration) {
41+
if (shuttingDown()) {
42+
return;
43+
}
44+
45+
if (duration.isZero() || duration.isNegative()) {
46+
add(item);
47+
return;
48+
}
49+
50+
delayItems.add(new DelayEntry(item, duration));
51+
}
52+
53+
private void waitingLoop() {
54+
try {
55+
while (true) {
56+
if (shuttingDown()) {
57+
break;
58+
}
59+
60+
DelayEntry delayEntry = delayItems.take();
61+
add(delayEntry.data);
62+
}
63+
} catch (InterruptedException e) {
64+
if (log.isDebugEnabled()) {
65+
log.debug("Waiting loop is stopped, reason: {}", e);
66+
}
67+
}
68+
}
69+
70+
private class DelayEntry implements Delayed {
71+
72+
private T data;
73+
private Temporal expectedEndTime;
74+
75+
public DelayEntry(T data, Duration delay) {
76+
this.data = data;
77+
78+
expectedEndTime = delay.addTo(Instant.now());
79+
}
80+
81+
@Override
82+
public long getDelay(@Nonnull TimeUnit unit) {
83+
Duration duration = Duration.between(Instant.now(), expectedEndTime);
84+
return unit.convert(duration.toMillis(), TimeUnit.MILLISECONDS);
85+
}
86+
87+
@Override
88+
public int compareTo(@Nonnull Delayed o) {
89+
return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
90+
}
91+
}
92+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import io.kubernetes.client.extended.workqueue.ratelimiter.RateLimiter;
4+
import java.util.concurrent.ExecutorService;
5+
import java.util.concurrent.Executors;
6+
7+
public class RateLimitingQueue<T> extends DelayingWorkQueue<T> {
8+
private RateLimiter<T> rateLimiter;
9+
10+
public RateLimitingQueue(RateLimiter<T> rateLimiter, ExecutorService executorService) {
11+
super(executorService);
12+
13+
this.rateLimiter = rateLimiter;
14+
}
15+
16+
public RateLimitingQueue(RateLimiter<T> rateLimiter) {
17+
this(rateLimiter, Executors.newSingleThreadExecutor());
18+
}
19+
20+
public void addRateLimited(T item) {
21+
addAfter(item, rateLimiter.when(item));
22+
}
23+
24+
public void forget(T item) {
25+
rateLimiter.forget(item);
26+
}
27+
28+
public int numRequests(T item) {
29+
return rateLimiter.numRequests(item);
30+
}
31+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package io.kubernetes.client.extended.workqueue;
2+
3+
import java.util.HashSet;
4+
import java.util.LinkedList;
5+
import java.util.concurrent.atomic.AtomicBoolean;
6+
7+
/**
8+
* workqueue provides a simple queue that supports the following features:
9+
*
10+
* <ul>
11+
* <li>Fair: items processed in the order in which they are added.
12+
* <li>Stingy: a single item will not be processed multiple times concurrently, and if an item is
13+
* added multiple times before it can be processed, it will only be processed once.
14+
* <li>Multiple consumers and producers. In particular, it is allowed for an item to be reenqueued
15+
* while it is being processed.
16+
* <li>Shutdown notifications.
17+
* </ul>
18+
*/
19+
public class WorkQueue<T> {
20+
// queue defines the order in which we will work on items. Every
21+
// element of queue should be in the dirty set and not in the
22+
// processing set.
23+
private LinkedList<T> queue;
24+
25+
// dirty defines all of the items that need to be processed.
26+
private HashSet<T> dirty;
27+
28+
// Things that are currently being processed are in the processing set.
29+
// These things may be simultaneously in the dirty set. When we finish
30+
// processing something and remove it from this set, we'll check if
31+
// it's in the dirty set, and if so, add it to the queue.
32+
private HashSet<T> processing;
33+
34+
private AtomicBoolean shuttingDown;
35+
36+
public WorkQueue() {
37+
queue = new LinkedList<>();
38+
dirty = new HashSet<>();
39+
processing = new HashSet<>();
40+
41+
shuttingDown = new AtomicBoolean(false);
42+
}
43+
44+
/**
45+
* Add marks item as needing processing.
46+
*
47+
* @param item item need to be processed.
48+
*/
49+
public synchronized void add(T item) {
50+
if (shuttingDown.get()) {
51+
return;
52+
}
53+
54+
if (dirty.contains(item)) {
55+
return;
56+
}
57+
58+
dirty.add(item);
59+
if (processing.contains(item)) {
60+
return;
61+
}
62+
63+
queue.add(item);
64+
this.notify();
65+
}
66+
67+
/**
68+
* Get blocks until it can return an item to be processed.
69+
*
70+
* @return item to be processed
71+
* @throws InterruptedException if queue is shutting down
72+
*/
73+
public synchronized T get() throws InterruptedException {
74+
while (queue.size() == 0 && !shuttingDown.get()) {
75+
this.wait();
76+
}
77+
78+
if (queue.size() == 0) {
79+
// We must be shutting down
80+
throw new InterruptedException("WorkQueue is shutting down");
81+
}
82+
83+
T item = queue.poll();
84+
85+
processing.add(item);
86+
dirty.remove(item);
87+
88+
return item;
89+
}
90+
91+
public synchronized int size() {
92+
return queue.size();
93+
}
94+
95+
/**
96+
* Done marks item as done processing, and if it has been marked as dirty again while it was being
97+
* processed, it will be re-added to the queue for re-processing.
98+
*
99+
* @param item Item that marked as done processing
100+
*/
101+
public synchronized void done(T item) {
102+
processing.remove(item);
103+
if (dirty.contains(item)) {
104+
queue.add(item);
105+
this.notify();
106+
}
107+
}
108+
109+
/**
110+
* shutDown will cause queue to ignore all new items added to it. All threads that waiting to get
111+
* item will be interrupted.
112+
*/
113+
public synchronized void shutDown() {
114+
shuttingDown.set(true);
115+
this.notifyAll();
116+
}
117+
118+
/** @return true is this queue is shutting down */
119+
public synchronized boolean shuttingDown() {
120+
return shuttingDown.get();
121+
}
122+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import java.time.Duration;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
/**
8+
* ItemExponentialFailureRateLimiter does a simple baseDelay*10<sup>num-failures</sup> limit dealing
9+
* with max failures and expiration are up to the caller
10+
*/
11+
public class ItemExponentialFailureRateLimiter<T> implements RateLimiter<T> {
12+
13+
private Duration baseDelay;
14+
private Duration maxDelay;
15+
16+
private Map<T, Integer> failures;
17+
18+
public ItemExponentialFailureRateLimiter(Duration baseDelay, Duration maxDelay) {
19+
this.baseDelay = baseDelay;
20+
this.maxDelay = maxDelay;
21+
22+
failures = new HashMap<>();
23+
}
24+
25+
@Override
26+
public synchronized Duration when(T item) {
27+
int exp = failures.getOrDefault(item, 0);
28+
failures.put(item, exp + 1);
29+
30+
double backOff = baseDelay.toNanos() * Math.pow(2, exp);
31+
if (backOff > Long.MAX_VALUE) {
32+
return maxDelay;
33+
}
34+
35+
if (backOff > maxDelay.toNanos()) {
36+
return maxDelay;
37+
}
38+
39+
return Duration.ofNanos((long) backOff);
40+
}
41+
42+
@Override
43+
public synchronized void forget(T item) {
44+
failures.remove(item);
45+
}
46+
47+
@Override
48+
public synchronized int numRequests(T item) {
49+
return failures.getOrDefault(item, 0);
50+
}
51+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import java.time.Duration;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
/**
8+
* ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry
9+
* after that
10+
*/
11+
public class ItemFastSlowRateLimiter<T> implements RateLimiter<T> {
12+
13+
private Map<T, Integer> failures;
14+
15+
private Duration fastDelay;
16+
private Duration slowDelay;
17+
private int maxFastAttempts;
18+
19+
public ItemFastSlowRateLimiter(Duration fastDelay, Duration slowDelay, int maxFastAttempts) {
20+
this.fastDelay = fastDelay;
21+
this.slowDelay = slowDelay;
22+
this.maxFastAttempts = maxFastAttempts;
23+
24+
failures = new HashMap<>();
25+
}
26+
27+
@Override
28+
public synchronized Duration when(T item) {
29+
int attempts = failures.getOrDefault(item, 0);
30+
failures.put(item, attempts + 1);
31+
32+
if (attempts <= maxFastAttempts) {
33+
return fastDelay;
34+
}
35+
36+
return slowDelay;
37+
}
38+
39+
@Override
40+
public synchronized void forget(T item) {
41+
failures.remove(item);
42+
}
43+
44+
@Override
45+
public synchronized int numRequests(T item) {
46+
return failures.getOrDefault(item, 0);
47+
}
48+
}

0 commit comments

Comments
 (0)