Skip to content

Commit f908ed9

Browse files
committed
backoff per {node,disk} pair instead of just node}
Change-Id: I9a2ae648b6f76df1d46bd0a7c992e4ce37f9f4af
1 parent 6e3972c commit f908ed9

File tree

3 files changed

+152
-76
lines changed

3 files changed

+152
-76
lines changed

pkg/gce-pd-csi-driver/controller.go

+70-12
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ import (
3939
)
4040

4141
const (
42-
nodeBackoffInitialDuration = 200 * time.Millisecond
43-
nodeBackoffMaxDuration = 5 * time.Minute
42+
errorBackoffInitialDuration = 200 * time.Millisecond
43+
errorBackoffMaxDuration = 5 * time.Minute
4444
)
4545

4646
type GCEControllerServer struct {
@@ -58,11 +58,46 @@ type GCEControllerServer struct {
5858
// Aborted error
5959
volumeLocks *common.VolumeLocks
6060

61-
// When the attacher sidecar issues controller publish/unpublish for multiple disks for a given node, the per-instance operation queue in GCE fills up causing attach/detach disk requests to immediately return with an error until the queue drains. nodeBackoff keeps track of any active backoff condition on a given node, and the time when retry of controller publish/unpublish is permissible. A node is marked with backoff when any error is encountered by the driver during controller publish/unpublish calls.
62-
// If the controller eventually allows controller publish/publish requests for volumes (because the backoff time expired), and those requests fail, the next backoff retry time will be updated on every failure and capped at 'nodeBackoffMaxDuration'. Also, any successful controller publish/unpublish call will clear the backoff condition for the node.
63-
nodeBackoff *flowcontrol.Backoff
61+
// There are several kinds of errors that are immediately retried by either
62+
// the CSI sidecars or the k8s control plane. The retries consume GCP api
63+
// quota, eg by doing ListVolumes, and so backoff needs to be used to
64+
// prevent quota exhaustion.
65+
//
66+
// Examples of these errors are the per-instance GCE operation queue getting
67+
// full (typically only 32 operations in flight at a time are allowed), and
68+
// disks being deleted out from under a PV causing unpublish errors.
69+
//
70+
// While we need to backoff, we also need some semblance of fairness. In
71+
// particular, volume unpublish retries happen very quickly, and with
72+
// a single backoff per node these retries can prevent any other operation
73+
// from making progess, even if it would succeed. Hence we track errors on
74+
// node and disk pairs, backing off only for calls matching such a
75+
// pair.
76+
//
77+
// An implication is that in the full operation queue situation, requests
78+
// for new disks will not backoff the first time. This is acceptible as a
79+
// single spurious call will not cause problems for quota exhaustion or make
80+
// the operation queue problem worse. This is well compensated by giving
81+
// disks where no problems are ocurring a chance to be processed.
82+
//
83+
// errorBackoff keeps track of any active backoff condition on a given node,
84+
// and the time when retry of controller publish/unpublish is permissible. A
85+
// node and disk pair is marked with backoff when any error is encountered
86+
// by the driver during controller publish/unpublish calls. If the
87+
// controller eventually allows controller publish/publish requests for
88+
// volumes (because the backoff time expired), and those requests fail, the
89+
// next backoff retry time will be updated on every failure and capped at
90+
// 'errorBackoffMaxDuration'. Also, any successful controller
91+
// publish/unpublish call will clear the backoff condition for a node and
92+
// disk.
93+
errorBackoff *csiErrorBackoff
6494
}
6595

96+
type csiErrorBackoff struct {
97+
backoff *flowcontrol.Backoff
98+
}
99+
type csiErrorBackoffId string
100+
66101
type workItem struct {
67102
ctx context.Context
68103
publishReq *csi.ControllerPublishVolumeRequest
@@ -376,17 +411,18 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
376411
return nil, err
377412
}
378413

379-
if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
414+
backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId)
415+
if gceCS.errorBackoff.blocking(backoffId) {
380416
return nil, status.Errorf(codes.Unavailable, "ControllerPublish not permitted on node %q due to backoff condition", req.NodeId)
381417
}
382418

383419
resp, err := gceCS.executeControllerPublishVolume(ctx, req)
384420
if err != nil {
385-
klog.Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId)
386-
gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now())
421+
klog.Infof("For node %s adding backoff due to error for volume %s: %v", req.NodeId, req.VolumeId, err)
422+
gceCS.errorBackoff.next(backoffId)
387423
} else {
388424
klog.Infof("For node %s clear backoff due to successful publish of volume %v", req.NodeId, req.VolumeId)
389-
gceCS.nodeBackoff.Reset(req.NodeId)
425+
gceCS.errorBackoff.reset(backoffId)
390426
}
391427
return resp, err
392428
}
@@ -513,17 +549,18 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
513549
return nil, err
514550
}
515551

516-
if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
552+
backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId)
553+
if gceCS.errorBackoff.blocking(backoffId) {
517554
return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff condition", req.NodeId)
518555
}
519556

520557
resp, err := gceCS.executeControllerUnpublishVolume(ctx, req)
521558
if err != nil {
522559
klog.Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId)
523-
gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now())
560+
gceCS.errorBackoff.next(backoffId)
524561
} else {
525562
klog.Infof("For node %s clear backoff due to successful unpublish of volume %v", req.NodeId, req.VolumeId)
526-
gceCS.nodeBackoff.Reset(req.NodeId)
563+
gceCS.errorBackoff.reset(backoffId)
527564
}
528565
return resp, err
529566
}
@@ -1560,3 +1597,24 @@ func pickRandAndConsecutive(slice []string, n int) ([]string, error) {
15601597
}
15611598
return ret, nil
15621599
}
1600+
1601+
func newCsiErrorBackoff() *csiErrorBackoff {
1602+
return &csiErrorBackoff{flowcontrol.NewBackOff(errorBackoffInitialDuration, errorBackoffMaxDuration)}
1603+
}
1604+
1605+
func (_ *csiErrorBackoff) backoffId(nodeId, volumeId string) csiErrorBackoffId {
1606+
return csiErrorBackoffId(fmt.Sprintf("%s:%s", nodeId, volumeId))
1607+
}
1608+
1609+
func (b *csiErrorBackoff) blocking(id csiErrorBackoffId) bool {
1610+
blk := b.backoff.IsInBackOffSinceUpdate(string(id), b.backoff.Clock.Now())
1611+
return blk
1612+
}
1613+
1614+
func (b *csiErrorBackoff) next(id csiErrorBackoffId) {
1615+
b.backoff.Next(string(id), b.backoff.Clock.Now())
1616+
}
1617+
1618+
func (b *csiErrorBackoff) reset(id csiErrorBackoffId) {
1619+
b.backoff.Reset(string(id))
1620+
}

pkg/gce-pd-csi-driver/controller_test.go

+81-61
Original file line numberDiff line numberDiff line change
@@ -2149,18 +2149,24 @@ type backoffTesterConfig struct {
21492149
mockMissingInstance bool // used by the backoff tester to mock a missing instance scenario
21502150
}
21512151

2152+
func newFakeCsiErrorBackoff(tc *clock.FakeClock) *csiErrorBackoff {
2153+
return &csiErrorBackoff{flowcontrol.NewFakeBackOff(errorBackoffInitialDuration, errorBackoffMaxDuration, tc)}
2154+
}
2155+
21522156
func TestControllerUnpublishBackoff(t *testing.T) {
2157+
backoffTesterForUnpublish(t, &backoffTesterConfig{})
2158+
}
2159+
2160+
func TestControllerUnpublishBackoffMissingInstance(t *testing.T) {
21532161
backoffTesterForUnpublish(t, &backoffTesterConfig{
21542162
mockMissingInstance: true,
21552163
})
2156-
backoffTesterForUnpublish(t, &backoffTesterConfig{})
21572164
}
21582165

21592166
func backoffTesterForUnpublish(t *testing.T, config *backoffTesterConfig) {
2160-
readyToExecute := make(chan chan gce.Signal, 1)
2161-
disk1 := name + "1"
2167+
readyToExecute := make(chan chan gce.Signal)
21622168
cloudDisks := []*gce.CloudDisk{
2163-
createZonalCloudDisk(disk1),
2169+
createZonalCloudDisk(name),
21642170
}
21652171
fcp, err := gce.CreateFakeCloudProvider(project, zone, cloudDisks)
21662172
if err != nil {
@@ -2173,7 +2179,7 @@ func backoffTesterForUnpublish(t *testing.T, config *backoffTesterConfig) {
21732179
instance := &compute.Instance{
21742180
Name: node,
21752181
Disks: []*compute.AttachedDisk{
2176-
{DeviceName: disk1}, // mock attached disks
2182+
{DeviceName: name}, // mock attached disks
21772183
},
21782184
}
21792185
if !config.mockMissingInstance {
@@ -2187,56 +2193,69 @@ func backoffTesterForUnpublish(t *testing.T, config *backoffTesterConfig) {
21872193
CloudProvider: fcpBlocking,
21882194
seen: map[string]int{},
21892195
volumeLocks: common.NewVolumeLocks(),
2190-
nodeBackoff: flowcontrol.NewFakeBackOff(nodeBackoffInitialDuration, nodeBackoffMaxDuration, tc),
2196+
errorBackoff: newFakeCsiErrorBackoff(tc),
21912197
}
21922198

2193-
key := testNodeID
2199+
backoffId := driver.cs.errorBackoff.backoffId(testNodeID, testVolumeID)
21942200
step := 1 * time.Millisecond
2195-
// Mock an active backoff condition on the node. This will setup a backoff duration of the 'nodeBackoffInitialDuration'.
2196-
driver.cs.nodeBackoff.Next(key, tc.Now())
2201+
2202+
runUnpublishRequest := func(req *csi.ControllerUnpublishVolumeRequest, reportError bool) error {
2203+
response := make(chan error)
2204+
go func() {
2205+
_, err := driver.cs.ControllerUnpublishVolume(context.Background(), req)
2206+
response <- err
2207+
}()
2208+
go func() {
2209+
<-readyToExecute <- gcecloudprovider.Signal{ReportError: reportError}
2210+
}()
2211+
return <-response
2212+
}
2213+
2214+
// Mock an active backoff condition on the node.
2215+
driver.cs.errorBackoff.next(backoffId)
2216+
2217+
tc.Step(step)
2218+
// A requst for a a different volume should succeed. This volume is not
2219+
// mounted on the node, so no GCE call will be made (ie, runUnpublishRequest
2220+
// doesn't need to be called, the request can be called directly).
2221+
differentUnpubReq := &csi.ControllerUnpublishVolumeRequest{
2222+
VolumeId: testVolumeID + "-different",
2223+
NodeId: testNodeID,
2224+
}
2225+
if _, err := driver.cs.ControllerUnpublishVolume(context.Background(), differentUnpubReq); err != nil {
2226+
t.Errorf("expected no error on different unpublish, got %v", err)
2227+
}
2228+
21972229
unpubreq := &csi.ControllerUnpublishVolumeRequest{
2198-
VolumeId: testVolumeID + "1",
2230+
VolumeId: testVolumeID,
21992231
NodeId: testNodeID,
22002232
}
22012233
// For the first 199 ms, the backoff condition is true. All controller publish request will be denied with 'Unavailable' error code.
22022234
for i := 0; i < 199; i++ {
2203-
tc.Step(step)
22042235
var err error
22052236
_, err = driver.cs.ControllerUnpublishVolume(context.Background(), unpubreq)
22062237
if !isUnavailableError(err) {
22072238
t.Errorf("unexpected error %v", err)
22082239
}
2240+
tc.Step(step)
22092241
}
22102242

2211-
// Mock clock tick for the 200th millisecond. So backoff condition is no longer true.
2212-
tc.Step(step)
2213-
runUnpublishRequest := func(req *csi.ControllerUnpublishVolumeRequest) <-chan error {
2214-
response := make(chan error)
2215-
go func() {
2216-
_, err := driver.cs.ControllerUnpublishVolume(context.Background(), req)
2217-
response <- err
2218-
}()
2219-
return response
2220-
}
2221-
2222-
// For a missing instance the driver should return a success code, and the node backoff condition should be cleared.
2243+
// At the 200th millisecond, the backoff condition is no longer true. The driver should return a success code, and the backoff condition should be cleared.
22232244
if config.mockMissingInstance {
22242245
_, err = driver.cs.ControllerUnpublishVolume(context.Background(), unpubreq)
2246+
if err != nil {
2247+
t.Errorf("unexpected error %v", err)
2248+
}
22252249
// Driver is expected to remove the node key from the backoff map.
2226-
t1 := driver.cs.nodeBackoff.Get(key)
2250+
t1 := driver.cs.errorBackoff.backoff.Get(string(backoffId))
22272251
if t1 != 0 {
22282252
t.Error("unexpected delay")
22292253
}
22302254
return
22312255
}
22322256

22332257
// mock an error
2234-
var respUnpublish <-chan error
2235-
respUnpublish = runUnpublishRequest(unpubreq)
2236-
execute := <-readyToExecute
2237-
s1 := gcecloudprovider.Signal{ReportError: true}
2238-
execute <- s1
2239-
if err := <-respUnpublish; err == nil {
2258+
if err := runUnpublishRequest(unpubreq, true); err == nil {
22402259
t.Errorf("expected error")
22412260
}
22422261

@@ -2254,33 +2273,31 @@ func backoffTesterForUnpublish(t *testing.T, config *backoffTesterConfig) {
22542273
// Mock clock tick for the 600th millisecond. So backoff condition is no longer true.
22552274
tc.Step(step)
22562275
// Now mock a successful ControllerUnpublish request, where DetachDisk call succeeds.
2257-
respUnpublish = runUnpublishRequest(unpubreq)
2258-
execute = <-readyToExecute
2259-
s1 = gcecloudprovider.Signal{}
2260-
execute <- s1
2261-
if err := <-respUnpublish; err != nil {
2276+
if err := runUnpublishRequest(unpubreq, false); err != nil {
22622277
t.Errorf("unexpected error")
22632278
}
22642279

22652280
// Driver is expected to remove the node key from the backoff map.
2266-
t1 := driver.cs.nodeBackoff.Get(key)
2281+
t1 := driver.cs.errorBackoff.backoff.Get(string(backoffId))
22672282
if t1 != 0 {
22682283
t.Error("unexpected delay")
22692284
}
22702285
}
22712286

22722287
func TestControllerPublishBackoff(t *testing.T) {
2288+
backoffTesterForPublish(t, &backoffTesterConfig{})
2289+
}
2290+
2291+
func TestControllerPublishBackoffMissingInstance(t *testing.T) {
22732292
backoffTesterForPublish(t, &backoffTesterConfig{
22742293
mockMissingInstance: true,
22752294
})
2276-
backoffTesterForPublish(t, &backoffTesterConfig{})
22772295
}
22782296

22792297
func backoffTesterForPublish(t *testing.T, config *backoffTesterConfig) {
2280-
readyToExecute := make(chan chan gce.Signal, 1)
2281-
disk1 := name + "1"
2298+
readyToExecute := make(chan chan gce.Signal)
22822299
cloudDisks := []*gce.CloudDisk{
2283-
createZonalCloudDisk(disk1),
2300+
createZonalCloudDisk(name),
22842301
}
22852302
fcp, err := gce.CreateFakeCloudProvider(project, zone, cloudDisks)
22862303
if err != nil {
@@ -2305,15 +2322,24 @@ func backoffTesterForPublish(t *testing.T, config *backoffTesterConfig) {
23052322
CloudProvider: fcpBlocking,
23062323
seen: map[string]int{},
23072324
volumeLocks: common.NewVolumeLocks(),
2308-
nodeBackoff: flowcontrol.NewFakeBackOff(nodeBackoffInitialDuration, nodeBackoffMaxDuration, tc),
2325+
errorBackoff: newFakeCsiErrorBackoff(tc),
23092326
}
23102327

2311-
key := testNodeID
2328+
backoffId := driver.cs.errorBackoff.backoffId(testNodeID, testVolumeID)
23122329
step := 1 * time.Millisecond
2313-
// Mock an active backoff condition on the node. This will setup a backoff duration of the 'nodeBackoffInitialDuration'.
2314-
driver.cs.nodeBackoff.Next(key, tc.Now())
2330+
// Mock an active backoff condition on the node.
2331+
driver.cs.errorBackoff.next(backoffId)
2332+
2333+
// A detach request for a different disk should succeed. As this disk is not
2334+
// on the instance, the detach will succeed without calling the gce detach
2335+
// disk api so we don't have to go through the blocking cloud provider and
2336+
// and make the request directly.
2337+
if _, err := driver.cs.ControllerUnpublishVolume(context.Background(), &csi.ControllerUnpublishVolumeRequest{VolumeId: testVolumeID + "different", NodeId: testNodeID}); err != nil {
2338+
t.Errorf("expected no error on different unpublish, got %v", err)
2339+
}
2340+
23152341
pubreq := &csi.ControllerPublishVolumeRequest{
2316-
VolumeId: testVolumeID + "1",
2342+
VolumeId: testVolumeID,
23172343
NodeId: testNodeID,
23182344
VolumeCapability: &csi.VolumeCapability{
23192345
AccessType: &csi.VolumeCapability_Mount{
@@ -2336,36 +2362,34 @@ func backoffTesterForPublish(t *testing.T, config *backoffTesterConfig) {
23362362

23372363
// Mock clock tick for the 200th millisecond. So backoff condition is no longer true.
23382364
tc.Step(step)
2339-
runPublishRequest := func(req *csi.ControllerPublishVolumeRequest) <-chan error {
2365+
runPublishRequest := func(req *csi.ControllerPublishVolumeRequest, reportError bool) error {
23402366
response := make(chan error)
23412367
go func() {
23422368
_, err := driver.cs.ControllerPublishVolume(context.Background(), req)
23432369
response <- err
23442370
}()
2345-
return response
2371+
go func() {
2372+
<-readyToExecute <- gcecloudprovider.Signal{ReportError: reportError}
2373+
}()
2374+
return <-response
23462375
}
23472376

2348-
// For a missing instance the driver should return error code, and the node backoff condition should be set.
2377+
// For a missing instance the driver should return error code, and the backoff condition should be set.
23492378
if config.mockMissingInstance {
23502379
_, err = driver.cs.ControllerPublishVolume(context.Background(), pubreq)
23512380
if err == nil {
23522381
t.Errorf("unexpected error %v", err)
23532382
}
23542383

2355-
t1 := driver.cs.nodeBackoff.Get(key)
2384+
t1 := driver.cs.errorBackoff.backoff.Get(string(backoffId))
23562385
if t1 == 0 {
23572386
t.Error("expected delay, got none")
23582387
}
23592388
return
23602389
}
23612390

23622391
// mock an error
2363-
var respPublish <-chan error
2364-
respPublish = runPublishRequest(pubreq)
2365-
execute := <-readyToExecute
2366-
s1 := gcecloudprovider.Signal{ReportError: true}
2367-
execute <- s1
2368-
if err := <-respPublish; err == nil {
2392+
if err := runPublishRequest(pubreq, true); err == nil {
23692393
t.Errorf("expected error")
23702394
}
23712395

@@ -2383,16 +2407,12 @@ func backoffTesterForPublish(t *testing.T, config *backoffTesterConfig) {
23832407
// Mock clock tick for the 600th millisecond. So backoff condition is no longer true.
23842408
tc.Step(step)
23852409
// Now mock a successful ControllerUnpublish request, where DetachDisk call succeeds.
2386-
respPublish = runPublishRequest(pubreq)
2387-
execute = <-readyToExecute
2388-
s1 = gcecloudprovider.Signal{}
2389-
execute <- s1
2390-
if err := <-respPublish; err != nil {
2410+
if err := runPublishRequest(pubreq, false); err != nil {
23912411
t.Errorf("unexpected error")
23922412
}
23932413

23942414
// Driver is expected to remove the node key from the backoff map.
2395-
t1 := driver.cs.nodeBackoff.Get(key)
2415+
t1 := driver.cs.errorBackoff.backoff.Get(string(backoffId))
23962416
if t1 != 0 {
23972417
t.Error("unexpected delay")
23982418
}

0 commit comments

Comments
 (0)