Skip to content

Commit d32b658

Browse files
author
Stephen Schmitt
committed
Adds support for multi-writer PD
1 parent 132ca97 commit d32b658

File tree

10 files changed

+314
-49
lines changed

10 files changed

+314
-49
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
kind: PersistentVolumeClaim
2+
apiVersion: v1
3+
metadata:
4+
name: podpvc
5+
spec:
6+
accessModes:
7+
- ReadWriteMany
8+
volumeMode: Block
9+
storageClassName: csi-gce-pd
10+
resources:
11+
requests:
12+
storage: 6Gi
13+
14+
---
15+
16+
apiVersion: apps/v1
17+
kind: Deployment
18+
metadata:
19+
name: web-server
20+
spec:
21+
selector:
22+
matchLabels:
23+
app: web
24+
replicas: 2
25+
template:
26+
metadata:
27+
labels:
28+
app: web
29+
spec:
30+
affinity:
31+
podAntiAffinity:
32+
requiredDuringSchedulingIgnoredDuringExecution:
33+
- labelSelector:
34+
matchExpressions:
35+
- key: app
36+
operator: In
37+
values:
38+
- web
39+
topologyKey: "kubernetes.io/hostname"
40+
containers:
41+
- name: web-server
42+
image: nginx
43+
volumeDevices:
44+
- name: mypvc
45+
devicePath: /dev/mypvc
46+
volumes:
47+
- name: mypvc
48+
persistentVolumeClaim:
49+
claimName: podpvc
50+
readOnly: false

pkg/gce-cloud-provider/compute/fake-gce.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func (cloud *FakeCloudProvider) ValidateExistingDisk(ctx context.Context, resp *
186186
return nil
187187
}
188188

189-
func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, volKey *meta.Key, diskType string, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID, diskEncryptionKmsKey string) error {
189+
func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, volKey *meta.Key, diskType string, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID, diskEncryptionKmsKey string, multiWriter bool) error {
190190
if disk, ok := cloud.disks[volKey.Name]; ok {
191191
err := cloud.ValidateExistingDisk(ctx, disk, diskType,
192192
int64(capacityRange.GetRequiredBytes()),

pkg/gce-cloud-provider/compute/gce-compute.go

Lines changed: 170 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
2525
csi "github.com/container-storage-interface/spec/lib/go/csi"
26+
computealpha "google.golang.org/api/compute/v0.alpha"
2627
computev1 "google.golang.org/api/compute/v1"
2728
"google.golang.org/grpc/codes"
2829
"google.golang.org/grpc/status"
@@ -42,7 +43,7 @@ type GCECompute interface {
4243
GetDisk(ctx context.Context, volumeKey *meta.Key) (*CloudDisk, error)
4344
RepairUnderspecifiedVolumeKey(ctx context.Context, volumeKey *meta.Key) (*meta.Key, error)
4445
ValidateExistingDisk(ctx context.Context, disk *CloudDisk, diskType string, reqBytes, limBytes int64) error
45-
InsertDisk(ctx context.Context, volKey *meta.Key, diskType string, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID, diskEncryptionKmsKey string) error
46+
InsertDisk(ctx context.Context, volKey *meta.Key, diskType string, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID, diskEncryptionKmsKey string, multiWriter bool) error
4647
DeleteDisk(ctx context.Context, volumeKey *meta.Key) error
4748
AttachDisk(ctx context.Context, volKey *meta.Key, readWrite, diskType, instanceZone, instanceName string) error
4849
DetachDisk(ctx context.Context, deviceName string, instanceZone, instanceName string) error
@@ -217,13 +218,21 @@ func (cloud *CloudProvider) ValidateExistingDisk(ctx context.Context, resp *Clou
217218
return nil
218219
}
219220

220-
func (cloud *CloudProvider) InsertDisk(ctx context.Context, volKey *meta.Key, diskType string, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID, diskEncryptionKmsKey string) error {
221+
func (cloud *CloudProvider) InsertDisk(ctx context.Context, volKey *meta.Key, diskType string, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID, diskEncryptionKmsKey string, multiWriter bool) error {
221222
klog.V(5).Infof("Inserting disk %v", volKey)
222223
switch volKey.Type() {
223224
case meta.Zonal:
224-
return cloud.insertZonalDisk(ctx, volKey, diskType, capBytes, capacityRange, snapshotID, diskEncryptionKmsKey)
225+
if multiWriter {
226+
return cloud.insertZonalAlphaDisk(ctx, volKey, diskType, capBytes, capacityRange, snapshotID, diskEncryptionKmsKey, multiWriter)
227+
} else {
228+
return cloud.insertZonalDisk(ctx, volKey, diskType, capBytes, capacityRange, snapshotID, diskEncryptionKmsKey)
229+
}
225230
case meta.Regional:
226-
return cloud.insertRegionalDisk(ctx, volKey, diskType, capBytes, capacityRange, replicaZones, snapshotID, diskEncryptionKmsKey)
231+
if multiWriter {
232+
return cloud.insertRegionalAlphaDisk(ctx, volKey, diskType, capBytes, capacityRange, replicaZones, snapshotID, diskEncryptionKmsKey, multiWriter)
233+
} else {
234+
return cloud.insertRegionalDisk(ctx, volKey, diskType, capBytes, capacityRange, replicaZones, snapshotID, diskEncryptionKmsKey)
235+
}
227236
default:
228237
return fmt.Errorf("could not insert disk, key was neither zonal nor regional, instead got: %v", volKey.String())
229238
}
@@ -288,6 +297,66 @@ func (cloud *CloudProvider) insertRegionalDisk(ctx context.Context, volKey *meta
288297
return nil
289298
}
290299

300+
func (cloud *CloudProvider) insertRegionalAlphaDisk(ctx context.Context, volKey *meta.Key, diskType string, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID, diskEncryptionKmsKey string, multiWriter bool) error {
301+
diskToCreateAlpha := &computealpha.Disk{
302+
Name: volKey.Name,
303+
SizeGb: common.BytesToGb(capBytes),
304+
Description: "Regional disk created by GCE-PD CSI Driver",
305+
Type: cloud.GetDiskTypeURI(volKey, diskType),
306+
MultiWriter: multiWriter,
307+
}
308+
if snapshotID != "" {
309+
diskToCreateAlpha.SourceSnapshot = snapshotID
310+
}
311+
if len(replicaZones) != 0 {
312+
diskToCreateAlpha.ReplicaZones = replicaZones
313+
}
314+
if diskEncryptionKmsKey != "" {
315+
diskToCreateAlpha.DiskEncryptionKey = &computealpha.CustomerEncryptionKey{
316+
KmsKeyName: diskEncryptionKmsKey,
317+
}
318+
}
319+
320+
insertOp, err := cloud.alphaService.RegionDisks.Insert(cloud.project, volKey.Region, diskToCreateAlpha).Context(ctx).Do()
321+
if err != nil {
322+
if IsGCEError(err, "alreadyExists") {
323+
disk, err := cloud.GetDisk(ctx, volKey)
324+
if err != nil {
325+
return err
326+
}
327+
err = cloud.ValidateExistingDisk(ctx, disk, diskType,
328+
int64(capacityRange.GetRequiredBytes()),
329+
int64(capacityRange.GetLimitBytes()))
330+
if err != nil {
331+
return err
332+
}
333+
klog.Warningf("GCE PD %s already exists, reusing", volKey.Name)
334+
return nil
335+
}
336+
return status.Error(codes.Internal, fmt.Sprintf("unkown Insert disk error: %v", err))
337+
}
338+
339+
err = cloud.waitForRegionalAlphaOp(ctx, insertOp, volKey.Region)
340+
if err != nil {
341+
if IsGCEError(err, "alreadyExists") {
342+
disk, err := cloud.GetDisk(ctx, volKey)
343+
if err != nil {
344+
return err
345+
}
346+
err = cloud.ValidateExistingDisk(ctx, disk, diskType,
347+
int64(capacityRange.GetRequiredBytes()),
348+
int64(capacityRange.GetLimitBytes()))
349+
if err != nil {
350+
return err
351+
}
352+
klog.Warningf("GCE PD %s already exists after wait, reusing", volKey.Name)
353+
return nil
354+
}
355+
return fmt.Errorf("unkown Insert disk operation error: %v", err)
356+
}
357+
return nil
358+
}
359+
291360
func (cloud *CloudProvider) insertZonalDisk(ctx context.Context, volKey *meta.Key, diskType string, capBytes int64, capacityRange *csi.CapacityRange, snapshotID, diskEncryptionKmsKey string) error {
292361
diskToCreate := &computev1.Disk{
293362
Name: volKey.Name,
@@ -348,6 +417,67 @@ func (cloud *CloudProvider) insertZonalDisk(ctx context.Context, volKey *meta.Ke
348417
return nil
349418
}
350419

420+
func (cloud *CloudProvider) insertZonalAlphaDisk(ctx context.Context, volKey *meta.Key, diskType string, capBytes int64, capacityRange *csi.CapacityRange, snapshotID, diskEncryptionKmsKey string, multiWriter bool) error {
421+
diskToCreateAlpha := &computealpha.Disk{
422+
Name: volKey.Name,
423+
SizeGb: common.BytesToGb(capBytes),
424+
Description: "Disk created by GCE-PD CSI Driver",
425+
Type: cloud.GetDiskTypeURI(volKey, diskType),
426+
MultiWriter: multiWriter,
427+
}
428+
429+
if snapshotID != "" {
430+
diskToCreateAlpha.SourceSnapshot = snapshotID
431+
}
432+
433+
if diskEncryptionKmsKey != "" {
434+
diskToCreateAlpha.DiskEncryptionKey = &computealpha.CustomerEncryptionKey{
435+
KmsKeyName: diskEncryptionKmsKey,
436+
}
437+
}
438+
439+
op, err := cloud.alphaService.Disks.Insert(cloud.project, volKey.Zone, diskToCreateAlpha).Context(ctx).Do()
440+
441+
if err != nil {
442+
if IsGCEError(err, "alreadyExists") {
443+
disk, err := cloud.GetDisk(ctx, volKey)
444+
if err != nil {
445+
return err
446+
}
447+
err = cloud.ValidateExistingDisk(ctx, disk, diskType,
448+
int64(capacityRange.GetRequiredBytes()),
449+
int64(capacityRange.GetLimitBytes()))
450+
if err != nil {
451+
return err
452+
}
453+
klog.Warningf("GCE PD %s already exists, reusing", volKey.Name)
454+
return nil
455+
}
456+
return fmt.Errorf("unkown Insert disk error: %v", err)
457+
}
458+
459+
err = cloud.waitForZonalAlphaOp(ctx, op, volKey.Zone)
460+
461+
if err != nil {
462+
if IsGCEError(err, "alreadyExists") {
463+
disk, err := cloud.GetDisk(ctx, volKey)
464+
if err != nil {
465+
return err
466+
}
467+
err = cloud.ValidateExistingDisk(ctx, disk, diskType,
468+
int64(capacityRange.GetRequiredBytes()),
469+
int64(capacityRange.GetLimitBytes()))
470+
if err != nil {
471+
return err
472+
}
473+
klog.Warningf("GCE PD %s already exists after wait, reusing", volKey.Name)
474+
return nil
475+
}
476+
return fmt.Errorf("unkown Insert disk operation error: %v", err)
477+
}
478+
return nil
479+
}
480+
351481
func (cloud *CloudProvider) DeleteDisk(ctx context.Context, volKey *meta.Key) error {
352482
klog.V(5).Infof("Deleting disk: %v", volKey)
353483
switch volKey.Type() {
@@ -492,6 +622,20 @@ func (cloud *CloudProvider) waitForZonalOp(ctx context.Context, op *computev1.Op
492622
})
493623
}
494624

625+
func (cloud *CloudProvider) waitForZonalAlphaOp(ctx context.Context, op *computealpha.Operation, zone string) error {
626+
svc := cloud.alphaService
627+
project := cloud.project
628+
return wait.Poll(3*time.Second, 5*time.Minute, func() (bool, error) {
629+
pollOp, err := svc.ZoneOperations.Get(project, zone, op.Name).Context(ctx).Do()
630+
if err != nil {
631+
klog.Errorf("WaitForOp(op: %#v, zone: %#v) failed to poll the operation", op, zone)
632+
return false, err
633+
}
634+
done, err := alphaOpIsDone(pollOp)
635+
return done, err
636+
})
637+
}
638+
495639
func (cloud *CloudProvider) waitForRegionalOp(ctx context.Context, op *computev1.Operation, region string) error {
496640
return wait.Poll(3*time.Second, 5*time.Minute, func() (bool, error) {
497641
pollOp, err := cloud.service.RegionOperations.Get(cloud.project, region, op.Name).Context(ctx).Do()
@@ -504,6 +648,18 @@ func (cloud *CloudProvider) waitForRegionalOp(ctx context.Context, op *computev1
504648
})
505649
}
506650

651+
func (cloud *CloudProvider) waitForRegionalAlphaOp(ctx context.Context, op *computealpha.Operation, region string) error {
652+
return wait.Poll(3*time.Second, 5*time.Minute, func() (bool, error) {
653+
pollOp, err := cloud.alphaService.RegionOperations.Get(cloud.project, region, op.Name).Context(ctx).Do()
654+
if err != nil {
655+
klog.Errorf("WaitForOp(op: %#v, region: %#v) failed to poll the operation", op, region)
656+
return false, err
657+
}
658+
done, err := alphaOpIsDone(pollOp)
659+
return done, err
660+
})
661+
}
662+
507663
func (cloud *CloudProvider) waitForGlobalOp(ctx context.Context, op *computev1.Operation) error {
508664
svc := cloud.service
509665
project := cloud.project
@@ -551,6 +707,16 @@ func opIsDone(op *computev1.Operation) (bool, error) {
551707
return true, nil
552708
}
553709

710+
func alphaOpIsDone(op *computealpha.Operation) (bool, error) {
711+
if op == nil || op.Status != operationStatusDone {
712+
return false, nil
713+
}
714+
if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil {
715+
return true, fmt.Errorf("operation %v failed (%v): %v", op.Name, op.Error.Errors[0].Code, op.Error.Errors[0].Message)
716+
}
717+
return true, nil
718+
}
719+
554720
func (cloud *CloudProvider) GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*computev1.Instance, error) {
555721
klog.V(5).Infof("Getting instance %v from zone %v", instanceName, instanceZone)
556722
svc := cloud.service

pkg/gce-cloud-provider/compute/gce.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
"cloud.google.com/go/compute/metadata"
2929
"golang.org/x/oauth2"
30+
alpha "google.golang.org/api/compute/v0.alpha"
3031
"google.golang.org/api/compute/v1"
3132
"google.golang.org/api/googleapi"
3233
"k8s.io/apimachinery/pkg/util/wait"
@@ -42,16 +43,18 @@ const (
4243

4344
regionURITemplate = "projects/%s/regions/%s"
4445

45-
GCEComputeAPIEndpoint = "https://www.googleapis.com/compute/v1/"
46-
GCEComputeBetaAPIEndpoint = "https://www.googleapis.com/compute/beta/"
46+
GCEComputeAPIEndpoint = "https://www.googleapis.com/compute/v1/"
47+
GCEComputeBetaAPIEndpoint = "https://www.googleapis.com/compute/beta/"
48+
GCEComputeAlphaAPIEndpoint = "https://www.googleapis.com/compute/alpha/"
4749

4850
replicaZoneURITemplateSingleZone = "%s/zones/%s" // {gce.projectID}/zones/{disk.Zone}
4951
)
5052

5153
type CloudProvider struct {
52-
service *compute.Service
53-
project string
54-
zone string
54+
service *compute.Service
55+
alphaService *alpha.Service
56+
project string
57+
zone string
5558

5659
zonesCache map[string]([]string)
5760
}
@@ -88,16 +91,22 @@ func CreateCloudProvider(vendorVersion string, configPath string) (*CloudProvide
8891
return nil, err
8992
}
9093

94+
alphasvc, err := createAlphaCloudService(vendorVersion, tokenSource)
95+
if err != nil {
96+
return nil, err
97+
}
98+
9199
project, zone, err := getProjectAndZone(configFile)
92100
if err != nil {
93101
return nil, fmt.Errorf("Failed getting Project and Zone: %v", err)
94102
}
95103

96104
return &CloudProvider{
97-
service: svc,
98-
project: project,
99-
zone: zone,
100-
zonesCache: make(map[string]([]string)),
105+
service: svc,
106+
alphaService: alphasvc,
107+
project: project,
108+
zone: zone,
109+
zonesCache: make(map[string]([]string)),
101110
}, nil
102111

103112
}
@@ -149,6 +158,19 @@ func readConfig(configPath string) (*ConfigFile, error) {
149158
return cfg, nil
150159
}
151160

161+
func createAlphaCloudService(vendorVersion string, tokenSource oauth2.TokenSource) (*alpha.Service, error) {
162+
client, err := newOauthClient(tokenSource)
163+
if err != nil {
164+
return nil, err
165+
}
166+
service, err := alpha.New(client)
167+
if err != nil {
168+
return nil, err
169+
}
170+
service.UserAgent = fmt.Sprintf("GCE CSI Driver/%s (%s %s)", vendorVersion, runtime.GOOS, runtime.GOARCH)
171+
return service, nil
172+
}
173+
152174
func createCloudService(vendorVersion string, tokenSource oauth2.TokenSource) (*compute.Service, error) {
153175
svc, err := createCloudServiceWithDefaultServiceAccount(vendorVersion, tokenSource)
154176
return svc, err

0 commit comments

Comments
 (0)