Skip to content

Commit ff3f3e3

Browse files
authored
Fix bug where adding/removing instance with RF > len(zones) will introduce more than one member change in Get replication set (#5429)
* Fix bug where adding/removing instance with RF > len(zones) will introduce more than one member change in Get replication set Signed-off-by: Justin Jung <[email protected]> * Add tests Signed-off-by: Justin Jung <[email protected]> * Add changelog Signed-off-by: Justin Jung <[email protected]> * Add more tests Signed-off-by: Justin Jung <[email protected]> * Update variable name Signed-off-by: Justin Jung <[email protected]> * Add extended replica set test for ring.Get Signed-off-by: Justin Jung <[email protected]> * Fix batch benchmark tests Signed-off-by: Justin Jung <[email protected]> --------- Signed-off-by: Justin Jung <[email protected]>
1 parent 4dd7c34 commit ff3f3e3

File tree

7 files changed

+454
-24
lines changed

7 files changed

+454
-24
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
* [BUGFIX] Ring: Allow RF greater than number of zones to select more than one instance per zone #5411
4444
* [BUGFIX] Distributor: Fix potential data corruption in cases of timeout between distributors and ingesters. #5422
4545
* [BUGFIX] Store Gateway: Fix bug in store gateway ring comparison logic. #5426
46+
* [BUGFIX] Ring: Fix bug in consistency of Get func in a scaling zone-aware ring. #5429
4647

4748
## 1.15.1 2023-04-26
4849

pkg/compactor/shuffle_sharding_grouper_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,7 @@ func (r *RingMock) Collect(ch chan<- prometheus.Metric) {}
751751

752752
func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {}
753753

754-
func (r *RingMock) Get(key uint32, op ring.Operation, bufDescs []ring.InstanceDesc, bufHosts, bufZones []string) (ring.ReplicationSet, error) {
754+
func (r *RingMock) Get(key uint32, op ring.Operation, bufDescs []ring.InstanceDesc, bufHosts []string, bufZones map[string]int) (ring.ReplicationSet, error) {
755755
args := r.Called(key, op, bufDescs, bufHosts, bufZones)
756756
return args.Get(0).(ring.ReplicationSet), args.Error(1)
757757
}

pkg/ring/batch.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,10 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb
7878
var (
7979
bufDescs [GetBufferSize]InstanceDesc
8080
bufHosts [GetBufferSize]string
81-
bufZones [GetBufferSize]string
81+
bufZones = make(map[string]int, GetZoneSize)
8282
)
8383
for i, key := range keys {
84-
replicationSet, err := r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0])
84+
replicationSet, err := r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones)
8585
if err != nil {
8686
cleanup()
8787
return err

pkg/ring/ring.go

+34-15
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ const (
3131
// GetBufferSize is the suggested size of buffers passed to Ring.Get(). It's based on
3232
// a typical replication factor 3, plus extra room for a JOINING + LEAVING instance.
3333
GetBufferSize = 5
34+
35+
// GetZoneSize is the suggested size of zone map passed to Ring.Get(). It's based on
36+
// a typical replication factor 3.
37+
GetZoneSize = 3
3438
)
3539

3640
// ReadRing represents the read interface to the ring.
@@ -39,7 +43,7 @@ type ReadRing interface {
3943
// Get returns n (or more) instances which form the replicas for the given key.
4044
// bufDescs, bufHosts and bufZones are slices to be overwritten for the return value
4145
// to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().
42-
Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error)
46+
Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones map[string]int) (ReplicationSet, error)
4347

4448
// GetAllHealthy returns all healthy instances in the ring, for the given operation.
4549
// This function doesn't check if the quorum is honored, so doesn't fail if the number
@@ -340,24 +344,30 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
340344
}
341345

342346
// Get returns n (or more) instances which form the replicas for the given key.
343-
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
347+
// This implementation guarantees:
348+
// - Stability: given the same ring, two invocations returns the same set for same operation.
349+
// - Consistency: adding/removing 1 instance from the ring returns set with no more than 1 difference for same operation.
350+
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones map[string]int) (ReplicationSet, error) {
344351
r.mtx.RLock()
345352
defer r.mtx.RUnlock()
346353
if r.ringDesc == nil || len(r.ringTokens) == 0 {
347354
return ReplicationSet{}, ErrEmptyRing
348355
}
349356

350357
var (
351-
replicationFactor = r.cfg.ReplicationFactor
352-
instances = bufDescs[:0]
353-
start = searchToken(r.ringTokens, key)
354-
iterations = 0
358+
replicationFactor = r.cfg.ReplicationFactor
359+
instances = bufDescs[:0]
360+
start = searchToken(r.ringTokens, key)
361+
iterations = 0
362+
maxInstancePerZone = replicationFactor / len(r.ringZones)
363+
zonesWithExtraInstance = replicationFactor % len(r.ringZones)
355364

356365
// We use a slice instead of a map because it's faster to search within a
357366
// slice than lookup a map for a very low number of items.
358-
distinctHosts = bufHosts[:0]
359-
distinctZones = bufZones[:0]
367+
distinctHosts = bufHosts[:0]
368+
numOfInstanceByZone = resetZoneMap(bufZones)
360369
)
370+
361371
for i := start; len(distinctHosts) < replicationFactor && iterations < len(r.ringTokens); i++ {
362372
iterations++
363373
// Wrap i around in the ring.
@@ -370,14 +380,20 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
370380
return ReplicationSet{}, ErrInconsistentTokensInfo
371381
}
372382

373-
// We want n *distinct* instances && distinct zones.
383+
// We want n *distinct* instances.
374384
if util.StringsContain(distinctHosts, info.InstanceID) {
375385
continue
376386
}
377387

378388
// Ignore if the instances don't have a zone set.
379389
if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
380-
if util.StringsContain(distinctZones, info.Zone) {
390+
maxNumOfInstance := maxInstancePerZone
391+
// If we still have room for zones with extra instance, increase the instance threshold by 1
392+
if zonesWithExtraInstance > 0 {
393+
maxNumOfInstance++
394+
}
395+
396+
if numOfInstanceByZone[info.Zone] >= maxNumOfInstance {
381397
continue
382398
}
383399
}
@@ -392,11 +408,14 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
392408
} else if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
393409
// We should only add the zone if we are not going to extend,
394410
// as we want to extend the instance in the same AZ.
395-
distinctZones = append(distinctZones, info.Zone)
396-
397-
if len(distinctZones) == len(r.ringZones) {
398-
// reset the zones to repeatedly get hosts from distinct zones
399-
distinctZones = distinctZones[:0]
411+
if numOfInstance, ok := numOfInstanceByZone[info.Zone]; !ok {
412+
numOfInstanceByZone[info.Zone] = 1
413+
} else if numOfInstance < maxInstancePerZone {
414+
numOfInstanceByZone[info.Zone]++
415+
} else {
416+
// This zone will have an extra instance
417+
numOfInstanceByZone[info.Zone]++
418+
zonesWithExtraInstance--
400419
}
401420
}
402421

0 commit comments

Comments
 (0)