Skip to content

Commit 1e5fd8a

Browse files
authored
Replacing retry goroutines with iterators (#2087)
1 parent 7502986 commit 1e5fd8a

File tree

5 files changed

+167
-43
lines changed

5 files changed

+167
-43
lines changed

Diff for: api-bucket-notification.go

+1-8
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,6 @@ func (c *Client) ListenBucketNotification(ctx context.Context, bucketName, prefi
157157
return
158158
}
159159

160-
// Continuously run and listen on bucket notification.
161-
// Create a done channel to control 'ListObjects' go routine.
162-
retryDoneCh := make(chan struct{}, 1)
163-
164-
// Indicate to our routine to exit cleanly upon return.
165-
defer close(retryDoneCh)
166-
167160
// Prepare urlValues to pass into the request on every loop
168161
urlValues := make(url.Values)
169162
urlValues.Set("ping", "10")
@@ -172,7 +165,7 @@ func (c *Client) ListenBucketNotification(ctx context.Context, bucketName, prefi
172165
urlValues["events"] = events
173166

174167
// Wait on the jitter retry loop.
175-
for range c.newRetryTimerContinous(time.Second, time.Second*30, MaxJitter, retryDoneCh) {
168+
for range c.newRetryTimerContinous(time.Second, time.Second*30, MaxJitter) {
176169
// Execute GET on bucket to list objects.
177170
resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
178171
bucketName: bucketName,

Diff for: api.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -660,13 +660,7 @@ func (c *Client) executeMethod(ctx context.Context, method string, metadata requ
660660
metadata.trailer.Set(metadata.addCrc.Key(), base64.StdEncoding.EncodeToString(crc.Sum(nil)))
661661
}
662662

663-
// Create cancel context to control 'newRetryTimer' go routine.
664-
retryCtx, cancel := context.WithCancel(ctx)
665-
666-
// Indicate to our routine to exit cleanly upon return.
667-
defer cancel()
668-
669-
for range c.newRetryTimer(retryCtx, reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter) {
663+
for range c.newRetryTimer(ctx, reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter) {
670664
// Retry executes the following function body if request has an
671665
// error until maxRetries have been exhausted, retry attempts are
672666
// performed after waiting for a given period of time in a
@@ -779,7 +773,7 @@ func (c *Client) executeMethod(ctx context.Context, method string, metadata requ
779773
}
780774

781775
// Return an error when retry is canceled or deadlined
782-
if e := retryCtx.Err(); e != nil {
776+
if e := ctx.Err(); e != nil {
783777
return nil, e
784778
}
785779

Diff for: retry-continous.go

+11-15
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
package minio
1919

20-
import "time"
20+
import (
21+
"iter"
22+
"math"
23+
"time"
24+
)
2125

2226
// newRetryTimerContinous creates a timer with exponentially increasing delays forever.
23-
func (c *Client) newRetryTimerContinous(baseSleep, maxSleep time.Duration, jitter float64, doneCh chan struct{}) <-chan int {
24-
attemptCh := make(chan int)
25-
27+
func (c *Client) newRetryTimerContinous(baseSleep, maxSleep time.Duration, jitter float64) iter.Seq[int] {
2628
// normalize jitter to the range [0, 1.0]
2729
if jitter < NoJitter {
2830
jitter = NoJitter
@@ -44,26 +46,20 @@ func (c *Client) newRetryTimerContinous(baseSleep, maxSleep time.Duration, jitte
4446
if sleep > maxSleep {
4547
sleep = maxSleep
4648
}
47-
if jitter != NoJitter {
49+
if math.Abs(jitter-NoJitter) > 1e-9 {
4850
sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter)
4951
}
5052
return sleep
5153
}
5254

53-
go func() {
54-
defer close(attemptCh)
55+
return func(yield func(int) bool) {
5556
var nextBackoff int
5657
for {
57-
select {
58-
// Attempts starts.
59-
case attemptCh <- nextBackoff:
60-
nextBackoff++
61-
case <-doneCh:
62-
// Stop the routine.
58+
if !yield(nextBackoff) {
6359
return
6460
}
61+
nextBackoff++
6562
time.Sleep(exponentialBackoffWait(nextBackoff))
6663
}
67-
}()
68-
return attemptCh
64+
}
6965
}

Diff for: retry.go

+15-12
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"context"
2222
"crypto/x509"
2323
"errors"
24+
"iter"
25+
"math"
2426
"net/http"
2527
"net/url"
2628
"time"
@@ -45,9 +47,7 @@ var DefaultRetryCap = time.Second
4547

4648
// newRetryTimer creates a timer with exponentially increasing
4749
// delays until the maximum retry attempts are reached.
48-
func (c *Client) newRetryTimer(ctx context.Context, maxRetry int, baseSleep, maxSleep time.Duration, jitter float64) <-chan int {
49-
attemptCh := make(chan int)
50-
50+
func (c *Client) newRetryTimer(ctx context.Context, maxRetry int, baseSleep, maxSleep time.Duration, jitter float64) iter.Seq[int] {
5151
// computes the exponential backoff duration according to
5252
// https://www.awsarchitectureblog.com/2015/03/backoff.html
5353
exponentialBackoffWait := func(attempt int) time.Duration {
@@ -64,18 +64,22 @@ func (c *Client) newRetryTimer(ctx context.Context, maxRetry int, baseSleep, max
6464
if sleep > maxSleep {
6565
sleep = maxSleep
6666
}
67-
if jitter != NoJitter {
67+
if math.Abs(jitter-NoJitter) > 1e-9 {
6868
sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter)
6969
}
7070
return sleep
7171
}
7272

73-
go func() {
74-
defer close(attemptCh)
75-
for i := 0; i < maxRetry; i++ {
76-
select {
77-
case attemptCh <- i + 1:
78-
case <-ctx.Done():
73+
return func(yield func(int) bool) {
74+
// if context is already canceled, skip yield
75+
select {
76+
case <-ctx.Done():
77+
return
78+
default:
79+
}
80+
81+
for i := range maxRetry {
82+
if !yield(i) {
7983
return
8084
}
8185

@@ -85,8 +89,7 @@ func (c *Client) newRetryTimer(ctx context.Context, maxRetry int, baseSleep, max
8589
return
8690
}
8791
}
88-
}()
89-
return attemptCh
92+
}
9093
}
9194

9295
// List of AWS S3 error codes which are retryable.

Diff for: retry_test.go

+138
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package minio
2+
3+
import (
4+
"context"
5+
"math/rand"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestRetryTimer(t *testing.T) {
11+
t.Run("withLimit", func(t *testing.T) {
12+
t.Parallel()
13+
c := &Client{random: rand.New(rand.NewSource(42))}
14+
ctx := context.Background()
15+
var count int
16+
for range c.newRetryTimer(ctx, 3, time.Millisecond, 10*time.Millisecond, 0.0) {
17+
count++
18+
}
19+
if count != 3 {
20+
t.Errorf("expected exactly 3 yields")
21+
}
22+
})
23+
24+
t.Run("checkDelay", func(t *testing.T) {
25+
t.Parallel()
26+
c := &Client{random: rand.New(rand.NewSource(42))}
27+
ctx := context.Background()
28+
prev := time.Now()
29+
baseSleep := time.Millisecond
30+
maxSleep := 10 * time.Millisecond
31+
for i := range c.newRetryTimer(ctx, 6, baseSleep, maxSleep, 0.0) {
32+
if i == 0 {
33+
// there is no sleep for the first execution
34+
if time.Since(prev) >= time.Millisecond {
35+
t.Errorf("expected to not sleep for the first instance of the loop")
36+
}
37+
prev = time.Now()
38+
continue
39+
}
40+
expect := baseSleep * time.Duration(1<<uint(i-1))
41+
expect = min(expect, maxSleep)
42+
if d := time.Since(prev); d < expect || d > 2*maxSleep {
43+
t.Errorf("expected to sleep for at least %s", expect.String())
44+
}
45+
prev = time.Now()
46+
}
47+
})
48+
49+
t.Run("withBreak", func(t *testing.T) {
50+
t.Parallel()
51+
c := &Client{random: rand.New(rand.NewSource(42))}
52+
ctx := context.Background()
53+
var count int
54+
for range c.newRetryTimer(ctx, 10, time.Millisecond, 10*time.Millisecond, 0.5) {
55+
count++
56+
if count >= 3 {
57+
break
58+
}
59+
}
60+
if count != 3 {
61+
t.Errorf("expected exactly 3 yields")
62+
}
63+
})
64+
65+
t.Run("withCancelledContext", func(t *testing.T) {
66+
t.Parallel()
67+
c := &Client{random: rand.New(rand.NewSource(42))}
68+
ctx := context.Background()
69+
ctx, cancel := context.WithCancel(ctx)
70+
cancel()
71+
var count int
72+
for range c.newRetryTimer(ctx, 10, time.Millisecond, 10*time.Millisecond, 0.5) {
73+
count++
74+
}
75+
if count != 0 {
76+
t.Errorf("expected no yields")
77+
}
78+
})
79+
t.Run("whileCancelledContext", func(t *testing.T) {
80+
t.Parallel()
81+
c := &Client{random: rand.New(rand.NewSource(42))}
82+
ctx := context.Background()
83+
ctx, cancel := context.WithCancel(ctx)
84+
var count int
85+
for range c.newRetryTimer(ctx, 10, time.Millisecond, 10*time.Millisecond, 0.5) {
86+
count++
87+
cancel()
88+
}
89+
cancel()
90+
if count != 1 {
91+
t.Errorf("expected only one yield")
92+
}
93+
})
94+
}
95+
96+
func TestRetryContinuous(t *testing.T) {
97+
t.Run("checkDelay", func(t *testing.T) {
98+
t.Parallel()
99+
c := &Client{random: rand.New(rand.NewSource(42))}
100+
prev := time.Now()
101+
baseSleep := time.Millisecond
102+
maxSleep := 10 * time.Millisecond
103+
for i := range c.newRetryTimerContinous(time.Millisecond, 10*time.Millisecond, 0.0) {
104+
if i == 0 {
105+
// there is no sleep for the first execution
106+
if time.Since(prev) >= time.Millisecond {
107+
t.Errorf("expected to not sleep for the first instance of the loop")
108+
}
109+
prev = time.Now()
110+
continue
111+
}
112+
expect := baseSleep * time.Duration(1<<uint(i-1))
113+
expect = min(expect, maxSleep)
114+
if d := time.Since(prev); d < expect || d > 2*maxSleep {
115+
t.Errorf("expected to sleep for at least %s", expect.String())
116+
}
117+
prev = time.Now()
118+
if i >= 10 {
119+
break
120+
}
121+
}
122+
})
123+
124+
t.Run("withBreak", func(t *testing.T) {
125+
t.Parallel()
126+
c := &Client{random: rand.New(rand.NewSource(42))}
127+
var count int
128+
for range c.newRetryTimerContinous(time.Millisecond, 10*time.Millisecond, 0.5) {
129+
count++
130+
if count >= 3 {
131+
break
132+
}
133+
}
134+
if count != 3 {
135+
t.Errorf("expected exactly 3 yields")
136+
}
137+
})
138+
}

0 commit comments

Comments
 (0)