Skip to content

Attach/Detach back off #847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 160 additions & 17 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
Expand All @@ -47,6 +49,22 @@ type GCEControllerServer struct {
// operations for that same volume (as defined by Volume Key) return an
// Aborted error
volumeLocks *common.VolumeLocks

// queue is a rate limited work queue for Controller Publish/Unpublish
// Volume calls
queue workqueue.RateLimitingInterface

// publishErrorsSeenOnNode is a list of nodes with attach/detach
// operation failures so those nodes shall be rate limited for all
// the attach/detach operations until there is an attach / detach
// operation succeeds
publishErrorsSeenOnNode map[string]bool
}

type workItem struct {
ctx context.Context
publishReq *csi.ControllerPublishVolumeRequest
unpublishReq *csi.ControllerUnpublishVolumeRequest
}

var _ csi.ControllerServer = &GCEControllerServer{}
Expand Down Expand Up @@ -280,25 +298,113 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
return &csi.DeleteVolumeResponse{}, nil
}

// Run starts the GCEControllerServer.
func (gceCS *GCEControllerServer) Run() {
go wait.Until(gceCS.worker, 1*time.Second, wait.NeverStop)
}

func (gceCS *GCEControllerServer) worker() {
// Runs until workqueue is shut down
for gceCS.processNextWorkItem() {
}
}

func (gceCS *GCEControllerServer) processNextWorkItem() bool {
item, quit := gceCS.queue.Get()
if quit {
return false
}
defer gceCS.queue.Done(item)

workItem, ok := item.(*workItem)
if !ok {
gceCS.queue.AddRateLimited(item)
return true
}

if workItem.publishReq != nil {
_, err := gceCS.executeControllerPublishVolume(workItem.ctx, workItem.publishReq)

if err != nil {
klog.Errorf("ControllerPublishVolume failed with error: %v", err)
}
}

if workItem.unpublishReq != nil {
_, err := gceCS.executeControllerUnpublishVolume(workItem.ctx, workItem.unpublishReq)

if err != nil {
klog.Errorf("ControllerUnpublishVolume failed with error: %v", err)
}
}

gceCS.queue.Forget(item)
return true
}

func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
// Only valid requests will be queued
_, _, err := gceCS.validateControllerPublishVolumeRequest(ctx, req)

if err != nil {
return nil, err
}

// If the node is not marked, proceed the request
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
return gceCS.executeControllerPublishVolume(ctx, req)
}

// Node is marked so queue up the request
gceCS.queue.AddRateLimited(&workItem{
ctx: ctx,
publishReq: req,
})
return &csi.ControllerPublishVolumeResponse{}, nil
}

func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (string, *meta.Key, error) {
// Validate arguments
volumeID := req.GetVolumeId()
readOnly := req.GetReadonly()
nodeID := req.GetNodeId()
volumeCapability := req.GetVolumeCapability()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided")
return "", nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided")
}
if len(nodeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Node ID must be provided")
return "", nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Node ID must be provided")
}
if volumeCapability == nil {
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume capability must be provided")
return "", nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume capability must be provided")
}

project, volKey, err := common.VolumeIDToKey(volumeID)
if err != nil {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("ControllerPublishVolume volume ID is invalid: %v", err))
return "", nil, status.Error(codes.InvalidArgument, fmt.Sprintf("ControllerPublishVolume volume ID is invalid: %v", err))
}

// TODO(#253): Check volume capability matches for ALREADY_EXISTS
if err = validateVolumeCapability(volumeCapability); err != nil {
return "", nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapabilities is invalid: %v", err))
}

return project, volKey, nil
}

func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
project, volKey, err := gceCS.validateControllerPublishVolumeRequest(ctx, req)

if err != nil {
return nil, err
}

volumeID := req.GetVolumeId()
readOnly := req.GetReadonly()
nodeID := req.GetNodeId()
volumeCapability := req.GetVolumeCapability()

pubVolResp := &csi.ControllerPublishVolumeResponse{
PublishContext: nil,
}

project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey)
Expand All @@ -317,15 +423,6 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
}
defer gceCS.volumeLocks.Release(lockingVolumeID)

// TODO(#253): Check volume capability matches for ALREADY_EXISTS
if err = validateVolumeCapability(volumeCapability); err != nil {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapabilities is invalid: %v", err))
}

pubVolResp := &csi.ControllerPublishVolumeResponse{
PublishContext: nil,
}

_, err = gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1)
if err != nil {
if gce.IsGCENotFoundError(err) {
Expand Down Expand Up @@ -375,29 +472,69 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r

err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName)
if err != nil {
// Mark the node and rate limit all the following attach/detach
// operations for this node
gceCS.publishErrorsSeenOnNode[nodeID] = true
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown WaitForAttach error: %v", err))
}

// Attach succeeds so unmark the node
delete(gceCS.publishErrorsSeenOnNode, nodeID)

klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID)
return pubVolResp, nil
}

func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
// Only valid requests will be queued
_, _, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req)

if err != nil {
return nil, err
}

// If the node is not marked, proceed the request
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
return gceCS.executeControllerUnpublishVolume(ctx, req)
}

// Node is marked so queue up the request
gceCS.queue.AddRateLimited(&workItem{
ctx: ctx,
unpublishReq: req,
})

return &csi.ControllerUnpublishVolumeResponse{}, nil
}

func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (string, *meta.Key, error) {
// Validate arguments
volumeID := req.GetVolumeId()
nodeID := req.GetNodeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "ControllerUnpublishVolume Volume ID must be provided")
return "", nil, status.Error(codes.InvalidArgument, "ControllerUnpublishVolume Volume ID must be provided")
}
if len(nodeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "ControllerUnpublishVolume Node ID must be provided")
return "", nil, status.Error(codes.InvalidArgument, "ControllerUnpublishVolume Node ID must be provided")
}

project, volKey, err := common.VolumeIDToKey(volumeID)
if err != nil {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("ControllerUnpublishVolume Volume ID is invalid: %v", err))
return "", nil, status.Error(codes.InvalidArgument, fmt.Sprintf("ControllerUnpublishVolume Volume ID is invalid: %v", err))
}

return project, volKey, nil
}

func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
project, volKey, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req)

if err != nil {
return nil, err
}

volumeID := req.GetVolumeId()
nodeID := req.GetNodeId()
project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey)
if err != nil {
if gce.IsGCENotFoundError(err) {
Expand Down Expand Up @@ -443,9 +580,15 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,

err = gceCS.CloudProvider.DetachDisk(ctx, project, deviceName, instanceZone, instanceName)
if err != nil {
// Mark the node and rate limit all the following attach/detach
// operations for this node
gceCS.publishErrorsSeenOnNode[nodeID] = true
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown detach error: %v", err))
}

// Detach succeeds so unmark the node
delete(gceCS.publishErrorsSeenOnNode, nodeID)

klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v", volKey, nodeID)
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
Expand Down
Loading