Skip to content

Commit 3d24367

Browse files
authored
Add percentage based sharding to rulers (#6680)
* Add support for percentage based shard size Signed-off-by: Wilbert Guo <[email protected]> * Refactor implementation and add unit tests Signed-off-by: Wilbert Guo <[email protected]> * Update CHANGELOG Signed-off-by: Wilbert Guo <[email protected]> * Add support for percentage based shard size Signed-off-by: Wilbert Guo <[email protected]> * Refactor implementation and add unit tests Signed-off-by: Wilbert Guo <[email protected]> * Update configuration doc Signed-off-by: Wilbert Guo <[email protected]> * Remove wrong rebase change Signed-off-by: Wilbert Guo <[email protected]> * Refactor shard size logic and add more unit tests Signed-off-by: Wilbert Guo <[email protected]> * Minor refactoring Signed-off-by: Wilbert Guo <[email protected]> * Update CHANGELOG to trigger test re-run Signed-off-by: Wilbert Guo <[email protected]> * Remove default shardsize value Signed-off-by: Wilbert Guo <[email protected]> * Addressing PR comments Signed-off-by: Wilbert Guo <[email protected]> * Minor refactor Signed-off-by: Wilbert Guo <[email protected]> * Minor comment update Signed-off-by: Wilbert Guo <[email protected]> * Update documentation Signed-off-by: Wilbert Guo <[email protected]> * Update docs Signed-off-by: Wilbert Guo <[email protected]> --------- Signed-off-by: Wilbert Guo <[email protected]>
1 parent 3109c34 commit 3d24367

File tree

6 files changed

+168
-12
lines changed

6 files changed

+168
-12
lines changed

Diff for: CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
77
* [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590
88
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
9+
* [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680
910
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. #6681
1011
* [ENHANCEMENT] Query Frontend: Add new limit `-frontend.max-query-response-size` for total query response size after decompression in query frontend. #6607
1112
* [ENHANCEMENT] Alertmanager: Add nflog and silences maintenance metrics. #6659

Diff for: docs/configuration/config-file-reference.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -3635,9 +3635,10 @@ query_rejection:
36353635

36363636
# The default tenant's shard size when the shuffle-sharding strategy is used by
36373637
# ruler. When this setting is specified in the per-tenant overrides, a value of
3638-
# 0 disables shuffle sharding for the tenant.
3638+
# 0 disables shuffle sharding for the tenant. If the value is < 1 the shard size
3639+
# will be a percentage of the total rulers.
36393640
# CLI flag: -ruler.tenant-shard-size
3640-
[ruler_tenant_shard_size: <int> | default = 0]
3641+
[ruler_tenant_shard_size: <float> | default = 0]
36413642

36423643
# Maximum number of rules per rule group per-tenant. 0 to disable.
36433644
# CLI flag: -ruler.max-rules-per-rule-group

Diff for: pkg/ruler/compat.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func (t *PusherAppendable) Appender(ctx context.Context) storage.Appender {
155155
// RulesLimits defines limits used by Ruler.
156156
type RulesLimits interface {
157157
MaxQueryLength(userID string) time.Duration
158-
RulerTenantShardSize(userID string) int
158+
RulerTenantShardSize(userID string) float64
159159
RulerMaxRuleGroupsPerTenant(userID string) int
160160
RulerMaxRulesPerRuleGroup(userID string) int
161161
RulerQueryOffset(userID string) time.Duration

Diff for: pkg/ruler/ruler.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -857,7 +857,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
857857
userRings := map[string]ring.ReadRing{}
858858
for _, u := range users {
859859
if shardSize := r.limits.RulerTenantShardSize(u); shardSize > 0 {
860-
subRing := r.ring.ShuffleShard(u, shardSize)
860+
subRing := r.ring.ShuffleShard(u, r.getShardSizeForUser(u))
861861

862862
// Include the user only if it belongs to this ruler shard.
863863
if subRing.HasInstance(r.lifecycler.GetInstanceID()) {
@@ -1325,11 +1325,18 @@ func (r *Ruler) ruleGroupListToGroupStateDesc(userID string, backupGroups rulesp
13251325
return groupDescs, nil
13261326
}
13271327

1328+
func (r *Ruler) getShardSizeForUser(userID string) int {
1329+
newShardSize := util.DynamicShardSize(r.limits.RulerTenantShardSize(userID), r.ring.InstancesCount())
1330+
1331+
// We want to guarantee that shard size will be at least replication factor
1332+
return max(newShardSize, r.cfg.Ring.ReplicationFactor)
1333+
}
1334+
13281335
func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest RulesRequest) (*RulesResponse, error) {
13291336
ring := ring.ReadRing(r.ring)
13301337

13311338
if shardSize := r.limits.RulerTenantShardSize(userID); shardSize > 0 && r.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
1332-
ring = r.ring.ShuffleShard(userID, shardSize)
1339+
ring = r.ring.ShuffleShard(userID, r.getShardSizeForUser(userID))
13331340
}
13341341

13351342
rulers, failedZones, err := GetReplicationSetForListRule(ring, &r.cfg.Ring)

Diff for: pkg/ruler/ruler_test.go

+151-4
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func defaultRulerConfig(t testing.TB) Config {
8787

8888
type ruleLimits struct {
8989
mtx sync.RWMutex
90-
tenantShard int
90+
tenantShard float64
9191
maxRulesPerRuleGroup int
9292
maxRuleGroups int
9393
disabledRuleGroups validation.DisabledRuleGroups
@@ -102,7 +102,7 @@ func (r *ruleLimits) setRulerExternalLabels(lset labels.Labels) {
102102
r.mtx.Unlock()
103103
}
104104

105-
func (r *ruleLimits) RulerTenantShardSize(_ string) int {
105+
func (r *ruleLimits) RulerTenantShardSize(_ string) float64 {
106106
r.mtx.RLock()
107107
defer r.mtx.RUnlock()
108108
return r.tenantShard
@@ -630,7 +630,7 @@ func TestGetRules(t *testing.T) {
630630
type testCase struct {
631631
sharding bool
632632
shardingStrategy string
633-
shuffleShardSize int
633+
shuffleShardSize float64
634634
rulesRequest RulesRequest
635635
expectedCount map[string]int
636636
expectedClientCallCount int
@@ -1887,7 +1887,7 @@ func TestSharding(t *testing.T) {
18871887
sharding bool
18881888
shardingStrategy string
18891889
replicationFactor int
1890-
shuffleShardSize int
1890+
shuffleShardSize float64
18911891
setupRing func(*ring.Desc)
18921892
enabledUsers []string
18931893
disabledUsers []string
@@ -3104,3 +3104,150 @@ func TestRuler_QueryOffset(t *testing.T) {
31043104
gotOffset = rg.GetGroup().QueryOffset
31053105
require.Equal(t, time.Minute*2, *gotOffset)
31063106
}
3107+
3108+
func TestGetShardSizeForUser(t *testing.T) {
3109+
tests := []struct {
3110+
name string
3111+
userID string
3112+
replicationFactor int
3113+
rulerInstanceCount int
3114+
tenantShardSize float64
3115+
expectedShardSize int
3116+
}{
3117+
{
3118+
name: "User with fixed shard size with 10 ruler instances",
3119+
userID: "user1",
3120+
rulerInstanceCount: 10,
3121+
replicationFactor: 1,
3122+
tenantShardSize: 2,
3123+
expectedShardSize: 2,
3124+
},
3125+
{
3126+
name: "User with fixed shard size with 50 ruler instances",
3127+
userID: "user1",
3128+
rulerInstanceCount: 50,
3129+
replicationFactor: 1,
3130+
tenantShardSize: 30,
3131+
expectedShardSize: 30,
3132+
},
3133+
{
3134+
name: "User with percentage shard size with 10 ruler instances",
3135+
userID: "user1",
3136+
rulerInstanceCount: 10,
3137+
replicationFactor: 1,
3138+
tenantShardSize: 0.6,
3139+
expectedShardSize: 6,
3140+
},
3141+
{
3142+
name: "User with percentage shard size with 80 ruler instances",
3143+
userID: "user1",
3144+
rulerInstanceCount: 80,
3145+
replicationFactor: 1,
3146+
tenantShardSize: 0.25,
3147+
expectedShardSize: 20,
3148+
},
3149+
{
3150+
name: "Ensure shard size is at least replication factor",
3151+
userID: "user1",
3152+
rulerInstanceCount: 10,
3153+
replicationFactor: 3,
3154+
tenantShardSize: 0.1,
3155+
expectedShardSize: 3,
3156+
},
3157+
}
3158+
3159+
for _, tc := range tests {
3160+
t.Run(tc.name, func(t *testing.T) {
3161+
3162+
rulerStateMap := make(map[string]ring.InstanceState)
3163+
rulerAZEvenSpread := make(map[string]string)
3164+
rulerIDs := make([]string, tc.rulerInstanceCount)
3165+
3166+
for i := 0; i < tc.rulerInstanceCount; i++ {
3167+
rulerID := fmt.Sprintf("ruler%d", i+1)
3168+
rulerIDs[i] = rulerID
3169+
rulerStateMap[rulerID] = ring.ACTIVE
3170+
rulerAZEvenSpread[rulerID] = string(rune('a' + i%3))
3171+
}
3172+
3173+
kvStore, cleanUp := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
3174+
t.Cleanup(func() { assert.NoError(t, cleanUp.Close()) })
3175+
allRulesByUser := map[string]rulespb.RuleGroupList{}
3176+
allTokensByRuler := map[string][]uint32{}
3177+
rulerAddrMap := map[string]*Ruler{}
3178+
3179+
createRuler := func(id string) *Ruler {
3180+
store := newMockRuleStore(allRulesByUser, nil)
3181+
cfg := defaultRulerConfig(t)
3182+
3183+
cfg.ShardingStrategy = util.ShardingStrategyShuffle
3184+
cfg.EnableSharding = true
3185+
cfg.EnableHAEvaluation = false
3186+
cfg.EvaluationInterval = 5 * time.Minute
3187+
3188+
cfg.Ring = RingConfig{
3189+
InstanceID: id,
3190+
InstanceAddr: id,
3191+
KVStore: kv.Config{
3192+
Mock: kvStore,
3193+
},
3194+
ReplicationFactor: tc.replicationFactor,
3195+
ZoneAwarenessEnabled: true,
3196+
InstanceZone: rulerAZEvenSpread[id],
3197+
}
3198+
3199+
r, _ := buildRuler(t, cfg, nil, store, rulerAddrMap)
3200+
r.limits = &ruleLimits{tenantShard: tc.tenantShardSize}
3201+
rulerAddrMap[id] = r
3202+
if r.ring != nil {
3203+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r.ring))
3204+
t.Cleanup(r.ring.StopAsync)
3205+
}
3206+
return r
3207+
}
3208+
3209+
var testRuler *Ruler
3210+
// Create rulers and ensure they join the ring
3211+
for _, rID := range rulerIDs {
3212+
r := createRuler(rID)
3213+
testRuler = r
3214+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r.lifecycler))
3215+
}
3216+
3217+
err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
3218+
d, _ := in.(*ring.Desc)
3219+
if d == nil {
3220+
d = ring.NewDesc()
3221+
}
3222+
for rID, tokens := range allTokensByRuler {
3223+
d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), rulerAddrMap[rID].lifecycler.GetInstanceZone(), tokens, ring.ACTIVE, time.Now())
3224+
}
3225+
return d, true, nil
3226+
})
3227+
require.NoError(t, err)
3228+
// Wait a bit to make sure ruler's ring is updated.
3229+
time.Sleep(100 * time.Millisecond)
3230+
3231+
// Check the ring state
3232+
ringDesc, err := kvStore.Get(context.Background(), ringKey)
3233+
require.NoError(t, err)
3234+
require.NotNil(t, ringDesc)
3235+
desc := ringDesc.(*ring.Desc)
3236+
require.Equal(t, tc.rulerInstanceCount, len(desc.Ingesters))
3237+
3238+
forEachRuler := func(f func(rID string, r *Ruler)) {
3239+
for rID, r := range rulerAddrMap {
3240+
f(rID, r)
3241+
}
3242+
}
3243+
3244+
// Sync Rules
3245+
forEachRuler(func(_ string, r *Ruler) {
3246+
r.syncRules(context.Background(), rulerSyncReasonInitial)
3247+
})
3248+
3249+
result := testRuler.getShardSizeForUser(tc.userID)
3250+
assert.Equal(t, tc.expectedShardSize, result)
3251+
})
3252+
}
3253+
}

Diff for: pkg/util/validation/limits.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ type Limits struct {
183183

184184
// Ruler defaults and limits.
185185
RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"`
186-
RulerTenantShardSize int `yaml:"ruler_tenant_shard_size" json:"ruler_tenant_shard_size"`
186+
RulerTenantShardSize float64 `yaml:"ruler_tenant_shard_size" json:"ruler_tenant_shard_size"`
187187
RulerMaxRulesPerRuleGroup int `yaml:"ruler_max_rules_per_rule_group" json:"ruler_max_rules_per_rule_group"`
188188
RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant" json:"ruler_max_rule_groups_per_tenant"`
189189
RulerQueryOffset model.Duration `yaml:"ruler_query_offset" json:"ruler_query_offset"`
@@ -283,7 +283,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
283283
f.IntVar(&l.MaxOutstandingPerTenant, "frontend.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per request queue (either query frontend or query scheduler); requests beyond this error with HTTP 429.")
284284

285285
f.Var(&l.RulerEvaluationDelay, "ruler.evaluation-delay-duration", "Deprecated(use ruler.query-offset instead) and will be removed in v1.19.0: Duration to delay the evaluation of rules to ensure the underlying metrics have been pushed to Cortex.")
286-
f.IntVar(&l.RulerTenantShardSize, "ruler.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by ruler. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
286+
f.Float64Var(&l.RulerTenantShardSize, "ruler.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by ruler. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 the shard size will be a percentage of the total rulers.")
287287
f.IntVar(&l.RulerMaxRulesPerRuleGroup, "ruler.max-rules-per-rule-group", 0, "Maximum number of rules per rule group per-tenant. 0 to disable.")
288288
f.IntVar(&l.RulerMaxRuleGroupsPerTenant, "ruler.max-rule-groups-per-tenant", 0, "Maximum number of rule groups per-tenant. 0 to disable.")
289289
f.Var(&l.RulerQueryOffset, "ruler.query-offset", "Duration to offset all rule evaluation queries per-tenant.")
@@ -838,7 +838,7 @@ func (o *Overrides) MetricRelabelConfigs(userID string) []*relabel.Config {
838838
}
839839

840840
// RulerTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy.
841-
func (o *Overrides) RulerTenantShardSize(userID string) int {
841+
func (o *Overrides) RulerTenantShardSize(userID string) float64 {
842842
return o.GetOverridesForUser(userID).RulerTenantShardSize
843843
}
844844

0 commit comments

Comments
 (0)