Skip to content

WIP Mock up: Add rpc-specific backoff #1089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math/rand"
"reflect"
"regexp"
"sort"
"time"
Expand Down Expand Up @@ -410,7 +411,7 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
return nil, err
}

backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId)
backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId, reflect.TypeOf(req).String())
if gceCS.errorBackoff.blocking(backoffId) {
return nil, status.Errorf(codes.Unavailable, "ControllerPublish not permitted on node %q due to backoff condition", req.NodeId)
}
Expand Down Expand Up @@ -517,7 +518,7 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con

attached, err := diskIsAttachedAndCompatible(deviceName, instance, volumeCapability, readWrite)
if err != nil {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Disk %v already published to node %v but incompatbile: %v", volKey.Name, nodeID, err))
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Disk %v already published to node %v but incompatible: %v", volKey.Name, nodeID, err))
}
if attached {
// Volume is attached to node. Success!
Expand Down Expand Up @@ -548,7 +549,7 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
return nil, err
}

backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId)
backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId, reflect.TypeOf(req).Name())
if gceCS.errorBackoff.blocking(backoffId) {
return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff condition", req.NodeId)
}
Expand Down Expand Up @@ -1600,8 +1601,8 @@ func newCsiErrorBackoff() *csiErrorBackoff {
return &csiErrorBackoff{flowcontrol.NewBackOff(errorBackoffInitialDuration, errorBackoffMaxDuration)}
}

func (_ *csiErrorBackoff) backoffId(nodeId, volumeId string) csiErrorBackoffId {
return csiErrorBackoffId(fmt.Sprintf("%s:%s", nodeId, volumeId))
func (_ *csiErrorBackoff) backoffId(nodeId, volumeId, reqType string) csiErrorBackoffId {
return csiErrorBackoffId(fmt.Sprintf("%s:%s:%s", nodeId, volumeId, reqType))
}

func (b *csiErrorBackoff) blocking(id csiErrorBackoffId) bool {
Expand Down
26 changes: 19 additions & 7 deletions pkg/gce-pd-csi-driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2195,7 +2195,7 @@ func backoffTesterForUnpublish(t *testing.T, config *backoffTesterConfig) {
errorBackoff: newFakeCsiErrorBackoff(tc),
}

backoffId := driver.cs.errorBackoff.backoffId(testNodeID, testVolumeID)
backoffId := driver.cs.errorBackoff.backoffId(testNodeID, testVolumeID, reflect.TypeOf(csi.ControllerUnpublishVolumeRequest{}).String())
step := 1 * time.Millisecond

runUnpublishRequest := func(req *csi.ControllerUnpublishVolumeRequest, reportError bool) error {
Expand Down Expand Up @@ -2390,10 +2390,14 @@ func backoffTesterForPublish(t *testing.T, config *backoffTesterConfig) {
errorBackoff: newFakeCsiErrorBackoff(tc),
}

backoffId := driver.cs.errorBackoff.backoffId(testNodeID, testVolumeID)
backoffUnpublishId := driver.cs.errorBackoff.backoffId(testNodeID, testVolumeID, reflect.TypeOf(csi.ControllerUnpublishVolumeRequest{}).String())
step := 1 * time.Millisecond
// Mock an active backoff condition for unpublish command on the node.
driver.cs.errorBackoff.next(backoffUnpublishId)

backoffPublishId := driver.cs.errorBackoff.backoffId(testNodeID, testVolumeID, reflect.TypeOf(csi.ControllerPublishVolumeRequest{}).String())
// Mock an active backoff condition on the node.
driver.cs.errorBackoff.next(backoffId)
driver.cs.errorBackoff.next(backoffPublishId)

// A detach request for a different disk should succeed. As this disk is not
// on the instance, the detach will succeed without calling the gce detach
Expand Down Expand Up @@ -2447,9 +2451,13 @@ func backoffTesterForPublish(t *testing.T, config *backoffTesterConfig) {
t.Errorf("unexpected error %v", err)
}

t1 := driver.cs.errorBackoff.backoff.Get(string(backoffId))
t1 := driver.cs.errorBackoff.backoff.Get(string(backoffUnpublishId))
if t1 == 0 {
t.Error("expected delay for unpublish backoff, got none")
}
t1 = driver.cs.errorBackoff.backoff.Get(string(backoffPublishId))
if t1 == 0 {
t.Error("expected delay, got none")
t.Error("expected delay for publish backoff, got none")
}
return
}
Expand Down Expand Up @@ -2478,9 +2486,13 @@ func backoffTesterForPublish(t *testing.T, config *backoffTesterConfig) {
}

// Driver is expected to remove the node key from the backoff map.
t1 := driver.cs.errorBackoff.backoff.Get(string(backoffId))
t1 := driver.cs.errorBackoff.backoff.Get(string(backoffUnpublishId))
if t1 != 0 {
t.Error("unexpected delay")
t.Error("unexpected unpublish backoff delay")
}
t1 = driver.cs.errorBackoff.backoff.Get(string(backoffPublishId))
if t1 != 0 {
t.Error("unexpected publish backoff delay")
}
}

Expand Down