diff --git a/pkg/gce-cloud-provider/fake-gce.go b/pkg/gce-cloud-provider/fake-gce.go index 00c00471c..2a24c8be8 100644 --- a/pkg/gce-cloud-provider/fake-gce.go +++ b/pkg/gce-cloud-provider/fake-gce.go @@ -34,6 +34,8 @@ type FakeCloudProvider struct { instances map[string]*compute.Instance } +var _ GCECompute = &FakeCloudProvider{} + func FakeCreateCloudProvider(project, zone string) (*FakeCloudProvider, error) { return &FakeCloudProvider{ project: project, @@ -156,6 +158,10 @@ func (cloud *FakeCloudProvider) GetDiskTypeURI(zone, diskType string) string { return "" } +func (cloud *FakeCloudProvider) WaitForAttach(ctx context.Context, zone, diskName, instanceName string) error { + return nil +} + // Instance Methods func (cloud *FakeCloudProvider) InsertInstance(instance *compute.Instance, instanceName string) { cloud.instances[instanceName] = instance diff --git a/pkg/gce-cloud-provider/gce-compute.go b/pkg/gce-cloud-provider/gce-compute.go index 19b442139..4d74c3eda 100644 --- a/pkg/gce-cloud-provider/gce-compute.go +++ b/pkg/gce-cloud-provider/gce-compute.go @@ -41,6 +41,7 @@ type GCECompute interface { DetachDisk(ctx context.Context, volumeZone, instanceName, volumeName string) (*compute.Operation, error) GetDiskSourceURI(disk *compute.Disk, zone string) string GetDiskTypeURI(zone, diskType string) string + WaitForAttach(ctx context.Context, zone, diskName, instanceName string) error // Instance Methods GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*compute.Instance, error) // Operation Methods @@ -158,6 +159,27 @@ func (cloud *CloudProvider) WaitForOp(ctx context.Context, op *compute.Operation }) } +func (cloud *CloudProvider) WaitForAttach(ctx context.Context, zone, diskName, instanceName string) error { + return wait.Poll(5*time.Second, 2*time.Minute, func() (bool, error) { + disk, err := cloud.GetDiskOrError(ctx, zone, diskName) + if err != nil { + glog.Errorf("GetDiskOrError failed to get disk: %v", err) + return false, err + } + + if disk == nil { + return false, fmt.Errorf("Disk %v could not be found in zone %v", diskName, zone) + } + + for _, user := range disk.Users { + if strings.Contains(user, instanceName) && strings.Contains(user, zone) { + return true, nil + } + } + return false, nil + }) +} + func opIsDone(op *compute.Operation) bool { return op != nil && op.Status == "DONE" } diff --git a/pkg/gce-cloud-provider/gce.go b/pkg/gce-cloud-provider/gce.go index f97c5ba30..c53db1923 100644 --- a/pkg/gce-cloud-provider/gce.go +++ b/pkg/gce-cloud-provider/gce.go @@ -44,6 +44,8 @@ type CloudProvider struct { zone string } +var _ GCECompute = &CloudProvider{} + func CreateCloudProvider() (*CloudProvider, error) { svc, err := createCloudService() if err != nil { diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index 372e406f3..d85ddbe59 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -296,6 +296,11 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r return nil, status.Error(codes.Internal, fmt.Sprintf("unknown Attach operation error: %v", err)) } + err = gceCS.CloudProvider.WaitForAttach(ctx, volumeZone, disk.Name, nodeID) + if err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("unknown WaitForAttach error: %v", err)) + } + glog.Infof("Disk %v attached to instance %v successfully", disk.Name, nodeID) return pubVolResp, nil } diff --git a/pkg/gce-pd-csi-driver/controller_test.go b/pkg/gce-pd-csi-driver/controller_test.go index 0c14a262b..c0a4f31b7 100644 --- a/pkg/gce-pd-csi-driver/controller_test.go +++ b/pkg/gce-pd-csi-driver/controller_test.go @@ -255,6 +255,7 @@ func TestCreateVolumeArguments(t *testing.T) { } // Test volume already exists + // Test volume with op pending // Test DeleteVolume