diff --git a/Dockerfile b/Dockerfile index 47f6313dc..20cd5aa6d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,7 +27,7 @@ FROM gke.gcr.io/debian-base:bookworm-v1.0.4-gke.2 AS debian # Install necessary dependencies # google_nvme_id script depends on the following packages: nvme-cli, xxd, bash -RUN clean-install util-linux e2fsprogs mount ca-certificates udev xfsprogs nvme-cli xxd bash +RUN clean-install util-linux e2fsprogs mount ca-certificates udev xfsprogs nvme-cli xxd bash kmod lvm2 mdadm # Since we're leveraging apt to pull in dependencies, we use `gcr.io/distroless/base` because it includes glibc. FROM gcr.io/distroless/base-debian12 AS distroless-base @@ -56,6 +56,35 @@ COPY --from=debian /sbin/e2fsck /sbin/e2fsck COPY --from=debian /sbin/fsck /sbin/fsck COPY --from=debian /sbin/fsck* /sbin/ COPY --from=debian /sbin/fsck.xfs /sbin/fsck.xfs +# Add dependencies for LVM +COPY --from=debian /etc/lvm /lvm-tmp/lvm +COPY --from=debian /lib/systemd/system/blk-availability.service /lib/systemd/system/blk-availability.service +COPY --from=debian /lib/systemd/system/lvm2-lvmpolld.service /lib/systemd/system/lvm2-lvmpolld.service +COPY --from=debian /lib/systemd/system/lvm2-lvmpolld.socket /lib/systemd/system/lvm2-lvmpolld.socket +COPY --from=debian /lib/systemd/system/lvm2-monitor.service /lib/systemd/system/lvm2-monitor.service +COPY --from=debian /lib/udev/rules.d/56-lvm.rules /lib/udev/rules.d/56-lvm.rules +COPY --from=debian /sbin/fsadm /sbin/fsadm +COPY --from=debian /sbin/lvm /sbin/lvm +COPY --from=debian /sbin/lvmdump /sbin/lvmdump +COPY --from=debian /sbin/lvmpolld /sbin/lvmpolld +COPY --from=debian /usr/lib/tmpfiles.d /usr/lib/tmpfiles.d +COPY --from=debian /usr/lib/tmpfiles.d/lvm2.conf /usr/lib/tmpfiles.d/lvm2.conf +COPY --from=debian /sbin/lv* /sbin/ +COPY --from=debian /sbin/pv* /sbin/ +COPY --from=debian /sbin/vg* /sbin/ +COPY --from=debian /bin/lsblk /bin/lsblk +COPY --from=debian /sbin/modprobe /sbin/modprobe +COPY --from=debian /lib/udev /lib/udev +COPY --from=debian /lib/udev/rules.d /lib/udev/rules.d +COPY --from=debian /lib/udev/rules.d/55-dm.rules /lib/udev/rules.d/55-dm.rules +COPY --from=debian /lib/udev/rules.d/60-persistent-storage-dm.rules /lib/udev/rules.d/60-persistent-storage-dm.rules +COPY --from=debian /lib/udev/rules.d/95-dm-notify.rules /lib/udev/rules.d/95-dm-notify.rules +COPY --from=debian /sbin/blkdeactivate /sbin/blkdeactivate +COPY --from=debian /sbin/dmsetup /sbin/dmsetup +COPY --from=debian /sbin/dmstats /sbin/dmstats +COPY --from=debian /bin/ls /bin/ls +# End of dependencies for LVM +COPY --from=debian /sbin/mdadm /sbin/mdadm COPY --from=debian /sbin/mke2fs /sbin/mke2fs COPY --from=debian /sbin/mkfs* /sbin/ COPY --from=debian /sbin/resize2fs /sbin/resize2fs @@ -71,14 +100,20 @@ COPY --from=debian /bin/date /bin/date COPY --from=debian /bin/grep /bin/grep COPY --from=debian /bin/sed /bin/sed COPY --from=debian /bin/ln /bin/ln +COPY --from=debian /bin/cp /bin/cp COPY --from=debian /bin/udevadm /bin/udevadm # Copy shared libraries into distroless base. COPY --from=debian /lib/${LIB_DIR_PREFIX}-linux-gnu/libselinux.so.1 \ + /lib/${LIB_DIR_PREFIX}-linux-gnu/libdl.so.2 \ + /lib/${LIB_DIR_PREFIX}-linux-gnu/libpthread.so.0 \ /lib/${LIB_DIR_PREFIX}-linux-gnu/libtinfo.so.6 \ /lib/${LIB_DIR_PREFIX}-linux-gnu/libe2p.so.2 \ /lib/${LIB_DIR_PREFIX}-linux-gnu/libcom_err.so.2 \ /lib/${LIB_DIR_PREFIX}-linux-gnu/libdevmapper.so.1.02.1 \ + /lib/${LIB_DIR_PREFIX}-linux-gnu/libm.so.6 \ + /lib/${LIB_DIR_PREFIX}-linux-gnu/libc.so.6 \ + /lib/${LIB_DIR_PREFIX}-linux-gnu/libdevmapper-event.so.1.02.1 \ /lib/${LIB_DIR_PREFIX}-linux-gnu/libext2fs.so.2 \ /lib/${LIB_DIR_PREFIX}-linux-gnu/libgcc_s.so.1 \ /lib/${LIB_DIR_PREFIX}-linux-gnu/liblzma.so.5 \ @@ -99,11 +134,17 @@ COPY --from=debian /lib/${LIB_DIR_PREFIX}-linux-gnu/libselinux.so.1 \ /lib/${LIB_DIR_PREFIX}-linux-gnu/libzstd.so.1 /lib/${LIB_DIR_PREFIX}-linux-gnu/ COPY --from=debian /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libblkid.so.1 \ + /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libsmartcols.so.1 \ /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libbsd.so.0 \ /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libinih.so.1 \ /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libmount.so.1 \ /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libudev.so.1 \ /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libuuid.so.1 \ + /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libzstd.so.1 \ + /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libaio.so.1 \ + /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libgcrypt.so.20 \ + /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libsystemd.so.0 \ + /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/liblz4.so.1 \ /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libacl.so.1 \ /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libattr.so.1 \ /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libedit.so.2 \ @@ -118,4 +159,5 @@ COPY --from=debian /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libblkid.so.1 \ # Copy NVME support required script and rules into distroless base. COPY deploy/kubernetes/udev/google_nvme_id /lib/udev_containerized/google_nvme_id -ENTRYPOINT ["/gce-pd-csi-driver"] +COPY --from=builder /go/src/sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/initialize-driver.sh /initialize-driver.sh +ENTRYPOINT ["/initialize-driver.sh"] diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 2447572bc..458b45619 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -27,9 +27,12 @@ import ( "strings" "time" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/klog/v2" "k8s.io/utils/strings/slices" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" @@ -76,6 +79,8 @@ var ( fallbackRequisiteZonesFlag = flag.String("fallback-requisite-zones", "", "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk") enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools") enableHdHAFlag = flag.Bool("allow-hdha-provisioning", false, "If set to true, will allow the driver to provision Hyperdisk-balanced High Availability disks") + enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with data cache configuration") + nodeName = flag.String("node-name", "", "The node this driver is running on") multiZoneVolumeHandleDiskTypesFlag = flag.String("multi-zone-volume-handle-disk-types", "", "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable") multiZoneVolumeHandleEnableFlag = flag.Bool("multi-zone-volume-handle-enable", false, "If set to true, the multi-zone volumeHandle feature will be enabled") @@ -97,7 +102,9 @@ var ( ) const ( - driverName = "pd.csi.storage.gke.io" + driverName = "pd.csi.storage.gke.io" + dataCacheLabel = "datacache-storage-gke-io" + dataCacheLabelValue = "enabled" ) func init() { @@ -226,7 +233,7 @@ func handle() { } initialBackoffDuration := time.Duration(*errorBackoffInitialDurationMs) * time.Millisecond maxBackoffDuration := time.Duration(*errorBackoffMaxDurationMs) * time.Millisecond - controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag) + controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag) } else if *cloudConfigFilePath != "" { klog.Warningf("controller service is disabled but cloud config given - it has no effect") } @@ -247,6 +254,7 @@ func handle() { nsArgs := driver.NodeServerArgs{ EnableDeviceInUseCheck: *enableDeviceInUseCheck, DeviceInUseTimeout: *deviceInUseTimeout, + EnableDataCache: *enableDataCacheFlag, } nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs) if *maxConcurrentFormatAndMount > 0 { @@ -254,6 +262,15 @@ func handle() { } } + if *enableDataCacheFlag { + if nodeName == nil || *nodeName == "" { + klog.Errorf("Data cache enabled, but --node-name not passed") + } + if err := setupDataCache(ctx, *nodeName); err != nil { + klog.Errorf("DataCache setup failed: %v", err) + } + } + err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer) if err != nil { klog.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error()) @@ -332,3 +349,33 @@ func urlFlag(target **url.URL, name string, usage string) { return err }) } + +func setupDataCache(ctx context.Context, nodeName string) error { + klog.V(2).Infof("Setting up data cache for node %s", nodeName) + if nodeName != common.TestNode { + cfg, err := rest.InClusterConfig() + if err != nil { + return err + } + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return err + } + node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + // We could retry, but this error will also crashloop the driver which may be as good a way to retry as any. + return err + } + if val, found := node.GetLabels()[dataCacheLabel]; !found || val != dataCacheLabelValue { + klog.V(2).Infof("Datacache not enabled for node %s; node label %s=%s and not %s", nodeName, dataCacheLabel, val, dataCacheLabelValue) + return nil + } + } + klog.V(2).Info("Raiding local ssds to setup data cache") + if err := driver.RaidLocalSsds(); err != nil { + return fmt.Errorf("Failed to Raid local SSDs, unable to setup data caching, got error %v", err) + } + + klog.V(2).Infof("Datacache enabled for node %s", nodeName) + return nil +} diff --git a/deploy/kubernetes/base/controller/cluster_setup.yaml b/deploy/kubernetes/base/controller/cluster_setup.yaml index a6941a119..f92b640bc 100644 --- a/deploy/kubernetes/base/controller/cluster_setup.yaml +++ b/deploy/kubernetes/base/controller/cluster_setup.yaml @@ -205,6 +205,9 @@ rules: verbs: ['use'] resourceNames: - csi-gce-pd-node-psp + - apiGroups: [""] + resources: ["nodes"] + verbs: ["get", "list"] --- kind: ClusterRole diff --git a/deploy/kubernetes/base/node_linux/node.yaml b/deploy/kubernetes/base/node_linux/node.yaml index ebf7779cf..b191b2881 100644 --- a/deploy/kubernetes/base/node_linux/node.yaml +++ b/deploy/kubernetes/base/node_linux/node.yaml @@ -46,8 +46,15 @@ spec: - "--v=5" - "--endpoint=unix:/csi/csi.sock" - "--run-controller-service=false" + - "--enable-data-cache" + - "--node-name=$(KUBE_NODE_NAME)" securityContext: privileged: true + env: + - name: KUBE_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName volumeMounts: - name: kubelet-dir mountPath: /var/lib/kubelet @@ -66,6 +73,8 @@ spec: mountPath: /run/udev - name: sys mountPath: /sys + - name: lib-modules + mountPath: /lib/modules volumes: - name: registration-dir hostPath: @@ -101,6 +110,10 @@ spec: hostPath: path: /sys type: Directory + - name: lib-modules + hostPath: + path: /lib/modules + type: Directory # https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/ # See "special case". This will tolerate everything. Node component should # be scheduled on all nodes. diff --git a/initialize-driver.sh b/initialize-driver.sh new file mode 100755 index 000000000..fe5a615c8 --- /dev/null +++ b/initialize-driver.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +/bin/cp -r /lvm-tmp/lvm /etc/ +/bin/sed -i -e "s/.*allow_mixed_block_sizes = 0.*/ allow_mixed_block_sizes = 1/" /etc/lvm/lvm.conf +/bin/sed -i -e "s/.*udev_sync = 1.*/ udev_sync = 0/" /etc/lvm/lvm.conf +/bin/sed -i -e "s/.*udev_rules = 1.*/ udev_rules = 0/" /etc/lvm/lvm.conf +/bin/sed -i -e "s/.*locking_dir = .*/ locking_dir = \"\/tmp\"/" /etc/lvm/lvm.conf + +/gce-pd-csi-driver "$@" \ No newline at end of file diff --git a/pkg/common/constants.go b/pkg/common/constants.go index 9851b11d1..a9f070f82 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -32,4 +32,16 @@ const ( // Label that is set on a disk when it is used by a 'multi-zone' VolumeHandle MultiZoneLabel = "goog-gke-multi-zone" + + // Data cache mode + DataCacheModeWriteBack = "writeback" + DataCacheModeWriteThrough = "writethrough" + + ContextDataCacheSize = "data-cache-size" + ContextDataCacheMode = "data-cache-mode" + + // Keys in the publish context + ContexLocalSsdCacheSize = "local-ssd-cache-size" + // Node name for E2E tests + TestNode = "test-node-csi-e2e" ) diff --git a/pkg/common/parameters.go b/pkg/common/parameters.go index e88d876b8..7f59e69d4 100644 --- a/pkg/common/parameters.go +++ b/pkg/common/parameters.go @@ -18,6 +18,7 @@ package common import ( "fmt" + "strconv" "strings" ) @@ -35,9 +36,13 @@ const ( ParameterAvailabilityClass = "availability-class" ParameterKeyEnableConfidentialCompute = "enable-confidential-storage" ParameterKeyStoragePools = "storage-pools" - ParameterKeyResourceTags = "resource-tags" - ParameterKeyEnableMultiZoneProvisioning = "enable-multi-zone-provisioning" - ParameterHdHADiskType = "hyperdisk-balanced-high-availability" + + // Parameters for Data Cache + ParameterKeyDataCacheSize = "data-cache-size" + ParameterKeyDataCacheMode = "data-cache-mode" + ParameterKeyResourceTags = "resource-tags" + ParameterKeyEnableMultiZoneProvisioning = "enable-multi-zone-provisioning" + ParameterHdHADiskType = "hyperdisk-balanced-high-availability" // Parameters for VolumeSnapshotClass ParameterKeyStorageLocations = "storage-locations" @@ -73,6 +78,15 @@ const ( tagKeyCreatedForSnapshotContentName = "kubernetes.io/created-for/volumesnapshotcontent/name" ) +type DataCacheParameters struct { + // Values: {string} in int64 form + // Default: "" + DataCacheSize string + // Values: writethrough, writeback + // Default: writethrough + DataCacheMode string +} + // DiskParameters contains normalized and defaulted disk parameters type DiskParameters struct { // Values: pd-standard, pd-balanced, pd-ssd, or any other PD disk type. Not validated. @@ -152,7 +166,8 @@ type ModifyVolumeParameters struct { // put them into a well defined struct making sure to default unspecified fields. // extraVolumeLabels are added as labels; if there are also labels specified in // parameters, any matching extraVolumeLabels will be overridden. -func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string]string, extraVolumeLabels map[string]string, extraTags map[string]string) (DiskParameters, error) { +func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string]string, extraVolumeLabels map[string]string, enableDataCache bool, extraTags map[string]string) (DiskParameters, DataCacheParameters, error) { + p := DiskParameters{ DiskType: "pd-standard", // Default ReplicationType: replicationTypeNone, // Default @@ -162,6 +177,12 @@ func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string] ResourceTags: make(map[string]string), // Default } + // Set data cache mode default + d := DataCacheParameters{} + if enableDataCache && parameters[ParameterKeyDataCacheSize] != "" { + d.DataCacheMode = DataCacheModeWriteThrough + } + for k, v := range extraVolumeLabels { p.Labels[k] = v } @@ -180,7 +201,7 @@ func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string] if v != "" { p.DiskType = strings.ToLower(v) if !pp.EnableHdHA && p.DiskType == ParameterHdHADiskType { - return p, fmt.Errorf("parameters contain invalid disk type %s", ParameterHdHADiskType) + return p, d, fmt.Errorf("parameters contain invalid disk type %s", ParameterHdHADiskType) } } case ParameterKeyReplicationType: @@ -199,7 +220,7 @@ func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string] case ParameterKeyLabels: paramLabels, err := ConvertLabelsStringToMap(v) if err != nil { - return p, fmt.Errorf("parameters contain invalid labels parameter: %w", err) + return p, d, fmt.Errorf("parameters contain invalid labels parameter: %w", err) } // Override any existing labels with those from this parameter. for labelKey, labelValue := range paramLabels { @@ -208,22 +229,22 @@ func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string] case ParameterKeyProvisionedIOPSOnCreate: paramProvisionedIOPSOnCreate, err := ConvertStringToInt64(v) if err != nil { - return p, fmt.Errorf("parameters contain invalid provisionedIOPSOnCreate parameter: %w", err) + return p, d, fmt.Errorf("parameters contain invalid provisionedIOPSOnCreate parameter: %w", err) } p.ProvisionedIOPSOnCreate = paramProvisionedIOPSOnCreate case ParameterKeyProvisionedThroughputOnCreate: paramProvisionedThroughputOnCreate, err := ConvertMiStringToInt64(v) if err != nil { - return p, fmt.Errorf("parameters contain invalid provisionedThroughputOnCreate parameter: %w", err) + return p, d, fmt.Errorf("parameters contain invalid provisionedThroughputOnCreate parameter: %w", err) } if paramProvisionedThroughputOnCreate < 0 { - return p, fmt.Errorf("parameter provisionedThroughputOnCreate cannot be negative") + return p, d, fmt.Errorf("parameter provisionedThroughputOnCreate cannot be negative") } p.ProvisionedThroughputOnCreate = paramProvisionedThroughputOnCreate case ParameterAvailabilityClass: paramAvailabilityClass, err := ConvertStringToAvailabilityClass(v) if err != nil { - return p, fmt.Errorf("parameters contain invalid availability class parameter: %w", err) + return p, d, fmt.Errorf("parameters contain invalid availability class parameter: %w", err) } if paramAvailabilityClass == ParameterRegionalHardFailoverClass { p.ForceAttach = true @@ -231,37 +252,56 @@ func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string] case ParameterKeyEnableConfidentialCompute: paramEnableConfidentialCompute, err := ConvertStringToBool(v) if err != nil { - return p, fmt.Errorf("parameters contain invalid value for enable-confidential-storage parameter: %w", err) + return p, d, fmt.Errorf("parameters contain invalid value for enable-confidential-storage parameter: %w", err) } if paramEnableConfidentialCompute { // DiskEncryptionKmsKey is needed to enable confidentialStorage if val, ok := parameters[ParameterKeyDiskEncryptionKmsKey]; !ok || !isValidDiskEncryptionKmsKey(val) { - return p, fmt.Errorf("Valid %v is required to enable ConfidentialStorage", ParameterKeyDiskEncryptionKmsKey) + return p, d, fmt.Errorf("Valid %v is required to enable ConfidentialStorage", ParameterKeyDiskEncryptionKmsKey) } } p.EnableConfidentialCompute = paramEnableConfidentialCompute case ParameterKeyStoragePools: if !pp.EnableStoragePools { - return p, fmt.Errorf("parameters contains invalid option %q", ParameterKeyStoragePools) + return p, d, fmt.Errorf("parameters contains invalid option %q", ParameterKeyStoragePools) } storagePools, err := ParseStoragePools(v) if err != nil { - return p, fmt.Errorf("parameters contains invalid value for %s parameter %q: %w", ParameterKeyStoragePools, v, err) + return p, d, fmt.Errorf("parameters contains invalid value for %s parameter %q: %w", ParameterKeyStoragePools, v, err) } p.StoragePools = storagePools + case ParameterKeyDataCacheSize: + if !enableDataCache { + return p, d, fmt.Errorf("data caching enabled: %v; parameters contains invalid option %q", enableDataCache, ParameterKeyDataCacheSize) + } + // TODO: need to parse or validate the string + + paramDataCacheSize, err := ConvertGiStringToInt64(v) + if err != nil { + return p, d, fmt.Errorf("parameters contain invalid dataCacheSize parameter: %w", err) + } + d.DataCacheSize = strconv.FormatInt(paramDataCacheSize, 10) + case ParameterKeyDataCacheMode: + if !enableDataCache { + return p, d, fmt.Errorf("data caching enabled %v; parameters contains invalid option %q", enableDataCache, ParameterKeyDataCacheSize) + } + if err := ValidateDataCacheMode(v); err != nil { + return p, d, fmt.Errorf("parameters contains invalid option: %w", err) + } + d.DataCacheMode = v case ParameterKeyResourceTags: if err := extractResourceTagsParameter(v, p.ResourceTags); err != nil { - return p, err + return p, d, err } case ParameterKeyEnableMultiZoneProvisioning: if !pp.EnableMultiZone { - return p, fmt.Errorf("parameters contains invalid option %q", ParameterKeyEnableMultiZoneProvisioning) + return p, d, fmt.Errorf("parameters contains invalid option %q", ParameterKeyEnableMultiZoneProvisioning) } paramEnableMultiZoneProvisioning, err := ConvertStringToBool(v) if err != nil { - return p, fmt.Errorf("parameters contain invalid value for %s parameter: %w", ParameterKeyEnableMultiZoneProvisioning, err) + return p, d, fmt.Errorf("parameters contain invalid value for %s parameter: %w", ParameterKeyEnableMultiZoneProvisioning, err) } p.MultiZoneProvisioning = paramEnableMultiZoneProvisioning @@ -273,13 +313,13 @@ func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string] p.AccessMode = v } default: - return p, fmt.Errorf("parameters contains invalid option %q", k) + return p, d, fmt.Errorf("parameters contains invalid option %q", k) } } if len(p.Tags) > 0 { p.Tags[tagKeyCreatedBy] = pp.DriverName } - return p, nil + return p, d, nil } func ExtractAndDefaultSnapshotParameters(parameters map[string]string, driverName string, extraTags map[string]string) (SnapshotParameters, error) { diff --git a/pkg/common/parameters_test.go b/pkg/common/parameters_test.go index e83e59cfb..33434ec46 100644 --- a/pkg/common/parameters_test.go +++ b/pkg/common/parameters_test.go @@ -25,15 +25,17 @@ import ( func TestExtractAndDefaultParameters(t *testing.T) { tests := []struct { - name string - parameters map[string]string - labels map[string]string - enableStoragePools bool - enableMultiZone bool - enableHdHA bool - extraTags map[string]string - expectParams DiskParameters - expectErr bool + name string + parameters map[string]string + labels map[string]string + enableStoragePools bool + enableDataCache bool + enableMultiZone bool + enableHdHA bool + extraTags map[string]string + expectParams DiskParameters + expectDataCacheParams DataCacheParameters + expectErr bool }{ { name: "defaults", @@ -359,6 +361,55 @@ func TestExtractAndDefaultParameters(t *testing.T) { labels: map[string]string{}, expectErr: true, }, + { + name: "data cache parameters - set default cache mode", + enableDataCache: true, + parameters: map[string]string{ParameterKeyType: "pd-balanced", ParameterKeyReplicationType: "none", ParameterKeyDiskEncryptionKmsKey: "foo/key", ParameterKeyLabels: "key1=value1,key2=value2", ParameterKeyDataCacheSize: "1234Gi"}, + labels: map[string]string{}, + expectParams: DiskParameters{ + DiskType: "pd-balanced", + ReplicationType: "none", + DiskEncryptionKMSKey: "foo/key", + Tags: map[string]string{}, + Labels: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + ResourceTags: map[string]string{}, + }, + expectDataCacheParams: DataCacheParameters{ + DataCacheMode: DataCacheModeWriteThrough, + DataCacheSize: "1234", + }, + }, + { + name: "data cache parameters", + enableDataCache: true, + parameters: map[string]string{ParameterKeyType: "pd-balanced", ParameterKeyReplicationType: "none", ParameterKeyDiskEncryptionKmsKey: "foo/key", ParameterKeyLabels: "key1=value1,key2=value2", ParameterKeyDataCacheSize: "1234Gi", ParameterKeyDataCacheMode: DataCacheModeWriteBack}, + labels: map[string]string{}, + expectParams: DiskParameters{ + DiskType: "pd-balanced", + ReplicationType: "none", + DiskEncryptionKMSKey: "foo/key", + Tags: map[string]string{}, + Labels: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + ResourceTags: map[string]string{}, + }, + expectDataCacheParams: DataCacheParameters{ + DataCacheMode: DataCacheModeWriteBack, + DataCacheSize: "1234", + }, + }, + { + name: "data cache parameters - enableDataCache is false", + enableDataCache: false, + parameters: map[string]string{ParameterKeyType: "pd-balanced", ParameterKeyReplicationType: "none", ParameterKeyDiskEncryptionKmsKey: "foo/key", ParameterKeyLabels: "key1=value1,key2=value2", ParameterKeyDataCacheSize: "1234Gi", ParameterKeyDataCacheMode: DataCacheModeWriteBack}, + labels: map[string]string{}, + expectErr: true, + }, { name: "multi-zone-enable parameters, multi-zone label is set, multi-zone feature enabled", parameters: map[string]string{ParameterKeyType: "hyperdisk-ml", ParameterKeyEnableMultiZoneProvisioning: "true"}, @@ -423,7 +474,7 @@ func TestExtractAndDefaultParameters(t *testing.T) { EnableMultiZone: tc.enableMultiZone, EnableHdHA: tc.enableHdHA, } - p, err := pp.ExtractAndDefaultParameters(tc.parameters, tc.labels, tc.extraTags) + p, d, err := pp.ExtractAndDefaultParameters(tc.parameters, tc.labels, tc.enableDataCache, tc.extraTags) if gotErr := err != nil; gotErr != tc.expectErr { t.Fatalf("ExtractAndDefaultParameters(%+v) = %v; expectedErr: %v", tc.parameters, err, tc.expectErr) } @@ -434,6 +485,10 @@ func TestExtractAndDefaultParameters(t *testing.T) { if diff := cmp.Diff(tc.expectParams, p); diff != "" { t.Errorf("ExtractAndDefaultParameters(%+v): -want, +got \n%s", tc.parameters, diff) } + + if diff := cmp.Diff(tc.expectDataCacheParams, d); diff != "" { + t.Errorf("ExtractAndDefaultParameters(%+v) for data cache params: -want, +got \n%s", tc.parameters, diff) + } }) } } diff --git a/pkg/common/runcmd.go b/pkg/common/runcmd.go new file mode 100644 index 000000000..71240d2a9 --- /dev/null +++ b/pkg/common/runcmd.go @@ -0,0 +1,71 @@ +package common + +import ( + "fmt" + "os/exec" + "strings" + + "k8s.io/klog/v2" +) + +const ( + // Error thrown by exec cmd.Run() when process spawned by cmd.Start() completes before cmd.Wait() is called (see - k/k issue #103753) + errNoChildProcesses = "wait: no child processes" +) + +// RunCommand wraps a k8s exec to deal with the no child process error. Same as exec.CombinedOutput. +// On error, the output is included so callers don't need to echo it again. + +func RunCommand(pipeCmd string, pipeCmdArg string, cmd1 string, execCmdArgs ...string) ([]byte, error) { + execCmd1 := exec.Command(cmd1, execCmdArgs...) + + if pipeCmd != "" { + output, err := execPipeCommand(pipeCmd, pipeCmdArg, execCmd1) + if err != nil { + return nil, fmt.Errorf("%s %s failed here: %w; output: %s", pipeCmd, pipeCmdArg, err, string(output)) + } + return output, nil + } + output, err := execCmd1.CombinedOutput() + if err != nil { + err = checkError(err, *execCmd1) + return nil, fmt.Errorf("%s %s failed here 2: %w; output: %s", cmd1, strings.Join(execCmdArgs, " "), err, string(output)) + } + + return output, nil +} + +func checkError(err error, execCmd exec.Cmd) error { + if err.Error() == errNoChildProcesses { + if execCmd.ProcessState.Success() { + // If the process succeeded, this can be ignored, see k/k issue #103753 + return nil + } + // Get actual error + klog.Infof("Errored here") + err = &exec.ExitError{ProcessState: execCmd.ProcessState} + } + return err +} +func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]byte, error) { + + execPipeCmd := exec.Command(pipeCmd, pipeCmdArg) + stdoutPipe, err := execCmd1.StdoutPipe() + if err != nil { + klog.Errorf("failed command %v: got error:%v", execCmd1, err) + } + err = execCmd1.Start() + if err != nil { + klog.Infof("errored running command %v; error %v; ", execCmd1, err) + } + defer stdoutPipe.Close() + + execPipeCmd.Stdin = stdoutPipe + output, err := execPipeCmd.CombinedOutput() + if err != nil { + err = checkError(err, *execPipeCmd) + return nil, fmt.Errorf("%s failed: %w; output: %s", pipeCmd, err, string(output)) + } + + return output, nil +} diff --git a/pkg/common/utils.go b/pkg/common/utils.go index a69d30453..222207456 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "hash/fnv" "net/http" "regexp" "slices" @@ -78,6 +79,7 @@ const ( // Full or partial URL of the zone resource, in the format: // projects/{project}/zones/{zone} zoneURIPattern = "projects/[^/]+/zones/([^/]+)$" + alphanums = "bcdfghjklmnpqrstvwxz2456789" ) var ( @@ -101,6 +103,8 @@ var ( http.StatusConflict: codes.FailedPrecondition, } + validDataCacheMode = []string{DataCacheModeWriteBack, DataCacheModeWriteThrough} + // Regular expressions for validating parent_id, key and value of a resource tag. regexParent = regexp.MustCompile(`(^[1-9][0-9]{0,31}$)|(^[a-z][a-z0-9-]{4,28}[a-z0-9]$)`) regexKey = regexp.MustCompile(`^[a-zA-Z0-9]([0-9A-Za-z_.-]{0,61}[a-zA-Z0-9])?$`) @@ -392,6 +396,15 @@ func ConvertMiStringToInt64(str string) (int64, error) { return volumehelpers.RoundUpToMiB(quantity) } +// ConvertGiStringToInt64 converts a GiB string to int64 +func ConvertGiStringToInt64(str string) (int64, error) { + quantity, err := resource.ParseQuantity(str) + if err != nil { + return -1, err + } + return volumehelpers.RoundUpToGiB(quantity) +} + // ConvertStringToBool converts a string to a boolean. func ConvertStringToBool(str string) (bool, error) { switch strings.ToLower(str) { @@ -684,6 +697,22 @@ func VolumeIdAsMultiZone(volumeId string) (string, error) { return strings.Join(splitId, "/"), nil } +func StringInSlice(s string, list []string) bool { + for _, v := range list { + if v == s { + return true + } + } + return false +} + +func ValidateDataCacheMode(s string) error { + if StringInSlice(s, validDataCacheMode) { + return nil + } + return fmt.Errorf("invalid data-cache-mode %s. Only \"writeback\" and \"writethrough\" is a valid input", s) +} + // NewLimiter returns a token bucket based request rate limiter after initializing // the passed values for limit, burst (or token bucket) size. If opted for emptyBucket // all initial tokens are reserved for the first burst. @@ -696,3 +725,16 @@ func NewLimiter(limit, burst int, emptyBucket bool) *rate.Limiter { return limiter } + +// shortString is inspired by k8s.io/apimachinery/pkg/util/rand.SafeEncodeString, but takes data from a hash. +func ShortString(s string) string { + hasher := fnv.New128a() + hasher.Write([]byte(s)) + sum := hasher.Sum([]byte{}) + const sz = 8 + short := make([]byte, sz) + for i := 0; i < sz; i++ { + short[i] = alphanums[int(sum[i])%len(alphanums)] + } + return string(short) +} diff --git a/pkg/common/utils_test.go b/pkg/common/utils_test.go index c1307e734..9f9dd58b3 100644 --- a/pkg/common/utils_test.go +++ b/pkg/common/utils_test.go @@ -1044,6 +1044,108 @@ func TestConvertMiStringToInt64(t *testing.T) { } } +func TestConvertGiStringToInt64(t *testing.T) { + tests := []struct { + desc string + inputStr string + expInt64 int64 + expectError bool + }{ + { + desc: "valid number string", + inputStr: "10000", + expInt64: 1, + expectError: false, + }, + { + desc: "round Ki to GiB", + inputStr: "1000000Ki", + expInt64: 1, + expectError: false, + }, + { + desc: "round k to GiB", + inputStr: "1000000k", + expInt64: 1, + expectError: false, + }, + { + desc: "round Mi to GiB", + inputStr: "1000Mi", + expInt64: 1, + expectError: false, + }, + { + desc: "round M to GiB", + inputStr: "1000M", + expInt64: 1, + expectError: false, + }, + { + desc: "round G to GiB", + inputStr: "1000G", + expInt64: 932, + expectError: false, + }, + { + desc: "round Gi to GiB - most common case", + inputStr: "1234Gi", + expInt64: 1234, + expectError: false, + }, + { + desc: "round decimal to GiB", + inputStr: "1.2Gi", + expInt64: 2, + expectError: false, + }, + { + desc: "round big value to GiB", + inputStr: "8191Pi", + expInt64: 8588886016, + expectError: false, + }, + { + desc: "invalid empty string", + inputStr: "", + expInt64: 0, + expectError: true, + }, + { + desc: "invalid KiB string", + inputStr: "10KiB", + expInt64: 10000, + expectError: true, + }, + { + desc: "invalid GB string", + inputStr: "10GB", + expInt64: 0, + expectError: true, + }, + { + desc: "invalid string", + inputStr: "ew%65", + expInt64: 0, + expectError: true, + }, + } + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + actualInt64, err := ConvertGiStringToInt64(tc.inputStr) + if err != nil && !tc.expectError { + t.Errorf("Got error %v converting string to int64 %s; expect no error", err, tc.inputStr) + } + if err == nil && tc.expectError { + t.Errorf("Got no error converting string to int64 %s; expect an error", tc.inputStr) + } + if err == nil && actualInt64 != tc.expInt64 { + t.Errorf("Got %d for converting string to int64; expect %d", actualInt64, tc.expInt64) + } + }) + } +} + func TestConvertStringToBool(t *testing.T) { tests := []struct { desc string @@ -1657,6 +1759,70 @@ func TestUnorderedSlicesEqual(t *testing.T) { } } +func TestStringInSlice(t *testing.T) { + testCases := []struct { + name string + inputStr string + inputSlice []string + expectedInSlice bool + }{ + { + name: "string is in the slice", + inputStr: "in slice", + inputSlice: []string{"in slice", "other string"}, + expectedInSlice: true, + }, + { + name: "string is NOT in the slice", + inputStr: "not in slice", + inputSlice: []string{"other string"}, + }, + } + + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + actualResult := StringInSlice(tc.inputStr, tc.inputSlice) + if actualResult != tc.expectedInSlice { + t.Errorf("Expect value is %v but got %v. inputStr is %s, inputSlice is %v", tc.expectedInSlice, actualResult, tc.inputStr, tc.inputSlice) + } + } +} + +func TestValidateDataCacheMode(t *testing.T) { + testCases := []struct { + name string + inputStr string + expectError bool + }{ + { + name: "valid input - writethrough", + inputStr: "writethrough", + }, + { + name: "valid input - writeback", + inputStr: "writeback", + }, + { + name: "invalid input", + inputStr: "write-back not valid", + expectError: true, + }, + } + + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + err := ValidateDataCacheMode(tc.inputStr) + if err != nil && !tc.expectError { + t.Errorf("Got error %v validate data cache mode %s; expect no error", err, tc.inputStr) + } + + if err == nil && tc.expectError { + t.Errorf("Got no error validate data cache mode %s; expect an error", tc.inputStr) + } + } + +} + func TestParseZoneFromURI(t *testing.T) { testcases := []struct { name string diff --git a/pkg/deviceutils/device-utils.go b/pkg/deviceutils/device-utils.go index 24a6e8c93..1f7a8fde3 100644 --- a/pkg/deviceutils/device-utils.go +++ b/pkg/deviceutils/device-utils.go @@ -231,7 +231,7 @@ func (m *deviceUtils) VerifyDevicePath(devicePaths []string, deviceName string) devicePath, innerErr = existingDevicePath(devicePaths) if innerErr != nil { e := fmt.Errorf("for disk %s failed to check for existing device path: %w", deviceName, innerErr) - klog.Errorf(e.Error()) + klog.Errorf("Error: %s", e.Error()) return false, e } @@ -243,7 +243,7 @@ func (m *deviceUtils) VerifyDevicePath(devicePaths []string, deviceName string) innerErr := udevadmTriggerForDiskIfExists(deviceName) if innerErr != nil { e := fmt.Errorf("for disk %s failed to trigger udevadm fix of non existent device path: %w", deviceName, innerErr) - klog.Errorf(e.Error()) + klog.Errorf("Error: %s", e.Error()) return false, e } // Go to next retry loop to get the deviceName again after @@ -256,7 +256,7 @@ func (m *deviceUtils) VerifyDevicePath(devicePaths []string, deviceName string) devFsPath, innerErr := filepath.EvalSymlinks(devicePath) if innerErr != nil { e := fmt.Errorf("filepath.EvalSymlinks(%q) failed: %w", devicePath, innerErr) - klog.Errorf(e.Error()) + klog.Errorf("Error: %s", e.Error()) return false, e } klog.V(4).Infof("For disk %s the /dev/* path is %s for disk/by-id path %s", deviceName, devFsPath, devicePath) @@ -264,7 +264,7 @@ func (m *deviceUtils) VerifyDevicePath(devicePaths []string, deviceName string) devFsSerial, innerErr := getDevFsSerial(devFsPath) if innerErr != nil { e := fmt.Errorf("couldn't get serial number for disk %s at device path %s: %w", deviceName, devFsPath, innerErr) - klog.Errorf(e.Error()) + klog.Errorf("Error: %s", e.Error()) return false, e } klog.V(4).Infof("For disk %s, device path %s, found serial number %s", deviceName, devFsPath, devFsSerial) @@ -281,7 +281,7 @@ func (m *deviceUtils) VerifyDevicePath(devicePaths []string, deviceName string) innerErr = udevadmTriggerForDiskIfExists(deviceName) if innerErr != nil { e := fmt.Errorf("failed to trigger udevadm fix of misconfigured disk for %q: %w", deviceName, innerErr) - klog.Errorf(e.Error()) + klog.Errorf("Error: %s", e.Error()) return false, e } // Go to next retry loop to get the deviceName again after diff --git a/pkg/gce-pd-csi-driver/cache.go b/pkg/gce-pd-csi-driver/cache.go new file mode 100644 index 000000000..d9bd5454f --- /dev/null +++ b/pkg/gce-pd-csi-driver/cache.go @@ -0,0 +1,368 @@ +package gceGCEDriver + +import ( + "fmt" + "regexp" + "strconv" + "strings" + + csi "github.com/container-storage-interface/spec/lib/go/csi" + + "k8s.io/klog/v2" + + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" +) + +const ( + cacheSuffix = "csi-fast" + mainLvSuffix = "csi-main" + raidedLocalSsdName = "csi-driver-data-cache" + raidMode = "0" + raidedLssdPrefix = "/dev/md/" +) + +var raidedLocalSsdPath = raidedLssdPrefix + raidedLocalSsdName + +func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId string) (string, error) { + volumeId := req.GetVolumeId() + volumeGroupName := getVolumeGroupName(nodeId) + mainDevicePath := "/dev/" + volumeGroupName + "/" + getLvName(mainLvSuffix, volumeId) + mainLvName := getLvName(mainLvSuffix, volumeId) + klog.V(2).Infof("Volume group available on node %v ", volumeGroupName) + + info, err := common.RunCommand("grep", raidedLocalSsdName, "ls", raidedLssdPrefix) + if err != nil { + klog.Errorf("failed while listing raided devices, err: %v, output:%v", err, info) + } + infoString := strings.TrimSpace(string(info)) + raidedLocalSsdPath = raidedLssdPrefix + infoString + + vgExists := checkVgExists(volumeGroupName) + if vgExists { + // Clean up Volume Group before adding the PD + reduceVolumeGroup(volumeGroupName, true) + } else { + err := createVg(volumeGroupName, devicePath, raidedLocalSsdPath) + if err != nil { + return mainDevicePath, err + } + } + + // Check if the Physical Volume(PV) is part of some other volume group + args := []string{ + "--select", + "pv_name=" + devicePath, + "-o", + "vg_name", + } + info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "pvs", args...) + if err != nil { + klog.Errorf("errored while checking physical volume details %v: %s", err, info) + // On error info contains the error message which we cannot use for further steps + info = nil + } + + infoString = strings.TrimSpace(strings.ReplaceAll(string(info), "\n", " ")) + infoString = strings.ReplaceAll(infoString, ".", "") + infoString = strings.ReplaceAll(infoString, "\"", "") + infoSlice := strings.Split(strings.TrimSpace(infoString), " ") + vgNameForPv := strings.TrimSpace(infoSlice[(len(infoSlice) - 1)]) + if vgNameForPv == volumeGroupName { + klog.V(2).Infof("Physical Volume(PV) already exists in the Volume Group %v", volumeGroupName) + } else if vgNameForPv != "VG" && vgNameForPv != "" { + info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgchange", []string{"-an", vgNameForPv}...) + if err != nil { + klog.Errorf("Errored while deactivating VG %v: err: %v: %s", vgNameForPv, err, info) + } + // CLean up volume group to remove any dangling PV refrences + reduceVolumeGroup(vgNameForPv, false) + _, isCached := isCachingSetup(mainLvName) + // We will continue to uncache even if it errors to check caching as it is not a terminal issue. + if isCached { + // Uncache LV + args = []string{ + "--uncache", + vgNameForPv + "/" + mainLvName, + "--force", + "-y", // force remove cache without flushing data + } + info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvconvert", args...) + if err != nil { + return "", fmt.Errorf("errored while uncaching main LV. %v: %s", err, info) + } + // CLean up volume group to remove any dangling PV refrences + reduceVolumeGroup(vgNameForPv, false) + } + info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgmerge", []string{volumeGroupName, vgNameForPv}...) + if err != nil { + return "", fmt.Errorf("Errored while merging the PV Volume group %s into %s %v: %s", vgNameForPv, volumeGroupName, err, info) + } + + } else { + info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgextend", []string{volumeGroupName, devicePath}...) + if err != nil { + return "", fmt.Errorf("Errored while extending Volume group to add PV %v, error: %v: %s", devicePath, err, info) + } + } + + // Create LV if not already created + args = []string{ + "--select", + "vg_name=" + volumeGroupName, + "-o", + "lv_name", + } + lvList, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvs", args...) + if err != nil { + return mainDevicePath, fmt.Errorf("Errored while checking logical volume for the device %s %w: %s", devicePath, err, info) + } + if !strings.Contains(string(lvList), mainLvName) { + args = []string{ + "--yes", + "-n", + mainLvName, + "-l", + "100%PVS", // Use 100% of the PV + volumeGroupName, + devicePath, + } + info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvcreate", args...) + if err != nil { + return mainDevicePath, fmt.Errorf("Errored setting up logical volume for the volume %s %w: %s", devicePath, err, info) + } + + } + err, isCached := isCachingSetup(mainLvName) + if err != nil { + klog.Errorf("faild to check if caching ius setup for LV, continuing to setup caching.") + } + cacheLvName := getLvName(cacheSuffix, volumeId) + if isCached { + // Validate that cache is setup for required size + klog.V(2).Infof("Assuming valid data cache size and mode, resizing cache is not supported") + } else { + fastCacheSize := req.GetPublishContext()[common.ContexLocalSsdCacheSize] + chunkSize := "960" // Cannot use default chunk size(64KiB) as it errors on maxChunksAllowed. Unit - KiB + args = []string{ + "--yes", + "-n", + cacheLvName, + "-L", + // ConvertGiStringToInt64 converts the input size to GiB so default to "g" for cache size - LVM g|G is GiB. + fastCacheSize + "g", + volumeGroupName, + raidedLocalSsdPath, + } + info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvcreate", args...) + if err != nil { + return mainDevicePath, fmt.Errorf("Errored while creating cache %w: %s", err, info) + } + + // Once caching is setup, link the PD to cache + args = []string{ + "--type", + "cache", + "--cachevol", + cacheLvName, + "--zero", + "y", + "--cachemode", + req.GetPublishContext()[common.ContextDataCacheMode], + volumeGroupName + "/" + mainLvName, + "--chunksize", + string(chunkSize), + "--force", + "-y", + } + info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvconvert", args...) + if err != nil { + return mainDevicePath, fmt.Errorf("Errored while setting up caching for volume %s %w: %s", devicePath, err, info) + } + } + + // activate all the LVs in the Volume group + info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgchange", []string{"-ay", volumeGroupName}...) + if err != nil { + // The logical volumes would not be accessible if the group is not activated + return mainDevicePath, fmt.Errorf("Failed to activate volume group %v %v:%s", volumeGroupName, err, info) + } + return mainDevicePath, nil +} + +func checkVgExists(volumeGroupName string) bool { + args := []string{} + info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgscan", args...) + if err != nil { + klog.Errorf("Errored while checking if volume group exists %v: %s", err, info) + return false + } + // Check if the required volume group already exists + return strings.Contains(string(info), volumeGroupName) +} + +func cleanupCache(volumeId string, nodeId string) error { + + volumeGroupName := getVolumeGroupName(nodeId) + if !checkVgExists(volumeGroupName) { + // If volume group doesn't exist then there's nothing to uncache + return nil + } + mainLvName := getLvName(mainLvSuffix, volumeId) + args := []string{ + "-an", + "/dev/" + volumeGroupName + "/" + mainLvName, + } + info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvchange", args...) + if err != nil { + return fmt.Errorf("Failed to deactivate volume for uncaching %s %v: %s", volumeId, err, info) + } + args = []string{ + "--uncache", + volumeGroupName + "/" + mainLvName, + "-y", + } + info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvconvert", args...) + if err != nil { + return fmt.Errorf("Failed to uncache volume %s %w: %s", volumeId, err, info) + } + return nil +} + +func getVolumeGroupName(nodePath string) string { + nodeSlice := strings.Split(nodePath, "/") + nodeId := nodeSlice[len(nodeSlice)-1] + nodeHash := common.ShortString(nodeId) + return fmt.Sprintf("csi-vg-%s", nodeHash) +} + +func getLvName(suffix string, volumeId string) string { + pvcNameStringSlice := strings.Split(volumeId, "/") + pvcName := pvcNameStringSlice[len(pvcNameStringSlice)-1] + return fmt.Sprintf("%s-%s", suffix, pvcName) +} + +func createVg(volumeGroupName string, devicePath string, raidedLocalSsds string) error { + args := []string{ + "--zero", + "y", + volumeGroupName, + raidedLocalSsds, + "-v", + } + info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgcreate", args...) + if err != nil { + return fmt.Errorf("Volume group creation failed %w: %s", err, info) + } + klog.Infof("Volume group creation succeeded for %v", volumeGroupName) + + args = []string{} + info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgscan", args...) + if err != nil { + klog.Errorf("Failed to scan for volume group post creation, continuing: %v: %s", err, info) + } + return nil +} + +func reduceVolumeGroup(volumeGroupName string, force bool) { + args := []string{ + "--removemissing", + volumeGroupName, + } + if force { + args = append(args, "--force") + } + info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgreduce", args...) + if err != nil { + klog.Errorf("Errored while cleaning up volume group %v: %s", err, info) + } +} + +func RaidLocalSsds() error { + isAlreadyRaided, err := isRaided() + if err != nil { + klog.V(2).Infof("Errored while scanning for available LocalSSDs err:%v; continuing Raiding", err) + } else if isAlreadyRaided { + klog.V(2).Infof("Local SSDs are already RAIDed, no further action needed here") + return nil + } + diskList := []string{} + info, err := common.RunCommand("" /* pipedCmd */, "" /* pipeCmdArg */, "lsblk", []string{"-o", "NAME,MODEL", "-p", "-d", "-n"}...) + if err != nil { + return fmt.Errorf("Failed to fetch LSSD info: %v; err:%v", info, err) + } + infoList := strings.Split(strings.TrimSpace(string(info)), "\n") + re, err := regexp.Compile("nvme_card([0-9]+)?$") + if err != nil { + return fmt.Errorf("Errored while compiling to check PD or LSSD %s", err) + } + for _, ssd := range infoList { + ssd = strings.TrimSpace(ssd) + if strings.HasPrefix(ssd, "/dev/nvme") { + ssdDetails := strings.Split(ssd, " ") + lssd := re.MatchString(ssdDetails[1]) + if lssd { + diskList = append(diskList, strings.TrimSpace(ssdDetails[0])) + } + } + } + nvmeDiskCount := len(diskList) + if nvmeDiskCount == 0 { + return fmt.Errorf("No local SSDs found for raiding") + } + args := []string{ + "--create", + raidedLssdPrefix + raidedLocalSsdName, + "-l" + raidMode, + // Force RAIDing as sometime it might fail for caution if there is just 1 LSSD present as 1 LSSD need not be RAIDed + "--force", + "-n", + strconv.Itoa(nvmeDiskCount), + } + args = append(args, diskList...) + info, err = common.RunCommand("" /* pipedCmd */, "" /* pipeCmdArg */, "mdadm", args...) + if err != nil { + return fmt.Errorf("errored while RAIDing LSSDs info: %v; err:%v", info, err) + } + // Validate if Raided successfully + isAlreadyRaided, err = isRaided() + if err != nil { + klog.V(2).Infof("Errored while scanning for available raided LocalSSDs err:%v=", err) + } + if !isAlreadyRaided { + return fmt.Errorf("failed raiding, raided device not found on scanning") + } + return nil +} + +func isRaided() (bool, error) { + args := []string{ + "--detail", + "--scan", + } + info, err := common.RunCommand("" /* pipedCmd */, "" /* pipeCmdArg */, "mdadm", args...) + if err != nil { + return false, fmt.Errorf("errored while scanning for raided LSSD %v: %s", err, info) + } + if info != nil && strings.Contains(string(info), raidedLocalSsdName) { + return true, nil + } + return false, nil +} + +func isCachingSetup(mainLvName string) (error, bool) { + // Verify caching is setup for PD + args := []string{ + "--select", + "lv_name=" + mainLvName, + "-o", + "pool_lv", + } + poolName, err := common.RunCommand("" /* pipedCmd */, "" /* pipeCmdArg */, "lvs", args...) + if err != nil { + return fmt.Errorf("Failed to check if caching is setup %w", err), false + } + if strings.Contains(string(poolName), "csi-fast") { + return nil, true + } + return nil, false +} diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index 3bcee864a..2688efa4d 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -108,6 +108,9 @@ type GCEControllerServer struct { // to be provisioned. enableHdHA bool + // If set to true, the CSI Driver will allow volumes to be provisioned with data cache configuration + enableDataCache bool + multiZoneVolumeHandleConfig MultiZoneVolumeHandleConfig listVolumesConfig ListVolumesConfig @@ -304,7 +307,7 @@ func (gceCS *GCEControllerServer) createVolumeInternal(ctx context.Context, req var err error // Apply Parameters (case-insensitive). We leave validation of // the values to the cloud provider. - params, err := gceCS.parameterProcessor().ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.extraVolumeLabels, gceCS.Driver.extraTags) + params, dataCacheParams, err := gceCS.parameterProcessor().ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.extraVolumeLabels, gceCS.enableDataCache, gceCS.Driver.extraTags) metrics.UpdateRequestMetadataFromParams(ctx, params) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to extract parameters: %v", err.Error()) @@ -391,11 +394,11 @@ func (gceCS *GCEControllerServer) createVolumeInternal(ctx context.Context, req if gceCS.multiZoneVolumeHandleConfig.Enable && params.MultiZoneProvisioning { // Create multi-zone disk, that may have up to N disks. - return gceCS.createMultiZoneDisk(ctx, req, params) + return gceCS.createMultiZoneDisk(ctx, req, params, dataCacheParams, gceCS.enableDataCache) } // Create single device zonal or regional disk - return gceCS.createSingleDeviceDisk(ctx, req, params) + return gceCS.createSingleDeviceDisk(ctx, req, params, dataCacheParams, gceCS.enableDataCache) } func (gceCS *GCEControllerServer) getSupportedZonesForPDType(ctx context.Context, zones []string, diskType string) ([]string, error) { @@ -450,7 +453,7 @@ func (gceCS *GCEControllerServer) getMultiZoneProvisioningZones(ctx context.Cont return combinedZones.List(), nil } -func (gceCS *GCEControllerServer) createMultiZoneDisk(ctx context.Context, req *csi.CreateVolumeRequest, params common.DiskParameters) (*csi.CreateVolumeResponse, error) { +func (gceCS *GCEControllerServer) createMultiZoneDisk(ctx context.Context, req *csi.CreateVolumeRequest, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool) (*csi.CreateVolumeResponse, error) { // Determine the zones that are needed. var err error @@ -496,7 +499,7 @@ func (gceCS *GCEControllerServer) createMultiZoneDisk(ctx context.Context, req * // Use the first response as a template volumeId := fmt.Sprintf("projects/%s/zones/%s/disks/%s", gceCS.CloudProvider.GetDefaultProject(), common.MultiZoneValue, req.GetName()) klog.V(4).Infof("CreateVolume succeeded for multi-zone disks in zones %s: %v", zones, multiZoneVolKey) - return generateCreateVolumeResponseWithVolumeId(createdDisks[0], zones, params, volumeId), nil + return generateCreateVolumeResponseWithVolumeId(createdDisks[0], zones, params, dataCacheParams, enableDataCache, volumeId), nil } func (gceCS *GCEControllerServer) getZonesWithDiskNameAndType(ctx context.Context, name string, diskType string) ([]string, error) { @@ -540,7 +543,7 @@ func (gceCS *GCEControllerServer) updateAccessModeIfNecessary(ctx context.Contex return gceCS.CloudProvider.SetDiskAccessMode(ctx, project, volKey, gceReadOnlyManyAccessMode) } -func (gceCS *GCEControllerServer) createSingleDeviceDisk(ctx context.Context, req *csi.CreateVolumeRequest, params common.DiskParameters) (*csi.CreateVolumeResponse, error) { +func (gceCS *GCEControllerServer) createSingleDeviceDisk(ctx context.Context, req *csi.CreateVolumeRequest, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool) (*csi.CreateVolumeResponse, error) { var err error var locationTopReq *locationRequirements if useVolumeCloning(req) { @@ -590,7 +593,7 @@ func (gceCS *GCEControllerServer) createSingleDeviceDisk(ctx context.Context, re return nil, common.LoggedError("CreateVolume failed: %v", err) } - return generateCreateVolumeResponseWithVolumeId(disk, zones, params, volumeID), err + return generateCreateVolumeResponseWithVolumeId(disk, zones, params, dataCacheParams, enableDataCache, volumeID), err } func (gceCS *GCEControllerServer) createSingleDisk(ctx context.Context, req *csi.CreateVolumeRequest, params common.DiskParameters, volKey *meta.Key, zones []string) (*gce.CloudDisk, error) { @@ -1056,6 +1059,15 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con PublishContext: nil, } + // Set data cache publish context + if gceCS.enableDataCache && req.GetVolumeContext() != nil { + if req.GetVolumeContext()[common.ContextDataCacheSize] != "" { + pubVolResp.PublishContext = map[string]string{} + pubVolResp.PublishContext[common.ContexLocalSsdCacheSize] = req.GetVolumeContext()[common.ContextDataCacheSize] + pubVolResp.PublishContext[common.ContextDataCacheMode] = req.GetVolumeContext()[common.ContextDataCacheMode] + } + } + instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID) if err != nil { return nil, status.Errorf(codes.NotFound, "could not split nodeID: %v", err.Error()), nil @@ -1315,10 +1327,11 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context } // Validate the disk parameters match the disk we GET - params, err := gceCS.parameterProcessor().ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.extraVolumeLabels, gceCS.Driver.extraTags) + params, _, err := gceCS.parameterProcessor().ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.extraVolumeLabels, gceCS.enableDataCache, gceCS.Driver.extraTags) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to extract parameters: %v", err.Error()) } + if err := gce.ValidateDiskParameters(disk, params); err != nil { return generateFailedValidationMessage("Parameters %v do not match given disk %s: %v", req.GetParameters(), disk.GetName(), err.Error()), nil } @@ -2340,7 +2353,7 @@ func extractVolumeContext(context map[string]string) (*PDCSIContext, error) { return info, nil } -func generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []string, params common.DiskParameters, volumeId string) *csi.CreateVolumeResponse { +func generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []string, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool, volumeId string) *csi.CreateVolumeResponse { tops := []*csi.Topology{} for _, zone := range zones { tops = append(tops, &csi.Topology{ @@ -2356,6 +2369,14 @@ func generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []strin AccessibleTopology: tops, }, } + // Set data cache volume context + if enableDataCache && dataCacheParams != (common.DataCacheParameters{}) { + if createResp.Volume.VolumeContext == nil { + createResp.Volume.VolumeContext = map[string]string{} + } + createResp.Volume.VolumeContext[common.ContextDataCacheMode] = dataCacheParams.DataCacheMode + createResp.Volume.VolumeContext[common.ContextDataCacheSize] = dataCacheParams.DataCacheSize + } snapshotID := disk.GetSnapshotId() imageID := disk.GetImageId() diskID := disk.GetSourceDiskId() diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver.go b/pkg/gce-pd-csi-driver/gce-pd-driver.go index 9133ce035..3f7e9e970 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver.go @@ -155,10 +155,11 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi VolumeStatter: statter, enableDeviceInUseCheck: args.EnableDeviceInUseCheck, deviceInUseErrors: newDeviceErrMap(args.DeviceInUseTimeout), + EnableDataCache: args.EnableDataCache, } } -func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, errorBackoffInitialDuration, errorBackoffMaxDuration time.Duration, fallbackRequisiteZones []string, enableStoragePools bool, multiZoneVolumeHandleConfig MultiZoneVolumeHandleConfig, listVolumesConfig ListVolumesConfig, provisionableDisksConfig ProvisionableDisksConfig, enableHdHA bool) *GCEControllerServer { +func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, errorBackoffInitialDuration, errorBackoffMaxDuration time.Duration, fallbackRequisiteZones []string, enableStoragePools bool, enableDataCache bool, multiZoneVolumeHandleConfig MultiZoneVolumeHandleConfig, listVolumesConfig ListVolumesConfig, provisionableDisksConfig ProvisionableDisksConfig, enableHdHA bool) *GCEControllerServer { return &GCEControllerServer{ Driver: gceDriver, CloudProvider: cloudProvider, @@ -167,6 +168,7 @@ func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, err errorBackoff: newCsiErrorBackoff(errorBackoffInitialDuration, errorBackoffMaxDuration), fallbackRequisiteZones: fallbackRequisiteZones, enableStoragePools: enableStoragePools, + enableDataCache: enableDataCache, multiZoneVolumeHandleConfig: multiZoneVolumeHandleConfig, listVolumesConfig: listVolumesConfig, provisionableDisksConfig: provisionableDisksConfig, diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver_test.go b/pkg/gce-pd-csi-driver/gce-pd-driver_test.go index fb73c33fb..cf3ac38ce 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver_test.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver_test.go @@ -47,6 +47,7 @@ func controllerServerForTest(cloudProvider gce.GCECompute) *GCEControllerServer errorBackoffMaxDuration := 5 * time.Minute fallbackRequisiteZones := []string{} enableStoragePools := false + enableDataCache := false multiZoneVolumeHandleConfig := MultiZoneVolumeHandleConfig{} listVolumesConfig := ListVolumesConfig{} provisionableDisksConfig := ProvisionableDisksConfig{ @@ -54,7 +55,7 @@ func controllerServerForTest(cloudProvider gce.GCECompute) *GCEControllerServer SupportsThroughputChange: []string{"hyperdisk-balanced", "hyperdisk-throughput", "hyperdisk-ml"}, } - return NewControllerServer(gceDriver, cloudProvider, errorBackoffInitialDuration, errorBackoffMaxDuration, fallbackRequisiteZones, enableStoragePools, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, true /* enableHdHA */) + return NewControllerServer(gceDriver, cloudProvider, errorBackoffInitialDuration, errorBackoffMaxDuration, fallbackRequisiteZones, enableStoragePools, enableDataCache, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, true /* enableHdHA */) } func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) *GCEDriver { diff --git a/pkg/gce-pd-csi-driver/node.go b/pkg/gce-pd-csi-driver/node.go index 7bc934036..9be2c7f6a 100644 --- a/pkg/gce-pd-csi-driver/node.go +++ b/pkg/gce-pd-csi-driver/node.go @@ -46,6 +46,7 @@ type GCENodeServer struct { DeviceUtils deviceutils.DeviceUtils VolumeStatter mountmanager.Statter MetadataService metadataservice.MetadataService + EnableDataCache bool // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID) return an Aborted error @@ -78,6 +79,8 @@ type NodeServerArgs struct { EnableDeviceInUseCheck bool DeviceInUseTimeout time.Duration + + EnableDataCache bool } var _ csi.NodeServer = &GCENodeServer{} @@ -294,6 +297,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage volumeID := req.GetVolumeId() stagingTargetPath := req.GetStagingTargetPath() volumeCapability := req.GetVolumeCapability() + nodeId := ns.MetadataService.GetName() if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume ID must be provided") } @@ -327,12 +331,25 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage partition = part } devicePath, err := getDevicePath(ns, volumeID, partition) - if err != nil { return nil, status.Error(codes.Internal, fmt.Sprintf("Error when getting device path: %v", err.Error())) } - klog.V(4).Infof("Successfully found attached GCE PD %q at device path %s.", volumeKey.Name, devicePath) + klog.Infof("Successfully found attached GCE PD %q at device path %s.", volumeKey.Name, devicePath) + + if ns.EnableDataCache && req.GetPublishContext()[common.ContexLocalSsdCacheSize] != "" { + if len(nodeId) == 0 { + return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Node ID must be provided") + } + devFsPath, err := filepath.EvalSymlinks(devicePath) + if err != nil { + klog.Errorf("filepath.EvalSymlinks(%q) failed when trying to create volume group: %v", devicePath, err) + } + devicePath, err = setupCaching(devFsPath, req, nodeId) + if err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("Error setting up cache: %v", err.Error())) + } + } // Part 2: Check if mount already exists at stagingTargetPath if ns.isVolumePathMounted(stagingTargetPath) { @@ -486,6 +503,15 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns ns.deviceInUseErrors.deleteDevice(volumeID) } + // The NodeUnstageVolume does not have any volume or publish context, we need to get the info from LVM locally + // Check if cache group cache-{volumeID} exist in LVM + if ns.EnableDataCache { + nodeId := ns.MetadataService.GetName() + err := cleanupCache(volumeID, nodeId) + if err != nil { + klog.Errorf("Failed to cleanup cache for volume %s: %v", volumeID, err) + } + } klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath) return &csi.NodeUnstageVolumeResponse{}, nil } diff --git a/pkg/gce-pd-csi-driver/node_test.go b/pkg/gce-pd-csi-driver/node_test.go index d1d0920bf..16a015c29 100644 --- a/pkg/gce-pd-csi-driver/node_test.go +++ b/pkg/gce-pd-csi-driver/node_test.go @@ -51,7 +51,8 @@ func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAn func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, metaService metadataservice.MetadataService) *GCEDriver { gceDriver := GetGCEDriver() - nodeServer := NewNodeServer(gceDriver, mounter, deviceUtils, metaService, mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0}) + enableDataCache := false + nodeServer := NewNodeServer(gceDriver, mounter, deviceUtils, metaService, mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0, enableDataCache}) err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer) if err != nil { t.Fatalf("Failed to setup GCE Driver: %v", err) @@ -62,7 +63,7 @@ func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, dev func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver { gceDriver := GetGCEDriver() mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute) - nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0}) + nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0, true}) err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer) if err != nil { t.Fatalf("Failed to setup GCE Driver: %v", err) @@ -72,8 +73,9 @@ func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct func getTestBlockingFormatAndMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver { gceDriver := GetGCEDriver() + enableDataCache := true mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute) - nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0}).WithSerializedFormatAndMount(5*time.Second, 1) + nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0, enableDataCache}).WithSerializedFormatAndMount(5*time.Second, 1) err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer) if err != nil { diff --git a/test/e2e/tests/multi_zone_e2e_test.go b/test/e2e/tests/multi_zone_e2e_test.go index 492cb7cac..09ad3ef6a 100644 --- a/test/e2e/tests/multi_zone_e2e_test.go +++ b/test/e2e/tests/multi_zone_e2e_test.go @@ -332,7 +332,7 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { }() // Attach Disk - err := testAttachWriteReadDetach(underSpecifiedID, snapshotVolName, tc0.Instance, controllerClient, false /* readOnly */) + err := testAttachWriteReadDetach(underSpecifiedID, snapshotVolName, tc0.Instance, controllerClient, false /* readOnly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to go through volume lifecycle") // Create Snapshot @@ -413,11 +413,11 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { Expect(disk2.AccessMode).To(Equal("READ_ONLY_MANY")) // Attach Disk to node1 and validate contents - err = testAttachWriteReadDetach(volID, volName, tc0.Instance, tc0.Client, true /* readonly */) + err = testAttachWriteReadDetach(volID, volName, tc0.Instance, tc0.Client, true /* readonly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to attach/read/detach on vol1") // Attach Disk to node1 and validate contents - err = testAttachWriteReadDetach(volID, volName, tc1.Instance, tc1.Client, true /* readonly */) + err = testAttachWriteReadDetach(volID, volName, tc1.Instance, tc1.Client, true /* readonly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to attach/read/detach on vol2") disk1, err = computeService.Disks.Get(p, zones[0], volName).Do() @@ -479,7 +479,7 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { }() // Attach Disk - err := testAttachWriteReadDetach(underSpecifiedID, snapshotVolName, tc0.Instance, controllerClient, false /* readOnly */) + err := testAttachWriteReadDetach(underSpecifiedID, snapshotVolName, tc0.Instance, controllerClient, false /* readOnly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to go through volume lifecycle") // Create Snapshot @@ -551,11 +551,11 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { Expect(disk2.AccessMode).To(Equal("READ_ONLY_MANY")) // Attach Disk to node1 and validate contents - err = testAttachWriteReadDetach(volID, volName, tc0.Instance, tc0.Client, true /* readonly */) + err = testAttachWriteReadDetach(volID, volName, tc0.Instance, tc0.Client, true /* readonly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to attach/read/detach on vol1") // Attach Disk to node1 and validate contents - err = testAttachWriteReadDetach(volID, volName, tc1.Instance, tc1.Client, true /* readonly */) + err = testAttachWriteReadDetach(volID, volName, tc1.Instance, tc1.Client, true /* readonly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to attach/read/detach on vol2") disk1, err = computeService.Disks.Get(p, zones[0], volName).Do() @@ -617,7 +617,7 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { }() // Attach Disk - err := testAttachWriteReadDetach(underSpecifiedID, snapshotVolName, tc0.Instance, controllerClient, false /* readOnly */) + err := testAttachWriteReadDetach(underSpecifiedID, snapshotVolName, tc0.Instance, controllerClient, false /* readOnly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to go through volume lifecycle") // Create Disk Image @@ -707,11 +707,11 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { Expect(disk2.AccessMode).To(Equal("READ_ONLY_MANY")) // Attach Disk to node1 - err = testAttachWriteReadDetach(volID, volName, tc0.Instance, tc0.Client, true /* readonly */) + err = testAttachWriteReadDetach(volID, volName, tc0.Instance, tc0.Client, true /* readonly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to attach/read/detach on vol1") // Attach Disk to node1 - err = testAttachWriteReadDetach(volID, volName, tc1.Instance, tc1.Client, true /* readonly */) + err = testAttachWriteReadDetach(volID, volName, tc1.Instance, tc1.Client, true /* readonly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to attach/read/detach on vol2") }) @@ -806,18 +806,18 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { volID0 := fmt.Sprintf("projects/%s/zones/%s/disks/%s", p, zones[0], volName) volID1 := fmt.Sprintf("projects/%s/zones/%s/disks/%s", p, zones[1], volName) - err = testAttachWriteReadDetach(volID0, volName, tc0.Instance, tc0.Client, false /* readonly */) + err = testAttachWriteReadDetach(volID0, volName, tc0.Instance, tc0.Client, false /* readonly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to attach/write/read/detach on vol1") - err = testAttachWriteReadDetach(volID1, volName, tc1.Instance, tc1.Client, false /* readonly */) + err = testAttachWriteReadDetach(volID1, volName, tc1.Instance, tc1.Client, false /* readonly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to attach/write/read/detach on vol2") // Validate disks can be used in multi-zone mode on both nodes volIDMultiZone := fmt.Sprintf("projects/%s/zones/multi-zone/disks/%s", p, volName) - err = testAttachWriteReadDetach(volIDMultiZone, volName, tc0.Instance, tc0.Client, true /* readonly */) + err = testAttachWriteReadDetach(volIDMultiZone, volName, tc0.Instance, tc0.Client, true /* readonly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to attach/read/detach on vol1") - err = testAttachWriteReadDetach(volIDMultiZone, volName, tc1.Instance, tc1.Client, true /* readonly */) + err = testAttachWriteReadDetach(volIDMultiZone, volName, tc1.Instance, tc1.Client, true /* readonly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to attach/read/detach on vol2") // Validate disks are ROX now @@ -910,7 +910,7 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { if i >= 1 { readOnly = true } - err = testAttachWriteReadDetach(volume.VolumeId, volName, testContext.Instance, testContext.Client, readOnly) + err = testAttachWriteReadDetach(volume.VolumeId, volName, testContext.Instance, testContext.Client, readOnly, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "failed volume lifecycle checks") i = i + 1 } @@ -998,9 +998,10 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { // Attach disk to instance in the first zone. tc0 := zoneToContext[zones[0]] err, detacher, args := testAttachAndMount(volume.VolumeId, volName, tc0.Instance, tc0.Client, attachAndMountArgs{ - readOnly: false, - useBlock: false, - forceAttach: false, + readOnly: false, + useBlock: false, + forceAttach: false, + setupDataCache: false, }) detachers = append(detachers, detacher) Expect(err).To(BeNil(), "failed attach in zone 0") @@ -1018,9 +1019,10 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { // Now force attach to the second instance without detaching. tc1 := zoneToContext[zones[1]] err, detacher, _ = testAttachAndMount(volume.VolumeId, volName, tc1.Instance, tc1.Client, attachAndMountArgs{ - readOnly: false, - useBlock: false, - forceAttach: true, + readOnly: false, + useBlock: false, + forceAttach: true, + setupDataCache: false, }) detachers = append(detachers, detacher) Expect(err).To(BeNil(), "failed force attach in zone 1") @@ -1104,7 +1106,7 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { // For each of the two instances i := 0 for _, testContext := range zoneToContext { - err = testAttachWriteReadDetach(volume.VolumeId, volName, testContext.Instance, testContext.Client, false) + err = testAttachWriteReadDetach(volume.VolumeId, volName, testContext.Instance, testContext.Client, false, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "failed volume lifecycle checks") i = i + 1 } @@ -1212,9 +1214,10 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { // Now force attach to the second instance without detaching. tc1 := zoneToContext[zones[1]] err, detacher, _ = testAttachAndMount(volume.VolumeId, volName, tc1.Instance, tc1.Client, attachAndMountArgs{ - readOnly: false, - useBlock: false, - forceAttach: true, + readOnly: false, + useBlock: false, + forceAttach: true, + setupDataCache: false, }) detachers = append(detachers, detacher) Expect(err).To(BeNil(), "failed force attach in zone 1") @@ -1234,7 +1237,12 @@ func deleteDisk(controllerClient *remote.CsiClient, p, zone, volID, volName stri Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found") } -func testAttachWriteReadDetach(volID string, volName string, instance *remote.InstanceInfo, client *remote.CsiClient, readOnly bool) error { +func testAttachWriteReadDetach(volID string, volName string, instance *remote.InstanceInfo, client *remote.CsiClient, readOnly bool, detachAndReattach bool, setupDataCache bool) error { + writeFile, verifyReadFile := testWriteAndReadFile(instance, readOnly) + return testLifecycleWithVerify(volID, volName, instance, client, readOnly, false /* fs */, writeFile, verifyReadFile, detachAndReattach, setupDataCache) +} + +func testWriteAndReadFile(instance *remote.InstanceInfo, readOnly bool) (verifyFunc, verifyFunc) { var testFileContents = "test" writeFile := func(a *verifyArgs) error { if !readOnly { @@ -1260,46 +1268,61 @@ func testAttachWriteReadDetach(volID string, volName string, instance *remote.In } return nil } - return testLifecycleWithVerify(volID, volName, instance, client, readOnly, false /* fs */, writeFile, verifyReadFile) + return writeFile, verifyReadFile } type attachAndMountArgs struct { - readOnly bool - useBlock bool - forceAttach bool + readOnly bool + useBlock bool + forceAttach bool + setupDataCache bool } func testAttachAndMount(volID string, volName string, instance *remote.InstanceInfo, client *remote.CsiClient, args attachAndMountArgs) (error, func(), *verifyArgs) { + klog.Infof("Starting testAttachAndMount with volume %v node %v \n", volID, instance.GetNodeID()) + err, unstageAndDetach, stageDir := testAttach(volID, volName, instance, client, args) + if err != nil { + return err, nil, nil + } + // Mount Disk + err, unpublish, returnArgs := testMount(volID, volName, instance, client, args, stageDir) + if err != nil { + unstageAndDetach() + return err, nil, nil + } + unpublishUnstageAndDetach := func() { + unpublish() + unstageAndDetach() + } + return nil, unpublishUnstageAndDetach, returnArgs +} + +func testAttach(volID string, volName string, instance *remote.InstanceInfo, client *remote.CsiClient, args attachAndMountArgs) (error, func(), string) { + klog.Infof("Starting testAttach with volume %v node %v \n", volID, instance.GetNodeID()) // Attach Disk var err error + var stageDir string if args.readOnly { err = client.ControllerPublishVolumeReadOnly(volID, instance.GetNodeID()) } else { err = client.ControllerPublishVolumeReadWrite(volID, instance.GetNodeID(), args.forceAttach) } if err != nil { - return fmt.Errorf("ControllerPublishVolume failed with error for disk %v on node %v: %v", volID, instance.GetNodeID(), err.Error()), nil, nil - } - - detach := func() { - // Detach Disk - err = client.ControllerUnpublishVolume(volID, instance.GetNodeID()) - if err != nil { - klog.Errorf("Failed to detach disk: %v", err) - } + return fmt.Errorf("ControllerPublishVolume failed with error for disk %v on node %v: %v", volID, instance.GetNodeID(), err.Error()), nil, stageDir } // Stage Disk - stageDir := filepath.Join("/tmp/", volName, "stage") + stageDir = filepath.Join("/tmp/", volName, "stage") if args.useBlock { - err = client.NodeStageBlockVolume(volID, stageDir) + err = client.NodeStageBlockVolume(volID, stageDir, args.setupDataCache) + } else { - err = client.NodeStageExt4Volume(volID, stageDir) + err = client.NodeStageExt4Volume(volID, stageDir, args.setupDataCache) } if err != nil { - detach() - return fmt.Errorf("NodeStageExt4Volume failed with error: %w", err), nil, nil + _ = detach(volID, instance, client) + return fmt.Errorf("NodeStageExt4Volume failed with error: %w for node: %v", err, instance.GetNodeID()), nil, stageDir } unstageAndDetach := func() { @@ -1314,9 +1337,23 @@ func testAttachAndMount(volID string, volName string, instance *remote.InstanceI klog.Errorf("Failed to rm file path %s: %v", fp, err) } - detach() + detach(volID, instance, client) } + return nil, unstageAndDetach, stageDir +} + +func detach(volID string, instance *remote.InstanceInfo, client *remote.CsiClient) error { + // Detach Disk + err := client.ControllerUnpublishVolume(volID, instance.GetNodeID()) + if err != nil { + klog.Errorf("Failed to detach disk %v", err) + return fmt.Errorf("Failed to detach disk: %v", err) + } + return nil +} +func testMount(volID string, volName string, instance *remote.InstanceInfo, client *remote.CsiClient, args attachAndMountArgs, stageDir string) (error, func(), *verifyArgs) { + var err error // Mount Disk publishDir := filepath.Join("/tmp/", volName, "mount") @@ -1327,7 +1364,6 @@ func testAttachAndMount(volID string, volName string, instance *remote.InstanceI } if err != nil { - unstageAndDetach() return fmt.Errorf("NodePublishVolume failed with error: %v", err.Error()), nil, nil } @@ -1338,14 +1374,10 @@ func testAttachAndMount(volID string, volName string, instance *remote.InstanceI klog.Errorf("Failed to unpublish volume: %v", err) } } - unpublishUnstageAndDetach := func() { - unpublish() - unstageAndDetach() - } err = testutils.ForceChmod(instance, filepath.Join("/tmp/", volName), "777", !args.readOnly /* recursive */) if err != nil { - unpublishUnstageAndDetach() + unpublish() return fmt.Errorf("Chmod failed with error: %v", err.Error()), nil, nil } @@ -1354,16 +1386,18 @@ func testAttachAndMount(volID string, volName string, instance *remote.InstanceI stageDir: stageDir, } - return nil, unpublishUnstageAndDetach, returnArgs + return nil, unpublish, returnArgs } -func testLifecycleWithVerify(volID string, volName string, instance *remote.InstanceInfo, client *remote.CsiClient, readOnly, useBlock bool, firstMountVerify, secondMountVerify verifyFunc) error { +func testLifecycleWithVerify(volID string, volName string, instance *remote.InstanceInfo, client *remote.CsiClient, readOnly, useBlock bool, firstMountVerify, secondMountVerify verifyFunc, detachAndReattach bool, setupDataCache bool) error { klog.Infof("Starting testAttachWriteReadDetach with volume %v node %v with readonly %v\n", volID, instance.GetNodeID(), readOnly) - err, detacher, args := testAttachAndMount(volID, volName, instance, client, attachAndMountArgs{ - readOnly: readOnly, - useBlock: useBlock, - forceAttach: false, - }) + attachArgs := attachAndMountArgs{ + readOnly: readOnly, + useBlock: useBlock, + forceAttach: false, + setupDataCache: setupDataCache, + } + err, detacher, args := testAttachAndMount(volID, volName, instance, client, attachArgs) if err != nil { return fmt.Errorf("failed to attach and mount: %w", err) } @@ -1380,13 +1414,28 @@ func testLifecycleWithVerify(volID string, volName string, instance *remote.Inst return fmt.Errorf("NodeUnpublishVolume failed with error: %v", err.Error()) } + stageDir := args.stageDir + if detachAndReattach { + // Unstage and detach + err = client.NodeUnstageVolume(volID, stageDir) + if err != nil { + klog.Errorf("Failed to unstage volume: %v", err) + } + detach(volID, instance, client) + // Reattach the volume + err, _, stageDir = testAttach(volID, volName, instance, client, attachArgs) + if err != nil { + return err + } + } + if secondMountVerify != nil { // Mount disk somewhere else secondPublishDir := filepath.Join("/tmp/", volName, "secondmount") if useBlock { - err = client.NodePublishBlockVolume(volID, args.stageDir, secondPublishDir) + err = client.NodePublishBlockVolume(volID, stageDir, secondPublishDir) } else { - err = client.NodePublishVolume(volID, args.stageDir, secondPublishDir) + err = client.NodePublishVolume(volID, stageDir, secondPublishDir) } if err != nil { return fmt.Errorf("NodePublishVolume failed with error: %v", err.Error()) diff --git a/test/e2e/tests/resize_e2e_test.go b/test/e2e/tests/resize_e2e_test.go index fbe8af868..fc4d87558 100644 --- a/test/e2e/tests/resize_e2e_test.go +++ b/test/e2e/tests/resize_e2e_test.go @@ -80,7 +80,7 @@ var _ = Describe("GCE PD CSI Driver", func() { // Stage Disk stageDir := filepath.Join("/tmp/", volName, "stage") - err = client.NodeStageExt4Volume(volume.VolumeId, stageDir) + err = client.NodeStageExt4Volume(volume.VolumeId, stageDir, false /* setupDataCache */) Expect(err).To(BeNil(), "Node Stage volume failed") defer func() { @@ -174,7 +174,7 @@ var _ = Describe("GCE PD CSI Driver", func() { }() // Volume should be attached/formatted/mounted/unmounted/detached - err = testAttachWriteReadDetach(volume.VolumeId, volName, instance, client, false /* readOnly */) + err = testAttachWriteReadDetach(volume.VolumeId, volName, instance, client, false /* readOnly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to go through volume lifecycle") // Resize controller @@ -203,7 +203,7 @@ var _ = Describe("GCE PD CSI Driver", func() { // Stage Disk stageDir := filepath.Join("/tmp/", volName, "stage") - err = client.NodeStageExt4Volume(volume.VolumeId, stageDir) + err = client.NodeStageExt4Volume(volume.VolumeId, stageDir, false /* setupDataCache */) Expect(err).To(BeNil(), "Node Stage volume failed") defer func() { @@ -295,7 +295,7 @@ var _ = Describe("GCE PD CSI Driver", func() { // Stage Disk stageDir := filepath.Join("/tmp/", volName, "stage") - err = client.NodeStageBlockVolume(volume.VolumeId, stageDir) + err = client.NodeStageBlockVolume(volume.VolumeId, stageDir, false /* setupDataCache */) Expect(err).To(BeNil(), "Node Stage volume failed") defer func() { diff --git a/test/e2e/tests/setup_e2e_test.go b/test/e2e/tests/setup_e2e_test.go index 3bc738a2b..5222cc663 100644 --- a/test/e2e/tests/setup_e2e_test.go +++ b/test/e2e/tests/setup_e2e_test.go @@ -19,7 +19,9 @@ import ( "flag" "fmt" "math/rand" + "strconv" "strings" + "sync" "testing" "time" @@ -63,6 +65,8 @@ var ( kmsClient *cloudkms.KeyManagementClient ) +const localSSDCount int64 = 2 + func init() { klog.InitFlags(flag.CommandLine) } @@ -75,13 +79,13 @@ func TestE2E(t *testing.T) { var _ = BeforeSuite(func() { var err error - tcc := make(chan *remote.TestContext) - hdtcc := make(chan *remote.TestContext) + numberOfInstancesPerZone := 2 + zones := strings.Split(*zones, ",") + tcc := make(chan *remote.TestContext, len(zones)*numberOfInstancesPerZone) + hdtcc := make(chan *remote.TestContext, len(zones)) defer close(tcc) defer close(hdtcc) - zones := strings.Split(*zones, ",") - rand.Seed(time.Now().UnixNano()) computeService, err = remote.GetComputeClient() @@ -106,22 +110,37 @@ var _ = BeforeSuite(func() { klog.Infof("Running in project %v with service account %v", *project, *serviceAccount) - for _, zone := range zones { - go func(curZone string) { - defer GinkgoRecover() - tcc <- NewDefaultTestContext(curZone) - }(zone) + setupContext := func(zone string) { + var wg sync.WaitGroup + // Create 2 instances for each zone as we need 2 instances each zone for certain test cases + for j := 0; j < numberOfInstancesPerZone; j++ { + wg.Add(1) + go func(curZone string, randInt int) { + defer GinkgoRecover() + defer wg.Done() + tcc <- NewDefaultTestContext(curZone, strconv.Itoa(randInt)) + }(zone, j) + } go func(curZone string) { + wg.Add(1) defer GinkgoRecover() - hdtcc <- NewTestContext(curZone, *hdMinCpuPlatform, *hdMachineType) + defer wg.Done() + hdtcc <- NewTestContext(curZone, *hdMinCpuPlatform, *hdMachineType, "0") }(zone) + wg.Wait() } - for i := 0; i < len(zones); i++ { + for _, zone := range zones { + setupContext(zone) + } + + for i := 0; i < len(zones)*numberOfInstancesPerZone; i++ { tc := <-tcc testContexts = append(testContexts, tc) klog.Infof("Added TestContext for node %s", tc.Instance.GetName()) - tc = <-hdtcc + } + for i := 0; i < len(zones); i++ { + tc := <-hdtcc hyperdiskTestContexts = append(hyperdiskTestContexts, tc) klog.Infof("Added TestContext for node %s", tc.Instance.GetName()) } @@ -155,12 +174,12 @@ func getDriverConfig() testutils.DriverConfig { } } -func NewDefaultTestContext(zone string) *remote.TestContext { - return NewTestContext(zone, *minCpuPlatform, *machineType) +func NewDefaultTestContext(zone string, instanceNumber string) *remote.TestContext { + return NewTestContext(zone, *minCpuPlatform, *machineType, instanceNumber) } -func NewTestContext(zone, minCpuPlatform, machineType string) *remote.TestContext { - nodeID := fmt.Sprintf("%s-%s-%s", *vmNamePrefix, zone, machineType) +func NewTestContext(zone, minCpuPlatform, machineType string, instanceNumber string) *remote.TestContext { + nodeID := fmt.Sprintf("%s-%s-%s-%s", *vmNamePrefix, zone, machineType, instanceNumber) klog.Infof("Setting up node %s", nodeID) instanceConfig := remote.InstanceConfig{ @@ -175,6 +194,12 @@ func NewTestContext(zone, minCpuPlatform, machineType string) *remote.TestContex CloudtopHost: *cloudtopHost, EnableConfidentialCompute: *enableConfidentialCompute, ComputeService: computeService, + LocalSSDCount: localSSDCount, + } + + if machineType == *hdMachineType { + // Machine type is defaulted to c3-standard-2 which doesn't support LSSD and we don't need LSSD for HdHA test context + instanceConfig.LocalSSDCount = 0 } i, err := remote.SetupInstance(instanceConfig) if err != nil { @@ -195,7 +220,16 @@ func NewTestContext(zone, minCpuPlatform, machineType string) *remote.TestContex if err != nil { klog.Fatalf("Failed to copy google_nvme_id to containerized directory: %v", err) } + pkgs := []string{"lvm2", "mdadm", "grep", "coreutils"} + err = testutils.InstallDependencies(i, pkgs) + if err != nil { + klog.Fatalf("Failed to install dependency package on node %v: error : %v", i.GetNodeID(), err) + } + err = testutils.SetupDataCachingConfig(i) + if err != nil { + klog.Fatalf("Failed to setup data cache required config error %v", err) + } klog.Infof("Creating new driver and client for node %s", i.GetName()) tc, err := testutils.GCEClientAndDriverSetup(i, getDriverConfig()) if err != nil { diff --git a/test/e2e/tests/single_zone_e2e_test.go b/test/e2e/tests/single_zone_e2e_test.go index 42041afb4..e3749d8f8 100644 --- a/test/e2e/tests/single_zone_e2e_test.go +++ b/test/e2e/tests/single_zone_e2e_test.go @@ -109,7 +109,7 @@ var _ = Describe("GCE PD CSI Driver", func() { }() // Attach Disk - err := testAttachWriteReadDetach(volID, volName, instance, client, false /* readOnly */) + err := testAttachWriteReadDetach(volID, volName, instance, client, false /* readOnly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to go through volume lifecycle") }) @@ -157,7 +157,7 @@ var _ = Describe("GCE PD CSI Driver", func() { // Stage Disk stageDir := filepath.Join("/tmp/", volName, "stage") - err = client.NodeStageExt4Volume(volID, stageDir) + err = client.NodeStageExt4Volume(volID, stageDir, false /* setupDataCache */) Expect(err).To(BeNil(), "failed to repair /dev/by-id symlink and stage volume") // Validate that the link is correct @@ -227,7 +227,7 @@ var _ = Describe("GCE PD CSI Driver", func() { // Stage Disk stageDir := filepath.Join("/tmp/", volName, "stage") - err = client.NodeStageExt4Volume(volID, stageDir) + err = client.NodeStageExt4Volume(volID, stageDir, false /* setupDataCache */) Expect(err).To(BeNil(), "failed to repair /dev/by-id symlink and stage volume") // Validate that the link is correct @@ -334,7 +334,7 @@ var _ = Describe("GCE PD CSI Driver", func() { }() // Attach Disk - err := testAttachWriteReadDetach(underSpecifiedID, volName, instance, client, false /* readOnly */) + err := testAttachWriteReadDetach(underSpecifiedID, volName, instance, client, false /* readOnly */, false /* detachAndReattach */, false /* setupDataCache*/) Expect(err).To(BeNil(), "Failed to go through volume lifecycle") }, Entry("on pd-standard", standardDiskType), @@ -591,7 +591,14 @@ var _ = Describe("GCE PD CSI Driver", func() { defer func() { // Delete Disk - err := client.DeleteVolume(volID) + err = wait.Poll(5*time.Second, 1*time.Minute, func() (bool, error) { + err := client.DeleteVolume(volID) + if err == nil { + return true, err + } + return false, err + }) + Expect(err).To(BeNil(), "DeleteVolume failed") // Validate Disk Deleted @@ -678,7 +685,7 @@ var _ = Describe("GCE PD CSI Driver", func() { }() // Test disk works - err = testAttachWriteReadDetach(volume.VolumeId, volName, controllerInstance, controllerClient, false /* readOnly */) + err = testAttachWriteReadDetach(volume.VolumeId, volName, controllerInstance, controllerClient, false /* readOnly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to go through volume lifecycle before revoking CMEK key") // Revoke CMEK key @@ -699,7 +706,7 @@ var _ = Describe("GCE PD CSI Driver", func() { } // Make sure attach of PD fails - err = testAttachWriteReadDetach(volume.VolumeId, volName, controllerInstance, controllerClient, false /* readOnly */) + err = testAttachWriteReadDetach(volume.VolumeId, volName, controllerInstance, controllerClient, false /* readOnly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).ToNot(BeNil(), "Volume lifecycle should have failed, but succeeded") // Restore CMEK key @@ -720,7 +727,7 @@ var _ = Describe("GCE PD CSI Driver", func() { // The controller publish failure in above step would set a backoff condition on the node. Wait suffcient amount of time for the driver to accept new controller publish requests. time.Sleep(time.Second) // Make sure attach of PD succeeds - err = testAttachWriteReadDetach(volume.VolumeId, volName, controllerInstance, controllerClient, false /* readOnly */) + err = testAttachWriteReadDetach(volume.VolumeId, volName, controllerInstance, controllerClient, false /* readOnly */, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to go through volume lifecycle after restoring CMEK key") }, Entry("on pd-standard", standardDiskType), @@ -863,7 +870,7 @@ var _ = Describe("GCE PD CSI Driver", func() { } // Attach Disk - err := testLifecycleWithVerify(volID, volName, instance, client, false /* readOnly */, true /* block */, verifyVolumeStats, nil) + err := testLifecycleWithVerify(volID, volName, instance, client, false /* readOnly */, true /* block */, verifyVolumeStats, nil, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to go through volume lifecycle") }) @@ -900,7 +907,7 @@ var _ = Describe("GCE PD CSI Driver", func() { } // Attach Disk - err := testLifecycleWithVerify(volID, volName, instance, client, false /* readOnly */, false /* fs */, verifyVolumeStats, nil) + err := testLifecycleWithVerify(volID, volName, instance, client, false /* readOnly */, false /* fs */, verifyVolumeStats, nil, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to go through volume lifecycle") }) @@ -963,7 +970,7 @@ var _ = Describe("GCE PD CSI Driver", func() { } return nil } - err := testLifecycleWithVerify(volID, volName, instance, client, false /* readOnly */, true /* block */, writeFunc, verifyReadFunc) + err := testLifecycleWithVerify(volID, volName, instance, client, false /* readOnly */, true /* block */, writeFunc, verifyReadFunc, false /* detachAndReattach */, false /* setupDataCache */) Expect(err).To(BeNil(), "Failed to go through volume lifecycle") }) @@ -1393,7 +1400,7 @@ var _ = Describe("GCE PD CSI Driver", func() { Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, }, } - err = client.NodeStageVolume(volID, stageDir, volCap) + err = client.NodeStageVolume(volID, stageDir, volCap, false /* setupDataCache */) Expect(err).To(BeNil(), "failed to stage volume: %v", err) // Validate that the link is correct @@ -1430,81 +1437,111 @@ var _ = Describe("GCE PD CSI Driver", func() { } }() }) - - It("Should unpublish if there is an error unpublishing and device has been in use longer than timeout", func() { + It("Should create disks, attach them to instance with local ssd, setup caching between LSSD->detach->reattach to same instance", func() { + Expect(testContexts).ToNot(BeEmpty()) testContext := getRandomTestContext() p, z, _ := testContext.Instance.GetIdentity() client := testContext.Client instance := testContext.Instance - - // Create Disk volName, volID := createAndValidateUniqueZonalDisk(client, p, z, standardDiskType) - - defer func() { - // Delete Disk - err := client.DeleteVolume(volID) - Expect(err).To(BeNil(), "DeleteVolume failed") - - // Validate Disk Deleted - _, err = computeService.Disks.Get(p, z, volName).Do() - Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found") - }() + defer deleteVolumeOrError(client, volID) // Attach Disk - err := client.ControllerPublishVolumeReadWrite(volID, instance.GetNodeID(), false /* forceAttach */) - Expect(err).To(BeNil(), "ControllerPublishVolume failed with error for disk %v on node %v: %v", volID, instance.GetNodeID(), err) + err := testAttachWriteReadDetach(volID, volName, instance, client, false /* readOnly */, true /* detachAndReattach */, true /* setupDataCache */) + Expect(err).To(BeNil(), "Failed to go through volume lifecycle") - defer func() { - // Detach Disk - err = client.ControllerUnpublishVolume(volID, instance.GetNodeID()) - if err != nil { - klog.Errorf("Failed to detach disk: %v", err) + }) + It("Should create->attach->setup caching->write->detach->attach to different node->mount->read", func() { + Expect(testContexts).ToNot(BeEmpty()) + zoneToContext := map[string][]*remote.TestContext{} + testZoneContexts := []*remote.TestContext{} + for _, tc := range testContexts { + _, z, _ := tc.Instance.GetIdentity() + // Zone hasn't been seen before + if _, ok := zoneToContext[z]; !ok { + zoneToContext[z] = []*remote.TestContext{tc} + } else { + zoneToContext[z] = append(zoneToContext[z], tc) } - }() + if len(zoneToContext[z]) >= 2 { + testZoneContexts = zoneToContext[z] + break + } + } + if len(testZoneContexts) < 2 { + klog.Fatalf("No test contexts setup %v", testZoneContexts) + } + testContextForVm1 := testZoneContexts[0] + p, z, _ := testContextForVm1.Instance.GetIdentity() - // Stage Disk - stageDir := filepath.Join("/tmp/", volName, "stage") - err = client.NodeStageExt4Volume(volID, stageDir) - Expect(err).To(BeNil(), "failed to stage volume: %v", err) + client := testContextForVm1.Client + firstInstance := testContextForVm1.Instance - // Create private bind mount to keep the device's state as "in use" - boundMountStageDir := filepath.Join("/tmp/bindmount", volName, "bindmount") - boundMountStageMkdirOutput, err := instance.SSH("mkdir", "-p", boundMountStageDir) - Expect(err).To(BeNil(), "mkdir failed on instance %v: output: %v: %v", instance.GetNodeID(), boundMountStageMkdirOutput, err) - bindMountOutput, err := instance.SSH("mount", "--rbind", "--make-private", stageDir, boundMountStageDir) - Expect(err).To(BeNil(), "Bind mount failed on instance %v: output: %v: %v", instance.GetNodeID(), bindMountOutput, err) + volName, volID := createAndValidateUniqueZonalDisk(client, p, z, standardDiskType) + defer deleteVolumeOrError(client, volID) - privateBindMountRemoved := false - unmountAndRmPrivateBindMount := func() { - if !privateBindMountRemoved { - // Umount and delete private mount staging directory - bindUmountOutput, err := instance.SSH("umount", boundMountStageDir) - Expect(err).To(BeNil(), "Bind mount failed on instance %v: output: %v: %v", instance.GetNodeID(), bindUmountOutput, err) - err = testutils.RmAll(instance, boundMountStageDir) - Expect(err).To(BeNil(), "Failed to rm mount stage dir %s: %v", boundMountStageDir, err) + testContextForVm2 := testZoneContexts[1] + secondClient := testContextForVm2.Client + secondInstance := testContextForVm2.Instance + unmountDisk := func(client *remote.CsiClient, volID string, args *verifyArgs) { + err := client.NodeUnpublishVolume(volID, args.publishDir) + if err != nil { + klog.Errorf("NodeUnpublishVolume failed with error: %v", err) } - privateBindMountRemoved = true } + attachMountArgs := attachAndMountArgs{ + readOnly: false, + useBlock: false, + forceAttach: false, + setupDataCache: true, + } + // Controller Publish (Attach) - Node Stage - Node Publish(Mount) Volume + err, _, args := testAttachAndMount(volID, volName, firstInstance, client, attachMountArgs) + if err != nil { + klog.Errorf("Failed to attach and mount: %v", err.Error()) + } + // Write file in the volume + firstMountVerify, _ := testWriteAndReadFile(firstInstance, false /* readOnly */) + err = firstMountVerify(args) + if err != nil { + klog.Errorf("failed to verify after first mount to %s: %v", args, err) + } + Expect(err).To(BeNil(), "Failed to write data to volume %s on instance %s", volName, firstInstance.GetName()) + // Unmount Disk + unmountDisk(client, volID, args) + // Node Unstage + err = client.NodeUnstageVolume(volID, args.stageDir) + if err != nil { + klog.Errorf("Failed to unstage volume: %v", err) + } + detach(volID, firstInstance, client) + + // Attach Disk to secondInstance + err, detacher, stageDir := testAttach(volID, volName, secondInstance, secondClient, attachMountArgs) + if err != nil { + klog.Errorf("Failed to attach disk %v", err) + } defer func() { - unmountAndRmPrivateBindMount() + detacher() + deleteVolumeOrError(secondClient, volID) }() - // Unstage Disk. This should record a "deviceInUse" error - err = client.NodeUnstageVolume(volID, stageDir) - Expect(err).ToNot(BeNil(), "Expected failure during unstage") - Expect(err).To(MatchError(ContainSubstring(("is still in use")))) - - // Wait 12s (10s timeout specified in CLI + 2s buffer) and try again - time.Sleep(12 * time.Second) - err = client.NodeUnstageVolume(volID, stageDir) - Expect(err).To(BeNil(), "Failed to unpublish after 10s in-use timeout for volume: %s, stageDir: %s, unexpected err: %s", volID, stageDir, err) + // Mount disk + err, _, args = testMount(volID, volName, secondInstance, secondClient, attachMountArgs, stageDir) + if err != nil { + klog.Fatalf("Failed to mount disk %v", err) + } + _, secondMountRead := testWriteAndReadFile(secondInstance, false /* readOnly */) + err = secondMountRead(args) + if err != nil { + klog.Errorf("failed to verify after second mount to %s: %v", args, err) + } + // Unmount disk for cleanup + unmountDisk(secondClient, volID, args) + Expect(err).To(BeNil(), "Failed to read data from volume %s on instance %s", volName, secondInstance.GetName()) - // Unstage Disk - fp := filepath.Join("/tmp/", volName) - err = testutils.RmAll(instance, fp) - Expect(err).To(BeNil(), "Failed to rm file path %s: %v", fp, err) }) It("Should block unstage if filesystem mounted", func() { @@ -1541,7 +1578,7 @@ var _ = Describe("GCE PD CSI Driver", func() { // Stage Disk stageDir := filepath.Join("/tmp/", volName, "stage") - err = client.NodeStageExt4Volume(volID, stageDir) + err = client.NodeStageExt4Volume(volID, stageDir, false) Expect(err).To(BeNil(), "failed to stage volume: %v", err) // Create private bind mount diff --git a/test/e2e/utils/utils.go b/test/e2e/utils/utils.go index 804619323..b4eb8020a 100644 --- a/test/e2e/utils/utils.go +++ b/test/e2e/utils/utils.go @@ -71,6 +71,8 @@ func GCEClientAndDriverSetup(instance *remote.InstanceInfo, driverConfig DriverC "--allow-hdha-provisioning", "--device-in-use-timeout=10s", // Set lower than the usual value to expedite tests fmt.Sprintf("--fallback-requisite-zones=%s", strings.Join(driverConfig.Zones, ",")), + "--enable-data-cache", + fmt.Sprintf("--node-name=%s", utilcommon.TestNode), } extra_flags = append(extra_flags, fmt.Sprintf("--compute-endpoint=%s", driverConfig.ComputeEndpoint)) extra_flags = append(extra_flags, driverConfig.ExtraFlags...) @@ -278,6 +280,33 @@ func CopyFile(instance *remote.InstanceInfo, src, dest string) error { return nil } +func InstallDependencies(instance *remote.InstanceInfo, pkgs []string) error { + _, _ = instance.SSH("apt-get", "update") + for _, pkg := range pkgs { + output, err := instance.SSH("apt-get", "install", "-y", pkg) + if err != nil { + return fmt.Errorf("failed to install package %s. Output: %v, errror: %v", pkg, output, err.Error()) + } + } + return nil +} + +func SetupDataCachingConfig(instance *remote.InstanceInfo) error { + output, err := instance.SSH("/bin/sed", "-i", "-e", "\"s/.*allow_mixed_block_sizes = 0.*/ allow_mixed_block_sizes = 1/\"", "/etc/lvm/lvm.conf") + if err != nil { + return fmt.Errorf("failed to update field allow_mixed_block_sizes, error:%v; output: %v", err, output) + } + output, err = instance.SSH("/bin/sed", "-i", "-e", "\"s/.*udev_sync = 1.*/ udev_sync = 0/\"", "/etc/lvm/lvm.conf") + if err != nil { + return fmt.Errorf("failed to update field udev_sync, error:%v; output: %v", err, output) + } + output, err = instance.SSH("/bin/sed", "-i", "-e", "\"s/.*udev_rules = 1.*/ udev_rules = 0/\"", "/etc/lvm/lvm.conf") + if err != nil { + return fmt.Errorf("failed to update field udev_rules, error:%v; output: %v", err, output) + } + return nil +} + // ValidateLogicalLinkIsDisk takes a symlink location at "link" and finds the // link location - it then finds the backing PD using either scsi_id or // google_nvme_id (depending on the /dev path) and validates that it is the diff --git a/test/k8s-integration/config/data-cache-sc.yaml b/test/k8s-integration/config/data-cache-sc.yaml new file mode 100644 index 000000000..7e911eb36 --- /dev/null +++ b/test/k8s-integration/config/data-cache-sc.yaml @@ -0,0 +1,11 @@ +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: csi-gcepd-balanced +provisioner: pd.csi.storage.gke.io +parameters: + type: pd-balanced + data-cache-mode: writeback + data-cache-size: "50Gi" +volumeBindingMode: WaitForFirstConsumer +allowVolumeExpansion: true \ No newline at end of file diff --git a/test/remote/client-wrappers.go b/test/remote/client-wrappers.go index 15c8ba5a6..28b9887cf 100644 --- a/test/remote/client-wrappers.go +++ b/test/remote/client-wrappers.go @@ -52,6 +52,19 @@ var ( } ) +const ( + // Keys in the volume context. + contextForceAttach = "force-attach" + contextDataCacheSize = "data-cache-size" + contextDataCacheMode = "data-cache-mode" + + // Keys in the publish context + contexLocalSsdCacheSize = "local-ssd-cache-size" + + defaultLocalSsdCacheSize = "200Gi" + defaultDataCacheMode = common.DataCacheModeWriteThrough +) + type CsiClient struct { conn *grpc.ClientConn idClient csipb.IdentityClient @@ -179,19 +192,25 @@ func (c *CsiClient) ControllerUnpublishVolume(volId, nodeId string) error { return err } -func (c *CsiClient) NodeStageExt4Volume(volId, stageDir string) error { - return c.NodeStageVolume(volId, stageDir, stdVolCap) +func (c *CsiClient) NodeStageExt4Volume(volId, stageDir string, setupDataCache bool) error { + return c.NodeStageVolume(volId, stageDir, stdVolCap, setupDataCache) } -func (c *CsiClient) NodeStageBlockVolume(volId, stageDir string) error { - return c.NodeStageVolume(volId, stageDir, blockVolCap) +func (c *CsiClient) NodeStageBlockVolume(volId, stageDir string, setupDataCache bool) error { + return c.NodeStageVolume(volId, stageDir, blockVolCap, setupDataCache) } -func (c *CsiClient) NodeStageVolume(volId, stageDir string, volumeCap *csipb.VolumeCapability) error { +func (c *CsiClient) NodeStageVolume(volId string, stageDir string, volumeCap *csipb.VolumeCapability, setupDataCache bool) error { + publishContext := map[string]string{} + if setupDataCache { + publishContext[contexLocalSsdCacheSize] = defaultLocalSsdCacheSize + publishContext[contextDataCacheMode] = defaultDataCacheMode + } nodeStageReq := &csipb.NodeStageVolumeRequest{ VolumeId: volId, StagingTargetPath: stageDir, VolumeCapability: volumeCap, + PublishContext: publishContext, } _, err := c.nodeClient.NodeStageVolume(context.Background(), nodeStageReq) return err diff --git a/test/remote/instance.go b/test/remote/instance.go index ac42a6d12..c9c3dd6d5 100644 --- a/test/remote/instance.go +++ b/test/remote/instance.go @@ -58,6 +58,8 @@ type InstanceConfig struct { MinCpuPlatform string ComputeService *compute.Service EnableConfidentialCompute bool + LocalSSDCount int64 + EnableDataCache bool } type InstanceInfo struct { @@ -103,7 +105,7 @@ func machineTypeMismatch(curInst *compute.Instance, newInst *compute.Instance) b } // Provision a gce instance using image -func (i *InstanceInfo) CreateOrGetInstance() error { +func (i *InstanceInfo) CreateOrGetInstance(localSSDCount int) error { var err error var instance *compute.Instance klog.V(4).Infof("Creating instance: %v", i.cfg.Name) @@ -147,6 +149,18 @@ func (i *InstanceInfo) CreateOrGetInstance() error { } } + localSSDConfig := &compute.AttachedDisk{ + Type: "SCRATCH", + InitializeParams: &compute.AttachedDiskInitializeParams{ + DiskType: fmt.Sprintf("zones/%s/diskTypes/local-ssd", i.cfg.Zone), + }, + AutoDelete: true, + Interface: "NVME", + } + + for i := 0; i < localSSDCount; i++ { + newInst.Disks = append(newInst.Disks, localSSDConfig) + } saObj := &compute.ServiceAccount{ Email: i.cfg.ServiceAccount, Scopes: []string{"https://www.googleapis.com/auth/cloud-platform"}, @@ -301,7 +315,7 @@ func getexternalIP(instance *compute.Instance) string { } func getTimestamp() string { - return fmt.Sprintf(time.Now().Format(timestampFormat)) + return fmt.Sprintf("%s", time.Now().Format(timestampFormat)) } // Create default SSH filewall rule if it does not exist diff --git a/test/remote/setup-teardown.go b/test/remote/setup-teardown.go index 046d4da2b..3026b28b0 100644 --- a/test/remote/setup-teardown.go +++ b/test/remote/setup-teardown.go @@ -59,7 +59,7 @@ func SetupInstance(cfg InstanceConfig) (*InstanceInfo, error) { cfg: cfg, } - err := instance.CreateOrGetInstance() + err := instance.CreateOrGetInstance(int(cfg.LocalSSDCount)) if err != nil { return nil, err } @@ -82,6 +82,13 @@ func SetupNewDriverAndClient(instance *InstanceInfo, config *ClientConfig) (*Tes } }() + // Copy dependencies + _, _ = instance.SSH("apt-get", "update") + output, err := instance.SSH("apt-get", "install", "-y", "mdadm", "lvm2") + if err != nil { + return nil, fmt.Errorf("failed to install dependencies. Output: %v, errror: %v", output, err.Error()) + } + // Upload archive to instance and run binaries driverPID, err := instance.UploadAndRun(archivePath, config.WorkspaceDir, config.RunDriverCmd) if err != nil { diff --git a/test/run-e2e.sh b/test/run-e2e.sh index 3f42f9d05..e86931c0d 100755 --- a/test/run-e2e.sh +++ b/test/run-e2e.sh @@ -5,7 +5,7 @@ set -x readonly PKGDIR=sigs.k8s.io/gcp-compute-persistent-disk-csi-driver -TIMEOUT=40m +TIMEOUT=50m if [ "$RUN_CONTROLLER_MODIFY_VOLUME_TESTS" = true ]; then TIMEOUT=45m fi diff --git a/test/sanity/sanity_test.go b/test/sanity/sanity_test.go index 1ff33a41b..f3ff5e59a 100644 --- a/test/sanity/sanity_test.go +++ b/test/sanity/sanity_test.go @@ -63,6 +63,7 @@ func TestSanity(t *testing.T) { fallbackRequisiteZones := []string{} enableStoragePools := false + enableDataCache := true multiZoneVolumeHandleConfig := driver.MultiZoneVolumeHandleConfig{} listVolumesConfig := driver.ListVolumesConfig{} provisionableDisksConfig := driver.ProvisionableDisksConfig{ @@ -72,12 +73,16 @@ func TestSanity(t *testing.T) { mounter := mountmanager.NewFakeSafeMounter() deviceUtils := deviceutils.NewFakeDeviceUtils(true) + args := driver.NodeServerArgs{ + EnableDeviceInUseCheck: true, + DeviceInUseTimeout: 0, + EnableDataCache: true} //Initialize GCE Driver identityServer := driver.NewIdentityServer(gceDriver) - controllerServer := driver.NewControllerServer(gceDriver, cloudProvider, 0, 5*time.Minute, fallbackRequisiteZones, enableStoragePools, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, true) + controllerServer := driver.NewControllerServer(gceDriver, cloudProvider, 0, 5*time.Minute, fallbackRequisiteZones, enableStoragePools, enableDataCache, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, true) fakeStatter := mountmanager.NewFakeStatterWithOptions(mounter, mountmanager.FakeStatterOptions{IsBlock: false}) - nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), fakeStatter, driver.NodeServerArgs{EnableDeviceInUseCheck: true, DeviceInUseTimeout: 0}) + nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), fakeStatter, args) err = gceDriver.SetupGCEDriver(driverName, vendorVersion, extraLabels, nil, identityServer, controllerServer, nodeServer) if err != nil { t.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error())