Skip to content

Commit e85e988

Browse files
committed
Change the implementation of BucketRateLimiter and ItemExponentialFailureRateLimiter, add more unit tests
1 parent 9f73d82 commit e85e988

File tree

6 files changed

+152
-41
lines changed

6 files changed

+152
-41
lines changed
Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
package io.kubernetes.client.extended.workqueue.ratelimiter;
22

3-
import io.github.bucket4j.*;
3+
import io.github.bucket4j.Bandwidth;
4+
import io.github.bucket4j.Bucket;
5+
import io.github.bucket4j.Bucket4j;
6+
import io.github.bucket4j.Refill;
7+
import io.github.bucket4j.local.SynchronizationStrategy;
48
import java.time.Duration;
9+
import java.util.concurrent.ScheduledFuture;
10+
import java.util.concurrent.ScheduledThreadPoolExecutor;
11+
import java.util.concurrent.TimeUnit;
512

613
/** A light-weight token bucket implementation for RateLimiter. */
714
public class BucketRateLimiter<T> implements RateLimiter<T> {
8-
915
private Bucket bucket;
10-
private long tokensInQueue;
11-
private long tokensGeneratedInPeriod;
12-
private Duration period;
1316

1417
/**
1518
* @param capacity Capacity is the maximum number of tokens can be consumed.
@@ -19,26 +22,18 @@ public class BucketRateLimiter<T> implements RateLimiter<T> {
1922
public BucketRateLimiter(long capacity, long tokensGeneratedInPeriod, Duration period) {
2023
Bandwidth bandwidth =
2124
Bandwidth.classic(capacity, Refill.greedy(tokensGeneratedInPeriod, period));
22-
23-
this.bucket = Bucket4j.builder().addLimit(bandwidth).build();
24-
this.tokensInQueue = 0;
25-
this.tokensGeneratedInPeriod = tokensGeneratedInPeriod;
26-
this.period = period;
25+
this.bucket =
26+
Bucket4j.builder()
27+
.addLimit(bandwidth)
28+
.withSynchronizationStrategy(SynchronizationStrategy.SYNCHRONIZED)
29+
.build();
2730
}
2831

2932
@Override
3033
public synchronized Duration when(T item) {
31-
tokensInQueue++;
32-
33-
long consumedTokens = bucket.tryConsumeAsMuchAsPossible(tokensInQueue);
34-
if (tokensInQueue - consumedTokens == 0) {
35-
tokensInQueue = 0;
36-
return Duration.ZERO;
37-
}
38-
39-
tokensInQueue = tokensInQueue - consumedTokens;
40-
41-
return durationFromTokens(tokensInQueue, tokensGeneratedInPeriod, period);
34+
DelayGetter delayGetter = new DelayGetter();
35+
bucket.asAsyncScheduler().consume(1, delayGetter).complete(null);
36+
return delayGetter.getDelay();
4237
}
4338

4439
@Override
@@ -49,8 +44,21 @@ public int numRequeues(T item) {
4944
return 0;
5045
}
5146

52-
private Duration durationFromTokens(
53-
long tokensNeedToBeConsumed, long tokensGeneratedInPeriod, Duration period) {
54-
return period.dividedBy(tokensGeneratedInPeriod).multipliedBy(tokensNeedToBeConsumed);
47+
private class DelayGetter extends ScheduledThreadPoolExecutor {
48+
private Duration delay = Duration.ZERO;
49+
50+
@Override
51+
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
52+
this.delay = Duration.ofNanos(unit.toNanos(delay));
53+
return null;
54+
}
55+
56+
private DelayGetter() {
57+
super(0);
58+
}
59+
60+
private Duration getDelay() {
61+
return delay;
62+
}
5563
}
5664
}

extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemExponentialFailureRateLimiter.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,8 @@ public ItemExponentialFailureRateLimiter(Duration baseDelay, Duration maxDelay)
2626
public synchronized Duration when(T item) {
2727
int exp = failures.getOrDefault(item, 0);
2828
failures.put(item, exp + 1);
29-
30-
double backOff = baseDelay.toNanos() * Math.pow(2, exp);
31-
if (backOff > Long.MAX_VALUE) {
32-
return maxDelay;
33-
}
34-
35-
if (backOff > maxDelay.toNanos()) {
36-
return maxDelay;
37-
}
38-
39-
return Duration.ofNanos((long) backOff);
29+
long d = maxDelay.toMillis() >> exp;
30+
return d > baseDelay.toMillis() ? baseDelay.multipliedBy(1 << exp) : maxDelay;
4031
}
4132

4233
@Override
Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,34 @@
11
package io.kubernetes.client.extended.workqueue.ratelimiter;
22

33
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertTrue;
45

56
import java.time.Duration;
7+
import org.junit.Rule;
68
import org.junit.Test;
9+
import org.junit.rules.ExpectedException;
710

811
public class BucketRateLimiterTest {
12+
@Rule public ExpectedException thrown = ExpectedException.none();
913

1014
@Test
1115
public void testBucketRateLimiterBasic() {
1216
RateLimiter<String> rateLimiter = new BucketRateLimiter<>(2, 1, Duration.ofMinutes(10));
1317
assertEquals(Duration.ZERO, rateLimiter.when("one"));
1418
assertEquals(Duration.ZERO, rateLimiter.when("one"));
1519

16-
assertEquals(Duration.ofMinutes(10), rateLimiter.when("one"));
17-
assertEquals(Duration.ofMinutes(20), rateLimiter.when("one"));
18-
assertEquals(Duration.ofMinutes(30), rateLimiter.when("one"));
20+
Duration waitDuration = rateLimiter.when("one");
21+
Duration expectDuration = Duration.ofMinutes(10);
22+
23+
Duration diff = waitDuration.minus(expectDuration);
24+
// waitDuration might be smaller than expect duration because of time is elapsed.
25+
assertTrue(diff.isZero() || (diff.isNegative() && !diff.plusSeconds(1).isNegative()));
26+
27+
waitDuration = rateLimiter.when("one");
28+
expectDuration = Duration.ofMinutes(20);
29+
diff = waitDuration.minus(expectDuration);
30+
31+
assertTrue(diff.isZero() || (diff.isNegative() && !diff.plusSeconds(1).isNegative()));
1932
}
2033

2134
@Test
@@ -25,11 +38,44 @@ public void testBucketRateLimiterTokenAdded() throws InterruptedException {
2538
assertEquals(Duration.ZERO, rateLimiter.when("one"));
2639
assertEquals(Duration.ZERO, rateLimiter.when("one"));
2740

28-
assertEquals(Duration.ofSeconds(2), rateLimiter.when("one"));
41+
Duration waitDuration = rateLimiter.when("one");
42+
assertTrue(waitDuration.getSeconds() > 0);
2943

3044
Thread.sleep(4000);
3145

32-
assertEquals(Duration.ZERO, rateLimiter.when("one"));
33-
assertEquals(Duration.ofSeconds(2), rateLimiter.when("one"));
46+
assertEquals(Duration.ZERO, rateLimiter.when("two"));
47+
48+
waitDuration = rateLimiter.when("two");
49+
assertTrue(waitDuration.getSeconds() > 0);
50+
}
51+
52+
@Test
53+
public void testNegativeCapacity() {
54+
thrown.expect(IllegalArgumentException.class);
55+
thrown.expectMessage("-2 is wrong value for capacity, because capacity should be positive");
56+
RateLimiter<String> rateLimiter = new BucketRateLimiter<>(-2, 1, Duration.ofSeconds(2));
57+
}
58+
59+
@Test
60+
public void testNegativeTokensGeneratedInPeriod() {
61+
thrown.expect(IllegalArgumentException.class);
62+
thrown.expectMessage("-1 is wrong value for period tokens, because tokens should be positive");
63+
RateLimiter<String> rateLimiter = new BucketRateLimiter<>(2, -1, Duration.ofSeconds(2));
64+
}
65+
66+
@Test
67+
public void testNegativePeriod() {
68+
thrown.expect(IllegalArgumentException.class);
69+
thrown.expectMessage(
70+
"-1 is wrong value for period of bandwidth, because period should be positive");
71+
RateLimiter<String> rateLimiter = new BucketRateLimiter<>(2, 1, Duration.ofNanos(-1));
72+
}
73+
74+
@Test
75+
public void testTokensLargerThanNanos() {
76+
thrown.expect(IllegalArgumentException.class);
77+
thrown.expectMessage(
78+
"100 token/nanosecond is not permitted refill rate, because highest supported rate is 1 token/nanosecond");
79+
RateLimiter<String> rateLimiter = new BucketRateLimiter<>(2, 100, Duration.ofNanos(1));
3480
}
3581
}

extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemExponentialFailureRateLimiterTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kubernetes.client.extended.workqueue.ratelimiter;
22

33
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertTrue;
45

56
import java.time.Duration;
67
import org.junit.Test;
@@ -56,4 +57,30 @@ public void testItemExponentialFailureRateLimiterOverFlow() {
5657
}
5758
assertEquals(Duration.ofHours(1000), rateLimiter.when("overflow2"));
5859
}
60+
61+
@Test
62+
public void testNegativeBaseDelay() {
63+
RateLimiter<String> rateLimiter =
64+
new ItemExponentialFailureRateLimiter<>(Duration.ofMillis(-1), Duration.ofSeconds(1000));
65+
66+
for (int i = 0; i < 5; i++) {
67+
rateLimiter.when("one");
68+
}
69+
assertEquals(Duration.ofMillis(-32), rateLimiter.when("one"));
70+
for (int i = 0; i < 1000; i++) {
71+
rateLimiter.when("overflow1");
72+
}
73+
assertTrue(rateLimiter.when("overflow1").isNegative());
74+
}
75+
76+
@Test
77+
public void testNegativeMaxDelay() {
78+
RateLimiter<String> rateLimiter =
79+
new ItemExponentialFailureRateLimiter<>(Duration.ofMillis(1), Duration.ofSeconds(-1000));
80+
81+
assertEquals(Duration.ofSeconds(-1000), rateLimiter.when("one"));
82+
assertEquals(Duration.ofSeconds(-1000), rateLimiter.when("one"));
83+
assertEquals(Duration.ofSeconds(-1000), rateLimiter.when("one"));
84+
assertEquals(Duration.ofSeconds(-1000), rateLimiter.when("one"));
85+
}
5986
}

extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemFastSlowRateLimiterTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,19 @@ public void testFastSlowRateLimiter() {
2929
assertEquals(0, rateLimiter.numRequeues("one"));
3030
assertEquals(Duration.ofMillis(5), rateLimiter.when("one"));
3131
}
32+
33+
@Test
34+
public void testNegativeOrZeroAttempts() {
35+
RateLimiter<String> rateLimiter =
36+
new ItemFastSlowRateLimiter<>(Duration.ofMillis(5), Duration.ofSeconds(10), -1);
37+
38+
assertEquals(Duration.ofSeconds(10), rateLimiter.when("one"));
39+
assertEquals(Duration.ofSeconds(10), rateLimiter.when("one"));
40+
assertEquals(Duration.ofSeconds(10), rateLimiter.when("one"));
41+
42+
rateLimiter = new ItemFastSlowRateLimiter<>(Duration.ofMillis(5), Duration.ofSeconds(10), 0);
43+
assertEquals(Duration.ofSeconds(10), rateLimiter.when("two"));
44+
assertEquals(Duration.ofSeconds(10), rateLimiter.when("two"));
45+
assertEquals(Duration.ofSeconds(10), rateLimiter.when("two"));
46+
}
3247
}

extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/MaxOfRateLimiterTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kubernetes.client.extended.workqueue.ratelimiter;
22

33
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertTrue;
45

56
import java.time.Duration;
67
import org.junit.Test;
@@ -31,4 +32,27 @@ public void testMaxOfRateLimiter() {
3132
assertEquals(0, rateLimiter.numRequeues("one"));
3233
assertEquals(Duration.ofMillis(5), rateLimiter.when("one"));
3334
}
35+
36+
@Test
37+
public void testDefaultRateLimiter() {
38+
RateLimiter<String> rateLimiter = new DefaultControllerRateLimiter<>();
39+
40+
assertEquals(Duration.ofMillis(5), rateLimiter.when("one"));
41+
assertEquals(Duration.ofMillis(10), rateLimiter.when("one"));
42+
assertEquals(Duration.ofMillis(20), rateLimiter.when("one"));
43+
44+
for (int i = 0; i < 20; i++) {
45+
rateLimiter.when("one");
46+
}
47+
48+
assertEquals(Duration.ofSeconds(1000), rateLimiter.when("one"));
49+
assertEquals(Duration.ofSeconds(1000), rateLimiter.when("one"));
50+
51+
for (int i = 0; i < 75; i++) {
52+
rateLimiter.when("one");
53+
}
54+
55+
assertTrue(rateLimiter.when("one").getSeconds() > 0);
56+
assertTrue(rateLimiter.when("two").getSeconds() > 0);
57+
}
3458
}

0 commit comments

Comments
 (0)