Skip to content

Update driver to support compute staging #1586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 16, 2024
3 changes: 2 additions & 1 deletion cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var (

maxConcurrentFormatAndMount = flag.Int("max-concurrent-format-and-mount", 1, "If set then format and mount operations are serialized on each node. This is stronger than max-concurrent-format as it includes fsck and other mount operations")
formatAndMountTimeout = flag.Duration("format-and-mount-timeout", 1*time.Minute, "The maximum duration of a format and mount operation before another such operation will be started. Used only if --serialize-format-and-mount")
computeEnvironment = flag.String("compute-environment", "prod", "Sets the compute environment")

fallbackRequisiteZonesFlag = flag.String("fallback-requisite-zones", "", "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk")

Expand Down Expand Up @@ -156,7 +157,7 @@ func handle() {
// Initialize requirements for the controller service
var controllerServer *driver.GCEControllerServer
if *runControllerService {
cloudProvider, err := gce.CreateCloudProvider(ctx, version, *cloudConfigFilePath, *computeEndpoint)
cloudProvider, err := gce.CreateCloudProvider(ctx, version, *cloudConfigFilePath, *computeEndpoint, *computeEnvironment)
if err != nil {
klog.Fatalf("Failed to get cloud provider: %v", err.Error())
}
Expand Down
92 changes: 57 additions & 35 deletions pkg/gce-cloud-provider/compute/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"os"
"runtime"
"time"
Expand Down Expand Up @@ -47,8 +48,27 @@ const (
regionURITemplate = "projects/%s/regions/%s"

replicaZoneURITemplateSingleZone = "projects/%s/zones/%s" // {gce.projectID}/zones/{disk.Zone}
versionV1 = "v1"
versionBeta = "beta"
versionAlpha = "alpha"
googleEnv = "googleapis"
)

var computeVersionMap = map[string]map[string]map[string]string{
googleEnv: {
"prod": {
versionV1: "compute/v1/",
versionBeta: "compute/beta/",
versionAlpha: "compute/alpha/",
},
"staging": {
versionV1: "compute/staging_v1/",
versionBeta: "compute/staging_beta/",
versionAlpha: "compute/staging_alpha/",
},
},
}

type CloudProvider struct {
service *compute.Service
betaService *computebeta.Service
Expand All @@ -72,7 +92,7 @@ type ConfigGlobal struct {
Zone string `gcfg:"zone"`
}

func CreateCloudProvider(ctx context.Context, vendorVersion string, configPath string, computeEndpoint string) (*CloudProvider, error) {
func CreateCloudProvider(ctx context.Context, vendorVersion string, configPath string, computeEndpoint string, computeEnvironment string) (*CloudProvider, error) {
configFile, err := readConfig(configPath)
if err != nil {
return nil, err
Expand All @@ -87,20 +107,23 @@ func CreateCloudProvider(ctx context.Context, vendorVersion string, configPath s
return nil, err
}

svc, err := createCloudService(ctx, vendorVersion, tokenSource, computeEndpoint)
svc, err := createCloudService(ctx, vendorVersion, tokenSource, computeEndpoint, computeEnvironment)
if err != nil {
return nil, err
}
klog.Infof("Compute endpoint for V1 version: %s", svc.BasePath)

betasvc, err := createBetaCloudService(ctx, vendorVersion, tokenSource, computeEndpoint)
betasvc, err := createBetaCloudService(ctx, vendorVersion, tokenSource, computeEndpoint, computeEnvironment)
if err != nil {
return nil, err
}
klog.Infof("Compute endpoint for Beta version: %s", betasvc.BasePath)

alphasvc, err := createAlphaCloudService(ctx, vendorVersion, tokenSource, computeEndpoint)
alphasvc, err := createAlphaCloudService(ctx, vendorVersion, tokenSource, computeEndpoint, computeEnvironment)
if err != nil {
return nil, err
}
klog.Infof("Compute endpoint for Alpha version: %s", alphasvc.BasePath)

project, zone, err := getProjectAndZone(configFile)
if err != nil {
Expand Down Expand Up @@ -164,16 +187,23 @@ func readConfig(configPath string) (*ConfigFile, error) {
return cfg, nil
}

func createBetaCloudService(ctx context.Context, vendorVersion string, tokenSource oauth2.TokenSource, computeEndpoint string) (*computebeta.Service, error) {
client, err := newOauthClient(ctx, tokenSource)
func createAlphaCloudService(ctx context.Context, vendorVersion string, tokenSource oauth2.TokenSource, computeEndpoint string, computeEnvironment string) (*computealpha.Service, error) {
computeOpts, err := getComputeVersion(ctx, tokenSource, computeEndpoint, computeEnvironment, versionAlpha)
if err != nil {
klog.Errorf("Failed to get compute endpoint: %s", err)
}
service, err := computealpha.NewService(ctx, computeOpts...)
if err != nil {
return nil, err
}
service.UserAgent = fmt.Sprintf("GCE CSI Driver/%s (%s %s)", vendorVersion, runtime.GOOS, runtime.GOARCH)
return service, nil
}

computeOpts := []option.ClientOption{option.WithHTTPClient(client)}
if computeEndpoint != "" {
betaEndpoint := fmt.Sprintf("%s/compute/beta/", computeEndpoint)
computeOpts = append(computeOpts, option.WithEndpoint(betaEndpoint))
func createBetaCloudService(ctx context.Context, vendorVersion string, tokenSource oauth2.TokenSource, computeEndpoint string, computeEnvironment string) (*computebeta.Service, error) {
computeOpts, err := getComputeVersion(ctx, tokenSource, computeEndpoint, computeEnvironment, versionBeta)
if err != nil {
klog.Errorf("Failed to get compute endpoint: %s", err)
}
service, err := computebeta.NewService(ctx, computeOpts...)
if err != nil {
Expand All @@ -183,47 +213,39 @@ func createBetaCloudService(ctx context.Context, vendorVersion string, tokenSour
return service, nil
}

func createAlphaCloudService(ctx context.Context, vendorVersion string, tokenSource oauth2.TokenSource, computeEndpoint string) (*computealpha.Service, error) {
client, err := newOauthClient(ctx, tokenSource)
func createCloudService(ctx context.Context, vendorVersion string, tokenSource oauth2.TokenSource, computeEndpoint string, computeEnvironment string) (*compute.Service, error) {
computeOpts, err := getComputeVersion(ctx, tokenSource, computeEndpoint, computeEnvironment, versionV1)
if err != nil {
return nil, err
}

computeOpts := []option.ClientOption{option.WithHTTPClient(client)}
if computeEndpoint != "" {
alphaEndpoint := fmt.Sprintf("%s/compute/alpha/", computeEndpoint)
computeOpts = append(computeOpts, option.WithEndpoint(alphaEndpoint))
klog.Errorf("Failed to get compute endpoint: %s", err)
}
service, err := computealpha.NewService(ctx, computeOpts...)
service, err := compute.NewService(ctx, computeOpts...)
if err != nil {
return nil, err
}
service.UserAgent = fmt.Sprintf("GCE CSI Driver/%s (%s %s)", vendorVersion, runtime.GOOS, runtime.GOARCH)
return service, nil
}

func createCloudService(ctx context.Context, vendorVersion string, tokenSource oauth2.TokenSource, computeEndpoint string) (*compute.Service, error) {
svc, err := createCloudServiceWithDefaultServiceAccount(ctx, vendorVersion, tokenSource, computeEndpoint)
return svc, err
}

func createCloudServiceWithDefaultServiceAccount(ctx context.Context, vendorVersion string, tokenSource oauth2.TokenSource, computeEndpoint string) (*compute.Service, error) {
func getComputeVersion(ctx context.Context, tokenSource oauth2.TokenSource, computeEndpoint string, computeEnvironment string, computeVersion string) ([]option.ClientOption, error) {
client, err := newOauthClient(ctx, tokenSource)
if err != nil {
return nil, err
}

computeEnvironmentSuffix, ok := computeVersionMap[googleEnv][computeEnvironment][computeVersion]
if !ok {
return nil, errors.New("Unable to fetch compute endpoint")
}
computeOpts := []option.ClientOption{option.WithHTTPClient(client)}
if computeEndpoint != "" {
v1Endpoint := fmt.Sprintf("%s/compute/v1/", computeEndpoint)
computeOpts = append(computeOpts, option.WithEndpoint(v1Endpoint))
}
service, err := compute.NewService(ctx, computeOpts...)
if err != nil {
return nil, err
endpoint := fmt.Sprintf("%s%s", computeEndpoint, computeEnvironmentSuffix)
klog.Infof("Got compute endpoint %s", endpoint)
_, err := url.ParseRequestURI(endpoint)
if err != nil {
klog.Fatalf("Error parsing compute endpoint %s", endpoint)
}
computeOpts = append(computeOpts, option.WithEndpoint(endpoint))
}
service.UserAgent = fmt.Sprintf("GCE CSI Driver/%s (%s %s)", vendorVersion, runtime.GOOS, runtime.GOARCH)
return service, nil
return computeOpts, nil
}

func newOauthClient(ctx context.Context, tokenSource oauth2.TokenSource) (*http.Client, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ const (
)

var (
validResourceApiVersions = map[string]bool{"v1": true, "alpha": true, "beta": true}
validResourceApiVersions = map[string]bool{"v1": true, "alpha": true, "beta": true, "staging_v1": true, "staging_beta": true, "staging_alpha": true}
)

func isDiskReady(disk *gce.CloudDisk) (bool, error) {
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func GCEClientAndDriverSetup(instance *remote.InstanceInfo, computeEndpoint stri
binPath := path.Join(pkgPath, "bin/gce-pd-csi-driver")

endpoint := fmt.Sprintf("tcp://localhost:%s", port)
klog.Infof("ENDPOINT: %s", endpoint)
extra_flags := []string{
fmt.Sprintf("--extra-labels=%s=%s", DiskLabelKey, DiskLabelValue),
"--max-concurrent-format-and-mount=20", // otherwise the serialization times out the e2e test.
Expand All @@ -65,7 +66,7 @@ func GCEClientAndDriverSetup(instance *remote.InstanceInfo, computeEndpoint stri
// useful to see what's happening when debugging tests.
driverRunCmd := fmt.Sprintf("sh -c '/usr/bin/nohup %s/gce-pd-csi-driver -v=6 --endpoint=%s %s 2> %s/prog.out < /dev/null > /dev/null &'",
workspace, endpoint, strings.Join(extra_flags, " "), workspace)

klog.Infof("DRIVER COMMAND %s", driverRunCmd)
config := &remote.ClientConfig{
PkgPath: pkgPath,
BinPath: binPath,
Expand Down
2 changes: 2 additions & 0 deletions test/remote/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd s

klog.V(4).Infof("Starting driver on %q", i.name)
// When the process is killed the driver should close the TCP endpoint, then we want to download the logs
klog.Infof("DRIVER RUN COMMAND: %s", driverRunCmd)
output, err := i.SSH(driverRunCmd)
if err != nil {
// Exit failure with the error
Expand All @@ -75,6 +76,7 @@ func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd s
//`awk "{print \$2}"`,
)
driverPIDString, err := i.SSHNoSudo("sh", "-c", driverPIDCmd)
klog.Infof("DRIVER PID STRING %s: COMMAND: %s", driverPIDString, driverPIDCmd)
if err != nil {
// Exit failure with the error
return -1, fmt.Errorf("failed to get PID of driver, got output: %v, error: %v", output, err.Error())
Expand Down