Skip to content

Cherry-pick #1304 to release-1.9 #1308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/gce-cloud-provider/compute/fake-gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string,
switch volKey.Type() {
case meta.Zonal:
computeDisk.Zone = volKey.Zone
computeDisk.SelfLink = fmt.Sprintf("projects/%s/zones/%s/disks/%s", project, volKey.Zone, volKey.Name)
computeDisk.SelfLink = fmt.Sprintf("%sprojects/%s/zones/%s/disks/%s", BasePath, project, volKey.Zone, volKey.Name)
case meta.Regional:
computeDisk.Region = volKey.Region
computeDisk.SelfLink = fmt.Sprintf("projects/%s/regions/%s/disks/%s", project, volKey.Region, volKey.Name)
computeDisk.SelfLink = fmt.Sprintf("%sprojects/%s/regions/%s/disks/%s", BasePath, project, volKey.Region, volKey.Name)
default:
return fmt.Errorf("could not create disk, key was neither zonal nor regional, instead got: %v", volKey.String())
}
Expand Down
143 changes: 103 additions & 40 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"errors"
"fmt"
"math/rand"
"regexp"
neturl "net/url"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -130,6 +130,17 @@ const (
// but 500 is a good proxy (gives ~8KB of data per ListVolumesResponse#Entry)
// See https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/grpc_types.h#L503)
maxListVolumesResponseEntries = 500

// Keys in the volume context.
contextForceAttach = "force-attach"

resourceApiScheme = "https"
resourceApiService = "compute"
resourceProject = "projects"
)

var (
validResourceApiVersions = map[string]bool{"v1": true, "alpha": true, "beta": true}
)

func isDiskReady(disk *gce.CloudDisk) (bool, error) {
Expand Down Expand Up @@ -306,7 +317,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre

// If there is no validation error, immediately return success
klog.V(4).Infof("CreateVolume succeeded for disk %v, it already exists and was compatible", volKey)
return generateCreateVolumeResponse(existingDisk, zones), nil
return generateCreateVolumeResponse(existingDisk, zones)
}

snapshotID := ""
Expand Down Expand Up @@ -421,7 +432,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
}

klog.V(4).Infof("CreateVolume succeeded for disk %v", volKey)
return generateCreateVolumeResponse(disk, zones), nil
return generateCreateVolumeResponse(disk, zones)

}

Expand Down Expand Up @@ -856,13 +867,23 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List
entries := []*csi.ListVolumesResponse_Entry{}
for i := 0; i+offset < len(gceCS.disks) && i < maxEntries; i++ {
d := gceCS.disks[i+offset]
diskRsrc, err := getResourceId(d.SelfLink)
if err != nil {
klog.Warningf("Bad ListVolumes disk resource %s, skipped: %v (%+v)", d.SelfLink, err, d)
continue
}
users := []string{}
for _, u := range d.Users {
users = append(users, cleanSelfLink(u))
rsrc, err := getResourceId(u)
if err != nil {
klog.Warningf("Bad ListVolumes user %s, skipped: %v", u, err)
} else {
users = append(users, rsrc)
}
}
entries = append(entries, &csi.ListVolumesResponse_Entry{
Volume: &csi.Volume{
VolumeId: cleanSelfLink(d.SelfLink),
VolumeId: diskRsrc,
},
Status: &csi.ListVolumesResponse_VolumeStatus{
PublishedNodeIds: users,
Expand Down Expand Up @@ -975,6 +996,10 @@ func (gceCS *GCEControllerServer) createPDSnapshot(ctx context.Context, project
return nil, common.LoggedError("Failed to create snapshot: ", err)
}
}
snapshotId, err := getResourceId(snapshot.SelfLink)
if err != nil {
return nil, common.LoggedError(fmt.Sprintf("Cannot extract resource id from snapshot %s", snapshot.SelfLink), err)
}

err = gceCS.validateExistingSnapshot(snapshot, volKey)
if err != nil {
Expand All @@ -993,7 +1018,7 @@ func (gceCS *GCEControllerServer) createPDSnapshot(ctx context.Context, project

return &csi.Snapshot{
SizeBytes: common.GbToBytes(snapshot.DiskSizeGb),
SnapshotId: cleanSelfLink(snapshot.SelfLink),
SnapshotId: snapshotId,
SourceVolumeId: volumeID,
CreationTime: timestamp,
ReadyToUse: ready,
Expand Down Expand Up @@ -1022,6 +1047,10 @@ func (gceCS *GCEControllerServer) createImage(ctx context.Context, project strin
return nil, common.LoggedError("Failed to create image: ", err)
}
}
imageId, err := getResourceId(image.SelfLink)
if err != nil {
return nil, common.LoggedError(fmt.Sprintf("Cannot extract resource id from snapshot %s", image.SelfLink), err)
}

err = gceCS.validateExistingImage(image, volKey)
if err != nil {
Expand All @@ -1040,7 +1069,7 @@ func (gceCS *GCEControllerServer) createImage(ctx context.Context, project strin

return &csi.Snapshot{
SizeBytes: common.GbToBytes(image.DiskSizeGb),
SnapshotId: cleanSelfLink(image.SelfLink),
SnapshotId: imageId,
SourceVolumeId: volumeID,
CreationTime: timestamp,
ReadyToUse: ready,
Expand All @@ -1052,9 +1081,13 @@ func (gceCS *GCEControllerServer) validateExistingImage(image *compute.Image, vo
return fmt.Errorf("disk does not exist")
}

_, sourceKey, err := common.VolumeIDToKey(cleanSelfLink(image.SourceDisk))
sourceId, err := getResourceId(image.SourceDisk)
if err != nil {
return fmt.Errorf("fail to get source disk key %s, %w", image.SourceDisk, err)
return fmt.Errorf("failed to get source id from %s: %w", image.SourceDisk, err)
}
_, sourceKey, err := common.VolumeIDToKey(sourceId)
if err != nil {
return fmt.Errorf("failed to get source disk key %s: %w", image.SourceDisk, err)
}

if sourceKey.String() != volKey.String() {
Expand Down Expand Up @@ -1103,7 +1136,11 @@ func (gceCS *GCEControllerServer) validateExistingSnapshot(snapshot *compute.Sna
return fmt.Errorf("disk does not exist")
}

_, sourceKey, err := common.VolumeIDToKey(cleanSelfLink(snapshot.SourceDisk))
sourceId, err := getResourceId(snapshot.SourceDisk)
if err != nil {
return fmt.Errorf("failed to get source id from %s: %w", snapshot.SourceDisk, err)
}
_, sourceKey, err := common.VolumeIDToKey(sourceId)
if err != nil {
return fmt.Errorf("fail to get source disk key %s, %w", snapshot.SourceDisk, err)
}
Expand Down Expand Up @@ -1146,7 +1183,7 @@ func (gceCS *GCEControllerServer) DeleteSnapshot(ctx context.Context, req *csi.D
if err != nil {
// Cannot get snapshot ID from the passing request
// This is a success according to the spec
klog.Warningf("Snapshot id does not have the correct format %s", snapshotID)
klog.Warningf("Snapshot id does not have the correct format %s: %v", snapshotID, err)
return &csi.DeleteSnapshotResponse{}, nil
}

Expand Down Expand Up @@ -1337,7 +1374,7 @@ func (gceCS *GCEControllerServer) getSnapshotByID(ctx context.Context, snapshotI
return &csi.ListSnapshotsResponse{}, nil
}
}
e, err := generateImageEntry(image)
e, err := generateDiskImageEntry(image)
if err != nil {
return nil, fmt.Errorf("failed to generate image entry: %w", err)
}
Expand All @@ -1359,6 +1396,15 @@ func generateDiskSnapshotEntry(snapshot *compute.Snapshot) (*csi.ListSnapshotsRe
return nil, fmt.Errorf("Failed to covert creation timestamp: %w", err)
}

snapshotId, err := getResourceId(snapshot.SelfLink)
if err != nil {
return nil, fmt.Errorf("failed to get snapshot id from %s: %w", snapshot.SelfLink, err)
}
sourceId, err := getResourceId(snapshot.SourceDisk)
if err != nil {
return nil, fmt.Errorf("failed to get source id from %s: %w", snapshot.SourceDisk, err)
}

// We ignore the error intentionally here since we are just listing snapshots
// TODO: If the snapshot is in "FAILED" state we need to think through what this
// should actually look like.
Expand All @@ -1367,8 +1413,8 @@ func generateDiskSnapshotEntry(snapshot *compute.Snapshot) (*csi.ListSnapshotsRe
entry := &csi.ListSnapshotsResponse_Entry{
Snapshot: &csi.Snapshot{
SizeBytes: common.GbToBytes(snapshot.DiskSizeGb),
SnapshotId: cleanSelfLink(snapshot.SelfLink),
SourceVolumeId: cleanSelfLink(snapshot.SourceDisk),
SnapshotId: snapshotId,
SourceVolumeId: sourceId,
CreationTime: tp,
ReadyToUse: ready,
},
Expand All @@ -1384,35 +1430,23 @@ func generateDiskImageEntry(image *compute.Image) (*csi.ListSnapshotsResponse_En
return nil, fmt.Errorf("failed to covert creation timestamp: %w", err)
}

ready, _ := isImageReady(image.Status)

entry := &csi.ListSnapshotsResponse_Entry{
Snapshot: &csi.Snapshot{
SizeBytes: common.GbToBytes(image.DiskSizeGb),
SnapshotId: cleanSelfLink(image.SelfLink),
SourceVolumeId: cleanSelfLink(image.SourceDisk),
CreationTime: tp,
ReadyToUse: ready,
},
imageId, err := getResourceId(image.SelfLink)
if err != nil {
return nil, fmt.Errorf("cannot get image id from %s: %w", image.SelfLink, err)
}
return entry, nil
}

func generateImageEntry(image *compute.Image) (*csi.ListSnapshotsResponse_Entry, error) {
timestamp, err := parseTimestamp(image.CreationTimestamp)
sourceId, err := getResourceId(image.SourceDisk)
if err != nil {
return nil, fmt.Errorf("Failed to covert creation timestamp: %w", err)
return nil, fmt.Errorf("cannot get source id from %s: %w", image.SourceDisk, err)
}

// ignore the error intentionally here since we are just listing images
ready, _ := isImageReady(image.Status)

entry := &csi.ListSnapshotsResponse_Entry{
Snapshot: &csi.Snapshot{
SizeBytes: common.GbToBytes(image.DiskSizeGb),
SnapshotId: cleanSelfLink(image.SelfLink),
SourceVolumeId: cleanSelfLink(image.SourceDisk),
CreationTime: timestamp,
SnapshotId: imageId,
SourceVolumeId: sourceId,
CreationTime: tp,
ReadyToUse: ready,
},
}
Expand Down Expand Up @@ -1650,7 +1684,12 @@ func getDefaultZonesInRegion(ctx context.Context, gceCS *GCEControllerServer, ex
return ret, nil
}

func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string) *csi.CreateVolumeResponse {
func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string) (*csi.CreateVolumeResponse, error) {
volumeId, err := getResourceId(disk.GetSelfLink())
if err != nil {
return nil, fmt.Errorf("cannot get volume id from %s: %w", disk.GetSelfLink(), err)
}

tops := []*csi.Topology{}
for _, zone := range zones {
tops = append(tops, &csi.Topology{
Expand All @@ -1661,7 +1700,7 @@ func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string) *csi.Crea
createResp := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
CapacityBytes: realDiskSizeBytes,
VolumeId: cleanSelfLink(disk.GetSelfLink()),
VolumeId: volumeId,
VolumeContext: nil,
AccessibleTopology: tops,
},
Expand Down Expand Up @@ -1700,12 +1739,36 @@ func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string) *csi.Crea
}
createResp.Volume.ContentSource = contentSource
}
return createResp
return createResp, nil
}

func cleanSelfLink(selfLink string) string {
r, _ := regexp.Compile("https:\\/\\/www.*apis.com\\/.*(v1|beta|alpha)\\/")
return r.ReplaceAllString(selfLink, "")
func getResourceId(resourceLink string) (string, error) {
url, err := neturl.Parse(resourceLink)
if err != nil {
return "", fmt.Errorf("Could not parse resource %s: %w", resourceLink, err)
}
if url.Scheme != resourceApiScheme {
return "", fmt.Errorf("Unexpected API scheme for resource %s", resourceLink)
}

// Note that the resource host can basically be anything, if we are running in
// a distributed cloud or trusted partner environment.

// The path should be /compute/VERSION/project/....
elts := strings.Split(url.Path, "/")
if len(elts) < 4 {
return "", fmt.Errorf("Short resource path %s", resourceLink)
}
if elts[1] != resourceApiService {
return "", fmt.Errorf("Bad resource service %s in %s", elts[1], resourceLink)
}
if _, ok := validResourceApiVersions[elts[2]]; !ok {
return "", fmt.Errorf("Bad version %s in %s", elts[2], resourceLink)
}
if elts[3] != resourceProject {
return "", fmt.Errorf("Expected %v to start with %s in resource %s", elts[3:], resourceProject, resourceLink)
}
return strings.Join(elts[3:], "/"), nil
}

func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) (*gce.CloudDisk, error) {
Expand Down
Loading