-
Notifications
You must be signed in to change notification settings - Fork 817
/
Copy pathclient_test.go
133 lines (118 loc) · 3.66 KB
/
client_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package client
import (
"context"
"net/http/httptest"
"strconv"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util"
)
// TestMarshall is useful to try out various optimisation on the unmarshalling code.
func TestMarshall(t *testing.T) {
const numSeries = 10
recorder := httptest.NewRecorder()
{
req := cortexpb.WriteRequest{}
for i := 0; i < numSeries; i++ {
req.Timeseries = append(req.Timeseries, cortexpb.PreallocTimeseries{
TimeSeries: &cortexpb.TimeSeries{
Labels: []cortexpb.LabelAdapter{
{Name: "foo", Value: strconv.Itoa(i)},
},
Samples: []cortexpb.Sample{
{TimestampMs: int64(i), Value: float64(i)},
},
},
})
}
err := util.SerializeProtoResponse(recorder, &req, util.RawSnappy)
require.NoError(t, err)
}
{
const (
tooSmallSize = 1
plentySize = 1024 * 1024
)
req := cortexpb.WriteRequest{}
_, err := util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), tooSmallSize, &req, util.RawSnappy)
require.Error(t, err)
_, err = util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), plentySize, &req, util.RawSnappy)
require.NoError(t, err)
require.Equal(t, numSeries, len(req.Timeseries))
}
}
func TestClosableHealthAndIngesterClient_MaxInflightPushRequests(t *testing.T) {
t.Parallel()
tests := map[string]struct {
inflightPushRequests int64
maxInflightPushRequests int64
expectThrottle bool
}{
"no limit": {
inflightPushRequests: 1000,
maxInflightPushRequests: 0,
expectThrottle: false,
},
"inflight request is under limit": {
inflightPushRequests: 99,
maxInflightPushRequests: 100,
expectThrottle: false,
},
"inflight request hits limit": {
inflightPushRequests: 100,
maxInflightPushRequests: 100,
expectThrottle: true,
},
}
ctx := context.Background()
for testName, testData := range tests {
tData := testData
t.Run(testName, func(t *testing.T) {
t.Parallel()
client1 := createTestIngesterClient(tData.maxInflightPushRequests, tData.inflightPushRequests)
_, err := client1.Push(ctx, nil)
if tData.expectThrottle {
assert.ErrorIs(t, err, errTooManyInflightPushRequests)
} else {
assert.NoError(t, err)
}
client2 := createTestIngesterClient(tData.maxInflightPushRequests, tData.inflightPushRequests)
_, err = client2.PushPreAlloc(ctx, nil)
if tData.expectThrottle {
assert.ErrorIs(t, err, errTooManyInflightPushRequests)
} else {
assert.NoError(t, err)
}
})
}
}
func createTestIngesterClient(maxInflightPushRequests int64, currentInflightRequests int64) *closableHealthAndIngesterClient {
client := &closableHealthAndIngesterClient{
IngesterClient: &mockIngester{},
conn: &mockClientConn{},
addr: "dummy_addr",
maxInflightPushRequests: maxInflightPushRequests,
inflightPushRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"ingester"}),
}
client.inflightRequests.Add(currentInflightRequests)
return client
}
type mockIngester struct {
IngesterClient
}
func (m *mockIngester) Push(_ context.Context, _ *cortexpb.WriteRequest, _ ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
return &cortexpb.WriteResponse{}, nil
}
type mockClientConn struct {
ClosableClientConn
}
func (m *mockClientConn) Invoke(_ context.Context, _ string, _ any, _ any, _ ...grpc.CallOption) error {
return nil
}
func (m *mockClientConn) Close() error {
return nil
}