Skip to content

Commit 5f48ffd

Browse files
committed
Add ListVolumes with PublishedNodes
1 parent d6f0ad8 commit 5f48ffd

File tree

10 files changed

+235
-17
lines changed

10 files changed

+235
-17
lines changed

deploy/kubernetes/overlays/dev/kustomization.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ images:
1010
- name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
1111
newName: gcr.io/dyzz-csi-staging/csi/gce-pd-driver
1212
newTag: "latest"
13+

pkg/gce-cloud-provider/compute/fake-gce.go

+52-8
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ import (
2828
"google.golang.org/grpc/status"
2929
"k8s.io/klog"
3030
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
31+
32+
"k8s.io/apimachinery/pkg/util/sets"
33+
"k8s.io/apimachinery/pkg/util/uuid"
3134
)
3235

3336
const (
@@ -41,20 +44,22 @@ type FakeCloudProvider struct {
4144
project string
4245
zone string
4346

44-
disks map[string]*CloudDisk
45-
instances map[string]*computev1.Instance
46-
snapshots map[string]*computev1.Snapshot
47+
disks map[string]*CloudDisk
48+
pageTokens map[string]sets.String
49+
instances map[string]*computev1.Instance
50+
snapshots map[string]*computev1.Snapshot
4751
}
4852

4953
var _ GCECompute = &FakeCloudProvider{}
5054

5155
func CreateFakeCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*FakeCloudProvider, error) {
5256
fcp := &FakeCloudProvider{
53-
project: project,
54-
zone: zone,
55-
disks: map[string]*CloudDisk{},
56-
instances: map[string]*computev1.Instance{},
57-
snapshots: map[string]*computev1.Snapshot{},
57+
project: project,
58+
zone: zone,
59+
disks: map[string]*CloudDisk{},
60+
instances: map[string]*computev1.Instance{},
61+
snapshots: map[string]*computev1.Snapshot{},
62+
pageTokens: map[string]sets.String{},
5863
}
5964
for _, d := range cloudDisks {
6065
fcp.disks[d.GetName()] = d
@@ -94,6 +99,45 @@ func (cloud *FakeCloudProvider) ListZones(ctx context.Context, region string) ([
9499
return []string{cloud.zone, "country-region-fakesecondzone"}, nil
95100
}
96101

102+
func (cloud *FakeCloudProvider) ListDisks(ctx context.Context, maxEntries int64, pageToken string) ([]*computev1.Disk, string, error) {
103+
// Ignore page tokens for now
104+
var seen sets.String
105+
var ok bool
106+
var count int64 = 0
107+
var newToken string
108+
d := []*computev1.Disk{}
109+
110+
if pageToken != "" {
111+
seen, ok = cloud.pageTokens[pageToken]
112+
if !ok {
113+
return nil, "", invalidError()
114+
}
115+
} else {
116+
seen = sets.NewString()
117+
}
118+
119+
if maxEntries == 0 {
120+
maxEntries = 500
121+
}
122+
123+
for name, cd := range cloud.disks {
124+
// Only return zonal disks for simplicity
125+
if !seen.Has(name) {
126+
d = append(d, cd.ZonalDisk)
127+
seen.Insert(name)
128+
count++
129+
}
130+
131+
if count >= maxEntries {
132+
newToken = string(uuid.NewUUID())
133+
cloud.pageTokens[newToken] = seen
134+
break
135+
}
136+
}
137+
138+
return d, newToken, nil
139+
}
140+
97141
func (cloud *FakeCloudProvider) ListSnapshots(ctx context.Context, filter string, maxEntries int64, pageToken string) ([]*computev1.Snapshot, string, error) {
98142
var sourceDisk string
99143
snapshots := []*computev1.Snapshot{}

pkg/gce-cloud-provider/compute/gce-compute.go

+18
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type GCECompute interface {
5050
GetDiskTypeURI(volKey *meta.Key, diskType string) string
5151
WaitForAttach(ctx context.Context, volKey *meta.Key, instanceZone, instanceName string) error
5252
ResizeDisk(ctx context.Context, volKey *meta.Key, requestBytes int64) (int64, error)
53+
ListDisks(ctx context.Context, maxEntries int64, pageToken string) ([]*computev1.Disk, string, error)
5354
// Regional Disk Methods
5455
GetReplicaZoneURI(zone string) string
5556
// Instance Methods
@@ -62,6 +63,23 @@ type GCECompute interface {
6263
DeleteSnapshot(ctx context.Context, snapshotName string) error
6364
}
6465

66+
// ListDisks lists disks based on maxEntries and pageToken only in the project
67+
// and zone that the driver is running in.
68+
func (cloud *CloudProvider) ListDisks(ctx context.Context, maxEntries int64, pageToken string) ([]*computev1.Disk, string, error) {
69+
lCall := cloud.service.Disks.List(cloud.project, cloud.zone)
70+
if maxEntries != 0 {
71+
lCall = lCall.MaxResults(maxEntries)
72+
}
73+
if len(pageToken) != 0 {
74+
lCall = lCall.PageToken(pageToken)
75+
}
76+
diskList, err := lCall.Do()
77+
if err != nil {
78+
return nil, "", err
79+
}
80+
return diskList.Items, diskList.NextPageToken, nil
81+
}
82+
6583
// RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified
6684
// by the volume key and return a volume key with a correct zone
6785
func (cloud *CloudProvider) RepairUnderspecifiedVolumeKey(ctx context.Context, volumeKey *meta.Key) (*meta.Key, error) {

pkg/gce-cloud-provider/compute/gce.go

+6
Original file line numberDiff line numberDiff line change
@@ -228,3 +228,9 @@ func IsGCEError(err error, reason string) bool {
228228
func IsGCENotFoundError(err error) bool {
229229
return IsGCEError(err, "notFound")
230230
}
231+
232+
// IsInvalidError returns true if the error is a googleapi.Error with
233+
// invalid reason
234+
func IsGCEInvalidError(err error) bool {
235+
return IsGCEError(err, "invalid")
236+
}

pkg/gce-pd-csi-driver/controller.go

+36-2
Original file line numberDiff line numberDiff line change
@@ -510,8 +510,42 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context
510510

511511
func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
512512
// https://cloud.google.com/compute/docs/reference/beta/disks/list
513-
// List volumes in the whole region? In only the zone that this controller is running?
514-
return nil, status.Error(codes.Unimplemented, "")
513+
if req.MaxEntries < 0 {
514+
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf(
515+
"ListVolumes got max entries request %v. GCE only supports values between 0-500", req.MaxEntries))
516+
}
517+
var maxEntries int64 = int64(req.MaxEntries)
518+
if maxEntries > 500 {
519+
klog.Warningf("ListVolumes requested max entries of %v, GCE only supports values <=500 so defaulting value back to 500", maxEntries)
520+
maxEntries = 500
521+
}
522+
diskList, nextToken, err := gceCS.CloudProvider.ListDisks(ctx, maxEntries, req.StartingToken)
523+
if err != nil {
524+
if gce.IsGCEInvalidError(err) {
525+
return nil, status.Error(codes.Aborted, fmt.Sprintf("ListVolumes error with invalid request: %v", err))
526+
}
527+
return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown list disk error: %v", err))
528+
}
529+
entries := []*csi.ListVolumesResponse_Entry{}
530+
for _, d := range diskList {
531+
users := []string{}
532+
for _, u := range d.Users {
533+
users = append(users, cleanSelfLink(u))
534+
}
535+
entries = append(entries, &csi.ListVolumesResponse_Entry{
536+
Volume: &csi.Volume{
537+
VolumeId: cleanSelfLink(d.SelfLink),
538+
},
539+
Status: &csi.ListVolumesResponse_VolumeStatus{
540+
PublishedNodeIds: users,
541+
},
542+
})
543+
}
544+
545+
return &csi.ListVolumesResponse{
546+
Entries: entries,
547+
NextToken: nextToken,
548+
}, nil
515549
}
516550

517551
func (gceCS *GCEControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {

pkg/gce-pd-csi-driver/controller_test.go

+58
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,64 @@ func TestCreateVolumeArguments(t *testing.T) {
714714
}
715715
}
716716

717+
func TestListVolumeArgs(t *testing.T) {
718+
testCases := []struct {
719+
name string
720+
maxEntries int32
721+
expectedEntries int
722+
expectedErr bool
723+
}{
724+
{
725+
name: "normal",
726+
expectedEntries: 500,
727+
},
728+
{
729+
name: "fine amount of entries",
730+
maxEntries: 420,
731+
expectedEntries: 420,
732+
},
733+
{
734+
name: "too many entries, but defaults to 500",
735+
maxEntries: 501,
736+
expectedEntries: 500,
737+
},
738+
{
739+
name: "negative entries",
740+
maxEntries: -1,
741+
expectedErr: true,
742+
},
743+
}
744+
745+
for _, tc := range testCases {
746+
t.Run(tc.name, func(t *testing.T) {
747+
// Setup new driver each time so no interference
748+
d := []*gce.CloudDisk{}
749+
for i := 0; i < 600; i++ {
750+
// Create 600 dummy disks
751+
d = append(d, &gce.CloudDisk{ZonalDisk: &compute.Disk{Name: fmt.Sprintf("%v", i)}})
752+
}
753+
gceDriver := initGCEDriver(t, d)
754+
lvr := &csi.ListVolumesRequest{
755+
MaxEntries: tc.maxEntries,
756+
}
757+
resp, err := gceDriver.cs.ListVolumes(context.TODO(), lvr)
758+
if tc.expectedErr && err == nil {
759+
t.Fatalf("Got no error when expecting an error")
760+
}
761+
if err != nil {
762+
if !tc.expectedErr {
763+
t.Fatalf("Got error %v, expecting none", err)
764+
}
765+
return
766+
}
767+
768+
if len(resp.Entries) != tc.expectedEntries {
769+
t.Fatalf("Got %v entries, expected %v", len(resp.Entries), tc.expectedEntries)
770+
}
771+
})
772+
}
773+
}
774+
717775
func TestCreateVolumeWithVolumeSource(t *testing.T) {
718776
// Define test cases
719777
testCases := []struct {

pkg/gce-pd-csi-driver/gce-pd-driver.go

+2
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ func (gceDriver *GCEDriver) SetupGCEDriver(cloudProvider gce.GCECompute, mounter
6767
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
6868
csi.ControllerServiceCapability_RPC_PUBLISH_READONLY,
6969
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
70+
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
71+
csi.ControllerServiceCapability_RPC_LIST_VOLUMES_PUBLISHED_NODES,
7072
}
7173
gceDriver.AddControllerServiceCapabilities(csc)
7274
ns := []csi.NodeServiceCapability_RPC_Type{

test/e2e/tests/single_zone_e2e_test.go

+47-5
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"k8s.io/apimachinery/pkg/util/wait"
2525
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
2626
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
27-
remote "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/test/remote"
27+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/test/remote"
2828

2929
csi "github.com/container-storage-interface/spec/lib/go/csi"
3030
. "github.com/onsi/ginkgo"
@@ -423,6 +423,36 @@ var _ = Describe("GCE PD CSI Driver", func() {
423423
Expect(err).To(BeNil(), "Failed to go through volume lifecycle after restoring CMEK key")
424424
})
425425

426+
It("Should create disks, attach them places, and verify List returns correct results", func() {
427+
Expect(testContexts).ToNot(BeEmpty())
428+
testContext := getRandomTestContext()
429+
430+
p, z, _ := testContext.Instance.GetIdentity()
431+
client := testContext.Client
432+
433+
nodeID := testContext.Instance.GetNodeID()
434+
435+
_, volID := createAndValidateUniqueZonalDisk(client, p, z)
436+
defer deleteVolumeOrError(client, volID, p)
437+
438+
_, secondVolID := createAndValidateUniqueZonalDisk(client, p, z)
439+
defer deleteVolumeOrError(client, secondVolID, p)
440+
441+
// Attach volID to current instance
442+
err := client.ControllerPublishVolume(volID, nodeID)
443+
Expect(err).To(BeNil(), "Failed ControllerPublishVolume")
444+
defer client.ControllerUnpublishVolume(volID, nodeID)
445+
446+
// List Volumes
447+
volsToNodes, err := client.ListVolumes()
448+
Expect(err).To(BeNil(), "Failed ListVolumes")
449+
450+
// Verify
451+
Expect(volsToNodes[volID]).ToNot(BeNil(), "Couldn't find attached nodes for vol")
452+
Expect(volsToNodes[volID]).To(ContainElement(nodeID), "Couldn't find node in attached nodes for vol")
453+
Expect(volsToNodes[secondVolID]).To(BeNil(), "Second vol ID attached nodes not nil")
454+
})
455+
426456
It("Should create and delete snapshot for RePD in two zones ", func() {
427457
Expect(testContexts).ToNot(BeEmpty())
428458
testContext := getRandomTestContext()
@@ -580,10 +610,11 @@ func equalWithinEpsilon(a, b, epsiolon int64) bool {
580610
return b-a < epsiolon
581611
}
582612

583-
func createAndValidateUniqueZonalDisk(client *remote.CsiClient, project, zone string) (string, string) {
613+
func createAndValidateUniqueZonalDisk(client *remote.CsiClient, project, zone string) (volName, volID string) {
584614
// Create Disk
585-
volName := testNamePrefix + string(uuid.NewUUID())
586-
volID, err := client.CreateVolume(volName, nil, defaultSizeGb,
615+
var err error
616+
volName = testNamePrefix + string(uuid.NewUUID())
617+
volID, err = client.CreateVolume(volName, nil, defaultSizeGb,
587618
&csi.TopologyRequirement{
588619
Requisite: []*csi.Topology{
589620
{
@@ -600,6 +631,17 @@ func createAndValidateUniqueZonalDisk(client *remote.CsiClient, project, zone st
600631
Expect(cloudDisk.Status).To(Equal(readyState))
601632
Expect(cloudDisk.SizeGb).To(Equal(defaultSizeGb))
602633
Expect(cloudDisk.Name).To(Equal(volName))
634+
return
635+
}
636+
637+
func deleteVolumeOrError(client *remote.CsiClient, volID, project string) {
638+
// Delete Disk
639+
err := client.DeleteVolume(volID)
640+
Expect(err).To(BeNil(), "DeleteVolume failed")
603641

604-
return volName, volID
642+
// Validate Disk Deleted
643+
key, err := common.VolumeIDToKey(volID)
644+
Expect(err).To(BeNil(), "Failed to conver volume ID To key")
645+
_, err = computeService.Disks.Get(project, key.Zone, key.Name).Do()
646+
Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found")
605647
}

test/remote/client-wrappers.go

+12
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,18 @@ func (c *CsiClient) ControllerPublishVolume(volId, nodeId string) error {
138138
return err
139139
}
140140

141+
func (c *CsiClient) ListVolumes() (map[string]([]string), error) {
142+
resp, err := c.ctrlClient.ListVolumes(context.Background(), &csipb.ListVolumesRequest{})
143+
if err != nil {
144+
return nil, err
145+
}
146+
vols := map[string]([]string){}
147+
for _, e := range resp.Entries {
148+
vols[e.Volume.VolumeId] = e.Status.PublishedNodeIds
149+
}
150+
return vols, nil
151+
}
152+
141153
func (c *CsiClient) ControllerUnpublishVolume(volId, nodeId string) error {
142154
cupreq := &csipb.ControllerUnpublishVolumeRequest{
143155
VolumeId: volId,

test/remote/instance.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package remote
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"io/ioutil"
2324
"net/http"
@@ -136,9 +137,9 @@ func (i *InstanceInfo) CreateOrGetInstance(serviceAccount string) error {
136137
if err != nil {
137138
ret := fmt.Sprintf("could not create instance %s: API error: %v", i.name, err)
138139
if op != nil {
139-
ret = fmt.Sprintf("%s: %v", ret, op.Error)
140+
ret = fmt.Sprintf("%s. op error: %v", ret, op.Error)
140141
}
141-
return fmt.Errorf(ret)
142+
return errors.New(ret)
142143
} else if op.Error != nil {
143144
return fmt.Errorf("could not create instance %s: %+v", i.name, op.Error)
144145
}

0 commit comments

Comments
 (0)