Skip to content

Commit f5f5a9c

Browse files
committed
Use AtomicLongMap to calculate attempts
1 parent e85e988 commit f5f5a9c

File tree

3 files changed

+21
-25
lines changed

3 files changed

+21
-25
lines changed

extended/src/main/java/io/kubernetes/client/extended/workqueue/ratelimiter/BucketRateLimiter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public BucketRateLimiter(long capacity, long tokensGeneratedInPeriod, Duration p
3030
}
3131

3232
@Override
33-
public synchronized Duration when(T item) {
33+
public Duration when(T item) {
3434
DelayGetter delayGetter = new DelayGetter();
3535
bucket.asAsyncScheduler().consume(1, delayGetter).complete(null);
3636
return delayGetter.getDelay();
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package io.kubernetes.client.extended.workqueue.ratelimiter;
22

3+
import com.google.common.util.concurrent.AtomicLongMap;
34
import java.time.Duration;
4-
import java.util.HashMap;
5-
import java.util.Map;
65

76
/**
87
* ItemExponentialFailureRateLimiter does a simple baseDelay*10<sup>num-failures</sup> limit dealing
@@ -13,30 +12,29 @@ public class ItemExponentialFailureRateLimiter<T> implements RateLimiter<T> {
1312
private Duration baseDelay;
1413
private Duration maxDelay;
1514

16-
private Map<T, Integer> failures;
15+
private AtomicLongMap<T> failures;
1716

1817
public ItemExponentialFailureRateLimiter(Duration baseDelay, Duration maxDelay) {
1918
this.baseDelay = baseDelay;
2019
this.maxDelay = maxDelay;
2120

22-
failures = new HashMap<>();
21+
failures = AtomicLongMap.create();
2322
}
2423

2524
@Override
26-
public synchronized Duration when(T item) {
27-
int exp = failures.getOrDefault(item, 0);
28-
failures.put(item, exp + 1);
25+
public Duration when(T item) {
26+
long exp = failures.getAndIncrement(item);
2927
long d = maxDelay.toMillis() >> exp;
3028
return d > baseDelay.toMillis() ? baseDelay.multipliedBy(1 << exp) : maxDelay;
3129
}
3230

3331
@Override
34-
public synchronized void forget(T item) {
32+
public void forget(T item) {
3533
failures.remove(item);
3634
}
3735

3836
@Override
39-
public synchronized int numRequeues(T item) {
40-
return failures.getOrDefault(item, 0);
37+
public int numRequeues(T item) {
38+
return (int) failures.get(item);
4139
}
4240
}
Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,46 @@
11
package io.kubernetes.client.extended.workqueue.ratelimiter;
22

3+
import com.google.common.util.concurrent.AtomicLongMap;
34
import java.time.Duration;
4-
import java.util.HashMap;
5-
import java.util.Map;
5+
import java.util.concurrent.ConcurrentHashMap;
66

77
/**
88
* ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry
99
* after that
1010
*/
1111
public class ItemFastSlowRateLimiter<T> implements RateLimiter<T> {
1212

13-
private Map<T, Integer> failures;
14-
1513
private Duration fastDelay;
1614
private Duration slowDelay;
1715
private int maxFastAttempts;
1816

17+
private AtomicLongMap<T> failures;
18+
1919
public ItemFastSlowRateLimiter(Duration fastDelay, Duration slowDelay, int maxFastAttempts) {
2020
this.fastDelay = fastDelay;
2121
this.slowDelay = slowDelay;
2222
this.maxFastAttempts = maxFastAttempts;
2323

24-
failures = new HashMap<>();
24+
failures = AtomicLongMap.create();
2525
}
2626

2727
@Override
28-
public synchronized Duration when(T item) {
29-
int attempts = failures.getOrDefault(item, 0);
30-
failures.put(item, attempts + 1);
31-
32-
if (attempts + 1 <= maxFastAttempts) {
28+
public Duration when(T item) {
29+
new ConcurrentHashMap<>().compute()
30+
long attempts = failures.incrementAndGet(item);
31+
if (attempts <= maxFastAttempts) {
3332
return fastDelay;
3433
}
35-
3634
return slowDelay;
3735
}
3836

3937
@Override
40-
public synchronized void forget(T item) {
38+
public void forget(T item) {
4139
failures.remove(item);
4240
}
4341

4442
@Override
45-
public synchronized int numRequeues(T item) {
46-
return failures.getOrDefault(item, 0);
43+
public int numRequeues(T item) {
44+
return (int) failures.get(item);
4745
}
4846
}

0 commit comments

Comments
 (0)