Skip to content

Add workqueue support #624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions extended/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
<artifactId>guava</artifactId>
<version>25.1-jre</version>
</dependency>
<dependency>
<groupId>com.github.vladimir-bukhtoyarov</groupId>
<artifactId>bucket4j-core</artifactId>
<version>${bucket4jVersion}</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>junit</groupId>
Expand Down Expand Up @@ -130,5 +135,6 @@
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<slf4jVersion>1.7.7</slf4jVersion>
<bucket4jVersion>4.4.1</bucket4jVersion>
</properties>
</project>
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package io.kubernetes.client.extended.workqueue;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.kubernetes.client.extended.workqueue.ratelimiter.DefaultControllerRateLimiter;
import io.kubernetes.client.extended.workqueue.ratelimiter.RateLimiter;
import java.util.concurrent.ExecutorService;

/** The default rate limiting queue implementation. */
public class DefaultRateLimitingQueue<T> extends DefaultDelayingQueue<T>
implements RateLimitingQueue<T> {

private RateLimiter rateLimiter;
private RateLimiter<T> rateLimiter;

public DefaultRateLimitingQueue(ExecutorService waitingWorker) {
super(waitingWorker);
this.rateLimiter = new ExponentialRateLimiter();
this.rateLimiter = new DefaultControllerRateLimiter<>();
}

public DefaultRateLimitingQueue(ExecutorService waitingWorker, RateLimiter rateLimiter) {
public DefaultRateLimitingQueue(ExecutorService waitingWorker, RateLimiter<T> rateLimiter) {
super(waitingWorker);
this.rateLimiter = rateLimiter;
}
Expand All @@ -27,54 +26,12 @@ public int numRequeues(T item) {
}

@Override
public void forget(Object item) {
public void forget(T item) {
rateLimiter.forget(item);
}

@Override
public void addRateLimited(T item) {
super.addAfter(item, rateLimiter.when(item));
}

public static class ExponentialRateLimiter implements RateLimiter {

Duration baseDelay;
Duration maxDelay;

private Map<Object, Integer> failures = new ConcurrentHashMap<>();

public ExponentialRateLimiter() {
this.baseDelay = Duration.ofMillis(5);
this.maxDelay = Duration.ofSeconds(1000);
}

public ExponentialRateLimiter(Duration baseDelay, Duration maxDelay) {
this.baseDelay = baseDelay;
this.maxDelay = maxDelay;
}

@Override
public void forget(Object item) {
failures.remove(item);
}

@Override
public int numRequeues(Object item) {
return failures.get(item);
}

@Override
public Duration when(Object item) {
Integer exp = failures.getOrDefault(item, 0);
failures.put(item, exp + 1);
double backoff = baseDelay.toNanos() * Math.pow(2, exp);
if (backoff > Long.MAX_VALUE) {
return maxDelay;
}
if (backoff > maxDelay.toNanos()) {
return maxDelay;
}
return Duration.ofNanos(Double.valueOf(backoff).longValue());
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.kubernetes.client.extended.workqueue.ratelimiter;

import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.Bucket4j;
import io.github.bucket4j.Refill;
import io.github.bucket4j.local.SynchronizationStrategy;
import java.time.Duration;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/** A light-weight token bucket implementation for RateLimiter. */
public class BucketRateLimiter<T> implements RateLimiter<T> {
private Bucket bucket;

/**
* @param capacity Capacity is the maximum number of tokens can be consumed.
* @param tokensGeneratedInPeriod Tokens generated in period.
* @param period Period that generating specific number of tokens.
*/
public BucketRateLimiter(long capacity, long tokensGeneratedInPeriod, Duration period) {
Bandwidth bandwidth =
Bandwidth.classic(capacity, Refill.greedy(tokensGeneratedInPeriod, period));
this.bucket =
Bucket4j.builder()
.addLimit(bandwidth)
.withSynchronizationStrategy(SynchronizationStrategy.SYNCHRONIZED)
.build();
}

@Override
public synchronized Duration when(T item) {
DelayGetter delayGetter = new DelayGetter();
bucket.asAsyncScheduler().consume(1, delayGetter).complete(null);
return delayGetter.getDelay();
}

@Override
public void forget(T item) {}

@Override
public int numRequeues(T item) {
return 0;
}

private class DelayGetter extends ScheduledThreadPoolExecutor {
private Duration delay = Duration.ZERO;

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
this.delay = Duration.ofNanos(unit.toNanos(delay));
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this really return null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brendanburns

The return value will not be used by Bucket4j according to the comment of bucket4j/bucket4j#96, so I think it ok to return a null value.

ScheduledFuture that returned also not used by Bucket4j, so do not spend efforts to provide correctly result.

}

private DelayGetter() {
super(0);
}

private Duration getDelay() {
return delay;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.kubernetes.client.extended.workqueue.ratelimiter;

import java.time.Duration;
import java.util.Arrays;

/**
* DefaultControllerRateLimiter is a default rate limiter for workqueue. It has both overall and
* per-item rate limiting. The overall is a token bucket and the per-item is exponential
*/
public class DefaultControllerRateLimiter<T> implements RateLimiter<T> {

private RateLimiter<T> internalRateLimiter;

public DefaultControllerRateLimiter() {
this.internalRateLimiter =
new MaxOfRateLimiter<>(
Arrays.asList(
new ItemExponentialFailureRateLimiter<>(
Duration.ofMillis(5), Duration.ofSeconds(1000)),
new BucketRateLimiter<>(100, 10, Duration.ofMinutes(1))));
}

@Override
public Duration when(T item) {
return internalRateLimiter.when(item);
}

@Override
public void forget(T item) {
internalRateLimiter.forget(item);
}

@Override
public int numRequeues(T item) {
return internalRateLimiter.numRequeues(item);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.kubernetes.client.extended.workqueue.ratelimiter;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

/**
* ItemExponentialFailureRateLimiter does a simple baseDelay*10<sup>num-failures</sup> limit dealing
* with max failures and expiration are up to the caller
*/
public class ItemExponentialFailureRateLimiter<T> implements RateLimiter<T> {

private Duration baseDelay;
private Duration maxDelay;

private Map<T, Integer> failures;

public ItemExponentialFailureRateLimiter(Duration baseDelay, Duration maxDelay) {
this.baseDelay = baseDelay;
this.maxDelay = maxDelay;

failures = new HashMap<>();
}

@Override
public synchronized Duration when(T item) {
int exp = failures.getOrDefault(item, 0);
failures.put(item, exp + 1);
long d = maxDelay.toMillis() >> exp;
return d > baseDelay.toMillis() ? baseDelay.multipliedBy(1 << exp) : maxDelay;
}

@Override
public synchronized void forget(T item) {
failures.remove(item);
}

@Override
public synchronized int numRequeues(T item) {
return failures.getOrDefault(item, 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.kubernetes.client.extended.workqueue.ratelimiter;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

/**
* ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry
* after that
*/
public class ItemFastSlowRateLimiter<T> implements RateLimiter<T> {

private Map<T, Integer> failures;

private Duration fastDelay;
private Duration slowDelay;
private int maxFastAttempts;

public ItemFastSlowRateLimiter(Duration fastDelay, Duration slowDelay, int maxFastAttempts) {
this.fastDelay = fastDelay;
this.slowDelay = slowDelay;
this.maxFastAttempts = maxFastAttempts;

failures = new HashMap<>();
}

@Override
public synchronized Duration when(T item) {
int attempts = failures.getOrDefault(item, 0);
failures.put(item, attempts + 1);

if (attempts + 1 <= maxFastAttempts) {
return fastDelay;
}

return slowDelay;
}

@Override
public synchronized void forget(T item) {
failures.remove(item);
}

@Override
public synchronized int numRequeues(T item) {
return failures.getOrDefault(item, 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.kubernetes.client.extended.workqueue.ratelimiter;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

/**
* MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a
* token bucket limiter, the burst could be apparently exceeded in cases where particular items were
* separately delayed a longer time.
*/
public class MaxOfRateLimiter<T> implements RateLimiter<T> {
private List<RateLimiter<T>> rateLimiters;

public MaxOfRateLimiter(List<RateLimiter<T>> rateLimiters) {
this.rateLimiters = rateLimiters;
}

@SafeVarargs
@SuppressWarnings("varargs")
public MaxOfRateLimiter(RateLimiter<T>... rateLimiters) {
this(Arrays.asList(rateLimiters));
}

@Override
public Duration when(T item) {
Duration max = Duration.ZERO;
for (RateLimiter<T> r : rateLimiters) {
Duration current = r.when(item);
if (current.compareTo(max) > 0) {
max = current;
}
}

return max;
}

@Override
public void forget(T item) {
rateLimiters.forEach(r -> r.forget(item));
}

@Override
public int numRequeues(T item) {
int max = 0;
for (RateLimiter<T> r : rateLimiters) {
int current = r.numRequeues(item);
if (current > max) {
max = current;
}
}

return max;
}
}
Loading