Skip to content

Add ListVolumes with PublishedNodes #392

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

[[constraint]]
name = "github.com/kubernetes-csi/csi-test"
version = "2.2.0"
version = "2.4.0-rc1"

[[constraint]]
branch = "master"
Expand Down
1 change: 1 addition & 0 deletions deploy/kubernetes/overlays/dev/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ images:
- name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
newName: gcr.io/dyzz-csi-staging/csi/gce-pd-driver
newTag: "latest"

60 changes: 52 additions & 8 deletions pkg/gce-cloud-provider/compute/fake-gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"google.golang.org/grpc/status"
"k8s.io/klog"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
)

const (
Expand All @@ -41,20 +44,22 @@ type FakeCloudProvider struct {
project string
zone string

disks map[string]*CloudDisk
instances map[string]*computev1.Instance
snapshots map[string]*computev1.Snapshot
disks map[string]*CloudDisk
pageTokens map[string]sets.String
instances map[string]*computev1.Instance
snapshots map[string]*computev1.Snapshot
}

var _ GCECompute = &FakeCloudProvider{}

func CreateFakeCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*FakeCloudProvider, error) {
fcp := &FakeCloudProvider{
project: project,
zone: zone,
disks: map[string]*CloudDisk{},
instances: map[string]*computev1.Instance{},
snapshots: map[string]*computev1.Snapshot{},
project: project,
zone: zone,
disks: map[string]*CloudDisk{},
instances: map[string]*computev1.Instance{},
snapshots: map[string]*computev1.Snapshot{},
pageTokens: map[string]sets.String{},
}
for _, d := range cloudDisks {
fcp.disks[d.GetName()] = d
Expand Down Expand Up @@ -94,6 +99,45 @@ func (cloud *FakeCloudProvider) ListZones(ctx context.Context, region string) ([
return []string{cloud.zone, "country-region-fakesecondzone"}, nil
}

func (cloud *FakeCloudProvider) ListDisks(ctx context.Context, maxEntries int64, pageToken string) ([]*computev1.Disk, string, error) {
// Ignore page tokens for now
var seen sets.String
var ok bool
var count int64 = 0
var newToken string
d := []*computev1.Disk{}

if pageToken != "" {
seen, ok = cloud.pageTokens[pageToken]
if !ok {
return nil, "", invalidError()
}
} else {
seen = sets.NewString()
}

if maxEntries == 0 {
maxEntries = 500
}

for name, cd := range cloud.disks {
// Only return zonal disks for simplicity
if !seen.Has(name) {
d = append(d, cd.ZonalDisk)
seen.Insert(name)
count++
}

if count >= maxEntries {
newToken = string(uuid.NewUUID())
cloud.pageTokens[newToken] = seen
break
}
}

return d, newToken, nil
}

func (cloud *FakeCloudProvider) ListSnapshots(ctx context.Context, filter string, maxEntries int64, pageToken string) ([]*computev1.Snapshot, string, error) {
var sourceDisk string
snapshots := []*computev1.Snapshot{}
Expand Down
18 changes: 18 additions & 0 deletions pkg/gce-cloud-provider/compute/gce-compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type GCECompute interface {
GetDiskTypeURI(volKey *meta.Key, diskType string) string
WaitForAttach(ctx context.Context, volKey *meta.Key, instanceZone, instanceName string) error
ResizeDisk(ctx context.Context, volKey *meta.Key, requestBytes int64) (int64, error)
ListDisks(ctx context.Context, maxEntries int64, pageToken string) ([]*computev1.Disk, string, error)
// Regional Disk Methods
GetReplicaZoneURI(zone string) string
// Instance Methods
Expand All @@ -62,6 +63,23 @@ type GCECompute interface {
DeleteSnapshot(ctx context.Context, snapshotName string) error
}

// ListDisks lists disks based on maxEntries and pageToken only in the project
// and zone that the driver is running in.
func (cloud *CloudProvider) ListDisks(ctx context.Context, maxEntries int64, pageToken string) ([]*computev1.Disk, string, error) {
lCall := cloud.service.Disks.List(cloud.project, cloud.zone)
if maxEntries != 0 {
lCall = lCall.MaxResults(maxEntries)
}
if len(pageToken) != 0 {
lCall = lCall.PageToken(pageToken)
}
diskList, err := lCall.Do()
if err != nil {
return nil, "", err
}
return diskList.Items, diskList.NextPageToken, nil
}

// RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified
// by the volume key and return a volume key with a correct zone
func (cloud *CloudProvider) RepairUnderspecifiedVolumeKey(ctx context.Context, volumeKey *meta.Key) (*meta.Key, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/gce-cloud-provider/compute/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,9 @@ func IsGCEError(err error, reason string) bool {
func IsGCENotFoundError(err error) bool {
return IsGCEError(err, "notFound")
}

// IsInvalidError returns true if the error is a googleapi.Error with
// invalid reason
func IsGCEInvalidError(err error) bool {
return IsGCEError(err, "invalid")
}
38 changes: 36 additions & 2 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,42 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context

func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
// https://cloud.google.com/compute/docs/reference/beta/disks/list
// List volumes in the whole region? In only the zone that this controller is running?
return nil, status.Error(codes.Unimplemented, "")
if req.MaxEntries < 0 {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf(
"ListVolumes got max entries request %v. GCE only supports values between 0-500", req.MaxEntries))
}
var maxEntries int64 = int64(req.MaxEntries)
if maxEntries > 500 {
klog.Warningf("ListVolumes requested max entries of %v, GCE only supports values <=500 so defaulting value back to 500", maxEntries)
maxEntries = 500
}
diskList, nextToken, err := gceCS.CloudProvider.ListDisks(ctx, maxEntries, req.StartingToken)
if err != nil {
if gce.IsGCEInvalidError(err) {
return nil, status.Error(codes.Aborted, fmt.Sprintf("ListVolumes error with invalid request: %v", err))
}
return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown list disk error: %v", err))
}
entries := []*csi.ListVolumesResponse_Entry{}
for _, d := range diskList {
users := []string{}
for _, u := range d.Users {
users = append(users, cleanSelfLink(u))
}
entries = append(entries, &csi.ListVolumesResponse_Entry{
Volume: &csi.Volume{
VolumeId: cleanSelfLink(d.SelfLink),
},
Status: &csi.ListVolumesResponse_VolumeStatus{
PublishedNodeIds: users,
},
})
}

return &csi.ListVolumesResponse{
Entries: entries,
NextToken: nextToken,
}, nil
}

func (gceCS *GCEControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
Expand Down
58 changes: 58 additions & 0 deletions pkg/gce-pd-csi-driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,64 @@ func TestCreateVolumeArguments(t *testing.T) {
}
}

func TestListVolumeArgs(t *testing.T) {
testCases := []struct {
name string
maxEntries int32
expectedEntries int
expectedErr bool
}{
{
name: "normal",
expectedEntries: 500,
},
{
name: "fine amount of entries",
maxEntries: 420,
expectedEntries: 420,
},
{
name: "too many entries, but defaults to 500",
maxEntries: 501,
expectedEntries: 500,
},
{
name: "negative entries",
maxEntries: -1,
expectedErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Setup new driver each time so no interference
d := []*gce.CloudDisk{}
for i := 0; i < 600; i++ {
// Create 600 dummy disks
d = append(d, &gce.CloudDisk{ZonalDisk: &compute.Disk{Name: fmt.Sprintf("%v", i)}})
}
gceDriver := initGCEDriver(t, d)
lvr := &csi.ListVolumesRequest{
MaxEntries: tc.maxEntries,
}
resp, err := gceDriver.cs.ListVolumes(context.TODO(), lvr)
if tc.expectedErr && err == nil {
t.Fatalf("Got no error when expecting an error")
}
if err != nil {
if !tc.expectedErr {
t.Fatalf("Got error %v, expecting none", err)
}
return
}

if len(resp.Entries) != tc.expectedEntries {
t.Fatalf("Got %v entries, expected %v", len(resp.Entries), tc.expectedEntries)
}
})
}
}

func TestCreateVolumeWithVolumeSource(t *testing.T) {
// Define test cases
testCases := []struct {
Expand Down
2 changes: 2 additions & 0 deletions pkg/gce-pd-csi-driver/gce-pd-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (gceDriver *GCEDriver) SetupGCEDriver(cloudProvider gce.GCECompute, mounter
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
csi.ControllerServiceCapability_RPC_PUBLISH_READONLY,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES_PUBLISHED_NODES,
}
gceDriver.AddControllerServiceCapabilities(csc)
ns := []csi.NodeServiceCapability_RPC_Type{
Expand Down
52 changes: 47 additions & 5 deletions test/e2e/tests/single_zone_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
remote "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/test/remote"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/test/remote"

csi "github.com/container-storage-interface/spec/lib/go/csi"
. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -423,6 +423,36 @@ var _ = Describe("GCE PD CSI Driver", func() {
Expect(err).To(BeNil(), "Failed to go through volume lifecycle after restoring CMEK key")
})

It("Should create disks, attach them places, and verify List returns correct results", func() {
Expect(testContexts).ToNot(BeEmpty())
testContext := getRandomTestContext()

p, z, _ := testContext.Instance.GetIdentity()
client := testContext.Client

nodeID := testContext.Instance.GetNodeID()

_, volID := createAndValidateUniqueZonalDisk(client, p, z)
defer deleteVolumeOrError(client, volID, p)

_, secondVolID := createAndValidateUniqueZonalDisk(client, p, z)
defer deleteVolumeOrError(client, secondVolID, p)

// Attach volID to current instance
err := client.ControllerPublishVolume(volID, nodeID)
Expect(err).To(BeNil(), "Failed ControllerPublishVolume")
defer client.ControllerUnpublishVolume(volID, nodeID)

// List Volumes
volsToNodes, err := client.ListVolumes()
Expect(err).To(BeNil(), "Failed ListVolumes")

// Verify
Expect(volsToNodes[volID]).ToNot(BeNil(), "Couldn't find attached nodes for vol")
Expect(volsToNodes[volID]).To(ContainElement(nodeID), "Couldn't find node in attached nodes for vol")
Expect(volsToNodes[secondVolID]).To(BeNil(), "Second vol ID attached nodes not nil")
})

It("Should create and delete snapshot for RePD in two zones ", func() {
Expect(testContexts).ToNot(BeEmpty())
testContext := getRandomTestContext()
Expand Down Expand Up @@ -580,10 +610,11 @@ func equalWithinEpsilon(a, b, epsiolon int64) bool {
return b-a < epsiolon
}

func createAndValidateUniqueZonalDisk(client *remote.CsiClient, project, zone string) (string, string) {
func createAndValidateUniqueZonalDisk(client *remote.CsiClient, project, zone string) (volName, volID string) {
// Create Disk
volName := testNamePrefix + string(uuid.NewUUID())
volID, err := client.CreateVolume(volName, nil, defaultSizeGb,
var err error
volName = testNamePrefix + string(uuid.NewUUID())
volID, err = client.CreateVolume(volName, nil, defaultSizeGb,
&csi.TopologyRequirement{
Requisite: []*csi.Topology{
{
Expand All @@ -600,6 +631,17 @@ func createAndValidateUniqueZonalDisk(client *remote.CsiClient, project, zone st
Expect(cloudDisk.Status).To(Equal(readyState))
Expect(cloudDisk.SizeGb).To(Equal(defaultSizeGb))
Expect(cloudDisk.Name).To(Equal(volName))
return
}

func deleteVolumeOrError(client *remote.CsiClient, volID, project string) {
// Delete Disk
err := client.DeleteVolume(volID)
Expect(err).To(BeNil(), "DeleteVolume failed")

return volName, volID
// Validate Disk Deleted
key, err := common.VolumeIDToKey(volID)
Expect(err).To(BeNil(), "Failed to conver volume ID To key")
_, err = computeService.Disks.Get(project, key.Zone, key.Name).Do()
Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found")
}
12 changes: 12 additions & 0 deletions test/remote/client-wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ func (c *CsiClient) ControllerPublishVolume(volId, nodeId string) error {
return err
}

func (c *CsiClient) ListVolumes() (map[string]([]string), error) {
resp, err := c.ctrlClient.ListVolumes(context.Background(), &csipb.ListVolumesRequest{})
if err != nil {
return nil, err
}
vols := map[string]([]string){}
for _, e := range resp.Entries {
vols[e.Volume.VolumeId] = e.Status.PublishedNodeIds
}
return vols, nil
}

func (c *CsiClient) ControllerUnpublishVolume(volId, nodeId string) error {
cupreq := &csipb.ControllerUnpublishVolumeRequest{
VolumeId: volId,
Expand Down
Loading