Skip to content

Commit 81fcf88

Browse files
authored
fix retry on markers (#5441)
* fix retry on markers Signed-off-by: Alan Protasio <[email protected]> * lint + changelog Signed-off-by: Alan Protasio <[email protected]> --------- Signed-off-by: Alan Protasio <[email protected]>
1 parent e2e5bcf commit 81fcf88

File tree

5 files changed

+122
-16
lines changed

5 files changed

+122
-16
lines changed

Diff for: pkg/storage/bucket/s3/bucket_client.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,17 @@ func NewBucketReaderClient(cfg Config, name string, logger log.Logger) (objstore
5959
}, nil
6060
}
6161

62+
// NewBucketWithRetries wraps the original bucket into the BucketWithRetries
63+
func NewBucketWithRetries(bucket objstore.Bucket, operationRetries int, retryMinBackoff time.Duration, retryMaxBackoff time.Duration, logger log.Logger) (objstore.Bucket, error) {
64+
return &BucketWithRetries{
65+
logger: logger,
66+
bucket: bucket,
67+
operationRetries: operationRetries,
68+
retryMinBackoff: retryMinBackoff,
69+
retryMaxBackoff: retryMaxBackoff,
70+
}, nil
71+
}
72+
6273
func newS3Config(cfg Config) (s3.Config, error) {
6374
sseCfg, err := cfg.SSE.BuildThanosConfig()
6475
if err != nil {
@@ -166,7 +177,11 @@ func (b *BucketWithRetries) Upload(ctx context.Context, name string, r io.Reader
166177
if !ok {
167178
// Skip retry if incoming Reader is not seekable to avoid
168179
// loading entire content into memory
169-
return b.bucket.Upload(ctx, name, r)
180+
err := b.bucket.Upload(ctx, name, r)
181+
if err != nil {
182+
level.Warn(b.logger).Log("msg", "skip upload retry as reader is not seekable", "file", name, "err", err)
183+
}
184+
return err
170185
}
171186
return b.retry(ctx, func() error {
172187
if _, err := rs.Seek(0, io.SeekStart); err != nil {

Diff for: pkg/storage/bucket/s3/bucket_client_test.go

+38-13
Original file line numberDiff line numberDiff line change
@@ -60,21 +60,46 @@ func TestBucketWithRetries_ShouldRetry(t *testing.T) {
6060
func TestBucketWithRetries_UploadSeekable(t *testing.T) {
6161
t.Parallel()
6262

63-
m := mockBucket{
64-
FailCount: 3,
65-
}
66-
b := BucketWithRetries{
67-
logger: log.NewNopLogger(),
68-
bucket: &m,
69-
operationRetries: 5,
70-
retryMinBackoff: 10 * time.Millisecond,
71-
retryMaxBackoff: time.Second,
63+
cases := map[string]struct {
64+
readerFactory func(i string) io.Reader
65+
input string
66+
err error
67+
}{
68+
"should retry when seekable": {
69+
err: nil,
70+
input: "test input",
71+
readerFactory: func(i string) io.Reader {
72+
return bytes.NewReader([]byte(i))
73+
},
74+
},
75+
76+
"should not retry when seekable": {
77+
err: fmt.Errorf("failed upload: 2"),
78+
input: "test input",
79+
readerFactory: func(i string) io.Reader {
80+
return bytes.NewBuffer([]byte(i))
81+
},
82+
},
7283
}
7384

74-
input := []byte("test input")
75-
err := b.Upload(context.Background(), "dummy", bytes.NewReader(input))
76-
require.NoError(t, err)
77-
require.Equal(t, input, m.uploadedContent)
85+
for name, tc := range cases {
86+
t.Run(name, func(*testing.T) {
87+
m := mockBucket{
88+
FailCount: 3,
89+
}
90+
b := BucketWithRetries{
91+
logger: log.NewNopLogger(),
92+
bucket: &m,
93+
operationRetries: 5,
94+
retryMinBackoff: 10 * time.Millisecond,
95+
retryMaxBackoff: time.Second,
96+
}
97+
98+
err := b.Upload(context.Background(), "dummy", tc.readerFactory(tc.input))
99+
require.Equal(t, tc.err, err)
100+
require.Equal(t, tc.input, string(m.uploadedContent))
101+
})
102+
}
78103
}
79104

80105
func TestBucketWithRetries_UploadNonSeekable(t *testing.T) {

Diff for: pkg/storage/tsdb/bucketindex/markers_bucket_client.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Read
3838
}
3939

4040
// Upload it to the global marker's location.
41-
if err := b.parent.Upload(ctx, globalMarkPath, bytes.NewBuffer(body)); err != nil {
41+
if err := b.parent.Upload(ctx, globalMarkPath, bytes.NewReader(body)); err != nil {
4242
return err
4343
}
4444

4545
// Upload it to the original location too.
46-
return b.parent.Upload(ctx, name, bytes.NewBuffer(body))
46+
return b.parent.Upload(ctx, name, bytes.NewReader(body))
4747
}
4848

4949
// Delete implements objstore.Bucket.

Diff for: pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go

+46
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package bucketindex
33
import (
44
"bytes"
55
"context"
6+
"errors"
67
"strings"
78
"testing"
89

10+
"github.com/go-kit/log"
911
"github.com/oklog/ulid"
1012
"github.com/prometheus/client_golang/prometheus"
1113
"github.com/prometheus/client_golang/prometheus/testutil"
@@ -14,6 +16,8 @@ import (
1416
"github.com/thanos-io/objstore"
1517
"github.com/thanos-io/thanos/pkg/block/metadata"
1618

19+
"github.com/cortexproject/cortex/pkg/storage/bucket/s3"
20+
1721
"github.com/cortexproject/cortex/pkg/storage/bucket"
1822
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
1923
)
@@ -161,6 +165,48 @@ func TestGlobalMarkersBucket_isMark(t *testing.T) {
161165
}
162166
}
163167

168+
func TestBucketWithGlobalMarkers_ShouldRetryUpload(t *testing.T) {
169+
ctx := context.Background()
170+
block1 := ulid.MustNew(1, nil)
171+
172+
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
173+
174+
// Fail the global markers and the non-global marker
175+
prefixErrors := []string{"", "marker"}
176+
177+
tests := []struct {
178+
mark string
179+
globalpath string
180+
}{
181+
{
182+
mark: metadata.DeletionMarkFilename,
183+
globalpath: "markers/" + block1.String() + "-deletion-mark.json",
184+
},
185+
{
186+
mark: metadata.NoCompactMarkFilename,
187+
globalpath: "markers/" + block1.String() + "-no-compact-mark.json",
188+
},
189+
}
190+
191+
for _, tc := range tests {
192+
for _, p := range prefixErrors {
193+
t.Run(tc.mark+"/"+p, func(t *testing.T) {
194+
mBucket := &cortex_testutil.MockBucketFailure{
195+
Bucket: bkt,
196+
UploadFailures: map[string]error{p: errors.New("test")},
197+
}
198+
bkt, _ = s3.NewBucketWithRetries(mBucket, 5, 0, 0, log.NewNopLogger())
199+
bkt = BucketWithGlobalMarkers(bkt)
200+
originalPath := block1.String() + "/" + tc.mark
201+
err := bkt.Upload(ctx, originalPath, strings.NewReader("{}"))
202+
require.Equal(t, errors.New("test"), err)
203+
require.Equal(t, mBucket.UploadCalls.Load(), int32(5))
204+
})
205+
}
206+
207+
}
208+
}
209+
164210
func TestBucketWithGlobalMarkers_ShouldWorkCorrectlyWithBucketMetrics(t *testing.T) {
165211
reg := prometheus.NewPedanticRegistry()
166212
ctx := context.Background()

Diff for: pkg/storage/tsdb/testutil/objstore.go

+20
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/pkg/errors"
1111
"github.com/stretchr/testify/require"
1212
"github.com/thanos-io/objstore"
13+
"go.uber.org/atomic"
1314

1415
"github.com/cortexproject/cortex/pkg/util"
1516

@@ -37,6 +38,10 @@ type MockBucketFailure struct {
3738

3839
DeleteFailures []string
3940
GetFailures map[string]error
41+
UploadFailures map[string]error
42+
43+
UploadCalls atomic.Int32
44+
GetCalls atomic.Int32
4045
}
4146

4247
func (m *MockBucketFailure) Delete(ctx context.Context, name string) error {
@@ -47,6 +52,7 @@ func (m *MockBucketFailure) Delete(ctx context.Context, name string) error {
4752
}
4853

4954
func (m *MockBucketFailure) Get(ctx context.Context, name string) (io.ReadCloser, error) {
55+
m.GetCalls.Add(1)
5056
for prefix, err := range m.GetFailures {
5157
if strings.HasPrefix(name, prefix) {
5258
return nil, err
@@ -59,6 +65,20 @@ func (m *MockBucketFailure) Get(ctx context.Context, name string) (io.ReadCloser
5965
return m.Bucket.Get(ctx, name)
6066
}
6167

68+
func (m *MockBucketFailure) Upload(ctx context.Context, name string, r io.Reader) error {
69+
m.UploadCalls.Add(1)
70+
for prefix, err := range m.UploadFailures {
71+
if strings.HasPrefix(name, prefix) {
72+
return err
73+
}
74+
}
75+
if e, ok := m.GetFailures[name]; ok {
76+
return e
77+
}
78+
79+
return m.Bucket.Upload(ctx, name, r)
80+
}
81+
6282
func (m *MockBucketFailure) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.Bucket {
6383
if ibkt, ok := m.Bucket.(objstore.InstrumentedBucket); ok {
6484
return &MockBucketFailure{Bucket: ibkt.WithExpectedErrs(expectedFunc), DeleteFailures: m.DeleteFailures, GetFailures: m.GetFailures}

0 commit comments

Comments
 (0)