Skip to content

Cherry-pick #813 to release-1.3 #817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 4 additions & 36 deletions pkg/gce-cloud-provider/compute/fake-gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
)

const (
Expand Down Expand Up @@ -115,43 +114,12 @@ func (cloud *FakeCloudProvider) ListZones(ctx context.Context, region string) ([
return []string{cloud.zone, "country-region-fakesecondzone"}, nil
}

func (cloud *FakeCloudProvider) ListDisks(ctx context.Context, maxEntries int64, pageToken string) ([]*computev1.Disk, string, error) {
// Ignore page tokens for now
var seen sets.String
var ok bool
var count int64 = 0
var newToken string
func (cloud *FakeCloudProvider) ListDisks(ctx context.Context) ([]*computev1.Disk, string, error) {
d := []*computev1.Disk{}

if pageToken != "" {
seen, ok = cloud.pageTokens[pageToken]
if !ok {
return nil, "", invalidError()
}
} else {
seen = sets.NewString()
}

if maxEntries == 0 {
maxEntries = 500
}

for name, cd := range cloud.disks {
// Only return v1 disks for simplicity
if !seen.Has(name) {
d = append(d, cd.disk)
seen.Insert(name)
count++
}

if count >= maxEntries {
newToken = string(uuid.NewUUID())
cloud.pageTokens[newToken] = seen
break
}
for _, cd := range cloud.disks {
d = append(d, cd.disk)
}

return d, newToken, nil
return d, "", nil
}

func (cloud *FakeCloudProvider) ListSnapshots(ctx context.Context, filter string, maxEntries int64, pageToken string) ([]*computev1.Snapshot, string, error) {
Expand Down
32 changes: 22 additions & 10 deletions pkg/gce-cloud-provider/compute/gce-compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type GCECompute interface {
GetDiskTypeURI(project string, volKey *meta.Key, diskType string) string
WaitForAttach(ctx context.Context, project string, volKey *meta.Key, instanceZone, instanceName string) error
ResizeDisk(ctx context.Context, project string, volKey *meta.Key, requestBytes int64) (int64, error)
ListDisks(ctx context.Context, maxEntries int64, pageToken string) ([]*computev1.Disk, string, error)
ListDisks(ctx context.Context) ([]*computev1.Disk, string, error)
// Regional Disk Methods
GetReplicaZoneURI(project string, zone string) string
// Instance Methods
Expand All @@ -89,19 +89,31 @@ func (cloud *CloudProvider) GetDefaultZone() string {

// ListDisks lists disks based on maxEntries and pageToken only in the project
// and zone that the driver is running in.
func (cloud *CloudProvider) ListDisks(ctx context.Context, maxEntries int64, pageToken string) ([]*computev1.Disk, string, error) {
lCall := cloud.service.Disks.List(cloud.project, cloud.zone)
if maxEntries != 0 {
lCall = lCall.MaxResults(maxEntries)
}
if len(pageToken) != 0 {
lCall = lCall.PageToken(pageToken)
func (cloud *CloudProvider) ListDisks(ctx context.Context) ([]*computev1.Disk, string, error) {
region, err := common.GetRegionFromZones([]string{cloud.zone})
if err != nil {
return nil, "", fmt.Errorf("failed to get region from zones: %v", err)
}
diskList, err := lCall.Do()
zones, err := cloud.ListZones(ctx, region)
if err != nil {
return nil, "", err
}
return diskList.Items, diskList.NextPageToken, nil
items := []*computev1.Disk{}

for _, zone := range zones {
lCall := cloud.service.Disks.List(cloud.project, zone)
nextPageToken := "pageToken"
for nextPageToken != "" {
diskList, err := lCall.Do()
if err != nil {
return nil, "", err
}
items = append(items, diskList.Items...)
nextPageToken = diskList.NextPageToken
lCall.PageToken(nextPageToken)
}
}
return items, "", nil
}

// RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified
Expand Down
48 changes: 36 additions & 12 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/klog"

"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
Expand All @@ -39,6 +40,9 @@ type GCEControllerServer struct {
Driver *GCEDriver
CloudProvider gce.GCECompute

disks []*compute.Disk
seen map[string]int

// A map storing all volumes with ongoing operations so that additional
// operations for that same volume (as defined by Volume Key) return an
// Aborted error
Expand Down Expand Up @@ -524,22 +528,36 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List
// https://cloud.google.com/compute/docs/reference/beta/disks/list
if req.MaxEntries < 0 {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf(
"ListVolumes got max entries request %v. GCE only supports values between 0-500", req.MaxEntries))
}
var maxEntries int64 = int64(req.MaxEntries)
if maxEntries > 500 {
klog.Warningf("ListVolumes requested max entries of %v, GCE only supports values <=500 so defaulting value back to 500", maxEntries)
maxEntries = 500
"ListVolumes got max entries request %v. GCE only supports values >0", req.MaxEntries))
}
diskList, nextToken, err := gceCS.CloudProvider.ListDisks(ctx, maxEntries, req.StartingToken)
if err != nil {
if gce.IsGCEInvalidError(err) {
return nil, status.Error(codes.Aborted, fmt.Sprintf("ListVolumes error with invalid request: %v", err))

offset := 0
var ok bool
if req.StartingToken == "" {
diskList, _, err := gceCS.CloudProvider.ListDisks(ctx)
if err != nil {
if gce.IsGCEInvalidError(err) {
return nil, status.Error(codes.Aborted, fmt.Sprintf("ListVolumes error with invalid request: %v", err))
}
return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown list disk error: %v", err))
}
return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown list disk error: %v", err))
gceCS.disks = diskList
gceCS.seen = map[string]int{}
} else {
offset, ok = gceCS.seen[req.StartingToken]
if !ok {
return nil, status.Error(codes.Aborted, fmt.Sprintf("ListVolumes error with invalid startingToken: %s", req.StartingToken))
}
}

var maxEntries int = int(req.MaxEntries)
if maxEntries == 0 {
maxEntries = len(gceCS.disks)
}

entries := []*csi.ListVolumesResponse_Entry{}
for _, d := range diskList {
for i := 0; i+offset < len(gceCS.disks) && i < maxEntries; i++ {
d := gceCS.disks[i+offset]
users := []string{}
for _, u := range d.Users {
users = append(users, cleanSelfLink(u))
Expand All @@ -554,6 +572,12 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List
})
}

nextToken := ""
if len(entries)+offset < len(gceCS.disks) {
nextToken = string(uuid.NewUUID())
gceCS.seen[nextToken] = len(entries) + offset
}

return &csi.ListVolumesResponse{
Entries: entries,
NextToken: nextToken,
Expand Down
10 changes: 3 additions & 7 deletions pkg/gce-pd-csi-driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ func TestCreateVolumeArguments(t *testing.T) {
}

func TestListVolumeArgs(t *testing.T) {
diskCount := 600
testCases := []struct {
name string
maxEntries int32
Expand All @@ -779,18 +780,13 @@ func TestListVolumeArgs(t *testing.T) {
}{
{
name: "normal",
expectedEntries: 500,
expectedEntries: diskCount,
},
{
name: "fine amount of entries",
maxEntries: 420,
expectedEntries: 420,
},
{
name: "too many entries, but defaults to 500",
maxEntries: 501,
expectedEntries: 500,
},
{
name: "negative entries",
maxEntries: -1,
Expand All @@ -802,7 +798,7 @@ func TestListVolumeArgs(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
// Setup new driver each time so no interference
var d []*gce.CloudDisk
for i := 0; i < 600; i++ {
for i := 0; i < diskCount; i++ {
// Create 600 dummy disks
d = append(d, gce.CloudDiskFromV1(&compute.Disk{Name: fmt.Sprintf("%v", i)}))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/gce-pd-csi-driver/gce-pd-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute) *GC
return &GCEControllerServer{
Driver: gceDriver,
CloudProvider: cloudProvider,
seen: map[string]int{},
volumeLocks: common.NewVolumeLocks(),
}
}
Expand Down