Skip to content

Volume cloning feature #854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/gce-cloud-provider/compute/cloud-disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,17 @@ func (d *CloudDisk) GetSnapshotId() string {
}
}

func (d *CloudDisk) GetSourceDiskId() string {
switch {
case d.disk != nil:
return d.disk.SourceDiskId
case d.betaDisk != nil:
return d.betaDisk.SourceDiskId
default:
return ""
}
}

func (d *CloudDisk) GetKMSKeyName() string {
switch {
case d.disk != nil:
Expand Down
3 changes: 2 additions & 1 deletion pkg/gce-cloud-provider/compute/fake-gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (cloud *FakeCloudProvider) ValidateExistingDisk(ctx context.Context, resp *
return ValidateDiskParameters(resp, params)
}

func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, multiWriter bool) error {
func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) error {
if disk, ok := cloud.disks[volKey.Name]; ok {
err := cloud.ValidateExistingDisk(ctx, disk, params,
int64(capacityRange.GetRequiredBytes()),
Expand All @@ -237,6 +237,7 @@ func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string,
Description: "Disk created by GCE-PD CSI Driver",
Type: cloud.GetDiskTypeURI(project, volKey, params.DiskType),
SourceSnapshotId: snapshotID,
SourceDiskId: volumeContentSourceVolumeID,
Status: cloud.mockDiskStatus,
Labels: params.Labels,
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/gce-cloud-provider/compute/gce-compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type GCECompute interface {
GetDisk(ctx context.Context, project string, volumeKey *meta.Key, gceAPIVersion GCEAPIVersion) (*CloudDisk, error)
RepairUnderspecifiedVolumeKey(ctx context.Context, project string, volumeKey *meta.Key) (string, *meta.Key, error)
ValidateExistingDisk(ctx context.Context, disk *CloudDisk, params common.DiskParameters, reqBytes, limBytes int64, multiWriter bool) error
InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, multiWriter bool) error
InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) error
DeleteDisk(ctx context.Context, project string, volumeKey *meta.Key) error
AttachDisk(ctx context.Context, project string, volKey *meta.Key, readWrite, diskType, instanceZone, instanceName string) error
DetachDisk(ctx context.Context, project, deviceName, instanceZone, instanceName string) error
Expand Down Expand Up @@ -315,7 +315,7 @@ func ValidateDiskParameters(disk *CloudDisk, params common.DiskParameters) error
return nil
}

func (cloud *CloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, multiWriter bool) error {
func (cloud *CloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) error {
klog.V(5).Infof("Inserting disk %v", volKey)

description, err := encodeDiskTags(params.Tags)
Expand All @@ -328,12 +328,12 @@ func (cloud *CloudProvider) InsertDisk(ctx context.Context, project string, volK
if description == "" {
description = "Disk created by GCE-PD CSI Driver"
}
return cloud.insertZonalDisk(ctx, project, volKey, params, capBytes, capacityRange, snapshotID, description, multiWriter)
return cloud.insertZonalDisk(ctx, project, volKey, params, capBytes, capacityRange, snapshotID, volumeContentSourceVolumeID, description, multiWriter)
case meta.Regional:
if description == "" {
description = "Regional disk created by GCE-PD CSI Driver"
}
return cloud.insertRegionalDisk(ctx, project, volKey, params, capBytes, capacityRange, replicaZones, snapshotID, description, multiWriter)
return cloud.insertRegionalDisk(ctx, project, volKey, params, capBytes, capacityRange, replicaZones, snapshotID, volumeContentSourceVolumeID, description, multiWriter)
default:
return fmt.Errorf("could not insert disk, key was neither zonal nor regional, instead got: %v", volKey.String())
}
Expand Down Expand Up @@ -377,6 +377,7 @@ func (cloud *CloudProvider) insertRegionalDisk(
capacityRange *csi.CapacityRange,
replicaZones []string,
snapshotID string,
volumeContentSourceVolumeID string,
description string,
multiWriter bool) error {
var (
Expand All @@ -399,6 +400,9 @@ func (cloud *CloudProvider) insertRegionalDisk(
if snapshotID != "" {
diskToCreate.SourceSnapshot = snapshotID
}
if volumeContentSourceVolumeID != "" {
diskToCreate.SourceDisk = volumeContentSourceVolumeID
}
if len(replicaZones) != 0 {
diskToCreate.ReplicaZones = replicaZones
}
Expand Down Expand Up @@ -472,6 +476,7 @@ func (cloud *CloudProvider) insertZonalDisk(
capBytes int64,
capacityRange *csi.CapacityRange,
snapshotID string,
volumeContentSourceVolumeID string,
description string,
multiWriter bool) error {
var (
Expand All @@ -495,6 +500,9 @@ func (cloud *CloudProvider) insertZonalDisk(
if snapshotID != "" {
diskToCreate.SourceSnapshot = snapshotID
}
if volumeContentSourceVolumeID != "" {
diskToCreate.SourceDisk = volumeContentSourceVolumeID
}

if params.DiskEncryptionKMSKey != "" {
diskToCreate.DiskEncryptionKey = &computev1.CustomerEncryptionKey{
Expand Down
76 changes: 59 additions & 17 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,8 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume disk %v had error checking ready status: %v", volKey, err))
}

if !ready {
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume disk %v is not ready", volKey))
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume existing disk %v is not ready", volKey))
}

// If there is no validation error, immediately return success
Expand All @@ -190,10 +189,10 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
}

snapshotID := ""
volumeContentSourceVolumeID := ""
content := req.GetVolumeContentSource()
if content != nil {
if content.GetSnapshot() != nil {
// TODO(#161): Add support for Volume Source (cloning) introduced in CSI v1.0.0
snapshotID = content.GetSnapshot().GetSnapshotId()

// Verify that snapshot exists
Expand All @@ -204,24 +203,54 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
return nil, status.Errorf(codes.NotFound, "CreateVolume source snapshot %s does not exist", snapshotID)
}
}
}

if content.GetVolume() != nil {
volumeContentSourceVolumeID = content.GetVolume().GetVolumeId()
// Verify that the source VolumeID is in the correct format.
project, sourceVolKey, err := common.VolumeIDToKey(volumeContentSourceVolumeID)
if err != nil {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("CreateVolume source volume id is invalid: %v", err))
}

// Verify that the volume in VolumeContentSource exists.
diskFromSourceVolume, err := gceCS.CloudProvider.GetDisk(ctx, project, sourceVolKey, gceAPIVersion)
if err != nil {
if gce.IsGCEError(err, "notFound") {
return nil, status.Errorf(codes.NotFound, "CreateVolume source volume %s does not exist", volumeContentSourceVolumeID)
} else {
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume unknown get disk error when validating: %v", err))
}
}
// Verify the zone, region, and disk type of the clone must be the same as that of the source disk.
if err := gce.ValidateDiskParameters(diskFromSourceVolume, params); err != nil {
return nil, status.Errorf(codes.InvalidArgument, `CreateVolume source volume parameters do not match CreateVolumeRequest Parameters: %v`, err)
}
// Verify the source disk is ready.
ready, err := isDiskReady(diskFromSourceVolume)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume disk from source volume %v had error checking ready status: %v", sourceVolKey, err))
}
if !ready {
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume disk from source volume %v is not ready", sourceVolKey))
}
}
}
// Create the disk
var disk *gce.CloudDisk
switch params.ReplicationType {
case replicationTypeNone:
if len(zones) != 1 {
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume failed to get a single zone for creating zonal disk, instead got: %v", zones))
}
disk, err = createSingleZoneDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, multiWriter)
disk, err = createSingleZoneDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume failed to create single zonal disk %#v: %v", name, err))
}
case replicationTypeRegionalPD:
if len(zones) != 2 {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("CreateVolume failed to get a 2 zones for creating regional disk, instead got: %v", zones))
}
disk, err = createRegionalDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, multiWriter)
disk, err = createRegionalDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume failed to create regional disk %#v: %v", name, err))
}
Expand Down Expand Up @@ -1053,15 +1082,28 @@ func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string) *csi.Crea
},
}
snapshotID := disk.GetSnapshotId()
if snapshotID != "" {
source := &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Snapshot{
Snapshot: &csi.VolumeContentSource_SnapshotSource{
SnapshotId: snapshotID,
diskID := disk.GetSourceDiskId()
if diskID != "" || snapshotID != "" {
contentSource := &csi.VolumeContentSource{}
if snapshotID != "" {
contentSource = &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Snapshot{
Snapshot: &csi.VolumeContentSource_SnapshotSource{
SnapshotId: snapshotID,
},
},
},
}
}
if diskID != "" {
contentSource = &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Volume{
Volume: &csi.VolumeContentSource_VolumeSource{
VolumeId: diskID,
},
},
}
}
createResp.Volume.ContentSource = source
createResp.Volume.ContentSource = contentSource
}
return createResp
}
Expand All @@ -1072,7 +1114,7 @@ func cleanSelfLink(selfLink string) string {
return strings.TrimPrefix(temp, gce.GCEComputeAlphaAPIEndpoint)
}

func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, multiWriter bool) (*gce.CloudDisk, error) {
func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) (*gce.CloudDisk, error) {
project := cloudProvider.GetDefaultProject()
region, err := common.GetRegionFromZones(zones)
if err != nil {
Expand All @@ -1085,7 +1127,7 @@ func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name
fullyQualifiedReplicaZones, cloudProvider.GetReplicaZoneURI(project, replicaZone))
}

err = cloudProvider.InsertDisk(ctx, project, meta.RegionalKey(name, region), params, capBytes, capacityRange, fullyQualifiedReplicaZones, snapshotID, multiWriter)
err = cloudProvider.InsertDisk(ctx, project, meta.RegionalKey(name, region), params, capBytes, capacityRange, fullyQualifiedReplicaZones, snapshotID, volumeContentSourceVolumeID, multiWriter)
if err != nil {
return nil, fmt.Errorf("failed to insert regional disk: %v", err)
}
Expand All @@ -1102,13 +1144,13 @@ func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name
return disk, nil
}

func createSingleZoneDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, multiWriter bool) (*gce.CloudDisk, error) {
func createSingleZoneDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) (*gce.CloudDisk, error) {
project := cloudProvider.GetDefaultProject()
if len(zones) != 1 {
return nil, fmt.Errorf("got wrong number of zones for zonal create volume: %v", len(zones))
}
diskZone := zones[0]
err := cloudProvider.InsertDisk(ctx, project, meta.ZonalKey(name, diskZone), params, capBytes, capacityRange, nil, snapshotID, multiWriter)
err := cloudProvider.InsertDisk(ctx, project, meta.ZonalKey(name, diskZone), params, capBytes, capacityRange, nil, snapshotID, volumeContentSourceVolumeID, multiWriter)
if err != nil {
return nil, fmt.Errorf("failed to insert zonal disk: %v", err)
}
Expand Down
Loading