Skip to content

Commit c83a26d

Browse files
authored
[Experimental] Signing write requests (#5430)
* Signing write requests Signed-off-by: Alan Protasio <[email protected]> * using tenant package Signed-off-by: Alan Protasio <[email protected]> * lint Signed-off-by: Alan Protasio <[email protected]> * changelog Signed-off-by: Alan Protasio <[email protected]> * Signing the other fields of the requests Signed-off-by: Alan Protasio <[email protected]> --------- Signed-off-by: Alan Protasio <[email protected]>
1 parent 2a1c3c4 commit c83a26d

File tree

11 files changed

+404
-7
lines changed

11 files changed

+404
-7
lines changed

Diff for: CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* [ENHANCEMENT] Store Gateway: Add new metrics `cortex_bucket_store_sent_chunk_size_bytes`, `cortex_bucket_store_postings_size_bytes` and `cortex_bucket_store_empty_postings_total`. #5397
3636
* [ENHANCEMENT] Add jitter to lifecycler heartbeat. #5404
3737
* [ENHANCEMENT] Store Gateway: Add config `estimated_max_series_size_bytes` and `estimated_max_chunk_size_bytes` to address data overfetch. #5401
38+
* [ENHANCEMENT] Distributor/Ingester: Add experimental `-distributor.sign_write_requests` flag to sign the write requests. #5430
3839
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
3940
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
4041
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293

Diff for: docs/configuration/config-file-reference.md

+5
Original file line numberDiff line numberDiff line change
@@ -2196,6 +2196,11 @@ ha_tracker:
21962196
# CLI flag: -distributor.extend-writes
21972197
[extend_writes: <boolean> | default = true]
21982198
2199+
# EXPERIMENTAL: If enabled, sign the write request between distributors and
2200+
# ingesters.
2201+
# CLI flag: -distributor.sign-write-requests
2202+
[sign_write_requests: <boolean> | default = false]
2203+
21992204
ring:
22002205
kvstore:
22012206
# Backend storage to use for the ring. Supported values are: consul, etcd,

Diff for: go.mod

+4-2
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,10 @@ require (
7676
sigs.k8s.io/yaml v1.3.0
7777
)
7878

79-
require github.com/google/go-cmp v0.5.9
79+
require (
80+
github.com/cespare/xxhash/v2 v2.2.0
81+
github.com/google/go-cmp v0.5.9
82+
)
8083

8184
require (
8285
cloud.google.com/go v0.110.0 // indirect
@@ -109,7 +112,6 @@ require (
109112
github.com/beorn7/perks v1.0.1 // indirect
110113
github.com/blang/semver/v4 v4.0.0 // indirect
111114
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
112-
github.com/cespare/xxhash/v2 v2.2.0 // indirect
113115
github.com/coreos/go-semver v0.3.0 // indirect
114116
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
115117
github.com/davecgh/go-spew v1.1.1 // indirect

Diff for: pkg/cortex/cortex.go

+10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"google.golang.org/grpc/health/grpc_health_v1"
2222
"gopkg.in/yaml.v2"
2323

24+
"github.com/cortexproject/cortex/pkg/util/grpcclient"
25+
2426
"github.com/cortexproject/cortex/pkg/alertmanager"
2527
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
2628
"github.com/cortexproject/cortex/pkg/api"
@@ -355,6 +357,7 @@ func New(cfg Config) (*Cortex, error) {
355357

356358
cortex.setupThanosTracing()
357359
cortex.setupGRPCHeaderForwarding()
360+
cortex.setupRequestSigning()
358361

359362
if err := cortex.setupModuleManager(); err != nil {
360363
return nil, err
@@ -379,6 +382,13 @@ func (t *Cortex) setupGRPCHeaderForwarding() {
379382
}
380383
}
381384

385+
func (t *Cortex) setupRequestSigning() {
386+
if t.Cfg.Distributor.SignWriteRequestsEnabled {
387+
util_log.WarnExperimentalUse("Distributor SignWriteRequestsEnabled")
388+
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcclient.UnarySigningServerInterceptor)
389+
}
390+
}
391+
382392
// Run starts Cortex running, and blocks until a Cortex stops.
383393
func (t *Cortex) Run() error {
384394
// Register custom process metrics.

Diff for: pkg/cortex/modules.go

+1
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ func (t *Cortex) initOverridesExporter() (services.Service, error) {
203203
func (t *Cortex) initDistributorService() (serv services.Service, err error) {
204204
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
205205
t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.ShuffleShardingIngestersLookbackPeriod
206+
t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsEnabled = t.Cfg.Distributor.SignWriteRequestsEnabled
206207

207208
// Check whether the distributor can join the distributors ring, which is
208209
// whenever it's not running as an internal dependency (ie. querier or

Diff for: pkg/cortexpb/extensions.go

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package cortexpb
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"sync"
8+
9+
"github.com/cespare/xxhash/v2"
10+
11+
"github.com/cortexproject/cortex/pkg/tenant"
12+
)
13+
14+
const maxBufferSize = 1024
15+
const signVersion = "v1"
16+
17+
var signerPool = sync.Pool{
18+
New: func() interface{} {
19+
return newSigner()
20+
},
21+
}
22+
23+
type signer struct {
24+
h *xxhash.Digest
25+
b []byte
26+
optimized bool
27+
}
28+
29+
func newSigner() *signer {
30+
s := &signer{
31+
h: xxhash.New(),
32+
b: make([]byte, 0, maxBufferSize),
33+
}
34+
s.Reset()
35+
return s
36+
}
37+
38+
func (s *signer) Reset() {
39+
s.h.Reset()
40+
s.b = s.b[:0]
41+
s.optimized = true
42+
}
43+
44+
func (s *signer) WriteString(val string) {
45+
switch {
46+
case !s.optimized:
47+
_, _ = s.h.WriteString(val)
48+
case len(s.b)+len(val) > cap(s.b):
49+
// If labels val does not fit in the []byte we fall back to not allocate the whole entry.
50+
_, _ = s.h.Write(s.b)
51+
_, _ = s.h.WriteString(val)
52+
s.optimized = false
53+
default:
54+
// Use xxhash.Sum64(b) for fast path as it's faster.
55+
s.b = append(s.b, val...)
56+
}
57+
}
58+
59+
func (s *signer) Sum64() uint64 {
60+
if s.optimized {
61+
return xxhash.Sum64(s.b)
62+
}
63+
64+
return s.h.Sum64()
65+
}
66+
67+
func (w *WriteRequest) VerifySign(ctx context.Context, signature string) (bool, error) {
68+
s, err := w.Sign(ctx)
69+
return s == signature, err
70+
}
71+
72+
func (w *WriteRequest) Sign(ctx context.Context) (string, error) {
73+
u, err := tenant.TenantID(ctx)
74+
if err != nil {
75+
return "", err
76+
}
77+
78+
s := signerPool.Get().(*signer)
79+
defer func() {
80+
s.Reset()
81+
signerPool.Put(s)
82+
}()
83+
s.WriteString(u)
84+
85+
for _, md := range w.Metadata {
86+
s.WriteString(strconv.Itoa(int(md.Type)))
87+
s.WriteString(md.MetricFamilyName)
88+
s.WriteString(md.Help)
89+
s.WriteString(md.Unit)
90+
}
91+
92+
for _, ts := range w.Timeseries {
93+
for _, lbl := range ts.Labels {
94+
s.WriteString(lbl.Name)
95+
s.WriteString(lbl.Value)
96+
}
97+
98+
for _, ex := range ts.Exemplars {
99+
for _, lbl := range ex.Labels {
100+
s.WriteString(lbl.Name)
101+
s.WriteString(lbl.Value)
102+
}
103+
}
104+
}
105+
106+
return fmt.Sprintf("%v/%v", signVersion, s.Sum64()), nil
107+
}

Diff for: pkg/cortexpb/extensions_test.go

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package cortexpb
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
"github.com/weaveworks/common/user"
11+
)
12+
13+
func TestWriteRequest_Sign(t *testing.T) {
14+
ctx := context.Background()
15+
ctx = user.InjectOrgID(ctx, "user-1")
16+
17+
tests := map[string]struct {
18+
w *WriteRequest
19+
expectedSign string
20+
}{
21+
"small write with exemplar": {
22+
w: createWriteRequest(10, true, "family1", "help1", "unit"),
23+
expectedSign: "v1/9125893422459502203",
24+
},
25+
"small write with exemplar and changed md": {
26+
w: createWriteRequest(10, true, "family2", "help1", "unit"),
27+
expectedSign: "v1/18044786562323437562",
28+
},
29+
"small write without exemplar": {
30+
w: createWriteRequest(10, false, "family1", "help1", "unit"),
31+
expectedSign: "v1/7697478040597284323",
32+
},
33+
"big write with exemplar": {
34+
w: createWriteRequest(10000, true, "family1", "help1", "unit"),
35+
expectedSign: "v1/18402783317092766507",
36+
},
37+
"big write without exemplar": {
38+
w: createWriteRequest(10000, false, "family1", "help1", "unit"),
39+
expectedSign: "v1/14973071954515615892",
40+
},
41+
}
42+
43+
for name, tc := range tests {
44+
t.Run(name, func(t *testing.T) {
45+
// running multiple times in parallel to make sure no race
46+
itNumber := 1000
47+
wg := sync.WaitGroup{}
48+
wg.Add(itNumber)
49+
for i := 0; i < itNumber; i++ {
50+
go func() {
51+
defer wg.Done()
52+
s, err := tc.w.Sign(ctx)
53+
require.NoError(t, err)
54+
// Make sure this sign doesn't change
55+
require.Equal(t, tc.expectedSign, s)
56+
}()
57+
}
58+
wg.Wait()
59+
})
60+
}
61+
}
62+
63+
func createWriteRequest(numTs int, exemplar bool, family string, help string, unit string) *WriteRequest {
64+
w := &WriteRequest{}
65+
w.Metadata = []*MetricMetadata{
66+
{
67+
MetricFamilyName: family,
68+
Help: help,
69+
Unit: unit,
70+
},
71+
}
72+
73+
for i := 0; i < numTs; i++ {
74+
w.Timeseries = append(w.Timeseries, PreallocTimeseries{
75+
TimeSeries: &TimeSeries{
76+
Labels: []LabelAdapter{
77+
{
78+
Name: fmt.Sprintf("Name-%v", i),
79+
Value: fmt.Sprintf("Value-%v", i),
80+
},
81+
},
82+
},
83+
})
84+
85+
if exemplar {
86+
w.Timeseries[i].Exemplars = []Exemplar{
87+
{
88+
Labels: []LabelAdapter{
89+
{
90+
Name: fmt.Sprintf("Ex-Name-%v", i),
91+
Value: fmt.Sprintf("Ex-Value-%v", i),
92+
},
93+
},
94+
},
95+
}
96+
}
97+
}
98+
99+
return w
100+
}

Diff for: pkg/distributor/distributor.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,10 @@ type Config struct {
127127
RemoteTimeout time.Duration `yaml:"remote_timeout"`
128128
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`
129129

130-
ShardingStrategy string `yaml:"sharding_strategy"`
131-
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
132-
ExtendWrites bool `yaml:"extend_writes"`
130+
ShardingStrategy string `yaml:"sharding_strategy"`
131+
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
132+
ExtendWrites bool `yaml:"extend_writes"`
133+
SignWriteRequestsEnabled bool `yaml:"sign_write_requests"`
133134

134135
// Distributors ring
135136
DistributorRing RingConfig `yaml:"ring"`
@@ -163,6 +164,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
163164
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
164165
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
165166
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
167+
f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.")
166168
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
167169
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")
168170

@@ -181,6 +183,7 @@ func (cfg *Config) Validate(limits validation.Limits) error {
181183
}
182184

183185
haHATrackerConfig := cfg.HATrackerConfig.ToHATrackerConfig()
186+
184187
return haHATrackerConfig.Validate()
185188
}
186189

Diff for: pkg/util/grpcclient/grpcclient.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ type Config struct {
2929
BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"`
3030
BackoffConfig backoff.Config `yaml:"backoff_config"`
3131

32-
TLSEnabled bool `yaml:"tls_enabled"`
33-
TLS tls.ClientConfig `yaml:",inline"`
32+
TLSEnabled bool `yaml:"tls_enabled"`
33+
TLS tls.ClientConfig `yaml:",inline"`
34+
SignWriteRequestsEnabled bool `yaml:"-"`
3435
}
3536

3637
// RegisterFlags registers flags.
@@ -91,6 +92,10 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep
9192
unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewRateLimiter(cfg)}, unaryClientInterceptors...)
9293
}
9394

95+
if cfg.SignWriteRequestsEnabled {
96+
unaryClientInterceptors = append(unaryClientInterceptors, UnarySigningClientInterceptor)
97+
}
98+
9499
return append(
95100
opts,
96101
grpc.WithDefaultCallOptions(cfg.CallOptions()...),

0 commit comments

Comments
 (0)