Skip to content

Commit fabe274

Browse files
authored
ringhash: Delegate subchannel creation to pickfirst (#8047)
1 parent 75c51bf commit fabe274

File tree

9 files changed

+1391
-1098
lines changed

9 files changed

+1391
-1098
lines changed

balancer/weightedroundrobin/weightedroundrobin.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ import (
3434
)
3535

3636
// attributeKey is the type used as the key to store AddrInfo in the
37-
// BalancerAttributes field of resolver.Address.
37+
// BalancerAttributes field of resolver.Address or Attributes field of
38+
// resolver.Endpoint.
3839
type attributeKey struct{}
3940

4041
// AddrInfo will be stored in the BalancerAttributes field of Address in order
@@ -71,6 +72,14 @@ func GetAddrInfo(addr resolver.Address) AddrInfo {
7172
return ai
7273
}
7374

75+
// AddrInfoFromEndpoint returns the AddrInfo stored in the Attributes field of
76+
// endpoint.
77+
func AddrInfoFromEndpoint(endpoint resolver.Endpoint) AddrInfo {
78+
v := endpoint.Attributes.Value(attributeKey{})
79+
ai, _ := v.(AddrInfo)
80+
return ai
81+
}
82+
7483
func (a AddrInfo) String() string {
7584
return fmt.Sprintf("Weight: %d", a.Weight)
7685
}

xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go

Lines changed: 646 additions & 99 deletions
Large diffs are not rendered by default.

xds/internal/balancer/ringhash/logging.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,3 @@ var logger = grpclog.Component("xds")
3232
func prefixLogger(p *ringhashBalancer) *internalgrpclog.PrefixLogger {
3333
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
3434
}
35-
36-
func subConnPrefixLogger(p *ringhashBalancer, sc *subConn) *internalgrpclog.PrefixLogger {
37-
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)+fmt.Sprintf("[subConn %p] ", sc))
38-
}

xds/internal/balancer/ringhash/picker.go

Lines changed: 27 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -22,140 +22,44 @@ import (
2222
"fmt"
2323

2424
"google.golang.org/grpc/balancer"
25-
"google.golang.org/grpc/codes"
2625
"google.golang.org/grpc/connectivity"
2726
"google.golang.org/grpc/internal/grpclog"
28-
"google.golang.org/grpc/status"
2927
)
3028

3129
type picker struct {
32-
ring *ring
33-
logger *grpclog.PrefixLogger
34-
subConnStates map[*subConn]connectivity.State
35-
}
36-
37-
func newPicker(ring *ring, logger *grpclog.PrefixLogger) *picker {
38-
states := make(map[*subConn]connectivity.State)
39-
for _, e := range ring.items {
40-
states[e.sc] = e.sc.effectiveState()
41-
}
42-
return &picker{ring: ring, logger: logger, subConnStates: states}
43-
}
44-
45-
// handleRICSResult is the return type of handleRICS. It's needed to wrap the
46-
// returned error from Pick() in a struct. With this, if the return values are
47-
// `balancer.PickResult, error, bool`, linter complains because error is not the
48-
// last return value.
49-
type handleRICSResult struct {
50-
pr balancer.PickResult
51-
err error
52-
}
53-
54-
// handleRICS generates pick result if the entry is in Ready, Idle, Connecting
55-
// or Shutdown. TransientFailure will be handled specifically after this
56-
// function returns.
57-
//
58-
// The first return value indicates if the state is in Ready, Idle, Connecting
59-
// or Shutdown. If it's true, the PickResult and error should be returned from
60-
// Pick() as is.
61-
func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) {
62-
switch state := p.subConnStates[e.sc]; state {
63-
case connectivity.Ready:
64-
return handleRICSResult{pr: balancer.PickResult{SubConn: e.sc.sc}}, true
65-
case connectivity.Idle:
66-
// Trigger Connect() and queue the pick.
67-
e.sc.queueConnect()
68-
return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true
69-
case connectivity.Connecting:
70-
return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true
71-
case connectivity.TransientFailure:
72-
// Return ok==false, so TransientFailure will be handled afterwards.
73-
return handleRICSResult{}, false
74-
case connectivity.Shutdown:
75-
// Shutdown can happen in a race where the old picker is called. A new
76-
// picker should already be sent.
77-
return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true
78-
default:
79-
// Should never reach this. All the connectivity states are already
80-
// handled in the cases.
81-
p.logger.Errorf("SubConn has undefined connectivity state: %v", state)
82-
return handleRICSResult{err: status.Errorf(codes.Unavailable, "SubConn has undefined connectivity state: %v", state)}, true
83-
}
30+
ring *ring
31+
logger *grpclog.PrefixLogger
32+
// endpointStates is a cache of endpoint connectivity states and pickers.
33+
// The ringhash balancer stores endpoint states in a `resolver.EndpointMap`,
34+
// with access guarded by `ringhashBalancer.mu`. The `endpointStates` cache
35+
// in the picker helps avoid locking the ringhash balancer's mutex when
36+
// reading the latest state at RPC time.
37+
endpointStates map[string]balancer.State // endpointState.firstAddr -> balancer.State
8438
}
8539

8640
func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
8741
e := p.ring.pick(getRequestHash(info.Ctx))
88-
if hr, ok := p.handleRICS(e); ok {
89-
return hr.pr, hr.err
90-
}
91-
// ok was false, the entry is in transient failure.
92-
return p.handleTransientFailure(e)
93-
}
94-
95-
func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, error) {
96-
// Queue a connect on the first picked SubConn.
97-
e.sc.queueConnect()
98-
99-
// Find next entry in the ring, skipping duplicate SubConns.
100-
e2 := nextSkippingDuplicates(p.ring, e)
101-
if e2 == nil {
102-
// There's no next entry available, fail the pick.
103-
return balancer.PickResult{}, fmt.Errorf("the only SubConn is in Transient Failure")
104-
}
105-
106-
// For the second SubConn, also check Ready/Idle/Connecting as if it's the
107-
// first entry.
108-
if hr, ok := p.handleRICS(e2); ok {
109-
return hr.pr, hr.err
110-
}
111-
112-
// The second SubConn is also in TransientFailure. Queue a connect on it.
113-
e2.sc.queueConnect()
114-
115-
// If it gets here, this is after the second SubConn, and the second SubConn
116-
// was in TransientFailure.
117-
//
118-
// Loop over all other SubConns:
119-
// - If all SubConns so far are all TransientFailure, trigger Connect() on
120-
// the TransientFailure SubConns, and keep going.
121-
// - If there's one SubConn that's not in TransientFailure, keep checking
122-
// the remaining SubConns (in case there's a Ready, which will be returned),
123-
// but don't not trigger Connect() on the other SubConns.
124-
var firstNonFailedFound bool
125-
for ee := nextSkippingDuplicates(p.ring, e2); ee != e; ee = nextSkippingDuplicates(p.ring, ee) {
126-
scState := p.subConnStates[ee.sc]
127-
if scState == connectivity.Ready {
128-
return balancer.PickResult{SubConn: ee.sc.sc}, nil
129-
}
130-
if firstNonFailedFound {
131-
continue
132-
}
133-
if scState == connectivity.TransientFailure {
134-
// This will queue a connect.
135-
ee.sc.queueConnect()
136-
continue
137-
}
138-
// This is a SubConn in a non-failure state. We continue to check the
139-
// other SubConns, but remember that there was a non-failed SubConn
140-
// seen. After this, Pick() will never trigger any SubConn to Connect().
141-
firstNonFailedFound = true
142-
if scState == connectivity.Idle {
143-
// This is the first non-failed SubConn, and it is in a real Idle
144-
// state. Trigger it to Connect().
145-
ee.sc.queueConnect()
42+
ringSize := len(p.ring.items)
43+
// Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF,
44+
// we ignore all TF subchannels and find the first ring entry in READY,
45+
// CONNECTING or IDLE. If that entry is in IDLE, we need to initiate a
46+
// connection. The idlePicker returned by the LazyLB or the new Pickfirst
47+
// should do this automatically.
48+
for i := 0; i < ringSize; i++ {
49+
index := (e.idx + i) % ringSize
50+
balState := p.balancerState(p.ring.items[index])
51+
switch balState.ConnectivityState {
52+
case connectivity.Ready, connectivity.Connecting, connectivity.Idle:
53+
return balState.Picker.Pick(info)
54+
case connectivity.TransientFailure:
55+
default:
56+
panic(fmt.Sprintf("Found child balancer in unknown state: %v", balState.ConnectivityState))
14657
}
14758
}
148-
return balancer.PickResult{}, fmt.Errorf("no connection is Ready")
59+
// All children are in transient failure. Return the first failure.
60+
return p.balancerState(e).Picker.Pick(info)
14961
}
15062

151-
// nextSkippingDuplicates finds the next entry in the ring, with a different
152-
// subconn from the given entry.
153-
func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry {
154-
for next := ring.next(entry); next != entry; next = ring.next(next) {
155-
if next.sc != entry.sc {
156-
return next
157-
}
158-
}
159-
// There's no qualifying next entry.
160-
return nil
63+
func (p *picker) balancerState(e *ringEntry) balancer.State {
64+
return p.endpointStates[e.firstAddr]
16165
}

0 commit comments

Comments
 (0)