Skip to content

Commit 55a1533

Browse files
committed
node backoff
1 parent 11e2389 commit 55a1533

File tree

18 files changed

+917
-271
lines changed

18 files changed

+917
-271
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ require (
3434
github.com/Microsoft/go-winio v0.4.16 // indirect
3535
github.com/beorn7/perks v1.0.1 // indirect
3636
github.com/blang/semver v3.5.1+incompatible // indirect
37+
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
3738
github.com/cespare/xxhash/v2 v2.1.1 // indirect
3839
github.com/davecgh/go-spew v1.1.1 // indirect
3940
github.com/fsnotify/fsnotify v1.4.9 // indirect

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn
117117
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
118118
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
119119
github.com/bwmarrin/snowflake v0.0.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
120+
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
121+
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
120122
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
121123
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
122124
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=

pkg/gce-pd-csi-driver/controller.go

+127-115
Large diffs are not rendered by default.

pkg/gce-pd-csi-driver/controller_test.go

+108-148
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ import (
2222
"testing"
2323
"time"
2424

25+
"context"
26+
2527
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
2628
"github.com/golang/protobuf/ptypes"
2729

28-
"context"
29-
3030
compute "google.golang.org/api/compute/v1"
3131
"google.golang.org/grpc/codes"
3232
"google.golang.org/grpc/status"
@@ -1872,20 +1872,16 @@ func TestCreateVolumeDiskReady(t *testing.T) {
18721872
}
18731873
}
18741874

1875-
func TestControllerPublishUnpublishVolume(t *testing.T) {
1875+
func TestControllerPublishUnpublishBackoff(t *testing.T) {
18761876
testCases := []struct {
1877-
name string
1878-
seedDisks []*gce.CloudDisk
1879-
pubReq *csi.ControllerPublishVolumeRequest
1880-
unpubReq *csi.ControllerUnpublishVolumeRequest
1881-
errorSeenOnNode bool
1882-
fakeCloudProvider bool
1877+
name string
1878+
node string
1879+
pubReq *csi.ControllerPublishVolumeRequest
1880+
unpubReq *csi.ControllerUnpublishVolumeRequest
18831881
}{
18841882
{
1885-
name: "queue up publish requests if node has publish error",
1886-
seedDisks: []*gce.CloudDisk{
1887-
createZonalCloudDisk(name),
1888-
},
1883+
name: "controller publish backoff",
1884+
node: testNodeID,
18891885
pubReq: &csi.ControllerPublishVolumeRequest{
18901886
VolumeId: testVolumeID,
18911887
NodeId: testNodeID,
@@ -1898,159 +1894,58 @@ func TestControllerPublishUnpublishVolume(t *testing.T) {
18981894
},
18991895
},
19001896
},
1901-
errorSeenOnNode: true,
1902-
fakeCloudProvider: false,
19031897
},
19041898
{
1905-
name: "queue up and process publish requests if node has publish error",
1906-
seedDisks: []*gce.CloudDisk{
1907-
createZonalCloudDisk(name),
1908-
},
1909-
pubReq: &csi.ControllerPublishVolumeRequest{
1910-
VolumeId: testVolumeID,
1911-
NodeId: testNodeID,
1912-
VolumeCapability: &csi.VolumeCapability{
1913-
AccessType: &csi.VolumeCapability_Mount{
1914-
Mount: &csi.VolumeCapability_MountVolume{},
1915-
},
1916-
AccessMode: &csi.VolumeCapability_AccessMode{
1917-
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
1918-
},
1919-
},
1920-
},
1921-
errorSeenOnNode: true,
1922-
fakeCloudProvider: true,
1923-
},
1924-
{
1925-
name: "do not queue up publish requests if node doesn't have publish error",
1926-
seedDisks: []*gce.CloudDisk{
1927-
createZonalCloudDisk(name),
1928-
},
1929-
pubReq: &csi.ControllerPublishVolumeRequest{
1930-
VolumeId: testVolumeID,
1931-
NodeId: testNodeID,
1932-
VolumeCapability: &csi.VolumeCapability{
1933-
AccessType: &csi.VolumeCapability_Mount{
1934-
Mount: &csi.VolumeCapability_MountVolume{},
1935-
},
1936-
AccessMode: &csi.VolumeCapability_AccessMode{
1937-
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
1938-
},
1939-
},
1940-
},
1941-
errorSeenOnNode: false,
1942-
fakeCloudProvider: false,
1943-
},
1944-
{
1945-
name: "queue up unpublish requests if node has publish error",
1946-
seedDisks: []*gce.CloudDisk{
1947-
createZonalCloudDisk(name),
1948-
},
1949-
unpubReq: &csi.ControllerUnpublishVolumeRequest{
1950-
VolumeId: testVolumeID,
1951-
NodeId: testNodeID,
1952-
},
1953-
errorSeenOnNode: true,
1954-
fakeCloudProvider: false,
1955-
},
1956-
{
1957-
name: "queue up and process unpublish requests if node has publish error",
1958-
seedDisks: []*gce.CloudDisk{
1959-
createZonalCloudDisk(name),
1960-
},
1899+
name: "controller unpublish backoff",
1900+
node: testNodeID,
19611901
unpubReq: &csi.ControllerUnpublishVolumeRequest{
19621902
VolumeId: testVolumeID,
19631903
NodeId: testNodeID,
19641904
},
1965-
errorSeenOnNode: true,
1966-
fakeCloudProvider: true,
1967-
},
1968-
{
1969-
name: "do not queue up unpublish requests if node doesn't have publish error",
1970-
seedDisks: []*gce.CloudDisk{
1971-
createZonalCloudDisk(name),
1972-
},
1973-
unpubReq: &csi.ControllerUnpublishVolumeRequest{
1974-
VolumeId: testVolumeID,
1975-
NodeId: testNodeID,
1976-
},
1977-
errorSeenOnNode: false,
1978-
fakeCloudProvider: false,
19791905
},
19801906
}
19811907
for _, tc := range testCases {
1982-
t.Logf("test case: %s", tc.name)
1983-
1984-
var gceDriver *GCEDriver
1985-
1986-
if tc.fakeCloudProvider {
1987-
fcp, err := gce.CreateFakeCloudProvider(project, zone, tc.seedDisks)
1908+
t.Run(tc.name, func(t *testing.T) {
1909+
fcp, err := gce.CreateFakeCloudProvider(project, zone, nil)
19881910
if err != nil {
1989-
t.Fatalf("Failed to create fake cloud provider: %v", err)
1990-
}
1991-
1992-
instance := &compute.Instance{
1993-
Name: node,
1994-
Disks: []*compute.AttachedDisk{},
1911+
t.Errorf("Failed to create fake cloud provider: %v", err)
19951912
}
1996-
fcp.InsertInstance(instance, zone, node)
1997-
1998-
// Setup new driver each time so no interference
1999-
gceDriver = initGCEDriverWithCloudProvider(t, fcp)
2000-
} else {
2001-
gceDriver = initGCEDriver(t, tc.seedDisks)
2002-
}
2003-
gceDriver.cs.opsManager.ready = true
2004-
// mark the node in the map
2005-
if tc.errorSeenOnNode {
2006-
gceDriver.cs.publishErrorsSeenOnNode[testNodeID] = true
2007-
}
2008-
2009-
requestCount := 50
2010-
for i := 0; i < requestCount; i++ {
1913+
driver := initGCEDriverWithCloudProvider(t, fcp)
1914+
driver.cs.opsManager.ready = true
1915+
// Simulate node with active backoff condition.
1916+
driver.cs.nodeBackoff.AddOrUpdateNodeBackoffEntry(tc.node)
1917+
boff := driver.cs.nodeBackoff.backoff[tc.node]
1918+
if boff.expBackoff.MaxElapsedTime != 0 {
1919+
t.Errorf("unexpected backoff object %+v", boff)
1920+
}
1921+
// Setup a large next retry time, to allow sufficient time for the controller publish/unpublish retries in this test case.
1922+
mockNextRetryTime := time.Now().Add(time.Minute)
1923+
boff.nextRetryTime = mockNextRetryTime
1924+
driver.cs.nodeBackoff.backoff[tc.node].nextRetryTime = mockNextRetryTime
20111925
if tc.pubReq != nil {
2012-
gceDriver.cs.ControllerPublishVolume(context.Background(), tc.pubReq)
1926+
for i := 0; i < 10; i++ {
1927+
_, err := driver.cs.ControllerPublishVolume(context.Background(), tc.pubReq)
1928+
if err == nil {
1929+
t.Errorf("expected error but found none")
1930+
}
1931+
if !isUnavailableError(err) {
1932+
t.Errorf("got error %v found, want Unavailable", err)
1933+
}
1934+
}
20131935
}
20141936

20151937
if tc.unpubReq != nil {
2016-
gceDriver.cs.ControllerUnpublishVolume(context.Background(), tc.unpubReq)
2017-
}
2018-
}
2019-
2020-
queued := false
2021-
2022-
if tc.errorSeenOnNode {
2023-
if err := wait.Poll(10*time.Nanosecond, 1*time.Second, func() (bool, error) {
2024-
if gceDriver.cs.queue.Len() > 0 {
2025-
queued = true
2026-
2027-
if tc.fakeCloudProvider {
2028-
gceDriver.cs.Run()
1938+
for i := 0; i < 10; i++ {
1939+
_, err := driver.cs.ControllerUnpublishVolume(context.Background(), tc.unpubReq)
1940+
if err == nil {
1941+
t.Errorf("expected error but found none")
1942+
}
1943+
if !isUnavailableError(err) {
1944+
t.Errorf("got error %v found, want Unavailable", err)
20291945
}
2030-
}
2031-
2032-
// Items are queued up and eventually all processed
2033-
if tc.fakeCloudProvider {
2034-
return queued && gceDriver.cs.queue.Len() == 0, nil
2035-
}
2036-
2037-
return gceDriver.cs.queue.Len() == requestCount, nil
2038-
}); err != nil {
2039-
if tc.fakeCloudProvider {
2040-
t.Fatalf("%v requests not processed for node has seen error", gceDriver.cs.queue.Len())
2041-
} else {
2042-
t.Fatalf("Only %v requests queued up for node has seen error", gceDriver.cs.queue.Len())
20431946
}
20441947
}
2045-
}
2046-
2047-
if !tc.errorSeenOnNode {
2048-
if err := wait.Poll(10*time.Nanosecond, 10*time.Millisecond, func() (bool, error) {
2049-
return gceDriver.cs.queue.Len() != 0, nil
2050-
}); err == nil {
2051-
t.Fatalf("%v requests queued up for node hasn't seen error", gceDriver.cs.queue.Len())
2052-
}
2053-
}
1948+
})
20541949
}
20551950
}
20561951

@@ -3112,3 +3007,68 @@ func TestHydrateCache(t *testing.T) {
31123007
})
31133008
}
31143009
}
3010+
3011+
func TestBackoff(t *testing.T) {
3012+
driver := initGCEDriver(t, nil)
3013+
cs := driver.cs
3014+
testNodeID = "testnode"
3015+
// testcase1: Add a new node key to the backoff map and check that retry time is not updated on failures within the next retry time.
3016+
// Empty backoff map.
3017+
safe, _ := cs.nodeBackoff.IsSafetoRetry(testNodeID)
3018+
if !safe {
3019+
t.Errorf("unexpected retry %v", safe)
3020+
}
3021+
3022+
cs.nodeBackoff.AddOrUpdateNodeBackoffEntry(testNodeID)
3023+
boff := cs.nodeBackoff.backoff[testNodeID]
3024+
if boff.expBackoff.MaxElapsedTime != 0 {
3025+
t.Errorf("unexpected backoff object %+v", boff)
3026+
}
3027+
3028+
// Setup a large next retry time, and try to update the backoff object. this should be a no-op.
3029+
mockNextRetryTime := time.Now().Add(time.Minute)
3030+
boff.nextRetryTime = mockNextRetryTime
3031+
cs.nodeBackoff.backoff[testNodeID] = boff
3032+
// Simulate a scenario where after setting a retry time due to the first failure, all subsequent failures within the next retry time should not update the retry time again.
3033+
for i := 0; i < 20; i++ {
3034+
cs.nodeBackoff.AddOrUpdateNodeBackoffEntry(testNodeID)
3035+
}
3036+
if !boff.nextRetryTime.Equal(mockNextRetryTime) {
3037+
t.Errorf("unexpected retry time, got %s, want %s", boff.nextRetryTime.String(), mockNextRetryTime.String())
3038+
}
3039+
3040+
// Simulate success and remove the node key from the backoff map.
3041+
cs.nodeBackoff.DeleteNodeBackoffEntry(testNodeID)
3042+
if _, ok := cs.nodeBackoff.backoff[testNodeID]; ok {
3043+
t.Errorf("unexpected key %s", testNodeID)
3044+
}
3045+
3046+
// testcase 2: Retry updating the backoff map after the next retry time has elapsed.
3047+
cs.nodeBackoff.AddOrUpdateNodeBackoffEntry(testNodeID)
3048+
initialRetryTime := cs.nodeBackoff.backoff[testNodeID].nextRetryTime
3049+
// Mock a failure after the retry time has elapsed. This should update the retry time.
3050+
time.Sleep(2 * time.Second)
3051+
cs.nodeBackoff.AddOrUpdateNodeBackoffEntry(testNodeID)
3052+
newRetryTime := cs.nodeBackoff.backoff[testNodeID].nextRetryTime
3053+
if !newRetryTime.After(initialRetryTime) {
3054+
t.Errorf("unexpected retry time: boff1 %s, boff2 %s", initialRetryTime.String(), newRetryTime.String())
3055+
}
3056+
// Simulate success and remove the node key from the backoff map.
3057+
cs.nodeBackoff.DeleteNodeBackoffEntry(testNodeID)
3058+
if _, ok := cs.nodeBackoff.backoff[testNodeID]; ok {
3059+
t.Errorf("unexpected key %s", testNodeID)
3060+
}
3061+
}
3062+
3063+
func isUnavailableError(err error) bool {
3064+
if err == nil {
3065+
return false
3066+
}
3067+
3068+
st, ok := status.FromError(err)
3069+
if !ok {
3070+
return false
3071+
}
3072+
3073+
return st.Code().String() == "Unavailable"
3074+
}

pkg/gce-pd-csi-driver/gce-pd-driver.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
csi "github.com/container-storage-interface/spec/lib/go/csi"
2121
"google.golang.org/grpc/codes"
2222
"google.golang.org/grpc/status"
23-
"k8s.io/client-go/util/workqueue"
2423
"k8s.io/klog"
2524
"k8s.io/mount-utils"
2625
common "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
@@ -151,13 +150,12 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi
151150

152151
func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute) *GCEControllerServer {
153152
return &GCEControllerServer{
154-
Driver: gceDriver,
155-
CloudProvider: cloudProvider,
156-
seen: map[string]int{},
157-
volumeLocks: common.NewVolumeLocks(),
158-
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controllerserver"),
159-
publishErrorsSeenOnNode: map[string]bool{},
160-
opsManager: NewOpsManager(cloudProvider),
153+
Driver: gceDriver,
154+
CloudProvider: cloudProvider,
155+
seen: map[string]int{},
156+
volumeLocks: common.NewVolumeLocks(),
157+
nodeBackoff: NewNodeBackoffInfo(),
158+
opsManager: NewOpsManager(cloudProvider),
161159
}
162160
}
163161

pkg/gce-pd-csi-driver/utils.go

+15
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import (
2323
"context"
2424

2525
csi "github.com/container-storage-interface/spec/lib/go/csi"
26+
2627
"google.golang.org/grpc"
28+
"google.golang.org/grpc/status"
2729
"k8s.io/klog"
2830
)
2931

@@ -199,3 +201,16 @@ func collectMountOptions(fsType string, mntFlags []string) []string {
199201
}
200202
return options
201203
}
204+
205+
func isInternalError(err error) bool {
206+
if err == nil {
207+
return false
208+
}
209+
210+
st, ok := status.FromError(err)
211+
if !ok {
212+
return false
213+
}
214+
215+
return st.Code().String() == "Internal"
216+
}

0 commit comments

Comments
 (0)