Skip to content

Simplify the node backoff logic #960

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions pkg/gce-cloud-provider/compute/fake-gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package gcecloudprovider
import (
"context"
"fmt"
"net/http"
"strconv"
"strings"

Expand Down Expand Up @@ -419,8 +420,9 @@ func (cloud *FakeCloudProvider) UpdateDiskStatus(s string) {
}

type Signal struct {
ReportError bool
ReportRunning bool
ReportError bool
ReportRunning bool
ReportTooManyRequestsError bool
}

type FakeBlockingCloudProvider struct {
Expand All @@ -446,6 +448,9 @@ func (cloud *FakeBlockingCloudProvider) WaitForZonalOp(ctx context.Context, proj
if val.ReportError {
return fmt.Errorf("force mock error of zonal op %s", opName)
}
if val.ReportTooManyRequestsError {
return tooManyRequestsError()
}
return nil
}

Expand Down Expand Up @@ -483,6 +488,12 @@ func invalidError() *googleapi.Error {
}
}

func tooManyRequestsError() *googleapi.Error {
return &googleapi.Error{
Code: http.StatusTooManyRequests,
}
}

func (cloud *FakeCloudProvider) StartAttachDiskOp(ctx context.Context, volKey *meta.Key, readWrite, diskType, project, location, instanceName string) (*computev1.Operation, error) {
source := cloud.GetDiskSourceURI(project, volKey)
attachedDiskV1 := &computev1.AttachedDisk{
Expand Down
7 changes: 5 additions & 2 deletions pkg/gce-cloud-provider/compute/gce-compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func (cloud *CloudProvider) WaitForAttach(ctx context.Context, project string, v
klog.V(6).Infof("Polling for attach of disk %v to instance %v to complete for %v", volKey.Name, instanceName, time.Since(start))
disk, err := cloud.GetDisk(ctx, project, volKey, GCEAPIVersionV1)
if err != nil {
return false, fmt.Errorf("GetDisk failed to get disk: %v", err)
return false, err
}

if disk == nil {
Expand Down Expand Up @@ -935,6 +935,7 @@ func (cloud *CloudProvider) StartAttachDiskOp(ctx context.Context, volKey *meta.

op, err := cloud.service.Instances.AttachDisk(project, location, instanceName, attachedDiskV1).Context(ctx).Do()
if err != nil {
klog.Errorf("failed to start attach op for disk %s, instance %s, err: %v", deviceName, instanceName, err)
return nil, fmt.Errorf("failed cloud service attach disk call: %v", err)
}
return op, nil
Expand All @@ -943,6 +944,7 @@ func (cloud *CloudProvider) StartAttachDiskOp(ctx context.Context, volKey *meta.
func (cloud *CloudProvider) StartDetachDiskOp(ctx context.Context, project, location, deviceName, instanceName string) (*computev1.Operation, error) {
op, err := cloud.service.Instances.DetachDisk(project, location, instanceName, deviceName).Context(ctx).Do()
if err != nil {
klog.Errorf("failed to start detach op for disk %s, instance %s, err %v", deviceName, instanceName, err)
return nil, fmt.Errorf("failed cloud service detach disk call: %v", err)
}
return op, nil
Expand All @@ -953,7 +955,8 @@ func (cloud *CloudProvider) CheckZonalOpDoneStatus(ctx context.Context, project,
lastKnownOp, err := cloud.service.ZoneOperations.Get(project, location, opId).Context(ctx).Do()
if err != nil {
if !IsGCENotFoundError(err) {
return false, fmt.Errorf("failed to get operation %s: %v", opId, err)
klog.Errorf("failed to check status for op %s, err: %v", opId, err)
return false, err
}
return true, nil
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/gce-cloud-provider/compute/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,17 @@ func IsGCENotFoundError(err error) bool {
return IsGCEError(err, "notFound")
}

// IsInvalidError returns true if the error is a googleapi.Error with
// IsGCEInvalidError returns true if the error is a googleapi.Error with
// invalid reason
func IsGCEInvalidError(err error) bool {
return IsGCEError(err, "invalid")
}

// IsTooManyRequestError returns true if the error is a googleapi.Error with
// resource exhausted error code.
func IsTooManyRequestError(err error) bool {
if apierr, ok := err.(*googleapi.Error); ok && apierr.Code == http.StatusTooManyRequests {
return true
}
return false
}
150 changes: 60 additions & 90 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@ import (
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog"

"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
)

const (
nodeBackoffInitialDuration = 200 * time.Millisecond
nodeBackoffMaxDuration = 5 * time.Minute
)

type GCEControllerServer struct {
Driver *GCEDriver
CloudProvider gce.GCECompute
Expand All @@ -50,17 +54,9 @@ type GCEControllerServer struct {
// Aborted error
volumeLocks *common.VolumeLocks

// queue is a rate limited work queue for Controller Publish/Unpublish
// Volume calls
queue workqueue.RateLimitingInterface

// publishErrorsSeenOnNode is a list of nodes with attach/detach
// operation failures so those nodes shall be rate limited for all
// the attach/detach operations until there is an attach / detach
// operation succeeds
publishErrorsSeenOnNode map[string]bool

opsManager *OpsManager
// nodeBackoff keeps track of any active backoff condition on a given node, and the time when retry of controller publish/unpublish is permissible.
nodeBackoff *flowcontrol.Backoff
opsManager *OpsManager
}

type workItem struct {
Expand Down Expand Up @@ -337,75 +333,34 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
// Run starts the GCEControllerServer.
func (gceCS *GCEControllerServer) Run() {
go gceCS.opsManager.HydrateOpsCache()
go wait.Until(gceCS.worker, 1*time.Second, wait.NeverStop)
}

func (gceCS *GCEControllerServer) worker() {
// Runs until workqueue is shut down
for gceCS.processNextWorkItem() {
}
}

func (gceCS *GCEControllerServer) processNextWorkItem() bool {
item, quit := gceCS.queue.Get()
if quit {
return false
}
defer gceCS.queue.Done(item)

workItem, ok := item.(*workItem)
if !ok {
gceCS.queue.AddRateLimited(item)
return true
}

if workItem.publishReq != nil {
_, err := gceCS.executeControllerPublishVolume(workItem.ctx, workItem.publishReq)

if err != nil {
klog.Errorf("ControllerPublishVolume failed with error: %v", err)
}
}

if workItem.unpublishReq != nil {
_, err := gceCS.executeControllerUnpublishVolume(workItem.ctx, workItem.unpublishReq)

if err != nil {
klog.Errorf("ControllerUnpublishVolume failed with error: %v", err)
}
}

gceCS.queue.Forget(item)
return true
}

func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
if !gceCS.opsManager.IsReady() {
return nil, status.Errorf(codes.Aborted, "Cache not ready")
}

// Only valid requests will be queued
// Only valid requests will be accepted
_, _, err := gceCS.validateControllerPublishVolumeRequest(ctx, req)

if err != nil {
return nil, err
}

// If the node is not marked, proceed the request
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
return gceCS.executeControllerPublishVolume(ctx, req)
if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
return nil, status.Errorf(codes.Unavailable, "ControllerPublish not permitted on node %q due to backoff", req.NodeId)
}

// Node is marked so queue up the request. Note the original gRPC context may get canceled,
// so a new one is created here.
//
// Note that the original context probably has a timeout (see csiAttach in external-attacher),
// which is ignored.
gceCS.queue.AddRateLimited(&workItem{
ctx: context.Background(),
publishReq: req,
})
return nil, status.Error(codes.Unavailable, "Request queued due to error condition on node")
resp, err := gceCS.executeControllerPublishVolume(ctx, req)
backoff := isResourceExhaustedError(err)
if backoff && !gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
klog.V(5).Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId)
gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now())
} else if err == nil {
klog.V(5).Infof("For node %s clear backoff due to successful publish of volume %v", req.NodeId, req.VolumeId)
gceCS.nodeBackoff.Reset(req.NodeId)
}

return resp, err
}

func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (string, *meta.Key, error) {
Expand Down Expand Up @@ -438,7 +393,6 @@ func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx con

func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
project, volKey, err := gceCS.validateControllerPublishVolumeRequest(ctx, req)

if err != nil {
return nil, err
}
Expand All @@ -457,6 +411,9 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
if gce.IsGCENotFoundError(err) {
return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume could not find volume with ID %v: %v", volumeID, err)
}
if gce.IsTooManyRequestError(err) {
return nil, status.Errorf(codes.ResourceExhausted, "ControllerPublishVolume error repairing underspecified volume key: %v", err)
}
return nil, status.Errorf(codes.Internal, "ControllerPublishVolume error repairing underspecified volume key: %v", err)
}

Expand All @@ -473,6 +430,9 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
if gce.IsGCENotFoundError(err) {
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find disk %v: %v", volKey.String(), err))
}
if gce.IsTooManyRequestError(err) {
return nil, status.Errorf(codes.ResourceExhausted, "get disk error: %v", err)
}
return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown get disk error: %v", err))
}
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
Expand All @@ -484,6 +444,9 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
if gce.IsGCENotFoundError(err) {
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find instance %v: %v", nodeID, err))
}
if gce.IsTooManyRequestError(err) {
return nil, status.Errorf(codes.ResourceExhausted, "get instance error: %v", err)
}
return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown get instance error: %v", err))
}

Expand Down Expand Up @@ -523,20 +486,20 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
InstanceName: instanceName,
})
if err != nil {
if gce.IsTooManyRequestError(err) {
return nil, status.Errorf(codes.ResourceExhausted, "Failed to execute attach operation, error: %v", err)
}
return nil, err
}

err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName)
if err != nil {
// Mark the node and rate limit all the following attach/detach
// operations for this node
gceCS.publishErrorsSeenOnNode[nodeID] = true
if gce.IsTooManyRequestError(err) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to back off on any error. With this logic, we'll let the attacher retry until api quota gets exhausted. I think we would rather not even reach that point.

I don't think there's a case where immediate retries are useful?

return nil, status.Errorf(codes.ResourceExhausted, "Failed to execute wait for attach operation, error: %v", err)
}
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown WaitForAttach error: %v", err))
}

// Attach succeeds so unmark the node
delete(gceCS.publishErrorsSeenOnNode, nodeID)

klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID)
return pubVolResp, nil
}
Expand All @@ -546,25 +509,25 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
return nil, status.Errorf(codes.Aborted, "Cache not ready")
}

// Only valid requests will be queued
// Only valid requests will be accepted.
_, _, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req)

if err != nil {
return nil, err
}

// If the node is not marked, proceed the request
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
return gceCS.executeControllerUnpublishVolume(ctx, req)
if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff", req.NodeId)
}

// Node is marked so queue up the request
gceCS.queue.AddRateLimited(&workItem{
ctx: ctx,
unpublishReq: req,
})

return &csi.ControllerUnpublishVolumeResponse{}, nil
resp, err := gceCS.executeControllerUnpublishVolume(ctx, req)
backoff := isResourceExhaustedError(err)
Comment on lines +521 to +522
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I think we should backoff for all errors from executeControllerUnpublishVolume, but we have also seen a customer case where executeControllerUnpublishVolume returns true: ControllerUnpublishVolume succeeded for disk. Already not attached. They still ended up with the same issue where the finalizer external-attacher/pd-csi-storage-gke-io was still set on the VA. I think the CSI attacher was the one that kept retrying because the csi-attacher logs repeated
VA csi-xx for volume projects/xx/zones/us-east4-a/disks/pvc-xx has attached status true but actual state false. Adding back to VA queue for forced reprocessing
Which made the kubecontroller manager keep trying to force detach but failing.

Copy link
Member

@amacaskill amacaskill May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To elaborate, in some cases,
instance, err := gceCS.CloudProvider.GetInstanceOrError(ctx, instanceZone, instanceName)
will still return the instance , but the device will not show up under devices when we call common.GetDeviceName(volKey), so then controllerUnpublishVolume returns true, but then we never called detachDisk (which I am assuming is what makes VA attached status to be false). So then the csi-attacher keeps queuing the VA. VA csi-xx has attached status true but actual state false. Adding back to VA queue for forced reprocessing

if backoff && !gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
klog.V(5).Infof("For node %s adding backoff due to unpublish error for volume %s", req.NodeId, req.VolumeId)
gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now())
} else if err == nil {
klog.V(5).Infof("For node %s clear backoff due to succesful unpublish of volume %s", req.NodeId, req.VolumeId)
gceCS.nodeBackoff.Reset(req.NodeId)
}
return resp, err
}

func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (string, *meta.Key, error) {
Expand All @@ -588,7 +551,6 @@ func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx c

func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
project, volKey, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req)

if err != nil {
return nil, err
}
Expand All @@ -600,6 +562,9 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
if gce.IsGCENotFoundError(err) {
return nil, status.Errorf(codes.NotFound, "ControllerUnpublishVolume could not find volume with ID %v: %v", volumeID, err)
}
if gce.IsTooManyRequestError(err) {
return nil, status.Errorf(codes.ResourceExhausted, "ControllerUnpublishVolume error repairing underspecified volume key: %v", err)
}
return nil, status.Errorf(codes.Internal, "ControllerUnpublishVolume error repairing underspecified volume key: %v", err)
}

Expand All @@ -622,6 +587,9 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
klog.Warningf("Treating volume %v as unpublished because node %v could not be found", volKey.String(), instanceName)
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
if gce.IsTooManyRequestError(err) {
return nil, status.Errorf(codes.ResourceExhausted, "ControllerUnpublishVolume error repairing underspecified volume key: %v", err)
}
return nil, status.Error(codes.Internal, fmt.Sprintf("error getting instance: %v", err))
}

Expand All @@ -645,10 +613,12 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
InstanceName: instanceName,
})
if err != nil {
if gce.IsTooManyRequestError(err) {
return nil, status.Errorf(codes.ResourceExhausted, "Failed to execute detach operation, error: %v", err)
}
return nil, err
}

delete(gceCS.publishErrorsSeenOnNode, nodeID)
klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v", volKey, nodeID)
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
Expand Down
Loading