Skip to content

Commit a2d18d1

Browse files
committed
Add token bucket rate limiter
1 parent 0e17f42 commit a2d18d1

File tree

4 files changed

+123
-5
lines changed

4 files changed

+123
-5
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import java.time.Duration;
4+
import java.time.Instant;
5+
6+
/** A light-weight token bucket implementation for RateLimiter. */
7+
public class BucketRateLimiter<T> implements RateLimiter<T> {
8+
public static final double INFINITE_LIMIT = Double.POSITIVE_INFINITY;
9+
10+
private long limit;
11+
12+
private long capacity;
13+
private long tokens;
14+
15+
private Instant last;
16+
17+
/**
18+
* @param limit Limit defines the maximum frequency of some events. Limit is represented as number
19+
* of events per second. A zero Limit allows no events.
20+
* @param capacity Capacity is the maximum number of tokens can be consumed.
21+
*/
22+
public BucketRateLimiter(long limit, long capacity) {
23+
this.limit = limit;
24+
this.capacity = capacity;
25+
26+
last = Instant.EPOCH;
27+
}
28+
29+
@Override
30+
public Duration when(T item) {
31+
// if limit is infinite, it allows all event
32+
if (limit == INFINITE_LIMIT) {
33+
return Duration.ZERO;
34+
}
35+
36+
// if capacity isn't positive, it will return the infinite delay
37+
if (capacity <= 0) {
38+
return Duration.ofSeconds(Long.MAX_VALUE);
39+
}
40+
41+
Instant now = Instant.now();
42+
if (now.isBefore(last)) {
43+
last = now;
44+
}
45+
46+
// Avoid making delta overflow below when last is very old.
47+
Duration maxElapsed = durationFromTokens(capacity - tokens);
48+
Duration elapsed = Duration.between(last, now);
49+
if (elapsed.compareTo(maxElapsed) > 0) {
50+
elapsed = maxElapsed;
51+
}
52+
53+
// Calculate the new number of tokens, due to time that passed.
54+
long delta = tokensFromDurations(elapsed);
55+
tokens += delta;
56+
if (tokens > capacity) {
57+
tokens = capacity;
58+
}
59+
60+
tokens -= 1;
61+
if (tokens < 0) {
62+
return durationFromTokens(-tokens);
63+
}
64+
65+
return Duration.ZERO;
66+
}
67+
68+
@Override
69+
public void forget(T item) {}
70+
71+
@Override
72+
public int numRequests(T item) {
73+
return 0;
74+
}
75+
76+
private Duration durationFromTokens(long tokens) {
77+
long seconds = (tokens / limit);
78+
return Duration.ofSeconds(seconds);
79+
}
80+
81+
private long tokensFromDurations(Duration d) {
82+
return (d.getSeconds() * limit);
83+
}
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import java.time.Duration;
4+
import java.util.Arrays;
5+
6+
/**
7+
* DefaultControllerRateLimiter is a default rate limiter for workqueue. It has both overall and
8+
* per-item rate limiting. The overall is a token bucket and the per-item is exponential
9+
*/
10+
public class DefaultControllerRateLimiter<T> implements RateLimiter<T> {
11+
12+
private RateLimiter<T> internalRateLimiter;
13+
14+
public DefaultControllerRateLimiter() {
15+
this.internalRateLimiter =
16+
new MaxOfRateLimiter<>(
17+
Arrays.asList(
18+
new ItemExponentialFailureRateLimiter<>(
19+
Duration.ofMillis(5), Duration.ofSeconds(1000)),
20+
new BucketRateLimiter<>(10, 100)));
21+
}
22+
23+
@Override
24+
public Duration when(T item) {
25+
return internalRateLimiter.when(item);
26+
}
27+
28+
@Override
29+
public void forget(T item) {
30+
internalRateLimiter.forget(item);
31+
}
32+
33+
@Override
34+
public int numRequests(T item) {
35+
return internalRateLimiter.numRequests(item);
36+
}
37+
}

extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/MaxOfRateLimiter.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,7 @@ public Duration when(T item) {
3030

3131
@Override
3232
public void forget(T item) {
33-
rateLimiters.forEach(
34-
r -> {
35-
r.forget(item);
36-
});
33+
rateLimiters.forEach(r -> r.forget(item));
3734
}
3835

3936
@Override

extended/src/test/java/io/kubernetes/client/extended/workqueue/DelayingWorkQueueTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public void setUp() {
1919
public void testAddAfterWithinDelay() throws InterruptedException {
2020
String testStr = "test";
2121

22-
delayingWorkQueue.addAfter(testStr, Duration.ofSeconds(10));
22+
delayingWorkQueue.addAfter(testStr, Duration.ofSeconds(5));
2323
assertEquals(0, delayingWorkQueue.size());
2424

2525
Thread.sleep(2000);

0 commit comments

Comments
 (0)