|
6 | 6 | #pragma once
|
7 | 7 |
|
8 | 8 | #include <aws/core/Core_EXPORTS.h>
|
9 |
| - |
10 | 9 | #include <aws/core/utils/ratelimiter/RateLimiterInterface.h>
|
11 | 10 |
|
12 | 11 | #include <algorithm>
|
| 12 | +#include <functional> |
13 | 13 | #include <mutex>
|
14 | 14 | #include <thread>
|
15 |
| -#include <functional> |
16 | 15 |
|
17 |
| -namespace Aws |
18 |
| -{ |
19 |
| - namespace Utils |
20 |
| - { |
21 |
| - namespace RateLimits |
22 |
| - { |
23 |
| - /** |
24 |
| - * High precision rate limiter. If you need to limit your bandwidth by a budget, this is very likely the implementation you want. |
25 |
| - */ |
26 |
| - template<typename CLOCK = std::chrono::high_resolution_clock, typename DUR = std::chrono::seconds, bool RENORMALIZE_RATE_CHANGES = true> |
27 |
| - class DefaultRateLimiter : public RateLimiterInterface |
28 |
| - { |
29 |
| - public: |
30 |
| - using Base = RateLimiterInterface; |
31 |
| - |
32 |
| - using InternalTimePointType = std::chrono::time_point<CLOCK>; |
33 |
| - using ElapsedTimeFunctionType = std::function< InternalTimePointType() >; |
34 |
| - |
35 |
| - /** |
36 |
| - * Initializes state, starts counts, does some basic validation. |
37 |
| - */ |
38 |
| - DefaultRateLimiter(int64_t maxRate, ElapsedTimeFunctionType elapsedTimeFunction = CLOCK::now) : |
39 |
| - m_elapsedTimeFunction(elapsedTimeFunction), |
40 |
| - m_maxRate(0), |
41 |
| - m_accumulatorLock(), |
42 |
| - m_accumulator(0), |
43 |
| - m_accumulatorFraction(0), |
44 |
| - m_accumulatorUpdated(), |
45 |
| - m_replenishNumerator(0), |
46 |
| - m_replenishDenominator(0), |
47 |
| - m_delayNumerator(0), |
48 |
| - m_delayDenominator(0) |
49 |
| - { |
50 |
| - // verify we're not going to divide by zero due to goofy type parameterization |
51 |
| - static_assert(DUR::period::num > 0, "Rate duration must have positive numerator"); |
52 |
| - static_assert(DUR::period::den > 0, "Rate duration must have positive denominator"); |
53 |
| - static_assert(CLOCK::duration::period::num > 0, "RateLimiter clock duration must have positive numerator"); |
54 |
| - static_assert(CLOCK::duration::period::den > 0, "RateLimiter clock duration must have positive denominator"); |
55 |
| - |
56 |
| - DefaultRateLimiter::SetRate(maxRate, true); |
57 |
| - } |
58 |
| - |
59 |
| - virtual ~DefaultRateLimiter() = default; |
60 |
| - |
61 |
| - /** |
62 |
| - * Calculates time in milliseconds that should be delayed before letting anymore data through. |
63 |
| - */ |
64 |
| - virtual DelayType ApplyCost(int64_t cost) override |
65 |
| - { |
66 |
| - std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock); |
67 |
| - |
68 |
| - auto now = m_elapsedTimeFunction(); |
69 |
| - auto elapsedTime = (now - m_accumulatorUpdated).count(); |
70 |
| - |
71 |
| - // replenish the accumulator based on how much time has passed |
72 |
| - auto temp = elapsedTime * m_replenishNumerator + m_accumulatorFraction; |
73 |
| - m_accumulator += temp / m_replenishDenominator; |
74 |
| - m_accumulatorFraction = temp % m_replenishDenominator; |
75 |
| - |
76 |
| - // the accumulator is capped based on the maximum rate |
77 |
| - m_accumulator = (std::min)(m_accumulator, m_maxRate); |
78 |
| - if (m_accumulator == m_maxRate) |
79 |
| - { |
80 |
| - m_accumulatorFraction = 0; |
81 |
| - } |
82 |
| - |
83 |
| - // if the accumulator is still negative, then we'll have to wait |
84 |
| - DelayType delay(0); |
85 |
| - if (m_accumulator < 0) |
86 |
| - { |
87 |
| - delay = DelayType(-m_accumulator * m_delayDenominator / m_delayNumerator); |
88 |
| - } |
89 |
| - |
90 |
| - // apply the cost to the accumulator after the delay has been calculated; the next call will end up paying for our cost |
91 |
| - m_accumulator -= cost; |
92 |
| - m_accumulatorUpdated = now; |
93 |
| - |
94 |
| - return delay; |
95 |
| - } |
96 |
| - |
97 |
| - /** |
98 |
| - * Same as ApplyCost() but then goes ahead and sleeps the current thread. |
99 |
| - */ |
100 |
| - virtual void ApplyAndPayForCost(int64_t cost) override |
101 |
| - { |
102 |
| - auto costInMilliseconds = ApplyCost(cost); |
103 |
| - if(costInMilliseconds.count() > 0) |
104 |
| - { |
105 |
| - std::this_thread::sleep_for(costInMilliseconds); |
106 |
| - } |
107 |
| - } |
108 |
| - |
109 |
| - /** |
110 |
| - * Update the bandwidth rate to allow. |
111 |
| - */ |
112 |
| - virtual void SetRate(int64_t rate, bool resetAccumulator = false) override |
113 |
| - { |
114 |
| - std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock); |
115 |
| - |
116 |
| - // rate must always be positive |
117 |
| - rate = (std::max)(static_cast<int64_t>(1), rate); |
118 |
| - |
119 |
| - if (resetAccumulator) |
120 |
| - { |
121 |
| - m_accumulator = rate; |
122 |
| - m_accumulatorFraction = 0; |
123 |
| - m_accumulatorUpdated = m_elapsedTimeFunction(); |
124 |
| - } |
125 |
| - else |
126 |
| - { |
127 |
| - // sync the accumulator to current time |
128 |
| - ApplyCost(0); // this call is why we need a recursive mutex |
129 |
| - |
130 |
| - if (ShouldRenormalizeAccumulatorOnRateChange()) |
131 |
| - { |
132 |
| - // now renormalize the accumulator and its fractional part against the new rate |
133 |
| - // the idea here is we want to preserve the desired wait based on the previous rate |
134 |
| - // |
135 |
| - // As an example: |
136 |
| - // Say we had a rate of 100/s and our accumulator was -500 (ie the next ApplyCost would incur a 5 second delay) |
137 |
| - // If we change the rate to 1000/s and want to preserve that delay, we need to scale the accumulator to -5000 |
138 |
| - m_accumulator = m_accumulator * rate / m_maxRate; |
139 |
| - m_accumulatorFraction = m_accumulatorFraction * rate / m_maxRate; |
140 |
| - } |
141 |
| - } |
142 |
| - |
143 |
| - m_maxRate = rate; |
144 |
| - |
145 |
| - // Helper constants that represent the amount replenished per CLOCK time period; use the gcd to reduce them in order to try and minimize the chance of integer overflow |
146 |
| - m_replenishNumerator = m_maxRate * DUR::period::den * CLOCK::duration::period::num; |
147 |
| - m_replenishDenominator = DUR::period::num * CLOCK::duration::period::den; |
148 |
| - auto gcd = ComputeGCD(m_replenishNumerator, m_replenishDenominator); |
149 |
| - m_replenishNumerator /= gcd; |
150 |
| - m_replenishDenominator /= gcd; |
151 |
| - |
152 |
| - // Helper constants that represent the delay per unit of costAccumulator; use the gcd to reduce them in order to try and minimize the chance of integer overflow |
153 |
| - m_delayNumerator = m_maxRate * DelayType::period::num * DUR::period::den; |
154 |
| - m_delayDenominator = DelayType::period::den * DUR::period::num; |
155 |
| - gcd = ComputeGCD(m_delayNumerator, m_delayDenominator); |
156 |
| - m_delayNumerator /= gcd; |
157 |
| - m_delayDenominator /= gcd; |
158 |
| - } |
159 |
| - |
160 |
| - private: |
161 |
| - |
162 |
| - int64_t ComputeGCD(int64_t num1, int64_t num2) const |
163 |
| - { |
164 |
| - // Euclid's |
165 |
| - while (num2 != 0) |
166 |
| - { |
167 |
| - int64_t rem = num1 % num2; |
168 |
| - num1 = num2; |
169 |
| - num2 = rem; |
170 |
| - } |
171 |
| - |
172 |
| - return num1; |
173 |
| - } |
174 |
| - |
175 |
| - bool ShouldRenormalizeAccumulatorOnRateChange() const { return RENORMALIZE_RATE_CHANGES; } |
176 |
| - |
177 |
| - /// Function that returns the current time |
178 |
| - ElapsedTimeFunctionType m_elapsedTimeFunction; |
179 |
| - |
180 |
| - /// The rate we want to limit to |
181 |
| - int64_t m_maxRate; |
182 |
| - |
183 |
| - /// We need to pretty much lock everything while either setting the rate or applying a cost |
184 |
| - std::recursive_mutex m_accumulatorLock; |
185 |
| - |
186 |
| - /// Tracks how much "rate" we currently have to give; if this drops below zero then we start having to wait in order to perform operations and maintain the rate |
187 |
| - /// Replenishes over time based on m_maxRate |
188 |
| - int64_t m_accumulator; |
189 |
| - |
190 |
| - /// Updates can occur at any time, leading to a fractional accumulation; represents the fraction (m_accumulatorFraction / m_replenishDenominator) |
191 |
| - int64_t m_accumulatorFraction; |
192 |
| - |
193 |
| - /// Last time point the accumulator was updated |
194 |
| - InternalTimePointType m_accumulatorUpdated; |
195 |
| - |
196 |
| - /// Some helper constants that represents fixed (per m_maxRate) ratios used in the delay and replenishment calculations |
197 |
| - int64_t m_replenishNumerator; |
198 |
| - int64_t m_replenishDenominator; |
199 |
| - int64_t m_delayNumerator; |
200 |
| - int64_t m_delayDenominator; |
201 |
| - }; |
202 |
| - |
203 |
| - } // namespace RateLimits |
204 |
| - } // namespace Utils |
205 |
| -} // namespace Aws |
| 16 | +namespace Aws { |
| 17 | +namespace Utils { |
| 18 | +namespace RateLimits { |
| 19 | +/** |
| 20 | + * High precision rate limiter. If you need to limit your bandwidth by a budget, this is very likely the implementation you want. |
| 21 | + */ |
| 22 | +template <typename CLOCK = std::chrono::steady_clock, typename DUR = std::chrono::seconds, bool RENORMALIZE_RATE_CHANGES = true> |
| 23 | +class DefaultRateLimiter : public RateLimiterInterface { |
| 24 | + public: |
| 25 | + using Base = RateLimiterInterface; |
| 26 | + |
| 27 | + using InternalTimePointType = std::chrono::time_point<CLOCK>; |
| 28 | + using ElapsedTimeFunctionType = std::function<InternalTimePointType()>; |
| 29 | + |
| 30 | + /** |
| 31 | + * Initializes state, starts counts, does some basic validation. |
| 32 | + */ |
| 33 | + DefaultRateLimiter(int64_t maxRate, ElapsedTimeFunctionType elapsedTimeFunction = CLOCK::now) |
| 34 | + : m_elapsedTimeFunction(elapsedTimeFunction), |
| 35 | + m_maxRate(0), |
| 36 | + m_accumulatorLock(), |
| 37 | + m_accumulator(0), |
| 38 | + m_accumulatorFraction(0), |
| 39 | + m_accumulatorUpdated(), |
| 40 | + m_replenishNumerator(0), |
| 41 | + m_replenishDenominator(0), |
| 42 | + m_delayNumerator(0), |
| 43 | + m_delayDenominator(0) { |
| 44 | + // verify we're not going to divide by zero due to goofy type parameterization |
| 45 | + static_assert(DUR::period::num > 0, "Rate duration must have positive numerator"); |
| 46 | + static_assert(DUR::period::den > 0, "Rate duration must have positive denominator"); |
| 47 | + static_assert(CLOCK::duration::period::num > 0, "RateLimiter clock duration must have positive numerator"); |
| 48 | + static_assert(CLOCK::duration::period::den > 0, "RateLimiter clock duration must have positive denominator"); |
| 49 | + |
| 50 | + DefaultRateLimiter::SetRate(maxRate, true); |
| 51 | + } |
| 52 | + |
| 53 | + virtual ~DefaultRateLimiter() = default; |
| 54 | + |
| 55 | + /** |
| 56 | + * Calculates time in milliseconds that should be delayed before letting anymore data through. |
| 57 | + */ |
| 58 | + virtual DelayType ApplyCost(int64_t cost) override { |
| 59 | + std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock); |
| 60 | + |
| 61 | + auto now = m_elapsedTimeFunction(); |
| 62 | + auto elapsedTime = (now - m_accumulatorUpdated).count(); |
| 63 | + |
| 64 | + // replenish the accumulator based on how much time has passed |
| 65 | + auto temp = elapsedTime * m_replenishNumerator + m_accumulatorFraction; |
| 66 | + m_accumulator += temp / m_replenishDenominator; |
| 67 | + m_accumulatorFraction = temp % m_replenishDenominator; |
| 68 | + |
| 69 | + // the accumulator is capped based on the maximum rate |
| 70 | + m_accumulator = (std::min)(m_accumulator, m_maxRate); |
| 71 | + if (m_accumulator == m_maxRate) { |
| 72 | + m_accumulatorFraction = 0; |
| 73 | + } |
| 74 | + |
| 75 | + // if the accumulator is still negative, then we'll have to wait |
| 76 | + DelayType delay(0); |
| 77 | + if (m_accumulator < 0) { |
| 78 | + delay = DelayType(-m_accumulator * m_delayDenominator / m_delayNumerator); |
| 79 | + } |
| 80 | + |
| 81 | + // apply the cost to the accumulator after the delay has been calculated; the next call will end up paying for our cost |
| 82 | + m_accumulator -= cost; |
| 83 | + m_accumulatorUpdated = now; |
| 84 | + |
| 85 | + return delay; |
| 86 | + } |
| 87 | + |
| 88 | + /** |
| 89 | + * Same as ApplyCost() but then goes ahead and sleeps the current thread. |
| 90 | + */ |
| 91 | + virtual void ApplyAndPayForCost(int64_t cost) override { |
| 92 | + auto costInMilliseconds = ApplyCost(cost); |
| 93 | + if (costInMilliseconds.count() > 0) { |
| 94 | + std::this_thread::sleep_for(costInMilliseconds); |
| 95 | + } |
| 96 | + } |
| 97 | + |
| 98 | + /** |
| 99 | + * Update the bandwidth rate to allow. |
| 100 | + */ |
| 101 | + virtual void SetRate(int64_t rate, bool resetAccumulator = false) override { |
| 102 | + std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock); |
| 103 | + |
| 104 | + // rate must always be positive |
| 105 | + rate = (std::max)(static_cast<int64_t>(1), rate); |
| 106 | + |
| 107 | + if (resetAccumulator) { |
| 108 | + m_accumulator = rate; |
| 109 | + m_accumulatorFraction = 0; |
| 110 | + m_accumulatorUpdated = m_elapsedTimeFunction(); |
| 111 | + } else { |
| 112 | + // sync the accumulator to current time |
| 113 | + ApplyCost(0); // this call is why we need a recursive mutex |
| 114 | + |
| 115 | + if (ShouldRenormalizeAccumulatorOnRateChange()) { |
| 116 | + // now renormalize the accumulator and its fractional part against the new rate |
| 117 | + // the idea here is we want to preserve the desired wait based on the previous rate |
| 118 | + // |
| 119 | + // As an example: |
| 120 | + // Say we had a rate of 100/s and our accumulator was -500 (ie the next ApplyCost would incur a 5 second delay) |
| 121 | + // If we change the rate to 1000/s and want to preserve that delay, we need to scale the accumulator to -5000 |
| 122 | + m_accumulator = m_accumulator * rate / m_maxRate; |
| 123 | + m_accumulatorFraction = m_accumulatorFraction * rate / m_maxRate; |
| 124 | + } |
| 125 | + } |
| 126 | + |
| 127 | + m_maxRate = rate; |
| 128 | + |
| 129 | + // Helper constants that represent the amount replenished per CLOCK time period; use the gcd to reduce them in order to try and minimize |
| 130 | + // the chance of integer overflow |
| 131 | + m_replenishNumerator = m_maxRate * DUR::period::den * CLOCK::duration::period::num; |
| 132 | + m_replenishDenominator = DUR::period::num * CLOCK::duration::period::den; |
| 133 | + auto gcd = ComputeGCD(m_replenishNumerator, m_replenishDenominator); |
| 134 | + m_replenishNumerator /= gcd; |
| 135 | + m_replenishDenominator /= gcd; |
| 136 | + |
| 137 | + // Helper constants that represent the delay per unit of costAccumulator; use the gcd to reduce them in order to try and minimize the |
| 138 | + // chance of integer overflow |
| 139 | + m_delayNumerator = m_maxRate * DelayType::period::num * DUR::period::den; |
| 140 | + m_delayDenominator = DelayType::period::den * DUR::period::num; |
| 141 | + gcd = ComputeGCD(m_delayNumerator, m_delayDenominator); |
| 142 | + m_delayNumerator /= gcd; |
| 143 | + m_delayDenominator /= gcd; |
| 144 | + } |
| 145 | + |
| 146 | + private: |
| 147 | + int64_t ComputeGCD(int64_t num1, int64_t num2) const { |
| 148 | + // Euclid's |
| 149 | + while (num2 != 0) { |
| 150 | + int64_t rem = num1 % num2; |
| 151 | + num1 = num2; |
| 152 | + num2 = rem; |
| 153 | + } |
| 154 | + |
| 155 | + return num1; |
| 156 | + } |
| 157 | + |
| 158 | + bool ShouldRenormalizeAccumulatorOnRateChange() const { return RENORMALIZE_RATE_CHANGES; } |
| 159 | + |
| 160 | + /// Function that returns the current time |
| 161 | + ElapsedTimeFunctionType m_elapsedTimeFunction; |
| 162 | + |
| 163 | + /// The rate we want to limit to |
| 164 | + int64_t m_maxRate; |
| 165 | + |
| 166 | + /// We need to pretty much lock everything while either setting the rate or applying a cost |
| 167 | + std::recursive_mutex m_accumulatorLock; |
| 168 | + |
| 169 | + /// Tracks how much "rate" we currently have to give; if this drops below zero then we start having to wait in order to perform operations |
| 170 | + /// and maintain the rate Replenishes over time based on m_maxRate |
| 171 | + int64_t m_accumulator; |
| 172 | + |
| 173 | + /// Updates can occur at any time, leading to a fractional accumulation; represents the fraction (m_accumulatorFraction / |
| 174 | + /// m_replenishDenominator) |
| 175 | + int64_t m_accumulatorFraction; |
| 176 | + |
| 177 | + /// Last time point the accumulator was updated |
| 178 | + InternalTimePointType m_accumulatorUpdated; |
| 179 | + |
| 180 | + /// Some helper constants that represents fixed (per m_maxRate) ratios used in the delay and replenishment calculations |
| 181 | + int64_t m_replenishNumerator; |
| 182 | + int64_t m_replenishDenominator; |
| 183 | + int64_t m_delayNumerator; |
| 184 | + int64_t m_delayDenominator; |
| 185 | +}; |
| 186 | + |
| 187 | +} // namespace RateLimits |
| 188 | +} // namespace Utils |
| 189 | +} // namespace Aws |
0 commit comments