Skip to content

Commit bdd9262

Browse files
committed
WIP Update gce operation timeout
Update gce operation timeout to 2 mins instead of 5 mins. The 90% of attach/detach disk is less than 15 second, and create volume is less than 1 min. Reduce the timeout of volume operation so that it can recover from previous operation quicker in case operation is dropped.
1 parent 53e8abd commit bdd9262

File tree

2 files changed

+169
-16
lines changed

2 files changed

+169
-16
lines changed

deploy/kubernetes/base/controller/controller.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ spec:
3232
- "--enable-leader-election"
3333
- "--leader-election-type=leases"
3434
- "--leader-election-namespace=$(PDCSI_NAMESPACE)"
35-
- "--timeout=250s"
35+
- "--timeout=180s"
3636
- "--extra-create-metadata"
3737
# - "--run-controller-service=false" # disable the controller service of the CSI driver
3838
# - "--run-node-service=false" # disable the node service of the CSI driver
@@ -52,7 +52,7 @@ spec:
5252
- "--metrics-address=:22012"
5353
- "--leader-election"
5454
- "--leader-election-namespace=$(PDCSI_NAMESPACE)"
55-
- "--timeout=250s"
55+
- "--timeout=600s"
5656
env:
5757
- name: PDCSI_NAMESPACE
5858
valueFrom:

pkg/gce-cloud-provider/compute/gce-compute.go

+167-14
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,28 @@ const (
4646
GCEAPIVersionV1 GCEAPIVersion = "v1"
4747
// Alpha key type
4848
GCEAPIVersionBeta GCEAPIVersion = "beta"
49+
// OpPollInterval is the ineterval between polling an operation
50+
OpPollInterval = 1 * time.Second
51+
// OpTimeout is the default total timeout for polling on an operation
52+
OpTimeout = 2 * time.Minute
53+
// defaultCallTimeout is the ddefault context timeout for creating/getting on an operation
54+
defaultCallTimeout = 10 * time.Minute
55+
// Time interval for checking on an operation
56+
AttachDiskInitialDelay = 6 * time.Second
57+
58+
InsertDiskInitialDelay = 3 * time.Second
59+
60+
// volumeAttachmentConsecutiveErrorLimit is the number of consecutive errors we will ignore when waiting for a volume to attach/detach
61+
//volumeAttachmentStatusConsecutiveErrorLimit = 10
62+
63+
// Attach typically takes 2-5 seconds (average is 2). Asking before 2 seconds is just waste of API quota.
64+
volumeAttachmentStatusInitialDelay = 2 * time.Second
65+
// Detach typically takes 5-10 seconds (average is 6). Asking before 5 seconds is just waste of API quota.
66+
volumeDetachmentStatusInitialDelay = 5 * time.Second
67+
// After the initial delay, poll attach/detach with exponential backoff (2046 seconds total)
68+
volumeAttachmentStatusPollDelay = 2 * time.Second
69+
volumeAttachmentStatusFactor = 2
70+
volumeAttachmentStatusSteps = 11
4971
)
5072

5173
type GCECompute interface {
@@ -306,6 +328,11 @@ func ValidateDiskParameters(disk *CloudDisk, params common.DiskParameters) error
306328
return nil
307329
}
308330

331+
// ContextWithCallTimeout returns a context with a default timeout, used for generated client calls.
332+
func ContextWithCallTimeout() (context.Context, context.CancelFunc) {
333+
return context.WithTimeout(context.Background(), defaultCallTimeout)
334+
}
335+
309336
func (cloud *CloudProvider) InsertDisk(ctx context.Context, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, multiWriter bool) error {
310337
klog.V(5).Infof("Inserting disk %v", volKey)
311338

@@ -489,17 +516,20 @@ func (cloud *CloudProvider) insertZonalDisk(
489516
}
490517
}
491518

519+
callCtx, cancel := ContextWithCallTimeout()
520+
defer cancel()
521+
492522
if gceAPIVersion == GCEAPIVersionBeta {
493523
var insertOp *computebeta.Operation
494524
betaDiskToCreate := convertV1DiskToBetaDisk(diskToCreate)
495525
betaDiskToCreate.MultiWriter = multiWriter
496-
insertOp, err = cloud.betaService.Disks.Insert(cloud.project, volKey.Zone, betaDiskToCreate).Context(ctx).Do()
526+
insertOp, err = cloud.betaService.Disks.Insert(cloud.project, volKey.Zone, betaDiskToCreate).Context(callCtx).Do()
497527
if insertOp != nil {
498528
opName = insertOp.Name
499529
}
500530
} else {
501531
var insertOp *computev1.Operation
502-
insertOp, err = cloud.service.Disks.Insert(cloud.project, volKey.Zone, diskToCreate).Context(ctx).Do()
532+
insertOp, err = cloud.service.Disks.Insert(cloud.project, volKey.Zone, diskToCreate).Context(callCtx).Do()
503533
if insertOp != nil {
504534
opName = insertOp.Name
505535
}
@@ -523,7 +553,7 @@ func (cloud *CloudProvider) insertZonalDisk(
523553
}
524554
return fmt.Errorf("unknown Insert disk error: %v", err)
525555
}
526-
556+
klog.Infof("Insert disk %s op id %s", volKey.Name, opName)
527557
err = cloud.waitForZonalOp(ctx, opName, volKey.Zone)
528558

529559
if err != nil {
@@ -568,6 +598,7 @@ func (cloud *CloudProvider) deleteZonalDisk(ctx context.Context, zone, name stri
568598
}
569599
return err
570600
}
601+
klog.Infof("delete disk %s op id %s", name, op.Name)
571602
err = cloud.waitForZonalOp(ctx, op.Name, zone)
572603
if err != nil {
573604
return err
@@ -607,26 +638,53 @@ func (cloud *CloudProvider) AttachDisk(ctx context.Context, volKey *meta.Key, re
607638
Type: diskType,
608639
}
609640

610-
op, err := cloud.service.Instances.AttachDisk(cloud.project, instanceZone, instanceName, attachedDiskV1).Context(ctx).Do()
641+
callCtx, cancel := ContextWithCallTimeout()
642+
defer cancel()
643+
644+
op, err := cloud.service.Instances.AttachDisk(cloud.project, instanceZone, instanceName, attachedDiskV1).Context(callCtx).Do()
611645
if err != nil {
612-
return fmt.Errorf("failed cloud service attach disk call: %v", err)
646+
return fmt.Errorf("failed cloud service attach disk %v call: %v", volKey, err)
613647
}
614-
err = cloud.waitForZonalOp(ctx, op.Name, instanceZone)
648+
klog.V(5).Infof("Attaching disk %s operation id is %s", volKey, op.Name)
649+
time.Sleep(2 * time.Second)
650+
651+
op2, err2 := cloud.service.Instances.AttachDisk(cloud.project, instanceZone, instanceName, attachedDiskV1).Context(callCtx).Do()
652+
if err2 != nil {
653+
klog.Errorf("failed cloud service attach disk %v call op2 %v: %v", volKey, op2, err2)
654+
//return fmt.Errorf("failed cloud service attach disk %v call op2 %v: %v", volKey, op2, err2)
655+
} else {
656+
klog.V(2).Infof("Attaching disk %s op2 id is %s", volKey, op2.Name)
657+
}
658+
659+
klog.V(5).Infof("Attaching disk %s operation id is %s", volKey, op.Name)
660+
661+
/*err = cloud.waitForZonalOp2(ctx, op.Name, instanceZone)
615662
if err != nil {
616-
return fmt.Errorf("failed when waiting for zonal op: %v", err)
663+
klog.Errorf("failed when waiting for zonal op %s on attaching volume %v: %v", op.Name, volKey, err)
664+
//return fmt.Errorf("failed when waiting for zonal op %s on attaching volume %v: %v", op.Name, volKey, err)
665+
}*/
666+
667+
err = cloud.waitForZonalOp2(ctx, op.Name, op2.Name, instanceName, instanceZone, volKey)
668+
if err != nil {
669+
return fmt.Errorf("failed when waiting for zonal op %s op2 %s on attaching volume %v: %v", op.Name, op2.Name, volKey, err)
617670
}
618671
return nil
619672
}
620673

621674
func (cloud *CloudProvider) DetachDisk(ctx context.Context, deviceName, instanceZone, instanceName string) error {
622-
klog.V(5).Infof("Detaching disk %v from %v", deviceName, instanceName)
623-
op, err := cloud.service.Instances.DetachDisk(cloud.project, instanceZone, instanceName, deviceName).Context(ctx).Do()
675+
676+
callCtx, cancel := ContextWithCallTimeout()
677+
defer cancel()
678+
679+
klog.V(5).Infof("Detaching disk %s from %v", deviceName, instanceName)
680+
op, err := cloud.service.Instances.DetachDisk(cloud.project, instanceZone, instanceName, deviceName).Context(callCtx).Do()
624681
if err != nil {
625682
return err
626683
}
684+
klog.V(5).Infof("Detaching disk %s operation id is %s", deviceName, op.Name)
627685
err = cloud.waitForZonalOp(ctx, op.Name, instanceZone)
628686
if err != nil {
629-
return err
687+
return fmt.Errorf("failed when waiting for zonal op %s on detaching volume %s: %v", op.Name, deviceName, err)
630688
}
631689
return nil
632690
}
@@ -677,21 +735,89 @@ func (cloud *CloudProvider) getRegionalDiskTypeURI(region, diskType string) stri
677735
return cloud.service.BasePath + fmt.Sprintf(diskTypeURITemplateRegional, cloud.project, region, diskType)
678736
}
679737

738+
// SleepWithContext will wait for the timer duration to expire, or the context
739+
// is canceled. Which ever happens first. If the context is canceled the Context's
740+
// error will be returned.
741+
//
742+
// Expects Context to always return a non-nil error if the Done channel is closed.
743+
/*func SleepWithContext(ctx Context, dur time.Duration) error {
744+
t := time.NewTimer(dur)
745+
defer t.Stop()
746+
747+
select {
748+
case <-t.C:
749+
break
750+
case <-ctx.Done():
751+
return ctx.Err()
752+
}
753+
754+
return nil
755+
}*/
756+
680757
func (cloud *CloudProvider) waitForZonalOp(ctx context.Context, opName string, zone string) error {
681758
// The v1 API can query for v1, alpha, or beta operations.
682759
svc := cloud.service
683760
project := cloud.project
684-
return wait.Poll(3*time.Second, 5*time.Minute, func() (bool, error) {
761+
timeout := OpTimeout
762+
if deadline, ok := ctx.Deadline(); ok {
763+
timeout = time.Until(deadline)
764+
klog.Infof("Getting timeout from passed context opname %s %v", opName, timeout)
765+
}
766+
//callCtx, cancel := ContextWithCallTimeout()
767+
//defer cancel()
768+
769+
return wait.Poll(OpPollInterval, timeout, func() (bool, error) {
685770
pollOp, err := svc.ZoneOperations.Get(project, zone, opName).Context(ctx).Do()
686771
if err != nil {
687-
klog.Errorf("WaitForOp(op: %s, zone: %#v) failed to poll the operation", opName, zone)
772+
klog.Errorf("WaitForOp(op: %s, zone: %#v) failed to poll the operation: %v", opName, zone, err)
688773
return false, err
689774
}
690775
done, err := opIsDone(pollOp)
776+
klog.V(4).Infof("checking op %s is done %t", opName, done)
691777
return done, err
692778
})
693779
}
694780

781+
func (cloud *CloudProvider) waitForZonalOp2(ctx context.Context, opName, opName2 string, instance, zone string, volKey *meta.Key) error {
782+
// The v1 API can query for v1, alpha, or beta operations.
783+
svc := cloud.service
784+
project := cloud.project
785+
timeout := OpTimeout
786+
if deadline, ok := ctx.Deadline(); ok {
787+
timeout = time.Until(deadline)
788+
klog.Infof("Getting timeout from passed context opname %s %v", opName, timeout)
789+
}
790+
//callCtx, cancel := ContextWithCallTimeout()
791+
//defer cancel()
792+
793+
return wait.Poll(OpPollInterval, timeout, func() (bool, error) {
794+
pollOp, err := svc.ZoneOperations.Get(project, zone, opName).Context(ctx).Do()
795+
if err != nil {
796+
klog.Errorf("WaitForOp(op: %s, zone: %#v) failed to poll the operation: %v", opName, zone, err)
797+
return false, err
798+
}
799+
done, err := opIsDone(pollOp)
800+
klog.V(4).Infof("checking op %s is done %t %v", opName, done, err)
801+
802+
pollOp, err = svc.ZoneOperations.Get(project, zone, opName2).Context(ctx).Do()
803+
if err != nil {
804+
klog.Errorf("WaitForOp(op: %s, zone: %#v) failed to poll the operation2: %v", opName2, zone, err)
805+
return false, err
806+
}
807+
done2, err2 := opIsDone(pollOp)
808+
klog.V(4).Infof("checking op2 %s is done %t %v", opName2, done2, err2)
809+
810+
if done || done2 {
811+
_, err = cloud.WaitForAttach2(ctx, volKey, zone, instance)
812+
if err != nil {
813+
klog.Errorf("volkey %v unknown WaitForAttach error: %v", volKey, err)
814+
//return nil, status.Error(codes.Internal, fmt.Sprintf("unknown WaitForAttach error: %v", err))
815+
}
816+
}
817+
return done && done2, err
818+
})
819+
}
820+
695821
func (cloud *CloudProvider) waitForRegionalOp(ctx context.Context, opName string, region string) error {
696822
// The v1 API can query for v1, alpha, or beta operations.
697823
return wait.Poll(3*time.Second, 5*time.Minute, func() (bool, error) {
@@ -723,7 +849,7 @@ func (cloud *CloudProvider) WaitForAttach(ctx context.Context, volKey *meta.Key,
723849
klog.V(5).Infof("Waiting for attach of disk %v to instance %v to complete...", volKey.Name, instanceName)
724850
start := time.Now()
725851
return wait.Poll(5*time.Second, 2*time.Minute, func() (bool, error) {
726-
klog.V(6).Infof("Polling for attach of disk %v to instance %v to complete for %v", volKey.Name, instanceName, time.Since(start))
852+
klog.V(5).Infof("Polling for attach of disk %v to instance %v to complete for %v", volKey.Name, instanceName, time.Since(start))
727853
disk, err := cloud.GetDisk(ctx, volKey, GCEAPIVersionV1)
728854
if err != nil {
729855
return false, fmt.Errorf("GetDisk failed to get disk: %v", err)
@@ -742,8 +868,35 @@ func (cloud *CloudProvider) WaitForAttach(ctx context.Context, volKey *meta.Key,
742868
})
743869
}
744870

871+
func (cloud *CloudProvider) WaitForAttach2(ctx context.Context, volKey *meta.Key, instanceZone, instanceName string) (bool, error) {
872+
klog.V(5).Infof("Waiting for attach of disk %v to instance %v to complete...", volKey.Name, instanceName)
873+
start := time.Now()
874+
//return wait.Poll(1*time.Second, 2*time.Second, func() (bool, error) {
875+
klog.V(5).Infof("Polling for attach of disk %v to instance %v to complete for %v", volKey.Name, instanceName, time.Since(start))
876+
disk, err := cloud.GetDisk(ctx, volKey, GCEAPIVersionV1)
877+
if err != nil {
878+
return false, fmt.Errorf("GetDisk failed to get disk: %v", err)
879+
}
880+
881+
if disk == nil {
882+
return false, fmt.Errorf("Disk %v could not be found", volKey.Name)
883+
}
884+
885+
for _, user := range disk.GetUsers() {
886+
if strings.Contains(user, instanceName) && strings.Contains(user, instanceZone) {
887+
klog.Infof("disk %v is attachd ", volKey)
888+
return true, nil
889+
}
890+
}
891+
return false, nil
892+
//})
893+
}
894+
745895
func opIsDone(op *computev1.Operation) (bool, error) {
746-
if op == nil || op.Status != operationStatusDone {
896+
if op == nil {
897+
return true, fmt.Errorf("operation is nil")
898+
}
899+
if op.Status != operationStatusDone {
747900
return false, nil
748901
}
749902
if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil {

0 commit comments

Comments
 (0)