Skip to content

Commit 8ef104d

Browse files
authored
feat(middleware-retry): add client side rate limiter for adaptive mode (#2439)
* chore(middleware-retry): add RateLimiter interface * chore: add scaffolding for default RateLimiter * feat: add getSendToken implementation * chore: code to achieve RateLimiter using functions Discontinuing as it's getting complex. Will use class instead. * feat: add DefaultRateLimiter * test: rateLimiter.updateClientSendingRate * test: rateLimiter.cubicSuccess * test: rateLimiter.cubicThrottle * test: rateLimiter.getSendToken
1 parent f4c8f09 commit 8ef104d

File tree

3 files changed

+288
-0
lines changed

3 files changed

+288
-0
lines changed
+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import { isThrottlingError } from "@aws-sdk/service-error-classification";
2+
3+
import { DefaultRateLimiter } from "./DefaultRateLimiter";
4+
5+
jest.mock("@aws-sdk/service-error-classification");
6+
7+
describe(DefaultRateLimiter.name, () => {
8+
beforeEach(() => {
9+
(isThrottlingError as jest.Mock).mockReturnValue(false);
10+
});
11+
12+
afterEach(() => {
13+
jest.clearAllMocks();
14+
});
15+
16+
describe("getSendToken", () => {
17+
beforeEach(() => {
18+
jest.useFakeTimers();
19+
});
20+
21+
afterEach(() => {
22+
jest.useRealTimers();
23+
});
24+
25+
it.each([
26+
[0.5, 892.8571428571428],
27+
[1, 1785.7142857142856],
28+
[2, 2000],
29+
])("timestamp: %d, delay: %d", async (timestamp, delay) => {
30+
jest.spyOn(Date, "now").mockImplementation(() => 0);
31+
const rateLimiter = new DefaultRateLimiter();
32+
33+
(isThrottlingError as jest.Mock).mockReturnValueOnce(true);
34+
jest.spyOn(Date, "now").mockImplementation(() => timestamp * 1000);
35+
rateLimiter.updateClientSendingRate({});
36+
37+
rateLimiter.getSendToken();
38+
jest.runAllTimers();
39+
expect(setTimeout).toHaveBeenLastCalledWith(expect.any(Function), delay);
40+
});
41+
});
42+
43+
describe("cubicSuccess", () => {
44+
it.each([
45+
[5, 7],
46+
[6, 9.64893601],
47+
[7, 10.00003085],
48+
[8, 10.45328452],
49+
[9, 13.40869703],
50+
[10, 21.26626836],
51+
[11, 36.42599853],
52+
])("timestamp: %d, calculatedRate: %d", (timestamp, calculatedRate) => {
53+
jest.spyOn(Date, "now").mockImplementation(() => 0);
54+
const rateLimiter = new DefaultRateLimiter();
55+
rateLimiter["lastMaxRate"] = 10;
56+
rateLimiter["lastThrottleTime"] = 5;
57+
58+
jest.spyOn(Date, "now").mockImplementation(() => timestamp * 1000);
59+
60+
const cubicSuccessSpy = jest.spyOn(DefaultRateLimiter.prototype as any, "cubicSuccess");
61+
rateLimiter.updateClientSendingRate({});
62+
expect(cubicSuccessSpy).toHaveLastReturnedWith(calculatedRate);
63+
});
64+
});
65+
66+
describe("cubicThrottle", () => {
67+
it.each([
68+
[5, 0.112],
69+
[6, 0.09333333],
70+
[7, 0.08],
71+
[8, 0.07],
72+
[9, 0.06222222],
73+
])("timestamp: %d, calculatedRate: %d", (timestamp, calculatedRate) => {
74+
jest.spyOn(Date, "now").mockImplementation(() => 0);
75+
const rateLimiter = new DefaultRateLimiter();
76+
rateLimiter["lastMaxRate"] = 10;
77+
rateLimiter["lastThrottleTime"] = 5;
78+
79+
(isThrottlingError as jest.Mock).mockReturnValueOnce(true);
80+
jest.spyOn(Date, "now").mockImplementation(() => timestamp * 1000);
81+
const cubicThrottleSpy = jest.spyOn(DefaultRateLimiter.prototype as any, "cubicThrottle");
82+
rateLimiter.updateClientSendingRate({});
83+
expect(cubicThrottleSpy).toHaveLastReturnedWith(calculatedRate);
84+
});
85+
});
86+
87+
it("updateClientSendingRate", () => {
88+
jest.spyOn(Date, "now").mockImplementation(() => 0);
89+
const rateLimiter = new DefaultRateLimiter();
90+
91+
const testCases: [boolean, number, number, number][] = [
92+
[false, 0.2, 0, 0.5],
93+
[false, 0.4, 0, 0.5],
94+
[false, 0.6, 4.8, 0.5],
95+
[false, 0.8, 4.8, 0.5],
96+
[false, 1, 4.16, 0.5],
97+
[false, 1.2, 4.16, 0.6912],
98+
[false, 1.4, 4.16, 1.0976],
99+
[false, 1.6, 5.632, 1.6384],
100+
[false, 1.8, 5.632, 2.3328],
101+
[true, 2, 4.3264, 3.02848],
102+
[false, 2.2, 4.3264, 3.486639],
103+
[false, 2.4, 4.3264, 3.821874],
104+
[false, 2.6, 5.66528, 4.053386],
105+
[false, 2.8, 5.66528, 4.200373],
106+
[false, 3.0, 4.333056, 4.282037],
107+
[true, 3.2, 4.333056, 2.997426],
108+
[false, 3.4, 4.333056, 3.452226],
109+
];
110+
111+
testCases.forEach(([isThrottlingErrorReturn, timestamp, measuredTxRate, fillRate]) => {
112+
(isThrottlingError as jest.Mock).mockReturnValue(isThrottlingErrorReturn);
113+
jest.spyOn(Date, "now").mockImplementation(() => timestamp * 1000);
114+
115+
rateLimiter.updateClientSendingRate({});
116+
expect(rateLimiter["measuredTxRate"]).toEqual(measuredTxRate);
117+
expect(parseFloat(rateLimiter["fillRate"].toFixed(6))).toEqual(fillRate);
118+
});
119+
});
120+
});

Diff for: packages/middleware-retry/src/DefaultRateLimiter.ts

+150
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import { isThrottlingError } from "@aws-sdk/service-error-classification";
2+
3+
import { RateLimiter } from "./types";
4+
5+
export interface DefaultRateLimiterOptions {
6+
beta?: number;
7+
minCapacity?: number;
8+
minFillRate?: number;
9+
scaleConstant?: number;
10+
smooth?: number;
11+
}
12+
13+
export class DefaultRateLimiter implements RateLimiter {
14+
// User configurable constants
15+
private beta: number;
16+
private minCapacity: number;
17+
private minFillRate: number;
18+
private scaleConstant: number;
19+
private smooth: number;
20+
21+
// Pre-set state variables
22+
private currentCapacity = 0;
23+
private enabled = false;
24+
private lastMaxRate = 0;
25+
private measuredTxRate = 0;
26+
private requestCount = 0;
27+
28+
// Other state variables
29+
private fillRate: number;
30+
private lastThrottleTime: number;
31+
private lastTimestamp = 0;
32+
private lastTxRateBucket: number;
33+
private maxCapacity: number;
34+
private timeWindow = 0;
35+
36+
constructor(options?: DefaultRateLimiterOptions) {
37+
this.beta = options?.beta ?? 0.7;
38+
this.minCapacity = options?.minCapacity ?? 1;
39+
this.minFillRate = options?.minFillRate ?? 0.5;
40+
this.scaleConstant = options?.scaleConstant ?? 0.4;
41+
this.smooth = options?.smooth ?? 0.8;
42+
43+
const currentTimeInSeconds = this.getCurrentTimeInSeconds();
44+
this.lastThrottleTime = currentTimeInSeconds;
45+
this.lastTxRateBucket = Math.floor(this.getCurrentTimeInSeconds());
46+
47+
this.fillRate = this.minFillRate;
48+
this.maxCapacity = this.minCapacity;
49+
}
50+
51+
private getCurrentTimeInSeconds() {
52+
return Date.now() / 1000;
53+
}
54+
55+
public async getSendToken() {
56+
return this.acquireTokenBucket(1);
57+
}
58+
59+
private async acquireTokenBucket(amount: number) {
60+
// Client side throttling is not enabled until we see a throttling error.
61+
if (!this.enabled) {
62+
return;
63+
}
64+
65+
this.refillTokenBucket();
66+
if (amount > this.currentCapacity) {
67+
const delay = ((amount - this.currentCapacity) / this.fillRate) * 1000;
68+
await new Promise((resolve) => setTimeout(resolve, delay));
69+
}
70+
this.currentCapacity = this.currentCapacity - amount;
71+
}
72+
73+
private refillTokenBucket() {
74+
const timestamp = this.getCurrentTimeInSeconds();
75+
if (!this.lastTimestamp) {
76+
this.lastTimestamp = timestamp;
77+
return;
78+
}
79+
80+
const fillAmount = (timestamp - this.lastTimestamp) * this.fillRate;
81+
this.currentCapacity = Math.min(this.maxCapacity, this.currentCapacity + fillAmount);
82+
this.lastTimestamp = timestamp;
83+
}
84+
85+
public updateClientSendingRate(response: any) {
86+
let calculatedRate: number;
87+
this.updateMeasuredRate();
88+
89+
if (isThrottlingError(response)) {
90+
const rateToUse = !this.enabled ? this.measuredTxRate : Math.min(this.measuredTxRate, this.fillRate);
91+
this.lastMaxRate = rateToUse;
92+
this.calculateTimeWindow();
93+
this.lastThrottleTime = this.getCurrentTimeInSeconds();
94+
calculatedRate = this.cubicThrottle(rateToUse);
95+
this.enableTokenBucket();
96+
} else {
97+
this.calculateTimeWindow();
98+
calculatedRate = this.cubicSuccess(this.getCurrentTimeInSeconds());
99+
}
100+
101+
const newRate = Math.min(calculatedRate, 2 * this.measuredTxRate);
102+
this.updateTokenBucketRate(newRate);
103+
}
104+
105+
private calculateTimeWindow() {
106+
this.timeWindow = this.getPrecise(Math.pow((this.lastMaxRate * (1 - this.beta)) / this.scaleConstant, 1 / 3));
107+
}
108+
109+
private cubicThrottle(rateToUse: number) {
110+
return this.getPrecise(rateToUse * this.beta);
111+
}
112+
113+
private cubicSuccess(timestamp: number) {
114+
return this.getPrecise(
115+
this.scaleConstant * Math.pow(timestamp - this.lastThrottleTime - this.timeWindow, 3) + this.lastMaxRate
116+
);
117+
}
118+
119+
private enableTokenBucket() {
120+
this.enabled = true;
121+
}
122+
123+
private updateTokenBucketRate(newRate: number) {
124+
// Refill based on our current rate before we update to the new fill rate.
125+
this.refillTokenBucket();
126+
127+
this.fillRate = Math.max(newRate, this.minFillRate);
128+
this.maxCapacity = Math.max(newRate, this.minCapacity);
129+
130+
// When we scale down we can't have a current capacity that exceeds our maxCapacity.
131+
this.currentCapacity = Math.min(this.currentCapacity, this.maxCapacity);
132+
}
133+
134+
private updateMeasuredRate() {
135+
const t = this.getCurrentTimeInSeconds();
136+
const timeBucket = Math.floor(t * 2) / 2;
137+
this.requestCount++;
138+
139+
if (timeBucket > this.lastTxRateBucket) {
140+
const currentRate = this.requestCount / (timeBucket - this.lastTxRateBucket);
141+
this.measuredTxRate = this.getPrecise(currentRate * this.smooth + this.measuredTxRate * (1 - this.smooth));
142+
this.requestCount = 0;
143+
this.lastTxRateBucket = timeBucket;
144+
}
145+
}
146+
147+
private getPrecise(num: number) {
148+
return parseFloat(num.toFixed(8));
149+
}
150+
}

Diff for: packages/middleware-retry/src/types.ts

+18
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,21 @@ export interface RetryQuota {
4040
*/
4141
releaseRetryTokens: (releaseCapacityAmount?: number) => void;
4242
}
43+
44+
export interface RateLimiter {
45+
/**
46+
* If there is sufficient capacity (tokens) available, it immediately returns.
47+
* If there is not sufficient capacity, it will either sleep a certain amount
48+
* of time until the rate limiter can retrieve a token from its token bucket
49+
* or raise an exception indicating there is insufficient capacity.
50+
*/
51+
getSendToken: () => Promise<void>;
52+
53+
/**
54+
* Updates the client sending rate based on response.
55+
* If the response was successful, the capacity and fill rate are increased.
56+
* If the response was a throttling response, the capacity and fill rate are
57+
* decreased. Transient errors do not affect the rate limiter.
58+
*/
59+
updateClientSendingRate: (response: any) => void;
60+
}

0 commit comments

Comments
 (0)