Skip to content

Commit 75f5cbf

Browse files
committed
Add option to serialize formatAndMount
Change-Id: Ieff9b45a413c6df67d7199de6845132ff3b1cd2f
1 parent d14b457 commit 75f5cbf

File tree

6 files changed

+172
-44
lines changed

6 files changed

+172
-44
lines changed

cmd/gce-pd-csi-driver/main.go

+6
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ var (
6363
maxConcurrentFormat = flag.Int("max-concurrent-format", 1, "The maximum number of concurrent format exec calls")
6464
concurrentFormatTimeout = flag.Duration("concurrent-format-timeout", 1*time.Minute, "The maximum duration of a format operation before its concurrency token is released")
6565

66+
serializeFormatAndMount = flag.Bool("serialize-format-and-mount", false, "If set then format and mount operations are serialized on each node")
67+
formatAndMountTimeout = flag.Duration("format-and-mount-timeout", 1*time.Minute, "The maximum duration of a format and mount operation before another such operation will be started")
68+
6669
version string
6770
)
6871

@@ -153,6 +156,9 @@ func handle() {
153156
klog.Fatalf("Failed to set up metadata service: %v", err.Error())
154157
}
155158
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter)
159+
if *serializeFormatAndMount {
160+
nodeServer = nodeServer.WithSerializedFormatAndMount(*serializeFormatAndMountTimeout)
161+
}
156162
}
157163

158164
err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, identityServer, controllerServer, nodeServer)

pkg/gce-pd-csi-driver/controller.go

+92-36
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"sort"
2424
"strings"
2525
"time"
26+
neturl "net/url"
2627

2728
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
2829
csi "github.com/container-storage-interface/spec/lib/go/csi"
@@ -138,6 +139,14 @@ const (
138139

139140
// Keys in the volume context.
140141
contextForceAttach = "force-attach"
142+
143+
resourceApiScheme = "http"
144+
resourceApiService = "compute"
145+
resourceProject = "project"
146+
)
147+
148+
var (
149+
resourceApiHost = regexp.Compile("^www.*apis.com$")
141150
)
142151

143152
func isDiskReady(disk *gce.CloudDisk) (bool, error) {
@@ -434,7 +443,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
434443
}
435444

436445
klog.V(4).Infof("CreateVolume succeeded for disk %v", volKey)
437-
return generateCreateVolumeResponse(disk, zones, params), nil
446+
return generateCreateVolumeResponse(disk, zones, params)
438447

439448
}
440449

@@ -869,13 +878,23 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List
869878
entries := []*csi.ListVolumesResponse_Entry{}
870879
for i := 0; i+offset < len(gceCS.disks) && i < maxEntries; i++ {
871880
d := gceCS.disks[i+offset]
881+
diskRsrc, err := getResourceId(d.SelfLink)
882+
if err != nil {
883+
klog.Warning("Bad ListVolumes disk resource %s, skipped: %v", d.SelfLink, err)
884+
continue
885+
}
872886
users := []string{}
873887
for _, u := range d.Users {
874-
users = append(users, cleanSelfLink(u))
888+
rsrc, err := getResourceId(u)
889+
if err != nil {
890+
klog.Warning("Bad ListVolumes user %s, skipped: %v", u, err)
891+
} else {
892+
users = append(users, rsrc)
893+
}
875894
}
876895
entries = append(entries, &csi.ListVolumesResponse_Entry{
877896
Volume: &csi.Volume{
878-
VolumeId: cleanSelfLink(d.SelfLink),
897+
VolumeId: diskRsrc,
879898
},
880899
Status: &csi.ListVolumesResponse_VolumeStatus{
881900
PublishedNodeIds: users,
@@ -988,6 +1007,10 @@ func (gceCS *GCEControllerServer) createPDSnapshot(ctx context.Context, project
9881007
return nil, common.LoggedError("Failed to create snapshot: ", err)
9891008
}
9901009
}
1010+
snapshotId, err := getResourceId(snapshot.SelfLink)
1011+
if err != nil {
1012+
return nil, common.LoggedError(fmt.Sprintf("Cannot extract resource id from snapshot %s", snapshot.SelfLink), err)
1013+
}
9911014

9921015
err = gceCS.validateExistingSnapshot(snapshot, volKey)
9931016
if err != nil {
@@ -1006,7 +1029,7 @@ func (gceCS *GCEControllerServer) createPDSnapshot(ctx context.Context, project
10061029

10071030
return &csi.Snapshot{
10081031
SizeBytes: common.GbToBytes(snapshot.DiskSizeGb),
1009-
SnapshotId: cleanSelfLink(snapshot.SelfLink),
1032+
SnapshotId: SnapshotId,
10101033
SourceVolumeId: volumeID,
10111034
CreationTime: timestamp,
10121035
ReadyToUse: ready,
@@ -1035,6 +1058,10 @@ func (gceCS *GCEControllerServer) createImage(ctx context.Context, project strin
10351058
return nil, common.LoggedError("Failed to create image: ", err)
10361059
}
10371060
}
1061+
imageId, err := getResourceId(image.SelfLink)
1062+
if err != nil {
1063+
return nil, common.LoggedError(fmt.Sprintf("Cannot extract resource id from snapshot %s", image.SelfLink), err)
1064+
}
10381065

10391066
err = gceCS.validateExistingImage(image, volKey)
10401067
if err != nil {
@@ -1053,7 +1080,7 @@ func (gceCS *GCEControllerServer) createImage(ctx context.Context, project strin
10531080

10541081
return &csi.Snapshot{
10551082
SizeBytes: common.GbToBytes(image.DiskSizeGb),
1056-
SnapshotId: cleanSelfLink(image.SelfLink),
1083+
SnapshotId: imageId,
10571084
SourceVolumeId: volumeID,
10581085
CreationTime: timestamp,
10591086
ReadyToUse: ready,
@@ -1065,9 +1092,13 @@ func (gceCS *GCEControllerServer) validateExistingImage(image *compute.Image, vo
10651092
return fmt.Errorf("disk does not exist")
10661093
}
10671094

1068-
_, sourceKey, err := common.VolumeIDToKey(cleanSelfLink(image.SourceDisk))
1095+
sourceId, err := getResourceId(image.SourceDisk)
1096+
if err != nil {
1097+
return fmt.Errorf("failed to get source id from %s: %w", image.SourceDisk, err)
1098+
}
1099+
_, sourceKey, err := common.VolumeIDToKey(sourceId)
10691100
if err != nil {
1070-
return fmt.Errorf("fail to get source disk key %s, %w", image.SourceDisk, err)
1101+
return fmt.Errorf("failed to get source disk key %s: %w", image.SourceDisk, err)
10711102
}
10721103

10731104
if sourceKey.String() != volKey.String() {
@@ -1116,7 +1147,11 @@ func (gceCS *GCEControllerServer) validateExistingSnapshot(snapshot *compute.Sna
11161147
return fmt.Errorf("disk does not exist")
11171148
}
11181149

1119-
_, sourceKey, err := common.VolumeIDToKey(cleanSelfLink(snapshot.SourceDisk))
1150+
sourceId, err := getResourceId(snapshot.SourceDisk)
1151+
if err != nil {
1152+
retur fmt.Errorf("failed to get source id from %s: %w", snapshot.SourceDisk, err)
1153+
}
1154+
_, sourceKey, err := common.VolumeIDToKey(sourceId)
11201155
if err != nil {
11211156
return fmt.Errorf("fail to get source disk key %s, %w", snapshot.SourceDisk, err)
11221157
}
@@ -1350,7 +1385,7 @@ func (gceCS *GCEControllerServer) getSnapshotByID(ctx context.Context, snapshotI
13501385
return &csi.ListSnapshotsResponse{}, nil
13511386
}
13521387
}
1353-
e, err := generateImageEntry(image)
1388+
e, err := generateDiskImageEntry(image)
13541389
if err != nil {
13551390
return nil, fmt.Errorf("failed to generate image entry: %w", err)
13561391
}
@@ -1372,6 +1407,15 @@ func generateDiskSnapshotEntry(snapshot *compute.Snapshot) (*csi.ListSnapshotsRe
13721407
return nil, fmt.Errorf("Failed to covert creation timestamp: %w", err)
13731408
}
13741409

1410+
snapshotId, err := getResourceId(snapshot.SelfLink)
1411+
if err != nil {
1412+
return nil, fmt.Errorf("failed to get snapshot id from %s: %w", snapshot.SelfLink, err)
1413+
}
1414+
sourceId, err := getResourceId(snapshot.SourceDisk)
1415+
if err != nil {
1416+
return nil, fmt.Errorf("failed to get source id from %s: %w", snapshot.SourceDisk, err)
1417+
}
1418+
13751419
// We ignore the error intentionally here since we are just listing snapshots
13761420
// TODO: If the snapshot is in "FAILED" state we need to think through what this
13771421
// should actually look like.
@@ -1380,8 +1424,8 @@ func generateDiskSnapshotEntry(snapshot *compute.Snapshot) (*csi.ListSnapshotsRe
13801424
entry := &csi.ListSnapshotsResponse_Entry{
13811425
Snapshot: &csi.Snapshot{
13821426
SizeBytes: common.GbToBytes(snapshot.DiskSizeGb),
1383-
SnapshotId: cleanSelfLink(snapshot.SelfLink),
1384-
SourceVolumeId: cleanSelfLink(snapshot.SourceDisk),
1427+
SnapshotId: snapshotId,
1428+
SourceVolumeId: sourceId,
13851429
CreationTime: tp,
13861430
ReadyToUse: ready,
13871431
},
@@ -1397,35 +1441,23 @@ func generateDiskImageEntry(image *compute.Image) (*csi.ListSnapshotsResponse_En
13971441
return nil, fmt.Errorf("failed to covert creation timestamp: %w", err)
13981442
}
13991443

1400-
ready, _ := isImageReady(image.Status)
1401-
1402-
entry := &csi.ListSnapshotsResponse_Entry{
1403-
Snapshot: &csi.Snapshot{
1404-
SizeBytes: common.GbToBytes(image.DiskSizeGb),
1405-
SnapshotId: cleanSelfLink(image.SelfLink),
1406-
SourceVolumeId: cleanSelfLink(image.SourceDisk),
1407-
CreationTime: tp,
1408-
ReadyToUse: ready,
1409-
},
1444+
imageId, err := getResourceId(image.SelfLink)
1445+
if err != nil {
1446+
return nil, fmt.Errorf("cannot get image id from %s: %w", image.SelfLink, err)
14101447
}
1411-
return entry, nil
1412-
}
1413-
1414-
func generateImageEntry(image *compute.Image) (*csi.ListSnapshotsResponse_Entry, error) {
1415-
timestamp, err := parseTimestamp(image.CreationTimestamp)
1448+
sourceId, err := getResourceId(image.SourceDisk)
14161449
if err != nil {
1417-
return nil, fmt.Errorf("Failed to covert creation timestamp: %w", err)
1450+
return nil, fmt.Errorf("cannot get sourcee id from %s: %w", image.SourceLink, err)
14181451
}
14191452

1420-
// ignore the error intentionally here since we are just listing images
14211453
ready, _ := isImageReady(image.Status)
14221454

14231455
entry := &csi.ListSnapshotsResponse_Entry{
14241456
Snapshot: &csi.Snapshot{
14251457
SizeBytes: common.GbToBytes(image.DiskSizeGb),
1426-
SnapshotId: cleanSelfLink(image.SelfLink),
1427-
SourceVolumeId: cleanSelfLink(image.SourceDisk),
1428-
CreationTime: timestamp,
1458+
SnapshotId: imageId,
1459+
SourceVolumeId: sourceId,
1460+
CreationTime: tp,
14291461
ReadyToUse: ready,
14301462
},
14311463
}
@@ -1691,7 +1723,12 @@ func extractVolumeContext(context map[string]string) (*PDCSIContext, error) {
16911723
return info, nil
16921724
}
16931725

1694-
func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string, params common.DiskParameters) *csi.CreateVolumeResponse {
1726+
func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string, params common.DiskParameters) *csi.CreateVolumeResponse, error {
1727+
volumeId, err := getResourceId(disk.GetSelfLink())
1728+
if err != nil {
1729+
return nil, fmt.Errorf("cannot get volume id from %s: %w", disk.GetSelfLink(), err)
1730+
}
1731+
16951732
tops := []*csi.Topology{}
16961733
for _, zone := range zones {
16971734
tops = append(tops, &csi.Topology{
@@ -1702,7 +1739,7 @@ func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string, params co
17021739
createResp := &csi.CreateVolumeResponse{
17031740
Volume: &csi.Volume{
17041741
CapacityBytes: realDiskSizeBytes,
1705-
VolumeId: cleanSelfLink(disk.GetSelfLink()),
1742+
VolumeId: volumeId,
17061743
VolumeContext: paramsToVolumeContext(params),
17071744
AccessibleTopology: tops,
17081745
},
@@ -1744,9 +1781,28 @@ func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string, params co
17441781
return createResp
17451782
}
17461783

1747-
func cleanSelfLink(selfLink string) string {
1748-
r, _ := regexp.Compile("https:\\/\\/www.*apis.com\\/.*(v1|beta|alpha)\\/")
1749-
return r.ReplaceAllString(selfLink, "")
1784+
func getResourceId(resourceLink string) (string, error) {
1785+
url, err := neturl.Parse(resourceLink)
1786+
if err != nil {
1787+
return fmt.Errorf("Could not parse resource %s: %w", resourceLink, err)
1788+
}
1789+
if url.Scheme != resourceApiScheme {
1790+
return fmt.Errorf("Unexpected API scheme for resource %s", resourceLink)
1791+
}
1792+
if !resourceApiHost.MatchString(url.Host) {
1793+
return fmt.Errorf("Unexpected API host for resource %s", resourceLink)
1794+
}
1795+
elts := strings.Split(url.Path, "/")
1796+
if len(elts) < 3 {
1797+
return fmt.Errorf("Bad resource path %s", resourceLink)
1798+
}
1799+
if elts[0] != resourceApiService {
1800+
return fmt.Errorf("Bad resource service %s in %s", elts[0], resourceLink)
1801+
}
1802+
if elts[2] != resourceProject {
1803+
return fmt.Errorf("Expected %s to start with %s in resource %s", elts[2:], resourceProject, resourceLink)
1804+
}
1805+
return elts[2:]
17501806
}
17511807

17521808
func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) (*gce.CloudDisk, error) {

pkg/gce-pd-csi-driver/node.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"os"
2121
"path/filepath"
2222
"runtime"
23+
"time"
2324

2425
"google.golang.org/grpc/codes"
2526
"google.golang.org/grpc/status"
@@ -46,6 +47,15 @@ type GCENodeServer struct {
4647
// A map storing all volumes with ongoing operations so that additional operations
4748
// for that same volume (as defined by VolumeID) return an Aborted error
4849
volumeLocks *common.VolumeLocks
50+
51+
// If set, this semaphore will be used to serialize formatAndMount. It will be raised
52+
// when the operation starts, and lowered either when finished, or when
53+
// formatAndMountTimeout has expired.
54+
//
55+
// This is used only on linux (where memory problems for concurrent fsck and mkfs have
56+
// been observed).
57+
formatAndMountSemaphore chan any
58+
formatAndMountTimeout time.Duration
4959
}
5060

5161
var _ csi.NodeServer = &GCENodeServer{}
@@ -86,6 +96,12 @@ func (ns *GCENodeServer) isVolumePathMounted(path string) bool {
8696
return false
8797
}
8898

99+
func (ns *GCENodeServer) WithSerializedFormatAndMount(timeout time.Duration) *GCENodeServer {
100+
ns.formatAndMountSemaphore = make(chan any, 1)
101+
ns.formatAndMountTimeout = timeout
102+
return ns
103+
}
104+
89105
func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
90106
// Validate Arguments
91107
targetPath := req.GetTargetPath()
@@ -318,7 +334,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
318334
klog.V(4).Infof("CSI volume is read-only, mounting with extra option ro")
319335
}
320336

321-
err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
337+
err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
322338
if err != nil {
323339
// If a volume is created from a content source like snapshot or cloning, the filesystem might get marked
324340
// as "dirty" even if it is otherwise consistent and ext3/4 will try to restore to a consistent state by replaying
@@ -329,7 +345,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
329345
klog.V(4).Infof("Failed to mount CSI volume read-only, retry mounting with extra option noload")
330346

331347
options = append(options, "noload")
332-
err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
348+
err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
333349
if err == nil {
334350
klog.V(4).Infof("NodeStageVolume succeeded with \"noload\" option on %v to %s", volumeID, stagingTargetPath)
335351
return &csi.NodeStageVolumeResponse{}, nil

pkg/gce-pd-csi-driver/node_test.go

+27-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"path/filepath"
2424
"strings"
2525
"testing"
26+
"time"
2627

2728
"k8s.io/utils/exec"
2829
testingexec "k8s.io/utils/exec/testing"
@@ -58,7 +59,7 @@ func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, dev
5859
return gceDriver
5960
}
6061

61-
func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
62+
func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
6263
gceDriver := GetGCEDriver()
6364
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
6465
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter))
@@ -69,6 +70,18 @@ func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *
6970
return gceDriver
7071
}
7172

73+
func getTestBlockingFormatAndMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
74+
gceDriver := GetGCEDriver()
75+
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
76+
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter)).WithSerializedFormatAndMount(5 * time.Second)
77+
78+
err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nodeServer)
79+
if err != nil {
80+
t.Fatalf("Failed to setup GCE Driver: %v", err)
81+
}
82+
return gceDriver
83+
}
84+
7285
func makeFakeCmd(fakeCmd *testingexec.FakeCmd, cmd string, args ...string) testingexec.FakeCommandAction {
7386
c := cmd
7487
a := args
@@ -849,9 +862,7 @@ func TestNodeGetCapabilities(t *testing.T) {
849862
}
850863
}
851864

852-
func TestConcurrentNodeOperations(t *testing.T) {
853-
readyToExecute := make(chan chan struct{}, 1)
854-
gceDriver := getTestBlockingGCEDriver(t, readyToExecute)
865+
func runBlockingFormatAndMount(t *testing.T, gceDriver *GCEDriver, readyToExecute chan chan struct{}) {
855866
ns := gceDriver.ns
856867
tempDir, err := ioutil.TempDir("", "cno")
857868
if err != nil {
@@ -931,3 +942,15 @@ func TestConcurrentNodeOperations(t *testing.T) {
931942
t.Errorf("Unexpected error: %v", err)
932943
}
933944
}
945+
946+
func TestBlockingMount(t *testing.T) {
947+
readyToExecute := make(chan chan struct{}, 1)
948+
gceDriver := getTestBlockingMountGCEDriver(t, readyToExecute)
949+
runBlockingFormatAndMount(t, gceDriver, readyToExecute)
950+
}
951+
952+
func TestBlockingFormatAndMount(t *testing.T) {
953+
readyToExecute := make(chan chan struct{}, 1)
954+
gceDriver := getTestBlockingFormatAndMountGCEDriver(t, readyToExecute)
955+
runBlockingFormatAndMount(t, gceDriver, readyToExecute)
956+
}

0 commit comments

Comments
 (0)