diff --git a/pkg/gce-cloud-provider/compute/fake-gce.go b/pkg/gce-cloud-provider/compute/fake-gce.go index 6a6a8f5d0..f59b63c06 100644 --- a/pkg/gce-cloud-provider/compute/fake-gce.go +++ b/pkg/gce-cloud-provider/compute/fake-gce.go @@ -30,7 +30,6 @@ import ( "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/uuid" ) const ( @@ -115,43 +114,12 @@ func (cloud *FakeCloudProvider) ListZones(ctx context.Context, region string) ([ return []string{cloud.zone, "country-region-fakesecondzone"}, nil } -func (cloud *FakeCloudProvider) ListDisks(ctx context.Context, maxEntries int64, pageToken string) ([]*computev1.Disk, string, error) { - // Ignore page tokens for now - var seen sets.String - var ok bool - var count int64 = 0 - var newToken string +func (cloud *FakeCloudProvider) ListDisks(ctx context.Context) ([]*computev1.Disk, string, error) { d := []*computev1.Disk{} - - if pageToken != "" { - seen, ok = cloud.pageTokens[pageToken] - if !ok { - return nil, "", invalidError() - } - } else { - seen = sets.NewString() - } - - if maxEntries == 0 { - maxEntries = 500 - } - - for name, cd := range cloud.disks { - // Only return v1 disks for simplicity - if !seen.Has(name) { - d = append(d, cd.disk) - seen.Insert(name) - count++ - } - - if count >= maxEntries { - newToken = string(uuid.NewUUID()) - cloud.pageTokens[newToken] = seen - break - } + for _, cd := range cloud.disks { + d = append(d, cd.disk) } - - return d, newToken, nil + return d, "", nil } func (cloud *FakeCloudProvider) ListSnapshots(ctx context.Context, filter string, maxEntries int64, pageToken string) ([]*computev1.Snapshot, string, error) { diff --git a/pkg/gce-cloud-provider/compute/gce-compute.go b/pkg/gce-cloud-provider/compute/gce-compute.go index 5d75acd34..73787c679 100644 --- a/pkg/gce-cloud-provider/compute/gce-compute.go +++ b/pkg/gce-cloud-provider/compute/gce-compute.go @@ -64,7 +64,7 @@ type GCECompute interface { GetDiskTypeURI(project string, volKey *meta.Key, diskType string) string WaitForAttach(ctx context.Context, project string, volKey *meta.Key, instanceZone, instanceName string) error ResizeDisk(ctx context.Context, project string, volKey *meta.Key, requestBytes int64) (int64, error) - ListDisks(ctx context.Context, maxEntries int64, pageToken string) ([]*computev1.Disk, string, error) + ListDisks(ctx context.Context) ([]*computev1.Disk, string, error) // Regional Disk Methods GetReplicaZoneURI(project string, zone string) string // Instance Methods @@ -89,19 +89,31 @@ func (cloud *CloudProvider) GetDefaultZone() string { // ListDisks lists disks based on maxEntries and pageToken only in the project // and zone that the driver is running in. -func (cloud *CloudProvider) ListDisks(ctx context.Context, maxEntries int64, pageToken string) ([]*computev1.Disk, string, error) { - lCall := cloud.service.Disks.List(cloud.project, cloud.zone) - if maxEntries != 0 { - lCall = lCall.MaxResults(maxEntries) - } - if len(pageToken) != 0 { - lCall = lCall.PageToken(pageToken) +func (cloud *CloudProvider) ListDisks(ctx context.Context) ([]*computev1.Disk, string, error) { + region, err := common.GetRegionFromZones([]string{cloud.zone}) + if err != nil { + return nil, "", fmt.Errorf("failed to get region from zones: %v", err) } - diskList, err := lCall.Do() + zones, err := cloud.ListZones(ctx, region) if err != nil { return nil, "", err } - return diskList.Items, diskList.NextPageToken, nil + items := []*computev1.Disk{} + + for _, zone := range zones { + lCall := cloud.service.Disks.List(cloud.project, zone) + nextPageToken := "pageToken" + for nextPageToken != "" { + diskList, err := lCall.Do() + if err != nil { + return nil, "", err + } + items = append(items, diskList.Items...) + nextPageToken = diskList.NextPageToken + lCall.PageToken(nextPageToken) + } + } + return items, "", nil } // RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index bc2323461..5bd22299f 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/klog" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" @@ -39,6 +40,9 @@ type GCEControllerServer struct { Driver *GCEDriver CloudProvider gce.GCECompute + disks []*compute.Disk + seen map[string]int + // A map storing all volumes with ongoing operations so that additional // operations for that same volume (as defined by Volume Key) return an // Aborted error @@ -524,22 +528,36 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List // https://cloud.google.com/compute/docs/reference/beta/disks/list if req.MaxEntries < 0 { return nil, status.Error(codes.InvalidArgument, fmt.Sprintf( - "ListVolumes got max entries request %v. GCE only supports values between 0-500", req.MaxEntries)) - } - var maxEntries int64 = int64(req.MaxEntries) - if maxEntries > 500 { - klog.Warningf("ListVolumes requested max entries of %v, GCE only supports values <=500 so defaulting value back to 500", maxEntries) - maxEntries = 500 + "ListVolumes got max entries request %v. GCE only supports values >0", req.MaxEntries)) } - diskList, nextToken, err := gceCS.CloudProvider.ListDisks(ctx, maxEntries, req.StartingToken) - if err != nil { - if gce.IsGCEInvalidError(err) { - return nil, status.Error(codes.Aborted, fmt.Sprintf("ListVolumes error with invalid request: %v", err)) + + offset := 0 + var ok bool + if req.StartingToken == "" { + diskList, _, err := gceCS.CloudProvider.ListDisks(ctx) + if err != nil { + if gce.IsGCEInvalidError(err) { + return nil, status.Error(codes.Aborted, fmt.Sprintf("ListVolumes error with invalid request: %v", err)) + } + return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown list disk error: %v", err)) } - return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown list disk error: %v", err)) + gceCS.disks = diskList + gceCS.seen = map[string]int{} + } else { + offset, ok = gceCS.seen[req.StartingToken] + if !ok { + return nil, status.Error(codes.Aborted, fmt.Sprintf("ListVolumes error with invalid startingToken: %s", req.StartingToken)) + } + } + + var maxEntries int = int(req.MaxEntries) + if maxEntries == 0 { + maxEntries = len(gceCS.disks) } + entries := []*csi.ListVolumesResponse_Entry{} - for _, d := range diskList { + for i := 0; i+offset < len(gceCS.disks) && i < maxEntries; i++ { + d := gceCS.disks[i+offset] users := []string{} for _, u := range d.Users { users = append(users, cleanSelfLink(u)) @@ -554,6 +572,12 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List }) } + nextToken := "" + if len(entries)+offset < len(gceCS.disks) { + nextToken = string(uuid.NewUUID()) + gceCS.seen[nextToken] = len(entries) + offset + } + return &csi.ListVolumesResponse{ Entries: entries, NextToken: nextToken, diff --git a/pkg/gce-pd-csi-driver/controller_test.go b/pkg/gce-pd-csi-driver/controller_test.go index dff1c4128..fe671ba78 100644 --- a/pkg/gce-pd-csi-driver/controller_test.go +++ b/pkg/gce-pd-csi-driver/controller_test.go @@ -771,6 +771,7 @@ func TestCreateVolumeArguments(t *testing.T) { } func TestListVolumeArgs(t *testing.T) { + diskCount := 600 testCases := []struct { name string maxEntries int32 @@ -779,18 +780,13 @@ func TestListVolumeArgs(t *testing.T) { }{ { name: "normal", - expectedEntries: 500, + expectedEntries: diskCount, }, { name: "fine amount of entries", maxEntries: 420, expectedEntries: 420, }, - { - name: "too many entries, but defaults to 500", - maxEntries: 501, - expectedEntries: 500, - }, { name: "negative entries", maxEntries: -1, @@ -802,7 +798,7 @@ func TestListVolumeArgs(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // Setup new driver each time so no interference var d []*gce.CloudDisk - for i := 0; i < 600; i++ { + for i := 0; i < diskCount; i++ { // Create 600 dummy disks d = append(d, gce.CloudDiskFromV1(&compute.Disk{Name: fmt.Sprintf("%v", i)})) } diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver.go b/pkg/gce-pd-csi-driver/gce-pd-driver.go index 603383688..62189861a 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver.go @@ -151,6 +151,7 @@ func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute) *GC return &GCEControllerServer{ Driver: gceDriver, CloudProvider: cloudProvider, + seen: map[string]int{}, volumeLocks: common.NewVolumeLocks(), } }