@@ -20,6 +20,7 @@ import (
20
20
"math/rand"
21
21
"regexp"
22
22
"sort"
23
+ "strings"
23
24
"time"
24
25
25
26
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
@@ -32,6 +33,7 @@ import (
32
33
"k8s.io/apimachinery/pkg/util/uuid"
33
34
"k8s.io/client-go/util/flowcontrol"
34
35
"k8s.io/klog/v2"
36
+ "k8s.io/utils/strings/slices"
35
37
36
38
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
37
39
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
@@ -98,6 +100,14 @@ type workItem struct {
98
100
unpublishReq * csi.ControllerUnpublishVolumeRequest
99
101
}
100
102
103
+ // locationRequirements are additional location topology requirements that must be respected when creating a volume.
104
+ type locationRequirements struct {
105
+ srcVolRegion string
106
+ srcVolZone string
107
+ srcReplicationType string
108
+ cloneReplicationType string
109
+ }
110
+
101
111
var _ csi.ControllerServer = & GCEControllerServer {}
102
112
103
113
const (
@@ -142,6 +152,44 @@ func isDiskReady(disk *gce.CloudDisk) (bool, error) {
142
152
return false , nil
143
153
}
144
154
155
+ // cloningLocationRequirements returns additional location requirements to be applied to the given create volume requests topology.
156
+ // If the CreateVolumeRequest will use volume cloning, location requirements in compliance with the volume cloning limitations
157
+ // will be returned: https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/volume-cloning#limitations.
158
+ func cloningLocationRequirements (req * csi.CreateVolumeRequest , cloneReplicationType string ) (* locationRequirements , error ) {
159
+ if ! useVolumeCloning (req ) {
160
+ return nil , nil
161
+ }
162
+ // If we are using volume cloning, this will be set.
163
+ volSrc := req .VolumeContentSource .GetVolume ()
164
+ volSrcVolID := volSrc .GetVolumeId ()
165
+
166
+ _ , sourceVolKey , err := common .VolumeIDToKey (volSrcVolID )
167
+ if err != nil {
168
+ return nil , fmt .Errorf ("volume ID is invalid: %w" , err )
169
+ }
170
+
171
+ isZonalSrcVol := sourceVolKey .Type () == meta .Zonal
172
+ if isZonalSrcVol {
173
+ region , err := common .GetRegionFromZones ([]string {sourceVolKey .Zone })
174
+ if err != nil {
175
+ return nil , fmt .Errorf ("failed to get region from zones: %w" , err )
176
+ }
177
+ sourceVolKey .Region = region
178
+ }
179
+
180
+ srcReplicationType := replicationTypeNone
181
+ if ! isZonalSrcVol {
182
+ srcReplicationType = replicationTypeRegionalPD
183
+ }
184
+
185
+ return & locationRequirements {srcVolZone : sourceVolKey .Zone , srcVolRegion : sourceVolKey .Region , srcReplicationType : srcReplicationType , cloneReplicationType : cloneReplicationType }, nil
186
+ }
187
+
188
+ // useVolumeCloning returns true if the create volume request should be created with volume cloning.
189
+ func useVolumeCloning (req * csi.CreateVolumeRequest ) bool {
190
+ return req .VolumeContentSource != nil && req .VolumeContentSource .GetVolume () != nil
191
+ }
192
+
145
193
func (gceCS * GCEControllerServer ) CreateVolume (ctx context.Context , req * csi.CreateVolumeRequest ) (* csi.CreateVolumeResponse , error ) {
146
194
var err error
147
195
// Validate arguments
@@ -177,12 +225,21 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
177
225
if multiWriter {
178
226
gceAPIVersion = gce .GCEAPIVersionBeta
179
227
}
228
+
229
+ var locationTopReq * locationRequirements
230
+ if useVolumeCloning (req ) {
231
+ locationTopReq , err = cloningLocationRequirements (req , params .ReplicationType )
232
+ if err != nil {
233
+ return nil , status .Errorf (codes .InvalidArgument , "failed to get location requirements: %v" , err .Error ())
234
+ }
235
+ }
236
+
180
237
// Determine the zone or zones+region of the disk
181
238
var zones []string
182
239
var volKey * meta.Key
183
240
switch params .ReplicationType {
184
241
case replicationTypeNone :
185
- zones , err = pickZones (ctx , gceCS , req .GetAccessibilityRequirements (), 1 )
242
+ zones , err = pickZones (ctx , gceCS , req .GetAccessibilityRequirements (), 1 , locationTopReq )
186
243
if err != nil {
187
244
return nil , status .Errorf (codes .InvalidArgument , "CreateVolume failed to pick zones for disk: %v" , err .Error ())
188
245
}
@@ -192,7 +249,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
192
249
volKey = meta .ZonalKey (name , zones [0 ])
193
250
194
251
case replicationTypeRegionalPD :
195
- zones , err = pickZones (ctx , gceCS , req .GetAccessibilityRequirements (), 2 )
252
+ zones , err = pickZones (ctx , gceCS , req .GetAccessibilityRequirements (), 2 , locationTopReq )
196
253
if err != nil {
197
254
return nil , status .Errorf (codes .InvalidArgument , "CreateVolume failed to pick zones for disk: %v" , err .Error ())
198
255
}
@@ -1358,7 +1415,29 @@ func diskIsAttachedAndCompatible(deviceName string, instance *compute.Instance,
1358
1415
return false , nil
1359
1416
}
1360
1417
1361
- func pickZonesFromTopology (top * csi.TopologyRequirement , numZones int ) ([]string , error ) {
1418
+ // pickZonesInRegion will remove any zones that are not in the given region.
1419
+ func pickZonesInRegion (region string , zones []string ) []string {
1420
+ refinedZones := []string {}
1421
+ for _ , zone := range zones {
1422
+ if strings .Contains (zone , region ) {
1423
+ refinedZones = append (refinedZones , zone )
1424
+ }
1425
+ }
1426
+ return refinedZones
1427
+ }
1428
+
1429
+ func prependZone (zone string , zones []string ) []string {
1430
+ newZones := []string {zone }
1431
+ for i := 0 ; i < len (zones ); i ++ {
1432
+ // Do not add a zone if it is equal to the zone that is already prepended to newZones.
1433
+ if zones [i ] != zone {
1434
+ newZones = append (newZones , zones [i ])
1435
+ }
1436
+ }
1437
+ return newZones
1438
+ }
1439
+
1440
+ func pickZonesFromTopology (top * csi.TopologyRequirement , numZones int , locationTopReq * locationRequirements ) ([]string , error ) {
1362
1441
reqZones , err := getZonesFromTopology (top .GetRequisite ())
1363
1442
if err != nil {
1364
1443
return nil , fmt .Errorf ("could not get zones from requisite topology: %w" , err )
@@ -1368,6 +1447,39 @@ func pickZonesFromTopology(top *csi.TopologyRequirement, numZones int) ([]string
1368
1447
return nil , fmt .Errorf ("could not get zones from preferred topology: %w" , err )
1369
1448
}
1370
1449
1450
+ if locationTopReq != nil {
1451
+ srcVolZone := locationTopReq .srcVolZone
1452
+ switch locationTopReq .cloneReplicationType {
1453
+ // For zonal -> zonal cloning, the source disk zone must match the destination disk zone.
1454
+ case replicationTypeNone :
1455
+ // If the source volume zone is not in the topology requirement, we return an error.
1456
+ if ! slices .Contains (prefZones , srcVolZone ) && ! slices .Contains (reqZones , srcVolZone ) {
1457
+ volumeCloningReq := fmt .Sprintf ("clone zone must match source disk zone: %s" , srcVolZone )
1458
+ return nil , fmt .Errorf ("failed to find zone from topology %v: %s" , top , volumeCloningReq )
1459
+ }
1460
+ return []string {srcVolZone }, nil
1461
+ // For zonal or regional -> regional disk cloning, the source disk region must match the destination disk region.
1462
+ case replicationTypeRegionalPD :
1463
+ srcVolRegion := locationTopReq .srcVolRegion
1464
+ prefZones = pickZonesInRegion (srcVolRegion , prefZones )
1465
+ reqZones = pickZonesInRegion (srcVolRegion , reqZones )
1466
+
1467
+ if len (prefZones ) == 0 && len (reqZones ) == 0 {
1468
+ volumeCloningReq := fmt .Sprintf ("clone zone must reside in source disk region %s" , srcVolRegion )
1469
+ return nil , fmt .Errorf ("failed to find zone from topology %v: %s" , top , volumeCloningReq )
1470
+ }
1471
+
1472
+ // For zonal -> regional disk cloning, one of the replicated zones must match the source zone.
1473
+ if locationTopReq .srcReplicationType == replicationTypeNone {
1474
+ if ! slices .Contains (prefZones , srcVolZone ) && ! slices .Contains (reqZones , srcVolZone ) {
1475
+ volumeCloningReq := fmt .Sprintf ("one of the replica zones of the clone must match the source disk zone: %s" , srcVolZone )
1476
+ return nil , fmt .Errorf ("failed to find zone from topology %v: %s" , top , volumeCloningReq )
1477
+ }
1478
+ prefZones = prependZone (srcVolZone , prefZones )
1479
+ }
1480
+ }
1481
+ }
1482
+
1371
1483
if numZones <= len (prefZones ) {
1372
1484
return prefZones [0 :numZones ], nil
1373
1485
} else {
@@ -1426,16 +1538,25 @@ func getZoneFromSegment(seg map[string]string) (string, error) {
1426
1538
return zone , nil
1427
1539
}
1428
1540
1429
- func pickZones (ctx context.Context , gceCS * GCEControllerServer , top * csi.TopologyRequirement , numZones int ) ([]string , error ) {
1541
+ func pickZones (ctx context.Context , gceCS * GCEControllerServer , top * csi.TopologyRequirement , numZones int , locationTopReq * locationRequirements ) ([]string , error ) {
1430
1542
var zones []string
1431
1543
var err error
1432
1544
if top != nil {
1433
- zones , err = pickZonesFromTopology (top , numZones )
1545
+ zones , err = pickZonesFromTopology (top , numZones , locationTopReq )
1434
1546
if err != nil {
1435
1547
return nil , fmt .Errorf ("failed to pick zones from topology: %w" , err )
1436
1548
}
1437
1549
} else {
1438
- zones , err = getDefaultZonesInRegion (ctx , gceCS , []string {gceCS .CloudProvider .GetDefaultZone ()}, numZones )
1550
+ existingZones := []string {gceCS .CloudProvider .GetDefaultZone ()}
1551
+ // We set existingZones to the source volume zone so that for zonal -> zonal cloning, the clone is provisioned
1552
+ // in the same zone as the source volume, and for zonal -> regional, one of the replicated zones will always
1553
+ // be the zone of the source volume. For regional -> regional cloning, the srcVolZone will not be set, so we
1554
+ // just use the default zone.
1555
+ if locationTopReq != nil && locationTopReq .srcReplicationType == replicationTypeNone {
1556
+ existingZones = []string {locationTopReq .srcVolZone }
1557
+ }
1558
+ // If topology is nil, then the Immediate binding mode was used without setting allowedTopologies in the storageclass.
1559
+ zones , err = getDefaultZonesInRegion (ctx , gceCS , existingZones , numZones )
1439
1560
if err != nil {
1440
1561
return nil , fmt .Errorf ("failed to get default %v zones in region: %w" , numZones , err )
1441
1562
}
0 commit comments