diff --git a/extended/pom.xml b/extended/pom.xml index 5b207ab2b2..58db0cbc17 100644 --- a/extended/pom.xml +++ b/extended/pom.xml @@ -37,6 +37,11 @@ guava 25.1-jre + + com.github.vladimir-bukhtoyarov + bucket4j-core + ${bucket4jVersion} + junit @@ -130,5 +135,6 @@ ${java.version} ${java.version} 1.7.7 + 4.4.1 diff --git a/extended/src/main/java/io/kubernetes/client/extended/workqueue/DefaultRateLimitingQueue.java b/extended/src/main/java/io/kubernetes/client/extended/workqueue/DefaultRateLimitingQueue.java index ad563e72f5..6f6202fd2f 100644 --- a/extended/src/main/java/io/kubernetes/client/extended/workqueue/DefaultRateLimitingQueue.java +++ b/extended/src/main/java/io/kubernetes/client/extended/workqueue/DefaultRateLimitingQueue.java @@ -1,22 +1,21 @@ package io.kubernetes.client.extended.workqueue; -import java.time.Duration; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import io.kubernetes.client.extended.workqueue.ratelimiter.DefaultControllerRateLimiter; +import io.kubernetes.client.extended.workqueue.ratelimiter.RateLimiter; import java.util.concurrent.ExecutorService; /** The default rate limiting queue implementation. */ public class DefaultRateLimitingQueue extends DefaultDelayingQueue implements RateLimitingQueue { - private RateLimiter rateLimiter; + private RateLimiter rateLimiter; public DefaultRateLimitingQueue(ExecutorService waitingWorker) { super(waitingWorker); - this.rateLimiter = new ExponentialRateLimiter(); + this.rateLimiter = new DefaultControllerRateLimiter<>(); } - public DefaultRateLimitingQueue(ExecutorService waitingWorker, RateLimiter rateLimiter) { + public DefaultRateLimitingQueue(ExecutorService waitingWorker, RateLimiter rateLimiter) { super(waitingWorker); this.rateLimiter = rateLimiter; } @@ -27,7 +26,7 @@ public int numRequeues(T item) { } @Override - public void forget(Object item) { + public void forget(T item) { rateLimiter.forget(item); } @@ -35,46 +34,4 @@ public void forget(Object item) { public void addRateLimited(T item) { super.addAfter(item, rateLimiter.when(item)); } - - public static class ExponentialRateLimiter implements RateLimiter { - - Duration baseDelay; - Duration maxDelay; - - private Map failures = new ConcurrentHashMap<>(); - - public ExponentialRateLimiter() { - this.baseDelay = Duration.ofMillis(5); - this.maxDelay = Duration.ofSeconds(1000); - } - - public ExponentialRateLimiter(Duration baseDelay, Duration maxDelay) { - this.baseDelay = baseDelay; - this.maxDelay = maxDelay; - } - - @Override - public void forget(Object item) { - failures.remove(item); - } - - @Override - public int numRequeues(Object item) { - return failures.get(item); - } - - @Override - public Duration when(Object item) { - Integer exp = failures.getOrDefault(item, 0); - failures.put(item, exp + 1); - double backoff = baseDelay.toNanos() * Math.pow(2, exp); - if (backoff > Long.MAX_VALUE) { - return maxDelay; - } - if (backoff > maxDelay.toNanos()) { - return maxDelay; - } - return Duration.ofNanos(Double.valueOf(backoff).longValue()); - } - } } diff --git a/extended/src/main/java/io/kubernetes/client/extended/workqueue/RateLimiter.java b/extended/src/main/java/io/kubernetes/client/extended/workqueue/RateLimiter.java deleted file mode 100644 index 856328bf9a..0000000000 --- a/extended/src/main/java/io/kubernetes/client/extended/workqueue/RateLimiter.java +++ /dev/null @@ -1,30 +0,0 @@ -package io.kubernetes.client.extended.workqueue; - -import java.time.Duration; - -public interface RateLimiter { - - /** - * when gets an item and gets to decide how long that item should wait. - * - * @param item specific item - * @return how long the item should wait - */ - Duration when(Object item); - - /** - * forget indicates that an item is finished being retried. Doesn't matter whether its for perm - * failing or for success, we'll stop tracking it - * - * @param item item that is finished being retried - */ - void forget(Object item); - - /** - * numRequeues returns back how many failures the item has had. - * - * @param item specific item - * @return how many failures the item has had - */ - int numRequeues(Object item); -} diff --git a/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/BucketRateLimiter.java b/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/BucketRateLimiter.java new file mode 100644 index 0000000000..967dfe0cdc --- /dev/null +++ b/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/BucketRateLimiter.java @@ -0,0 +1,64 @@ +package io.kubernetes.client.extended.workqueue.ratelimiter; + +import io.github.bucket4j.Bandwidth; +import io.github.bucket4j.Bucket; +import io.github.bucket4j.Bucket4j; +import io.github.bucket4j.Refill; +import io.github.bucket4j.local.SynchronizationStrategy; +import java.time.Duration; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** A light-weight token bucket implementation for RateLimiter. */ +public class BucketRateLimiter implements RateLimiter { + private Bucket bucket; + + /** + * @param capacity Capacity is the maximum number of tokens can be consumed. + * @param tokensGeneratedInPeriod Tokens generated in period. + * @param period Period that generating specific number of tokens. + */ + public BucketRateLimiter(long capacity, long tokensGeneratedInPeriod, Duration period) { + Bandwidth bandwidth = + Bandwidth.classic(capacity, Refill.greedy(tokensGeneratedInPeriod, period)); + this.bucket = + Bucket4j.builder() + .addLimit(bandwidth) + .withSynchronizationStrategy(SynchronizationStrategy.SYNCHRONIZED) + .build(); + } + + @Override + public Duration when(T item) { + DelayGetter delayGetter = new DelayGetter(); + bucket.asAsyncScheduler().consume(1, delayGetter).complete(null); + return delayGetter.getDelay(); + } + + @Override + public void forget(T item) {} + + @Override + public int numRequeues(T item) { + return 0; + } + + private class DelayGetter extends ScheduledThreadPoolExecutor { + private Duration delay = Duration.ZERO; + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + this.delay = Duration.ofNanos(unit.toNanos(delay)); + return null; + } + + private DelayGetter() { + super(0); + } + + private Duration getDelay() { + return delay; + } + } +} diff --git a/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/DefaultControllerRateLimiter.java b/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/DefaultControllerRateLimiter.java new file mode 100644 index 0000000000..3090992447 --- /dev/null +++ b/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/DefaultControllerRateLimiter.java @@ -0,0 +1,37 @@ +package io.kubernetes.client.extended.workqueue.ratelimiter; + +import java.time.Duration; +import java.util.Arrays; + +/** + * DefaultControllerRateLimiter is a default rate limiter for workqueue. It has both overall and + * per-item rate limiting. The overall is a token bucket and the per-item is exponential + */ +public class DefaultControllerRateLimiter implements RateLimiter { + + private RateLimiter internalRateLimiter; + + public DefaultControllerRateLimiter() { + this.internalRateLimiter = + new MaxOfRateLimiter<>( + Arrays.asList( + new ItemExponentialFailureRateLimiter<>( + Duration.ofMillis(5), Duration.ofSeconds(1000)), + new BucketRateLimiter<>(100, 10, Duration.ofMinutes(1)))); + } + + @Override + public Duration when(T item) { + return internalRateLimiter.when(item); + } + + @Override + public void forget(T item) { + internalRateLimiter.forget(item); + } + + @Override + public int numRequeues(T item) { + return internalRateLimiter.numRequeues(item); + } +} diff --git a/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemExponentialFailureRateLimiter.java b/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemExponentialFailureRateLimiter.java new file mode 100644 index 0000000000..9a6101b6f5 --- /dev/null +++ b/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemExponentialFailureRateLimiter.java @@ -0,0 +1,40 @@ +package io.kubernetes.client.extended.workqueue.ratelimiter; + +import com.google.common.util.concurrent.AtomicLongMap; +import java.time.Duration; + +/** + * ItemExponentialFailureRateLimiter does a simple baseDelay*10num-failures limit dealing + * with max failures and expiration are up to the caller + */ +public class ItemExponentialFailureRateLimiter implements RateLimiter { + + private Duration baseDelay; + private Duration maxDelay; + + private AtomicLongMap failures; + + public ItemExponentialFailureRateLimiter(Duration baseDelay, Duration maxDelay) { + this.baseDelay = baseDelay; + this.maxDelay = maxDelay; + + failures = AtomicLongMap.create(); + } + + @Override + public Duration when(T item) { + long exp = failures.getAndIncrement(item); + long d = maxDelay.toMillis() >> exp; + return d > baseDelay.toMillis() ? baseDelay.multipliedBy(1 << exp) : maxDelay; + } + + @Override + public void forget(T item) { + failures.remove(item); + } + + @Override + public int numRequeues(T item) { + return (int) failures.get(item); + } +} diff --git a/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemFastSlowRateLimiter.java b/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemFastSlowRateLimiter.java new file mode 100644 index 0000000000..87d739c74a --- /dev/null +++ b/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemFastSlowRateLimiter.java @@ -0,0 +1,44 @@ +package io.kubernetes.client.extended.workqueue.ratelimiter; + +import com.google.common.util.concurrent.AtomicLongMap; +import java.time.Duration; + +/** + * ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry + * after that + */ +public class ItemFastSlowRateLimiter implements RateLimiter { + + private Duration fastDelay; + private Duration slowDelay; + private int maxFastAttempts; + + private AtomicLongMap failures; + + public ItemFastSlowRateLimiter(Duration fastDelay, Duration slowDelay, int maxFastAttempts) { + this.fastDelay = fastDelay; + this.slowDelay = slowDelay; + this.maxFastAttempts = maxFastAttempts; + + failures = AtomicLongMap.create(); + } + + @Override + public Duration when(T item) { + long attempts = failures.incrementAndGet(item); + if (attempts <= maxFastAttempts) { + return fastDelay; + } + return slowDelay; + } + + @Override + public void forget(T item) { + failures.remove(item); + } + + @Override + public int numRequeues(T item) { + return (int) failures.get(item); + } +} diff --git a/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/MaxOfRateLimiter.java b/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/MaxOfRateLimiter.java new file mode 100644 index 0000000000..22361496f7 --- /dev/null +++ b/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/MaxOfRateLimiter.java @@ -0,0 +1,55 @@ +package io.kubernetes.client.extended.workqueue.ratelimiter; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +/** + * MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a + * token bucket limiter, the burst could be apparently exceeded in cases where particular items were + * separately delayed a longer time. + */ +public class MaxOfRateLimiter implements RateLimiter { + private List> rateLimiters; + + public MaxOfRateLimiter(List> rateLimiters) { + this.rateLimiters = rateLimiters; + } + + @SafeVarargs + @SuppressWarnings("varargs") + public MaxOfRateLimiter(RateLimiter... rateLimiters) { + this(Arrays.asList(rateLimiters)); + } + + @Override + public Duration when(T item) { + Duration max = Duration.ZERO; + for (RateLimiter r : rateLimiters) { + Duration current = r.when(item); + if (current.compareTo(max) > 0) { + max = current; + } + } + + return max; + } + + @Override + public void forget(T item) { + rateLimiters.forEach(r -> r.forget(item)); + } + + @Override + public int numRequeues(T item) { + int max = 0; + for (RateLimiter r : rateLimiters) { + int current = r.numRequeues(item); + if (current > max) { + max = current; + } + } + + return max; + } +} diff --git a/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/RateLimiter.java b/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/RateLimiter.java new file mode 100644 index 0000000000..aa4ce333fa --- /dev/null +++ b/extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/RateLimiter.java @@ -0,0 +1,23 @@ +package io.kubernetes.client.extended.workqueue.ratelimiter; + +import java.time.Duration; + +public interface RateLimiter { + /** + * When gets an item and gets to decide how long that item should wait + * + * @param item Item that should wait + */ + Duration when(T item); + + /** + * Forget indicates that an item is finished being retried. Doesn't matter whether its for perm + * failing or for success, we'll stop tracking it + * + * @param item Item will be forget + */ + void forget(T item); + + /** @return number of how many failures the item has had */ + int numRequeues(T item); +} diff --git a/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultRateLimitQueueTest.java b/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultRateLimitQueueTest.java index 05fc1812b2..c474ed287f 100644 --- a/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultRateLimitQueueTest.java +++ b/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultRateLimitQueueTest.java @@ -1,8 +1,8 @@ package io.kubernetes.client.extended.workqueue; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import io.kubernetes.client.extended.workqueue.ratelimiter.RateLimiter; import java.time.Duration; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -10,7 +10,7 @@ public class DefaultRateLimitQueueTest { - private static class MockRateLimiter implements RateLimiter { + private static class MockRateLimiter implements RateLimiter { private int count; @@ -34,30 +34,13 @@ public int numRequeues(Object item) { @Test public void testSimpleRateLimitQueue() throws Exception { - MockRateLimiter mockRateLimiter = new MockRateLimiter(); + MockRateLimiter mockRateLimiter = new MockRateLimiter<>(); DefaultRateLimitingQueue rlq = - new DefaultRateLimitingQueue(Executors.newSingleThreadExecutor(), mockRateLimiter); + new DefaultRateLimitingQueue<>(Executors.newSingleThreadExecutor(), mockRateLimiter); long t1 = System.nanoTime(); rlq.addRateLimited("foo"); rlq.get(); long t2 = System.nanoTime(); assertTrue(t2 - t1 > TimeUnit.MILLISECONDS.toNanos(500)); } - - @Test - public void testExponentialRateLimit() throws Exception { - DefaultRateLimitingQueue.ExponentialRateLimiter rateLimiter = - new DefaultRateLimitingQueue.ExponentialRateLimiter(); - String foo = "foo"; - for (int i = 0; i < 9999; i++) { - long expected = Double.valueOf(rateLimiter.baseDelay.toNanos() * Math.pow(2, i)).longValue(); - if (expected > rateLimiter.maxDelay.toNanos()) { - expected = rateLimiter.maxDelay.toNanos(); - } - long actual = rateLimiter.when(foo).toNanos(); - if (expected != actual) { - assertEquals(expected, actual); - } - } - } } diff --git a/extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/BucketRateLimiterTest.java b/extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/BucketRateLimiterTest.java new file mode 100644 index 0000000000..51979ae95c --- /dev/null +++ b/extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/BucketRateLimiterTest.java @@ -0,0 +1,81 @@ +package io.kubernetes.client.extended.workqueue.ratelimiter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class BucketRateLimiterTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testBucketRateLimiterBasic() { + RateLimiter rateLimiter = new BucketRateLimiter<>(2, 1, Duration.ofMinutes(10)); + assertEquals(Duration.ZERO, rateLimiter.when("one")); + assertEquals(Duration.ZERO, rateLimiter.when("one")); + + Duration waitDuration = rateLimiter.when("one"); + Duration expectDuration = Duration.ofMinutes(10); + + Duration diff = waitDuration.minus(expectDuration); + // waitDuration might be smaller than expect duration because of time is elapsed. + assertTrue(diff.isZero() || (diff.isNegative() && !diff.plusSeconds(1).isNegative())); + + waitDuration = rateLimiter.when("one"); + expectDuration = Duration.ofMinutes(20); + diff = waitDuration.minus(expectDuration); + + assertTrue(diff.isZero() || (diff.isNegative() && !diff.plusSeconds(1).isNegative())); + } + + @Test + public void testBucketRateLimiterTokenAdded() throws InterruptedException { + RateLimiter rateLimiter = new BucketRateLimiter<>(2, 1, Duration.ofSeconds(2)); + + assertEquals(Duration.ZERO, rateLimiter.when("one")); + assertEquals(Duration.ZERO, rateLimiter.when("one")); + + Duration waitDuration = rateLimiter.when("one"); + assertTrue(waitDuration.getSeconds() > 0); + + Thread.sleep(4000); + + assertEquals(Duration.ZERO, rateLimiter.when("two")); + + waitDuration = rateLimiter.when("two"); + assertTrue(waitDuration.getSeconds() > 0); + } + + @Test + public void testNegativeCapacity() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("-2 is wrong value for capacity, because capacity should be positive"); + RateLimiter rateLimiter = new BucketRateLimiter<>(-2, 1, Duration.ofSeconds(2)); + } + + @Test + public void testNegativeTokensGeneratedInPeriod() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("-1 is wrong value for period tokens, because tokens should be positive"); + RateLimiter rateLimiter = new BucketRateLimiter<>(2, -1, Duration.ofSeconds(2)); + } + + @Test + public void testNegativePeriod() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "-1 is wrong value for period of bandwidth, because period should be positive"); + RateLimiter rateLimiter = new BucketRateLimiter<>(2, 1, Duration.ofNanos(-1)); + } + + @Test + public void testTokensLargerThanNanos() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "100 token/nanosecond is not permitted refill rate, because highest supported rate is 1 token/nanosecond"); + RateLimiter rateLimiter = new BucketRateLimiter<>(2, 100, Duration.ofNanos(1)); + } +} diff --git a/extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemExponentialFailureRateLimiterTest.java b/extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemExponentialFailureRateLimiterTest.java new file mode 100644 index 0000000000..27fcf506d6 --- /dev/null +++ b/extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemExponentialFailureRateLimiterTest.java @@ -0,0 +1,86 @@ +package io.kubernetes.client.extended.workqueue.ratelimiter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.time.Duration; +import org.junit.Test; + +public class ItemExponentialFailureRateLimiterTest { + + @Test + public void testItemExponentialFailureRateLimiter() { + RateLimiter rateLimiter = + new ItemExponentialFailureRateLimiter<>(Duration.ofMillis(1), Duration.ofSeconds(1)); + + assertEquals(Duration.ofMillis(1), rateLimiter.when("one")); + assertEquals(Duration.ofMillis(2), rateLimiter.when("one")); + assertEquals(Duration.ofMillis(4), rateLimiter.when("one")); + assertEquals(Duration.ofMillis(8), rateLimiter.when("one")); + assertEquals(Duration.ofMillis(16), rateLimiter.when("one")); + + assertEquals(5, rateLimiter.numRequeues("one")); + + assertEquals(Duration.ofMillis(1), rateLimiter.when("two")); + assertEquals(Duration.ofMillis(2), rateLimiter.when("two")); + assertEquals(2, rateLimiter.numRequeues("two")); + + rateLimiter.forget("one"); + assertEquals(0, rateLimiter.numRequeues("one")); + assertEquals(Duration.ofMillis(1), rateLimiter.when("one")); + } + + @Test + public void testItemExponentialFailureRateLimiterOverFlow() { + RateLimiter rateLimiter = + new ItemExponentialFailureRateLimiter<>(Duration.ofMillis(1), Duration.ofSeconds(1000)); + + for (int i = 0; i < 5; i++) { + rateLimiter.when("one"); + } + assertEquals(Duration.ofMillis(32), rateLimiter.when("one")); + + for (int i = 0; i < 1000; i++) { + rateLimiter.when("overflow1"); + } + assertEquals(Duration.ofSeconds(1000), rateLimiter.when("overflow1")); + + rateLimiter = + new ItemExponentialFailureRateLimiter<>(Duration.ofMinutes(1), Duration.ofHours(1000)); + for (int i = 0; i < 2; i++) { + rateLimiter.when("two"); + } + assertEquals(Duration.ofMinutes(4), rateLimiter.when("two")); + + for (int i = 0; i < 1000; i++) { + rateLimiter.when("overflow2"); + } + assertEquals(Duration.ofHours(1000), rateLimiter.when("overflow2")); + } + + @Test + public void testNegativeBaseDelay() { + RateLimiter rateLimiter = + new ItemExponentialFailureRateLimiter<>(Duration.ofMillis(-1), Duration.ofSeconds(1000)); + + for (int i = 0; i < 5; i++) { + rateLimiter.when("one"); + } + assertEquals(Duration.ofMillis(-32), rateLimiter.when("one")); + for (int i = 0; i < 1000; i++) { + rateLimiter.when("overflow1"); + } + assertTrue(rateLimiter.when("overflow1").isNegative()); + } + + @Test + public void testNegativeMaxDelay() { + RateLimiter rateLimiter = + new ItemExponentialFailureRateLimiter<>(Duration.ofMillis(1), Duration.ofSeconds(-1000)); + + assertEquals(Duration.ofSeconds(-1000), rateLimiter.when("one")); + assertEquals(Duration.ofSeconds(-1000), rateLimiter.when("one")); + assertEquals(Duration.ofSeconds(-1000), rateLimiter.when("one")); + assertEquals(Duration.ofSeconds(-1000), rateLimiter.when("one")); + } +} diff --git a/extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemFastSlowRateLimiterTest.java b/extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemFastSlowRateLimiterTest.java new file mode 100644 index 0000000000..79e8610b7d --- /dev/null +++ b/extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/ItemFastSlowRateLimiterTest.java @@ -0,0 +1,47 @@ +package io.kubernetes.client.extended.workqueue.ratelimiter; + +import static org.junit.Assert.assertEquals; + +import java.time.Duration; +import org.junit.Test; + +public class ItemFastSlowRateLimiterTest { + + @Test + public void testFastSlowRateLimiter() { + RateLimiter rateLimiter = + new ItemFastSlowRateLimiter<>(Duration.ofMillis(5), Duration.ofSeconds(10), 3); + + assertEquals(Duration.ofMillis(5), rateLimiter.when("one")); + assertEquals(Duration.ofMillis(5), rateLimiter.when("one")); + assertEquals(Duration.ofMillis(5), rateLimiter.when("one")); + + assertEquals(Duration.ofSeconds(10), rateLimiter.when("one")); + assertEquals(Duration.ofSeconds(10), rateLimiter.when("one")); + + assertEquals(5, rateLimiter.numRequeues("one")); + + assertEquals(Duration.ofMillis(5), rateLimiter.when("two")); + assertEquals(Duration.ofMillis(5), rateLimiter.when("two")); + assertEquals(2, rateLimiter.numRequeues("two")); + + rateLimiter.forget("one"); + assertEquals(0, rateLimiter.numRequeues("one")); + assertEquals(Duration.ofMillis(5), rateLimiter.when("one")); + } + + @Test + public void testNegativeOrZeroAttempts() { + RateLimiter rateLimiter = + new ItemFastSlowRateLimiter<>(Duration.ofMillis(5), Duration.ofSeconds(10), -1); + + assertEquals(Duration.ofSeconds(10), rateLimiter.when("one")); + assertEquals(Duration.ofSeconds(10), rateLimiter.when("one")); + assertEquals(Duration.ofSeconds(10), rateLimiter.when("one")); + + rateLimiter = new ItemFastSlowRateLimiter<>(Duration.ofMillis(5), Duration.ofSeconds(10), 0); + assertEquals(Duration.ofSeconds(10), rateLimiter.when("two")); + assertEquals(Duration.ofSeconds(10), rateLimiter.when("two")); + assertEquals(Duration.ofSeconds(10), rateLimiter.when("two")); + } +} diff --git a/extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/MaxOfRateLimiterTest.java b/extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/MaxOfRateLimiterTest.java new file mode 100644 index 0000000000..5bfcf65562 --- /dev/null +++ b/extended/src/test/java/io/kubernetes/client/extended/workqueue/ratelimiter/MaxOfRateLimiterTest.java @@ -0,0 +1,58 @@ +package io.kubernetes.client.extended.workqueue.ratelimiter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.time.Duration; +import org.junit.Test; + +public class MaxOfRateLimiterTest { + + @Test + public void testMaxOfRateLimiter() { + RateLimiter rateLimiter = + new MaxOfRateLimiter<>( + new ItemFastSlowRateLimiter<>(Duration.ofMillis(5), Duration.ofSeconds(3), 3), + new ItemExponentialFailureRateLimiter<>(Duration.ofMillis(1), Duration.ofSeconds(1))); + + assertEquals(Duration.ofMillis(5), rateLimiter.when("one")); + assertEquals(Duration.ofMillis(5), rateLimiter.when("one")); + assertEquals(Duration.ofMillis(5), rateLimiter.when("one")); + + assertEquals(Duration.ofSeconds(3), rateLimiter.when("one")); + assertEquals(Duration.ofSeconds(3), rateLimiter.when("one")); + + assertEquals(5, rateLimiter.numRequeues("one")); + + assertEquals(Duration.ofMillis(5), rateLimiter.when("two")); + assertEquals(Duration.ofMillis(5), rateLimiter.when("two")); + assertEquals(2, rateLimiter.numRequeues("two")); + + rateLimiter.forget("one"); + assertEquals(0, rateLimiter.numRequeues("one")); + assertEquals(Duration.ofMillis(5), rateLimiter.when("one")); + } + + @Test + public void testDefaultRateLimiter() { + RateLimiter rateLimiter = new DefaultControllerRateLimiter<>(); + + assertEquals(Duration.ofMillis(5), rateLimiter.when("one")); + assertEquals(Duration.ofMillis(10), rateLimiter.when("one")); + assertEquals(Duration.ofMillis(20), rateLimiter.when("one")); + + for (int i = 0; i < 20; i++) { + rateLimiter.when("one"); + } + + assertEquals(Duration.ofSeconds(1000), rateLimiter.when("one")); + assertEquals(Duration.ofSeconds(1000), rateLimiter.when("one")); + + for (int i = 0; i < 75; i++) { + rateLimiter.when("one"); + } + + assertTrue(rateLimiter.when("one").getSeconds() > 0); + assertTrue(rateLimiter.when("two").getSeconds() > 0); + } +}