Skip to content

Commit 27638ea

Browse files
minjea.leesamber
minjea.lee
authored andcommitted
feat: adding NewThrottle (#396)
1 parent d587677 commit 27638ea

File tree

3 files changed

+164
-1
lines changed

3 files changed

+164
-1
lines changed

README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ Concurrency helpers:
297297
- [AttemptWhileWithDelay](#attemptwhilewithdelay)
298298
- [Debounce](#debounce)
299299
- [DebounceBy](#debounceby)
300+
- [Throttle](#throttle)
300301
- [Synchronize](#synchronize)
301302
- [Async](#async)
302303
- [Transaction](#transaction)
@@ -3417,6 +3418,47 @@ cancel("second key")
34173418

34183419
[[play](https://go.dev/play/p/d3Vpt6pxhY8)]
34193420

3421+
### Throttle
3422+
`NewThrottle` creates a throttled instance that invokes given functions only once in every interval.
3423+
This returns 2 functions, First one is throttled function and Second one is a function to reset interval.
3424+
3425+
```go
3426+
3427+
f := func() {
3428+
println("Called once in every 100ms")
3429+
}
3430+
3431+
throttle, reset := lo.NewThrottle(100 * time.Millisecond, f)
3432+
3433+
for j := 0; j < 10; j++ {
3434+
throttle()
3435+
time.Sleep(30 * time.Millisecond)
3436+
}
3437+
3438+
reset()
3439+
throttle()
3440+
3441+
```
3442+
3443+
`NewThrottleWithCount` is NewThrottle with count limit, throttled function will be invoked count times in every interval.
3444+
```go
3445+
3446+
f := func() {
3447+
println("Called three times in every 100ms")
3448+
}
3449+
3450+
throttle, reset := lo.NewThrottle(100 * time.Millisecond, f)
3451+
3452+
for j := 0; j < 10; j++ {
3453+
throttle()
3454+
time.Sleep(30 * time.Millisecond)
3455+
}
3456+
3457+
reset()
3458+
throttle()
3459+
3460+
```
3461+
34203462
### Synchronize
34213463

34223464
Wraps the underlying callback in a mutex. It receives an optional mutex.

retry.go

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,4 +287,62 @@ func (t *Transaction[T]) Process(state T) (T, error) {
287287
return state, err
288288
}
289289

290-
// throttle ?
290+
type throttle struct {
291+
mu *sync.Mutex
292+
timer *time.Timer
293+
interval time.Duration
294+
callbacks []func()
295+
countLimit int
296+
count int
297+
}
298+
299+
func (th *throttle) throttledFunc() {
300+
th.mu.Lock()
301+
defer th.mu.Unlock()
302+
if th.count < th.countLimit {
303+
th.count++
304+
305+
for _, f := range th.callbacks {
306+
f()
307+
}
308+
309+
}
310+
if th.timer == nil {
311+
th.timer = time.AfterFunc(th.interval, func() {
312+
th.reset()
313+
})
314+
}
315+
}
316+
317+
func (th *throttle) reset() {
318+
th.mu.Lock()
319+
defer th.mu.Unlock()
320+
321+
if th.timer != nil {
322+
th.timer.Stop()
323+
}
324+
325+
th.count = 0
326+
th.timer = nil
327+
328+
}
329+
330+
// NewThrottle creates a throttled instance that invokes given functions only once in every interval.
331+
// This returns 2 functions, First one is throttled function and Second one is a function to reset interval
332+
func NewThrottle(interval time.Duration, f ...func()) (func(), func()) {
333+
return NewThrottleWithCount(interval, 1, f...)
334+
}
335+
336+
// NewThrottleWithCount is NewThrottle with count limit, throttled function will be invoked count times in every interval.
337+
func NewThrottleWithCount(interval time.Duration, count int, f ...func()) (func(), func()) {
338+
if count <= 0 {
339+
count = 1
340+
}
341+
th := &throttle{
342+
mu: new(sync.Mutex),
343+
interval: interval,
344+
callbacks: f,
345+
countLimit: count,
346+
}
347+
return th.throttledFunc, th.reset
348+
}

retry_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,3 +498,66 @@ func TestTransaction(t *testing.T) {
498498
is.Equal(assert.AnError, err)
499499
}
500500
}
501+
502+
func TestNewThrottle(t *testing.T) {
503+
t.Parallel()
504+
is := assert.New(t)
505+
callCount := 0
506+
f1 := func() {
507+
callCount++
508+
}
509+
th, reset := NewThrottle(10*time.Millisecond, f1)
510+
511+
is.Equal(0, callCount)
512+
for i := 0; i < 9; i++ {
513+
var wg sync.WaitGroup
514+
for j := 0; j < 100; j++ {
515+
wg.Add(1)
516+
go func() {
517+
defer wg.Done()
518+
th()
519+
}()
520+
}
521+
wg.Wait()
522+
time.Sleep(3 * time.Millisecond)
523+
}
524+
// 35 ms passed
525+
is.Equal(3, callCount)
526+
527+
// reset counter
528+
reset()
529+
th()
530+
is.Equal(4, callCount)
531+
532+
}
533+
534+
func TestNewThrottleWithCount(t *testing.T) {
535+
t.Parallel()
536+
is := assert.New(t)
537+
callCount := 0
538+
f1 := func() {
539+
callCount++
540+
}
541+
th, reset := NewThrottleWithCount(10*time.Millisecond, 3, f1)
542+
543+
// the function does not throttle for initial count number
544+
for i := 0; i < 20; i++ {
545+
th()
546+
}
547+
is.Equal(3, callCount)
548+
549+
time.Sleep(11 * time.Millisecond)
550+
551+
for i := 0; i < 20; i++ {
552+
th()
553+
}
554+
555+
is.Equal(6, callCount)
556+
557+
reset()
558+
for i := 0; i < 20; i++ {
559+
th()
560+
}
561+
562+
is.Equal(9, callCount)
563+
}

0 commit comments

Comments
 (0)