From b2f0a723712421a7f02b1ab745ae784d8752e937 Mon Sep 17 00:00:00 2001 From: Sneha Aradhey Date: Wed, 10 May 2023 17:21:46 +0000 Subject: [PATCH 1/5] Adding new metric pdcsi_operation_errors to fetch error count --- cmd/gce-pd-csi-driver/main.go | 8 +- pkg/common/utils.go | 87 +++++++++++++++++++++ pkg/common/utils_test.go | 112 +++++++++++++++++++++++++++ pkg/gce-pd-csi-driver/controller.go | 108 ++++++++++++++++++-------- pkg/gce-pd-csi-driver/utils.go | 88 +--------------------- pkg/gce-pd-csi-driver/utils_test.go | 113 ---------------------------- pkg/metrics/metrics.go | 52 ++++++++++--- 7 files changed, 323 insertions(+), 245 deletions(-) diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 4d45dbe68..e43fdf15e 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -98,10 +98,14 @@ func handle() { } klog.V(2).Infof("Driver vendor version %v", version) - if *runControllerService && *httpEndpoint != "" && metrics.IsGKEComponentVersionAvailable() { + if *runControllerService && *httpEndpoint != "" { mm := metrics.NewMetricsManager() mm.InitializeHttpHandler(*httpEndpoint, *metricsPath) - mm.EmitGKEComponentVersion() + mm.RegisterHyperdiskMetric() + + if metrics.IsGKEComponentVersionAvailable() { + mm.EmitGKEComponentVersion() + } } if len(*extraVolumeLabelsStr) > 0 && !*runControllerService { diff --git a/pkg/common/utils.go b/pkg/common/utils.go index ae11afac0..af4e5d210 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -17,14 +17,21 @@ limitations under the License. package common import ( + "context" + "errors" "fmt" + "net/http" "regexp" "strings" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "google.golang.org/api/googleapi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" volumehelpers "k8s.io/cloud-provider/volume/helpers" + "k8s.io/klog/v2" ) const ( @@ -288,3 +295,83 @@ func ParseMachineType(machineTypeUrl string) (string, error) { } return machineType[1], nil } + +// CodeForError returns a pointer to the grpc error code that maps to the http +// error code for the passed in user googleapi error or context error. Returns +// codes.Internal if the given error is not a googleapi error caused by the user. +// The following http error codes are considered user errors: +// (1) http 400 Bad Request, returns grpc InvalidArgument, +// (2) http 403 Forbidden, returns grpc PermissionDenied, +// (3) http 404 Not Found, returns grpc NotFound +// (4) http 429 Too Many Requests, returns grpc ResourceExhausted +// The following errors are considered context errors: +// (1) "context deadline exceeded", returns grpc DeadlineExceeded, +// (2) "context canceled", returns grpc Canceled +func CodeForError(err error) *codes.Code { + if err == nil { + return nil + } + + if errCode := existingErrorCode(err); errCode != nil { + return errCode + } + if code := isContextError(err); code != nil { + return code + } + + internalErrorCode := codes.Internal + // Upwrap the error + var apiErr *googleapi.Error + if !errors.As(err, &apiErr) { + return &internalErrorCode + } + + userErrors := map[int]codes.Code{ + http.StatusForbidden: codes.PermissionDenied, + http.StatusBadRequest: codes.InvalidArgument, + http.StatusTooManyRequests: codes.ResourceExhausted, + http.StatusNotFound: codes.NotFound, + } + if code, ok := userErrors[apiErr.Code]; ok { + return &code + } + + return &internalErrorCode +} + +// isContextError returns a pointer to the grpc error code DeadlineExceeded +// if the passed in error contains the "context deadline exceeded" string and returns +// the grpc error code Canceled if the error contains the "context canceled" string. +func isContextError(err error) *codes.Code { + if err == nil { + return nil + } + + errStr := err.Error() + if strings.Contains(errStr, context.DeadlineExceeded.Error()) { + return errCodePtr(codes.DeadlineExceeded) + } + if strings.Contains(errStr, context.Canceled.Error()) { + return errCodePtr(codes.Canceled) + } + return nil +} + +func existingErrorCode(err error) *codes.Code { + if err == nil { + return nil + } + if status, ok := status.FromError(err); ok { + return errCodePtr(status.Code()) + } + return nil +} + +func errCodePtr(code codes.Code) *codes.Code { + return &code +} + +func LoggedError(msg string, err error) error { + klog.Errorf(msg+"%v", err.Error()) + return status.Errorf(*CodeForError(err), msg+"%v", err.Error()) +} diff --git a/pkg/common/utils_test.go b/pkg/common/utils_test.go index a13ea2de0..edf49e28c 100644 --- a/pkg/common/utils_test.go +++ b/pkg/common/utils_test.go @@ -17,11 +17,17 @@ limitations under the License. package common import ( + "context" + "errors" "fmt" + "net/http" "reflect" "testing" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "google.golang.org/api/googleapi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -853,3 +859,109 @@ func TestParseMachineType(t *testing.T) { }) } } + +func TestCodeForError(t *testing.T) { + internalErrorCode := codes.Internal + userErrorCode := codes.InvalidArgument + testCases := []struct { + name string + inputErr error + expCode *codes.Code + }{ + { + name: "Not googleapi.Error", + inputErr: errors.New("I am not a googleapi.Error"), + expCode: &internalErrorCode, + }, + { + name: "User error", + inputErr: &googleapi.Error{Code: http.StatusBadRequest, Message: "User error with bad request"}, + expCode: &userErrorCode, + }, + { + name: "googleapi.Error but not a user error", + inputErr: &googleapi.Error{Code: http.StatusInternalServerError, Message: "Internal error"}, + expCode: &internalErrorCode, + }, + { + name: "context canceled error", + inputErr: context.Canceled, + expCode: errCodePtr(codes.Canceled), + }, + { + name: "context deadline exceeded error", + inputErr: context.DeadlineExceeded, + expCode: errCodePtr(codes.DeadlineExceeded), + }, + { + name: "status error with Aborted error code", + inputErr: status.Error(codes.Aborted, "aborted error"), + expCode: errCodePtr(codes.Aborted), + }, + { + name: "nil error", + inputErr: nil, + expCode: nil, + }, + } + + for _, tc := range testCases { + t.Logf("Running test: %v", tc.name) + errCode := CodeForError(tc.inputErr) + if (tc.expCode == nil) != (errCode == nil) { + t.Errorf("test %v failed: got %v, expected %v", tc.name, errCode, tc.expCode) + } + if tc.expCode != nil && *errCode != *tc.expCode { + t.Errorf("test %v failed: got %v, expected %v", tc.name, errCode, tc.expCode) + } + } +} + +func TestIsContextError(t *testing.T) { + cases := []struct { + name string + err error + expectedErrCode *codes.Code + }{ + { + name: "deadline exceeded error", + err: context.DeadlineExceeded, + expectedErrCode: errCodePtr(codes.DeadlineExceeded), + }, + { + name: "contains 'context deadline exceeded'", + err: fmt.Errorf("got error: %w", context.DeadlineExceeded), + expectedErrCode: errCodePtr(codes.DeadlineExceeded), + }, + { + name: "context canceled error", + err: context.Canceled, + expectedErrCode: errCodePtr(codes.Canceled), + }, + { + name: "contains 'context canceled'", + err: fmt.Errorf("got error: %w", context.Canceled), + expectedErrCode: errCodePtr(codes.Canceled), + }, + { + name: "does not contain 'context canceled' or 'context deadline exceeded'", + err: fmt.Errorf("unknown error"), + expectedErrCode: nil, + }, + { + name: "nil error", + err: nil, + expectedErrCode: nil, + }, + } + + for _, test := range cases { + errCode := isContextError(test.err) + if (test.expectedErrCode == nil) != (errCode == nil) { + t.Errorf("test %v failed: got %v, expected %v", test.name, errCode, test.expectedErrCode) + } + if test.expectedErrCode != nil && *errCode != *test.expectedErrCode { + t.Errorf("test %v failed: got %v, expected %v", test.name, errCode, test.expectedErrCode) + } + } +} diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index 2d535c032..d0e7f74a3 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -38,11 +38,13 @@ import ( "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics" ) type GCEControllerServer struct { Driver *GCEDriver CloudProvider gce.GCECompute + Metrics metrics.MetricsManager disks []*compute.Disk seen map[string]int @@ -265,7 +267,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre volumeID, err := common.KeyToVolumeID(volKey, gceCS.CloudProvider.GetDefaultProject()) if err != nil { - return nil, LoggedError("Failed to convert volume key to volume ID: ", err) + return nil, common.LoggedError("Failed to convert volume key to volume ID: ", err) } if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired { return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID) @@ -276,7 +278,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, gceCS.CloudProvider.GetDefaultProject(), volKey, gceAPIVersion) if err != nil { if !gce.IsGCEError(err, "notFound") { - return nil, LoggedError("CreateVolume unknown get disk error when validating: ", err) + return nil, common.LoggedError("CreateVolume unknown get disk error when validating: ", err) } } if err == nil { @@ -291,7 +293,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre ready, err := isDiskReady(existingDisk) if err != nil { - return nil, LoggedError("CreateVolume disk "+volKey.String()+" had error checking ready status: ", err) + return nil, common.LoggedError("CreateVolume disk "+volKey.String()+" had error checking ready status: ", err) } if !ready { return nil, status.Errorf(codes.Internal, "CreateVolume existing disk %v is not ready", volKey) @@ -312,7 +314,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // Verify that snapshot exists sl, err := gceCS.getSnapshotByID(ctx, snapshotID) if err != nil { - return nil, LoggedError("CreateVolume failed to get snapshot "+snapshotID+": ", err) + return nil, common.LoggedError("CreateVolume failed to get snapshot "+snapshotID+": ", err) } else if len(sl.Entries) == 0 { return nil, status.Errorf(codes.NotFound, "CreateVolume source snapshot %s does not exist", snapshotID) } @@ -332,7 +334,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre if gce.IsGCEError(err, "notFound") { return nil, status.Errorf(codes.NotFound, "CreateVolume source volume %s does not exist", volumeContentSourceVolumeID) } else { - return nil, LoggedError("CreateVolume unknown get disk error when validating: ", err) + return nil, common.LoggedError("CreateVolume unknown get disk error when validating: ", err) } } @@ -370,7 +372,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // Verify the source disk is ready. ready, err := isDiskReady(diskFromSourceVolume) if err != nil { - return nil, LoggedError("CreateVolume disk from source volume "+sourceVolKey.String()+" had error checking ready status: ", err) + return nil, common.LoggedError("CreateVolume disk from source volume "+sourceVolKey.String()+" had error checking ready status: ", err) } if !ready { return nil, status.Errorf(codes.Internal, "CreateVolume disk from source volume %v is not ready", sourceVolKey) @@ -391,7 +393,11 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre } disk, err = createSingleZoneDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter) if err != nil { - return nil, LoggedError("CreateVolume failed to create single zonal disk "+name+": ", err) + // Emit metric for expected disk type from storage class + if params.DiskType != "" { + gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, params.DiskType) + } + return nil, common.LoggedError("CreateVolume failed to create single zonal disk "+name+": ", err) } case replicationTypeRegionalPD: if len(zones) != 2 { @@ -399,7 +405,11 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre } disk, err = createRegionalDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter) if err != nil { - return nil, LoggedError("CreateVolume failed to create regional disk "+name+": ", err) + // Emit metric for expected disk type from storage class + if params.DiskType != "" { + gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, params.DiskType) + } + return nil, common.LoggedError("CreateVolume failed to create regional disk "+name+": ", err) } default: return nil, status.Errorf(codes.InvalidArgument, "CreateVolume replication type '%s' is not supported", params.ReplicationType) @@ -407,6 +417,10 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre ready, err := isDiskReady(disk) if err != nil { + // Emit metric for expected disk type from storage class as the disk is not ready and might not have PD type populated + if params.DiskType != "" { + gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, params.DiskType) + } return nil, status.Errorf(codes.Internal, "CreateVolume disk %v had error checking ready status: %v", volKey, err.Error()) } if !ready { @@ -439,7 +453,7 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del klog.Warningf("DeleteVolume treating volume as deleted because cannot find volume %v: %v", volumeID, err.Error()) return &csi.DeleteVolumeResponse{}, nil } - return nil, LoggedError("DeleteVolume error repairing underspecified volume key: ", err) + return nil, common.LoggedError("DeleteVolume error repairing underspecified volume key: ", err) } if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired { @@ -449,7 +463,7 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del err = gceCS.CloudProvider.DeleteDisk(ctx, project, volKey) if err != nil { - return nil, LoggedError("unknown Delete disk error: ", err) + return nil, common.LoggedError("unknown Delete disk error: ", err) } klog.V(4).Infof("DeleteVolume succeeded for disk %v", volKey) @@ -537,7 +551,7 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume could not find volume with ID %v: %v", volumeID, err.Error()) } - return nil, LoggedError("ControllerPublishVolume error repairing underspecified volume key: ", err) + return nil, common.LoggedError("ControllerPublishVolume error repairing underspecified volume key: ", err) } // Acquires the lock for the volume on that node only, because we need to support the ability @@ -547,8 +561,7 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID) } defer gceCS.volumeLocks.Release(lockingVolumeID) - - _, err = gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskToPublish, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) if err != nil { if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.String(), err.Error()) @@ -599,11 +612,15 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con machineType := parseMachineType(instance.MachineType) return nil, status.Errorf(codes.InvalidArgument, "'%s' is not a compatible disk type with the machine type %s, please review the GCP online documentation for available persistent disk options", udErr.DiskType, machineType) } + // Emit metric for error + gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, diskToPublish.GetPDType()) return nil, status.Errorf(codes.Internal, "unknown Attach error: %v", err.Error()) } err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName) if err != nil { + // Emit metric for error + gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, diskToPublish.GetPDType()) return nil, status.Errorf(codes.Internal, "unknown WaitForAttach error: %v", err.Error()) } klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID) @@ -667,7 +684,7 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C klog.Warningf("Treating volume %v as unpublished because it could not be found", volumeID) return &csi.ControllerUnpublishVolumeResponse{}, nil } - return nil, LoggedError("ControllerUnpublishVolume error repairing underspecified volume key: ", err) + return nil, common.LoggedError("ControllerUnpublishVolume error repairing underspecified volume key: ", err) } // Acquires the lock for the volume on that node only, because we need to support the ability @@ -704,10 +721,17 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v. Already not attached.", volKey, nodeID) return &csi.ControllerUnpublishVolumeResponse{}, nil } - + diskToUnpublish, _ := gceCS.CloudProvider.GetDisk(ctx, gceCS.CloudProvider.GetDefaultProject(), volKey, gce.GCEAPIVersionV1) + if err != nil { + common.LoggedError("Unknown get disk error: ", err) + } err = gceCS.CloudProvider.DetachDisk(ctx, project, deviceName, instanceZone, instanceName) if err != nil { - return nil, LoggedError("unknown detach error: ", err) + //Do not emit metric if disk is unknown + if diskToUnpublish != nil { + gceCS.Metrics.RecordOperationErrorMetrics("ControllerUnpublishVolume", err, diskToUnpublish.GetPDType()) + } + return nil, common.LoggedError("unknown detach error: ", err) } klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v", volKey, nodeID) @@ -732,7 +756,7 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "ValidateVolumeCapabilities could not find volume with ID %v: %v", volumeID, err.Error()) } - return nil, LoggedError("ValidateVolumeCapabilities error repairing underspecified volume key: ", err) + return nil, common.LoggedError("ValidateVolumeCapabilities error repairing underspecified volume key: ", err) } if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired { @@ -745,7 +769,7 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.Name, err.Error()) } - return nil, LoggedError("Unknown get disk error: ", err) + return nil, common.LoggedError("Unknown get disk error: ", err) } // Check Volume Context is Empty @@ -808,7 +832,7 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List if gce.IsGCEInvalidError(err) { return nil, status.Errorf(codes.Aborted, "ListVolumes error with invalid request: %v", err.Error()) } - return nil, LoggedError("Unknown list disk error: ", err) + return nil, common.LoggedError("Unknown list disk error: ", err) } gceCS.disks = diskList gceCS.seen = map[string]int{} @@ -891,7 +915,7 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "CreateSnapshot could not find disk %v: %v", volKey.String(), err.Error()) } - return nil, LoggedError("CreateSnapshot unknown get disk error: ", err) + return nil, common.LoggedError("CreateSnapshot unknown get disk error: ", err) } snapshotParams, err := common.ExtractAndDefaultSnapshotParameters(req.GetParameters(), gceCS.Driver.name) @@ -924,7 +948,10 @@ func (gceCS *GCEControllerServer) createPDSnapshot(ctx context.Context, project if err != nil { return nil, status.Errorf(codes.InvalidArgument, "Invalid volume key: %v", volKey) } - + sourceDisk, err := gceCS.CloudProvider.GetDisk(ctx, gceCS.CloudProvider.GetDefaultProject(), volKey, gce.GCEAPIVersionV1) + if err != nil { + common.LoggedError("Unknown get disk error: ", err) + } // Check if PD snapshot already exists var snapshot *compute.Snapshot snapshot, err = gceCS.CloudProvider.GetSnapshot(ctx, project, snapshotName) @@ -938,12 +965,20 @@ func (gceCS *GCEControllerServer) createPDSnapshot(ctx context.Context, project if gce.IsGCEError(err, "notFound") { return nil, status.Errorf(codes.NotFound, "Could not find volume with ID %v: %v", volKey.String(), err.Error()) } - return nil, LoggedError("Unknown create snapshot error: ", err) + //Do not emit metric if disk is unknown + if sourceDisk != nil { + gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, sourceDisk.GetPDType()) + } + return nil, common.LoggedError("Unknown create snapshot error: ", err) } } err = gceCS.validateExistingSnapshot(snapshot, volKey) if err != nil { + //Do not emit metric if disk is unknown + if sourceDisk != nil { + gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, sourceDisk.GetPDType()) + } return nil, status.Errorf(codes.AlreadyExists, "Error in creating snapshot: %v", err.Error()) } @@ -977,7 +1012,7 @@ func (gceCS *GCEControllerServer) createImage(ctx context.Context, project strin image, err = gceCS.CloudProvider.GetImage(ctx, project, imageName) if err != nil { if !gce.IsGCEError(err, "notFound") { - return nil, LoggedError("Unknown get image error: ", err) + return nil, common.LoggedError("Unknown get image error: ", err) } // create a new image image, err = gceCS.CloudProvider.CreateImage(ctx, project, volKey, imageName, snapshotParams) @@ -985,7 +1020,7 @@ func (gceCS *GCEControllerServer) createImage(ctx context.Context, project strin if gce.IsGCEError(err, "notFound") { return nil, status.Errorf(codes.NotFound, "Could not find volume with ID %v: %v", volKey.String(), err.Error()) } - return nil, LoggedError("Unknown create image error: ", err) + return nil, common.LoggedError("Unknown create image error: ", err) } } @@ -1115,12 +1150,12 @@ func (gceCS *GCEControllerServer) DeleteSnapshot(ctx context.Context, req *csi.D case common.DiskSnapshotType: err = gceCS.CloudProvider.DeleteSnapshot(ctx, project, key) if err != nil { - return nil, LoggedError("unknown Delete snapshot error: ", err) + return nil, common.LoggedError("unknown Delete snapshot error: ", err) } case common.DiskImageType: err = gceCS.CloudProvider.DeleteImage(ctx, project, key) if err != nil { - return nil, LoggedError("unknown Delete image error: ", err) + return nil, common.LoggedError("unknown Delete image error: ", err) } default: return nil, status.Errorf(codes.InvalidArgument, "unknown snapshot type %s", snapshotType) @@ -1152,7 +1187,7 @@ func (gceCS *GCEControllerServer) ListSnapshots(ctx context.Context, req *csi.Li if gce.IsGCEInvalidError(err) { return nil, status.Errorf(codes.Aborted, "ListSnapshots error with invalid request: %v", err.Error()) } - return nil, LoggedError("Unknown list snapshots error: ", err) + return nil, common.LoggedError("Unknown list snapshots error: ", err) } gceCS.snapshots = snapshotList gceCS.snapshotTokens = map[string]int{} @@ -1201,12 +1236,19 @@ func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, re if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "ControllerExpandVolume could not find volume with ID %v: %v", volumeID, err.Error()) } - return nil, LoggedError("ControllerExpandVolume error repairing underspecified volume key: ", err) + return nil, common.LoggedError("ControllerExpandVolume error repairing underspecified volume key: ", err) + } + sourceDisk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + if err != nil { + common.LoggedError("Unknown get disk error: ", err) } - resizedGb, err := gceCS.CloudProvider.ResizeDisk(ctx, project, volKey, reqBytes) if err != nil { - return nil, LoggedError("ControllerExpandVolume failed to resize disk: ", err) + //Do not emit metric if disk is unknown + if sourceDisk != nil { + gceCS.Metrics.RecordOperationErrorMetrics("ControllerExpandVolume", err, sourceDisk.GetPDType()) + } + return nil, common.LoggedError("ControllerExpandVolume failed to resize disk: ", err) } klog.V(4).Infof("ControllerExpandVolume succeeded for disk %v to size %v", volKey, resizedGb) @@ -1229,7 +1271,7 @@ func (gceCS *GCEControllerServer) getSnapshots(ctx context.Context, req *csi.Lis if gce.IsGCEError(err, "invalid") { return nil, status.Errorf(codes.Aborted, "Invalid error: %v", err.Error()) } - return nil, LoggedError("Unknown list snapshot error: ", err) + return nil, common.LoggedError("Unknown list snapshot error: ", err) } images, _, err = gceCS.CloudProvider.ListImages(ctx, filter) @@ -1237,7 +1279,7 @@ func (gceCS *GCEControllerServer) getSnapshots(ctx context.Context, req *csi.Lis if gce.IsGCEError(err, "invalid") { return nil, status.Errorf(codes.Aborted, "Invalid error: %v", err.Error()) } - return nil, LoggedError("Unknown list image error: ", err) + return nil, common.LoggedError("Unknown list image error: ", err) } entries := []*csi.ListSnapshotsResponse_Entry{} @@ -1278,7 +1320,7 @@ func (gceCS *GCEControllerServer) getSnapshotByID(ctx context.Context, snapshotI // return empty list if no snapshot is found return &csi.ListSnapshotsResponse{}, nil } - return nil, LoggedError("Unknown list snapshot error: ", err) + return nil, common.LoggedError("Unknown list snapshot error: ", err) } e, err := generateDiskSnapshotEntry(snapshot) if err != nil { diff --git a/pkg/gce-pd-csi-driver/utils.go b/pkg/gce-pd-csi-driver/utils.go index cfea8e0b4..d8ef8ce0b 100644 --- a/pkg/gce-pd-csi-driver/utils.go +++ b/pkg/gce-pd-csi-driver/utils.go @@ -17,18 +17,12 @@ limitations under the License. package gceGCEDriver import ( + "context" "errors" "fmt" - "net/http" - "strings" - - "context" csi "github.com/container-storage-interface/spec/lib/go/csi" - "google.golang.org/api/googleapi" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "k8s.io/klog/v2" ) @@ -218,83 +212,3 @@ func containsZone(zones []string, zone string) bool { return false } - -// CodeForError returns a pointer to the grpc error code that maps to the http -// error code for the passed in user googleapi error or context error. Returns -// codes.Internal if the given error is not a googleapi error caused by the user. -// The following http error codes are considered user errors: -// (1) http 400 Bad Request, returns grpc InvalidArgument, -// (2) http 403 Forbidden, returns grpc PermissionDenied, -// (3) http 404 Not Found, returns grpc NotFound -// (4) http 429 Too Many Requests, returns grpc ResourceExhausted -// The following errors are considered context errors: -// (1) "context deadline exceeded", returns grpc DeadlineExceeded, -// (2) "context canceled", returns grpc Canceled -func CodeForError(err error) *codes.Code { - if err == nil { - return nil - } - - if errCode := existingErrorCode(err); errCode != nil { - return errCode - } - if code := isContextError(err); code != nil { - return code - } - - internalErrorCode := codes.Internal - // Upwrap the error - var apiErr *googleapi.Error - if !errors.As(err, &apiErr) { - return &internalErrorCode - } - - userErrors := map[int]codes.Code{ - http.StatusForbidden: codes.PermissionDenied, - http.StatusBadRequest: codes.InvalidArgument, - http.StatusTooManyRequests: codes.ResourceExhausted, - http.StatusNotFound: codes.NotFound, - } - if code, ok := userErrors[apiErr.Code]; ok { - return &code - } - - return &internalErrorCode -} - -func existingErrorCode(err error) *codes.Code { - if err == nil { - return nil - } - if status, ok := status.FromError(err); ok { - return errCodePtr(status.Code()) - } - return nil -} - -// isContextError returns a pointer to the grpc error code DeadlineExceeded -// if the passed in error contains the "context deadline exceeded" string and returns -// the grpc error code Canceled if the error contains the "context canceled" string. -func isContextError(err error) *codes.Code { - if err == nil { - return nil - } - - errStr := err.Error() - if strings.Contains(errStr, context.DeadlineExceeded.Error()) { - return errCodePtr(codes.DeadlineExceeded) - } - if strings.Contains(errStr, context.Canceled.Error()) { - return errCodePtr(codes.Canceled) - } - return nil -} - -func errCodePtr(code codes.Code) *codes.Code { - return &code -} - -func LoggedError(msg string, err error) error { - klog.Errorf(msg+"%v", err.Error()) - return status.Errorf(*CodeForError(err), msg+"%v", err.Error()) -} diff --git a/pkg/gce-pd-csi-driver/utils_test.go b/pkg/gce-pd-csi-driver/utils_test.go index 048810f2f..2a0fd5f45 100644 --- a/pkg/gce-pd-csi-driver/utils_test.go +++ b/pkg/gce-pd-csi-driver/utils_test.go @@ -18,16 +18,9 @@ limitations under the License. package gceGCEDriver import ( - "context" - "errors" - "fmt" - "net/http" "testing" csi "github.com/container-storage-interface/spec/lib/go/csi" - "google.golang.org/api/googleapi" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) var ( @@ -298,109 +291,3 @@ func TestGetReadOnlyFromCapabilities(t *testing.T) { } } } - -func TestCodeForError(t *testing.T) { - internalErrorCode := codes.Internal - userErrorCode := codes.InvalidArgument - testCases := []struct { - name string - inputErr error - expCode *codes.Code - }{ - { - name: "Not googleapi.Error", - inputErr: errors.New("I am not a googleapi.Error"), - expCode: &internalErrorCode, - }, - { - name: "User error", - inputErr: &googleapi.Error{Code: http.StatusBadRequest, Message: "User error with bad request"}, - expCode: &userErrorCode, - }, - { - name: "googleapi.Error but not a user error", - inputErr: &googleapi.Error{Code: http.StatusInternalServerError, Message: "Internal error"}, - expCode: &internalErrorCode, - }, - { - name: "context canceled error", - inputErr: context.Canceled, - expCode: errCodePtr(codes.Canceled), - }, - { - name: "context deadline exceeded error", - inputErr: context.DeadlineExceeded, - expCode: errCodePtr(codes.DeadlineExceeded), - }, - { - name: "status error with Aborted error code", - inputErr: status.Error(codes.Aborted, "aborted error"), - expCode: errCodePtr(codes.Aborted), - }, - { - name: "nil error", - inputErr: nil, - expCode: nil, - }, - } - - for _, tc := range testCases { - t.Logf("Running test: %v", tc.name) - errCode := CodeForError(tc.inputErr) - if (tc.expCode == nil) != (errCode == nil) { - t.Errorf("test %v failed: got %v, expected %v", tc.name, errCode, tc.expCode) - } - if tc.expCode != nil && *errCode != *tc.expCode { - t.Errorf("test %v failed: got %v, expected %v", tc.name, errCode, tc.expCode) - } - } -} - -func TestIsContextError(t *testing.T) { - cases := []struct { - name string - err error - expectedErrCode *codes.Code - }{ - { - name: "deadline exceeded error", - err: context.DeadlineExceeded, - expectedErrCode: errCodePtr(codes.DeadlineExceeded), - }, - { - name: "contains 'context deadline exceeded'", - err: fmt.Errorf("got error: %w", context.DeadlineExceeded), - expectedErrCode: errCodePtr(codes.DeadlineExceeded), - }, - { - name: "context canceled error", - err: context.Canceled, - expectedErrCode: errCodePtr(codes.Canceled), - }, - { - name: "contains 'context canceled'", - err: fmt.Errorf("got error: %w", context.Canceled), - expectedErrCode: errCodePtr(codes.Canceled), - }, - { - name: "does not contain 'context canceled' or 'context deadline exceeded'", - err: fmt.Errorf("unknown error"), - expectedErrCode: nil, - }, - { - name: "nil error", - err: nil, - expectedErrCode: nil, - }, - } - - for _, test := range cases { - errCode := isContextError(test.err) - if (test.expectedErrCode == nil) != (errCode == nil) { - t.Errorf("test %v failed: got %v, expected %v", test.name, errCode, test.expectedErrCode) - } - if test.expectedErrCode != nil && *errCode != *test.expectedErrCode { - t.Errorf("test %v failed: got %v, expected %v", test.name, errCode, test.expectedErrCode) - } - } -} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index cae379231..e885e310d 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -20,15 +20,19 @@ import ( "fmt" "net/http" "os" + "strings" "k8s.io/component-base/metrics" "k8s.io/klog/v2" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" ) const ( // envGKEPDCSIVersion is an environment variable set in the PDCSI controller manifest // with the current version of the GKE component. - envGKEPDCSIVersion = "GKE_PDCSI_VERSION" + envGKEPDCSIVersion = "GKE_PDCSI_VERSION" + hyperdiskDriverName = "hyperdisk.csi.storage.gke.io" + pdcsiDriverName = "pd.csi.storage.gke.io" ) var ( @@ -37,28 +41,42 @@ var ( Name: "component_version", Help: "Metric to expose the version of the PDCSI GKE component.", }, []string{"component_version"}) + + pdcsiOperationErrorsMetric = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: "csidriver", + Name: "pdcsi_operation_errors", + Help: "CSI server side error metrics", + StabilityLevel: metrics.ALPHA, + }, + []string{"driver_name", "method_name", "grpc_status_code", "disk_type"}, + ) ) -type metricsManager struct { +type MetricsManager struct { registry metrics.KubeRegistry } -func NewMetricsManager() metricsManager { - mm := metricsManager{ +func NewMetricsManager() MetricsManager { + mm := MetricsManager{ registry: metrics.NewKubeRegistry(), } return mm } -func (mm *metricsManager) GetRegistry() metrics.KubeRegistry { +func (mm *MetricsManager) GetRegistry() metrics.KubeRegistry { return mm.registry } -func (mm *metricsManager) registerComponentVersionMetric() { +func (mm *MetricsManager) registerComponentVersionMetric() { mm.registry.MustRegister(gkeComponentVersion) } -func (mm *metricsManager) recordComponentVersionMetric() error { +func (mm *MetricsManager) RegisterHyperdiskMetric() { + mm.registry.MustRegister(pdcsiOperationErrorsMetric) +} + +func (mm *MetricsManager) recordComponentVersionMetric() error { v := getEnvVar(envGKEPDCSIVersion) if v == "" { klog.V(2).Info("Skip emitting component version metric") @@ -70,7 +88,21 @@ func (mm *metricsManager) recordComponentVersionMetric() error { return nil } -func (mm *metricsManager) EmitGKEComponentVersion() error { +func (mm *MetricsManager) RecordOperationErrorMetrics( + operationName string, + operationErr error, + diskType string) { + var driverName string + if strings.Contains(diskType, "hyperdisk") { + driverName = hyperdiskDriverName + } + if strings.Contains(diskType, "pd") { + driverName = pdcsiDriverName + } + pdcsiOperationErrorsMetric.WithLabelValues(driverName, "/csi.v1.Controller/"+operationName, common.CodeForError(operationErr).String(), diskType).Set(1.0) +} + +func (mm *MetricsManager) EmitGKEComponentVersion() error { mm.registerComponentVersionMetric() if err := mm.recordComponentVersionMetric(); err != nil { return err @@ -87,7 +119,7 @@ type Server interface { // RegisterToServer registers an HTTP handler for this metrics manager to the // given server at the specified address/path. -func (mm *metricsManager) registerToServer(s Server, metricsPath string) { +func (mm *MetricsManager) registerToServer(s Server, metricsPath string) { s.Handle(metricsPath, metrics.HandlerFor( mm.GetRegistry(), metrics.HandlerOpts{ @@ -95,7 +127,7 @@ func (mm *metricsManager) registerToServer(s Server, metricsPath string) { } // InitializeHttpHandler sets up a server and creates a handler for metrics. -func (mm *metricsManager) InitializeHttpHandler(address, path string) { +func (mm *MetricsManager) InitializeHttpHandler(address, path string) { mux := http.NewServeMux() mm.registerToServer(mux, path) go func() { From dd50754b1bde4a962c0080ab61ffb65edde14c27 Mon Sep 17 00:00:00 2001 From: Sneha Aradhey Date: Mon, 22 May 2023 21:12:01 +0000 Subject: [PATCH 2/5] removed hyperdisk driver override, updated error messages and other pr suggestions --- cmd/gce-pd-csi-driver/main.go | 2 +- pkg/gce-pd-csi-driver/controller.go | 91 +++++++++++++---------------- pkg/metrics/metrics.go | 37 ++++++------ 3 files changed, 59 insertions(+), 71 deletions(-) diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index e43fdf15e..0402ac646 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -101,7 +101,7 @@ func handle() { if *runControllerService && *httpEndpoint != "" { mm := metrics.NewMetricsManager() mm.InitializeHttpHandler(*httpEndpoint, *metricsPath) - mm.RegisterHyperdiskMetric() + mm.RegisterPDCSIMetric() if metrics.IsGKEComponentVersionAvailable() { mm.EmitGKEComponentVersion() diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index d0e7f74a3..b66196877 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -123,7 +123,7 @@ const ( replicationTypeNone = "none" replicationTypeRegionalPD = "regional-pd" - + diskNotFound = "" // The maximum number of entries that we can include in the // ListVolumesResposne // In reality, the limit here is 4MB (based on gRPC client response limits), @@ -278,7 +278,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, gceCS.CloudProvider.GetDefaultProject(), volKey, gceAPIVersion) if err != nil { if !gce.IsGCEError(err, "notFound") { - return nil, common.LoggedError("CreateVolume unknown get disk error when validating: ", err) + return nil, common.LoggedError("CreateVolume, failed to getDisk when validating: ", err) } } if err == nil { @@ -334,7 +334,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre if gce.IsGCEError(err, "notFound") { return nil, status.Errorf(codes.NotFound, "CreateVolume source volume %s does not exist", volumeContentSourceVolumeID) } else { - return nil, common.LoggedError("CreateVolume unknown get disk error when validating: ", err) + return nil, common.LoggedError("CreateVolume, getDisk error when validating: ", err) } } @@ -394,9 +394,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre disk, err = createSingleZoneDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter) if err != nil { // Emit metric for expected disk type from storage class - if params.DiskType != "" { - gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, params.DiskType) - } + defer gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, params.DiskType) return nil, common.LoggedError("CreateVolume failed to create single zonal disk "+name+": ", err) } case replicationTypeRegionalPD: @@ -406,9 +404,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre disk, err = createRegionalDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter) if err != nil { // Emit metric for expected disk type from storage class - if params.DiskType != "" { - gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, params.DiskType) - } + defer gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, params.DiskType) return nil, common.LoggedError("CreateVolume failed to create regional disk "+name+": ", err) } default: @@ -418,9 +414,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre ready, err := isDiskReady(disk) if err != nil { // Emit metric for expected disk type from storage class as the disk is not ready and might not have PD type populated - if params.DiskType != "" { - gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, params.DiskType) - } + defer gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, params.DiskType) return nil, status.Errorf(codes.Internal, "CreateVolume disk %v had error checking ready status: %v", volKey, err.Error()) } if !ready { @@ -463,7 +457,7 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del err = gceCS.CloudProvider.DeleteDisk(ctx, project, volKey) if err != nil { - return nil, common.LoggedError("unknown Delete disk error: ", err) + return nil, common.LoggedError("Failed to delete disk: ", err) } klog.V(4).Infof("DeleteVolume succeeded for disk %v", volKey) @@ -563,10 +557,11 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con defer gceCS.volumeLocks.Release(lockingVolumeID) diskToPublish, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) if err != nil { + defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, diskNotFound) if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.String(), err.Error()) } - return nil, status.Errorf(codes.Internal, "Unknown get disk error: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "Failed to getDisk: %v", err.Error()) } instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID) if err != nil { @@ -577,7 +572,7 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "Could not find instance %v: %v", nodeID, err.Error()) } - return nil, status.Errorf(codes.Internal, "Unknown get instance error: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "Failed to get instance: %v", err.Error()) } readWrite := "READ_WRITE" @@ -613,16 +608,17 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con return nil, status.Errorf(codes.InvalidArgument, "'%s' is not a compatible disk type with the machine type %s, please review the GCP online documentation for available persistent disk options", udErr.DiskType, machineType) } // Emit metric for error - gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, diskToPublish.GetPDType()) - return nil, status.Errorf(codes.Internal, "unknown Attach error: %v", err.Error()) + defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, metrics.GetDiskType(diskToPublish)) + return nil, status.Errorf(codes.Internal, "Failed to Attach: %v", err.Error()) } err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName) if err != nil { // Emit metric for error - gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, diskToPublish.GetPDType()) - return nil, status.Errorf(codes.Internal, "unknown WaitForAttach error: %v", err.Error()) + defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, metrics.GetDiskType(diskToPublish)) + return nil, status.Errorf(codes.Internal, "Errored during WaitForAttach: %v", err.Error()) } + klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID) return pubVolResp, nil } @@ -723,15 +719,13 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C } diskToUnpublish, _ := gceCS.CloudProvider.GetDisk(ctx, gceCS.CloudProvider.GetDefaultProject(), volKey, gce.GCEAPIVersionV1) if err != nil { - common.LoggedError("Unknown get disk error: ", err) + defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerUnpublishVolume", err, diskNotFound) + common.LoggedError("Failed to getDisk: ", err) } err = gceCS.CloudProvider.DetachDisk(ctx, project, deviceName, instanceZone, instanceName) if err != nil { - //Do not emit metric if disk is unknown - if diskToUnpublish != nil { - gceCS.Metrics.RecordOperationErrorMetrics("ControllerUnpublishVolume", err, diskToUnpublish.GetPDType()) - } - return nil, common.LoggedError("unknown detach error: ", err) + defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerUnpublishVolume", err, metrics.GetDiskType(diskToUnpublish)) + return nil, common.LoggedError("Failed to detach: ", err) } klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v", volKey, nodeID) @@ -769,7 +763,7 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.Name, err.Error()) } - return nil, common.LoggedError("Unknown get disk error: ", err) + return nil, common.LoggedError("Failed to getDisk: ", err) } // Check Volume Context is Empty @@ -832,7 +826,7 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List if gce.IsGCEInvalidError(err) { return nil, status.Errorf(codes.Aborted, "ListVolumes error with invalid request: %v", err.Error()) } - return nil, common.LoggedError("Unknown list disk error: ", err) + return nil, common.LoggedError("Failed to list disk: ", err) } gceCS.disks = diskList gceCS.seen = map[string]int{} @@ -915,7 +909,7 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "CreateSnapshot could not find disk %v: %v", volKey.String(), err.Error()) } - return nil, common.LoggedError("CreateSnapshot unknown get disk error: ", err) + return nil, common.LoggedError("CreateSnapshot, failed to getDisk: ", err) } snapshotParams, err := common.ExtractAndDefaultSnapshotParameters(req.GetParameters(), gceCS.Driver.name) @@ -950,35 +944,30 @@ func (gceCS *GCEControllerServer) createPDSnapshot(ctx context.Context, project } sourceDisk, err := gceCS.CloudProvider.GetDisk(ctx, gceCS.CloudProvider.GetDefaultProject(), volKey, gce.GCEAPIVersionV1) if err != nil { - common.LoggedError("Unknown get disk error: ", err) + common.LoggedError("Failed to getDisk: ", err) } // Check if PD snapshot already exists var snapshot *compute.Snapshot snapshot, err = gceCS.CloudProvider.GetSnapshot(ctx, project, snapshotName) if err != nil { if !gce.IsGCEError(err, "notFound") { - return nil, status.Errorf(codes.Internal, "Unknown get snapshot error: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "Failed to get snapshot: %v", err.Error()) } // If we could not find the snapshot, we create a new one snapshot, err = gceCS.CloudProvider.CreateSnapshot(ctx, project, volKey, snapshotName, snapshotParams) if err != nil { if gce.IsGCEError(err, "notFound") { + defer gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, diskNotFound) return nil, status.Errorf(codes.NotFound, "Could not find volume with ID %v: %v", volKey.String(), err.Error()) } - //Do not emit metric if disk is unknown - if sourceDisk != nil { - gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, sourceDisk.GetPDType()) - } - return nil, common.LoggedError("Unknown create snapshot error: ", err) + defer gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, metrics.GetDiskType(sourceDisk)) + return nil, common.LoggedError("Failed to create snapshot: ", err) } } err = gceCS.validateExistingSnapshot(snapshot, volKey) if err != nil { - //Do not emit metric if disk is unknown - if sourceDisk != nil { - gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, sourceDisk.GetPDType()) - } + defer gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, metrics.GetDiskType(sourceDisk)) return nil, status.Errorf(codes.AlreadyExists, "Error in creating snapshot: %v", err.Error()) } @@ -1012,7 +1001,7 @@ func (gceCS *GCEControllerServer) createImage(ctx context.Context, project strin image, err = gceCS.CloudProvider.GetImage(ctx, project, imageName) if err != nil { if !gce.IsGCEError(err, "notFound") { - return nil, common.LoggedError("Unknown get image error: ", err) + return nil, common.LoggedError("Failed to get image: ", err) } // create a new image image, err = gceCS.CloudProvider.CreateImage(ctx, project, volKey, imageName, snapshotParams) @@ -1020,7 +1009,7 @@ func (gceCS *GCEControllerServer) createImage(ctx context.Context, project strin if gce.IsGCEError(err, "notFound") { return nil, status.Errorf(codes.NotFound, "Could not find volume with ID %v: %v", volKey.String(), err.Error()) } - return nil, common.LoggedError("Unknown create image error: ", err) + return nil, common.LoggedError("Failed to create image: ", err) } } @@ -1150,12 +1139,12 @@ func (gceCS *GCEControllerServer) DeleteSnapshot(ctx context.Context, req *csi.D case common.DiskSnapshotType: err = gceCS.CloudProvider.DeleteSnapshot(ctx, project, key) if err != nil { - return nil, common.LoggedError("unknown Delete snapshot error: ", err) + return nil, common.LoggedError("Failed to DeleteSnapshot: ", err) } case common.DiskImageType: err = gceCS.CloudProvider.DeleteImage(ctx, project, key) if err != nil { - return nil, common.LoggedError("unknown Delete image error: ", err) + return nil, common.LoggedError("Failed to DeleteImage error: ", err) } default: return nil, status.Errorf(codes.InvalidArgument, "unknown snapshot type %s", snapshotType) @@ -1187,7 +1176,7 @@ func (gceCS *GCEControllerServer) ListSnapshots(ctx context.Context, req *csi.Li if gce.IsGCEInvalidError(err) { return nil, status.Errorf(codes.Aborted, "ListSnapshots error with invalid request: %v", err.Error()) } - return nil, common.LoggedError("Unknown list snapshots error: ", err) + return nil, common.LoggedError("Failed to list snapshots: ", err) } gceCS.snapshots = snapshotList gceCS.snapshotTokens = map[string]int{} @@ -1233,6 +1222,7 @@ func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, re project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) if err != nil { + defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerExpandVolume", err, diskNotFound) if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "ControllerExpandVolume could not find volume with ID %v: %v", volumeID, err.Error()) } @@ -1240,14 +1230,11 @@ func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, re } sourceDisk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) if err != nil { - common.LoggedError("Unknown get disk error: ", err) + common.LoggedError("Failed to getDisk: ", err) } resizedGb, err := gceCS.CloudProvider.ResizeDisk(ctx, project, volKey, reqBytes) if err != nil { - //Do not emit metric if disk is unknown - if sourceDisk != nil { - gceCS.Metrics.RecordOperationErrorMetrics("ControllerExpandVolume", err, sourceDisk.GetPDType()) - } + defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerExpandVolume", err, metrics.GetDiskType(sourceDisk)) return nil, common.LoggedError("ControllerExpandVolume failed to resize disk: ", err) } @@ -1271,7 +1258,7 @@ func (gceCS *GCEControllerServer) getSnapshots(ctx context.Context, req *csi.Lis if gce.IsGCEError(err, "invalid") { return nil, status.Errorf(codes.Aborted, "Invalid error: %v", err.Error()) } - return nil, common.LoggedError("Unknown list snapshot error: ", err) + return nil, common.LoggedError("Failed to list snapshot: ", err) } images, _, err = gceCS.CloudProvider.ListImages(ctx, filter) @@ -1279,7 +1266,7 @@ func (gceCS *GCEControllerServer) getSnapshots(ctx context.Context, req *csi.Lis if gce.IsGCEError(err, "invalid") { return nil, status.Errorf(codes.Aborted, "Invalid error: %v", err.Error()) } - return nil, common.LoggedError("Unknown list image error: ", err) + return nil, common.LoggedError("Failed to list image: ", err) } entries := []*csi.ListSnapshotsResponse_Entry{} @@ -1320,7 +1307,7 @@ func (gceCS *GCEControllerServer) getSnapshotByID(ctx context.Context, snapshotI // return empty list if no snapshot is found return &csi.ListSnapshotsResponse{}, nil } - return nil, common.LoggedError("Unknown list snapshot error: ", err) + return nil, common.LoggedError("Failed to list snapshot: ", err) } e, err := generateDiskSnapshotEntry(snapshot) if err != nil { diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index e885e310d..2016f8a8d 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -20,19 +20,18 @@ import ( "fmt" "net/http" "os" - "strings" "k8s.io/component-base/metrics" "k8s.io/klog/v2" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" + gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" ) const ( // envGKEPDCSIVersion is an environment variable set in the PDCSI controller manifest // with the current version of the GKE component. - envGKEPDCSIVersion = "GKE_PDCSI_VERSION" - hyperdiskDriverName = "hyperdisk.csi.storage.gke.io" - pdcsiDriverName = "pd.csi.storage.gke.io" + envGKEPDCSIVersion = "GKE_PDCSI_VERSION" + pdcsiDriverName = "pd.csi.storage.gke.io" ) var ( @@ -42,15 +41,14 @@ var ( Help: "Metric to expose the version of the PDCSI GKE component.", }, []string{"component_version"}) - pdcsiOperationErrorsMetric = metrics.NewGaugeVec( - &metrics.GaugeOpts{ + pdcsiOperationErrorsMetric = metrics.NewCounterVec( + &metrics.CounterOpts{ Subsystem: "csidriver", - Name: "pdcsi_operation_errors", + Name: "operation_errors", Help: "CSI server side error metrics", StabilityLevel: metrics.ALPHA, }, - []string{"driver_name", "method_name", "grpc_status_code", "disk_type"}, - ) + []string{"driver_name", "method_name", "grpc_status_code", "disk_type"}) ) type MetricsManager struct { @@ -72,7 +70,7 @@ func (mm *MetricsManager) registerComponentVersionMetric() { mm.registry.MustRegister(gkeComponentVersion) } -func (mm *MetricsManager) RegisterHyperdiskMetric() { +func (mm *MetricsManager) RegisterPDCSIMetric() { mm.registry.MustRegister(pdcsiOperationErrorsMetric) } @@ -92,14 +90,7 @@ func (mm *MetricsManager) RecordOperationErrorMetrics( operationName string, operationErr error, diskType string) { - var driverName string - if strings.Contains(diskType, "hyperdisk") { - driverName = hyperdiskDriverName - } - if strings.Contains(diskType, "pd") { - driverName = pdcsiDriverName - } - pdcsiOperationErrorsMetric.WithLabelValues(driverName, "/csi.v1.Controller/"+operationName, common.CodeForError(operationErr).String(), diskType).Set(1.0) + pdcsiOperationErrorsMetric.WithLabelValues(pdcsiDriverName, "/csi.v1.Controller/"+operationName, common.CodeForError(operationErr).String(), diskType).Inc() } func (mm *MetricsManager) EmitGKEComponentVersion() error { @@ -155,3 +146,13 @@ func IsGKEComponentVersionAvailable() bool { return true } + +func GetDiskType(disk *gce.CloudDisk) string { + var diskType string + if disk != nil { + diskType = disk.GetPDType() + } else { + diskType = "" + } + return diskType +} From 535ce88995489d0dcfebb17ce9ba58df342e3ef5 Mon Sep 17 00:00:00 2001 From: Sneha Aradhey Date: Wed, 24 May 2023 17:32:16 +0000 Subject: [PATCH 3/5] adding metrics for additional method calls --- pkg/gce-pd-csi-driver/controller.go | 113 ++++++++++++++++++---------- 1 file changed, 73 insertions(+), 40 deletions(-) diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index b66196877..34924b8bc 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -195,6 +195,12 @@ func useVolumeCloning(req *csi.CreateVolumeRequest) bool { func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, diskTypeForMetric) + } + }() // Validate arguments volumeCapabilities := req.GetVolumeCapabilities() name := req.GetName() @@ -276,6 +282,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // Validate if disk already exists existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, gceCS.CloudProvider.GetDefaultProject(), volKey, gceAPIVersion) + diskTypeForMetric = metrics.GetDiskType(existingDisk) if err != nil { if !gce.IsGCEError(err, "notFound") { return nil, common.LoggedError("CreateVolume, failed to getDisk when validating: ", err) @@ -330,6 +337,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // Verify that the volume in VolumeContentSource exists. diskFromSourceVolume, err := gceCS.CloudProvider.GetDisk(ctx, project, sourceVolKey, gceAPIVersion) + diskTypeForMetric = metrics.GetDiskType(diskFromSourceVolume) if err != nil { if gce.IsGCEError(err, "notFound") { return nil, status.Errorf(codes.NotFound, "CreateVolume source volume %s does not exist", volumeContentSourceVolumeID) @@ -386,6 +394,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // Create the disk var disk *gce.CloudDisk + diskTypeForMetric = params.DiskType switch params.ReplicationType { case replicationTypeNone: if len(zones) != 1 { @@ -393,8 +402,6 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre } disk, err = createSingleZoneDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter) if err != nil { - // Emit metric for expected disk type from storage class - defer gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, params.DiskType) return nil, common.LoggedError("CreateVolume failed to create single zonal disk "+name+": ", err) } case replicationTypeRegionalPD: @@ -403,8 +410,6 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre } disk, err = createRegionalDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter) if err != nil { - // Emit metric for expected disk type from storage class - defer gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, params.DiskType) return nil, common.LoggedError("CreateVolume failed to create regional disk "+name+": ", err) } default: @@ -413,8 +418,6 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre ready, err := isDiskReady(disk) if err != nil { - // Emit metric for expected disk type from storage class as the disk is not ready and might not have PD type populated - defer gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, params.DiskType) return nil, status.Errorf(codes.Internal, "CreateVolume disk %v had error checking ready status: %v", volKey, err.Error()) } if !ready { @@ -427,6 +430,13 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre } func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("DeleteVolume", err, diskTypeForMetric) + } + }() // Validate arguments volumeID := req.GetVolumeId() if len(volumeID) == 0 { @@ -442,6 +452,8 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del } project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) + disk, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskTypeForMetric = metrics.GetDiskType(disk) if err != nil { if gce.IsGCENotFoundError(err) { klog.Warningf("DeleteVolume treating volume as deleted because cannot find volume %v: %v", volumeID, err.Error()) @@ -465,12 +477,20 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del } func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, diskTypeForMetric) + } + }() // Only valid requests will be accepted - _, _, err := gceCS.validateControllerPublishVolumeRequest(ctx, req) + project, volKey, err := gceCS.validateControllerPublishVolumeRequest(ctx, req) if err != nil { return nil, err } - + diskToPublish, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskTypeForMetric = metrics.GetDiskType(diskToPublish) backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId) if gceCS.errorBackoff.blocking(backoffId) { return nil, status.Errorf(codes.Unavailable, "ControllerPublish not permitted on node %q due to backoff condition", req.NodeId) @@ -526,7 +546,6 @@ func parseMachineType(machineTypeUrl string) string { func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { project, volKey, err := gceCS.validateControllerPublishVolumeRequest(ctx, req) - if err != nil { return nil, err } @@ -555,9 +574,8 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID) } defer gceCS.volumeLocks.Release(lockingVolumeID) - diskToPublish, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + _, err = gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) if err != nil { - defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, diskNotFound) if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.String(), err.Error()) } @@ -607,15 +625,11 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con machineType := parseMachineType(instance.MachineType) return nil, status.Errorf(codes.InvalidArgument, "'%s' is not a compatible disk type with the machine type %s, please review the GCP online documentation for available persistent disk options", udErr.DiskType, machineType) } - // Emit metric for error - defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, metrics.GetDiskType(diskToPublish)) return nil, status.Errorf(codes.Internal, "Failed to Attach: %v", err.Error()) } err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName) if err != nil { - // Emit metric for error - defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, metrics.GetDiskType(diskToPublish)) return nil, status.Errorf(codes.Internal, "Errored during WaitForAttach: %v", err.Error()) } @@ -624,12 +638,20 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con } func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { - // Only valid requests will be queued - _, _, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req) + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("ControllerUnpublishVolume", err, diskTypeForMetric) + } + }() + project, volKey, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req) if err != nil { return nil, err } - + diskToUnpublish, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskTypeForMetric = metrics.GetDiskType(diskToUnpublish) + // Only valid requests will be queued backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId) if gceCS.errorBackoff.blocking(backoffId) { return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff condition", req.NodeId) @@ -717,14 +739,8 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v. Already not attached.", volKey, nodeID) return &csi.ControllerUnpublishVolumeResponse{}, nil } - diskToUnpublish, _ := gceCS.CloudProvider.GetDisk(ctx, gceCS.CloudProvider.GetDefaultProject(), volKey, gce.GCEAPIVersionV1) - if err != nil { - defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerUnpublishVolume", err, diskNotFound) - common.LoggedError("Failed to getDisk: ", err) - } err = gceCS.CloudProvider.DetachDisk(ctx, project, deviceName, instanceZone, instanceName) if err != nil { - defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerUnpublishVolume", err, metrics.GetDiskType(diskToUnpublish)) return nil, common.LoggedError("Failed to detach: ", err) } @@ -733,6 +749,13 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C } func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("ValidateVolumeCapabilities", err, diskTypeForMetric) + } + }() if req.GetVolumeCapabilities() == nil || len(req.GetVolumeCapabilities()) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume Capabilities must be provided") } @@ -744,7 +767,6 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context if err != nil { return nil, status.Errorf(codes.InvalidArgument, "Volume ID is invalid: %v", err.Error()) } - project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) if err != nil { if gce.IsGCENotFoundError(err) { @@ -759,6 +781,7 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context defer gceCS.volumeLocks.Release(volumeID) disk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskTypeForMetric = metrics.GetDiskType(disk) if err != nil { if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.Name, err.Error()) @@ -885,6 +908,13 @@ func (gceCS *GCEControllerServer) ControllerGetCapabilities(ctx context.Context, } func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, diskTypeForMetric) + } + }() // Validate arguments volumeID := req.GetSourceVolumeId() if len(req.Name) == 0 { @@ -904,7 +934,8 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C defer gceCS.volumeLocks.Release(volumeID) // Check if volume exists - _, err = gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + disk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskTypeForMetric = metrics.GetDiskType(disk) if err != nil { if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "CreateSnapshot could not find disk %v: %v", volKey.String(), err.Error()) @@ -942,10 +973,6 @@ func (gceCS *GCEControllerServer) createPDSnapshot(ctx context.Context, project if err != nil { return nil, status.Errorf(codes.InvalidArgument, "Invalid volume key: %v", volKey) } - sourceDisk, err := gceCS.CloudProvider.GetDisk(ctx, gceCS.CloudProvider.GetDefaultProject(), volKey, gce.GCEAPIVersionV1) - if err != nil { - common.LoggedError("Failed to getDisk: ", err) - } // Check if PD snapshot already exists var snapshot *compute.Snapshot snapshot, err = gceCS.CloudProvider.GetSnapshot(ctx, project, snapshotName) @@ -957,17 +984,14 @@ func (gceCS *GCEControllerServer) createPDSnapshot(ctx context.Context, project snapshot, err = gceCS.CloudProvider.CreateSnapshot(ctx, project, volKey, snapshotName, snapshotParams) if err != nil { if gce.IsGCEError(err, "notFound") { - defer gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, diskNotFound) return nil, status.Errorf(codes.NotFound, "Could not find volume with ID %v: %v", volKey.String(), err.Error()) } - defer gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, metrics.GetDiskType(sourceDisk)) return nil, common.LoggedError("Failed to create snapshot: ", err) } } err = gceCS.validateExistingSnapshot(snapshot, volKey) if err != nil { - defer gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, metrics.GetDiskType(sourceDisk)) return nil, status.Errorf(codes.AlreadyExists, "Error in creating snapshot: %v", err.Error()) } @@ -1121,6 +1145,13 @@ func isCSISnapshotReady(status string) (bool, error) { } func (gceCS *GCEControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("DeleteSnapshot", err, diskTypeForMetric) + } + }() // Validate arguments snapshotID := req.GetSnapshotId() if len(snapshotID) == 0 { @@ -1205,6 +1236,13 @@ func (gceCS *GCEControllerServer) ListSnapshots(ctx context.Context, req *csi.Li } func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("ControllerExpandVolume", err, diskTypeForMetric) + } + }() volumeID := req.GetVolumeId() if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "ControllerExpandVolume volume ID must be provided") @@ -1219,22 +1257,17 @@ func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, re if err != nil { return nil, status.Errorf(codes.InvalidArgument, "ControllerExpandVolume Volume ID is invalid: %v", err.Error()) } - + sourceDisk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskTypeForMetric = metrics.GetDiskType(sourceDisk) project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) if err != nil { - defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerExpandVolume", err, diskNotFound) if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "ControllerExpandVolume could not find volume with ID %v: %v", volumeID, err.Error()) } return nil, common.LoggedError("ControllerExpandVolume error repairing underspecified volume key: ", err) } - sourceDisk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) - if err != nil { - common.LoggedError("Failed to getDisk: ", err) - } resizedGb, err := gceCS.CloudProvider.ResizeDisk(ctx, project, volKey, reqBytes) if err != nil { - defer gceCS.Metrics.RecordOperationErrorMetrics("ControllerExpandVolume", err, metrics.GetDiskType(sourceDisk)) return nil, common.LoggedError("ControllerExpandVolume failed to resize disk: ", err) } From 4bfe1fa5b40c8931b39bdcb807dc5df18523972e Mon Sep 17 00:00:00 2001 From: Sneha Aradhey Date: Thu, 25 May 2023 14:14:01 +0000 Subject: [PATCH 4/5] Refactoring GetDisk calls based on comments --- pkg/gce-pd-csi-driver/controller.go | 64 ++++++++++++++--------------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index 34924b8bc..1859e378b 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -225,6 +225,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // Apply Parameters (case-insensitive). We leave validation of // the values to the cloud provider. params, err := common.ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.name, gceCS.Driver.extraVolumeLabels) + diskTypeForMetric = params.DiskType if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to extract parameters: %v", err.Error()) } @@ -337,7 +338,6 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // Verify that the volume in VolumeContentSource exists. diskFromSourceVolume, err := gceCS.CloudProvider.GetDisk(ctx, project, sourceVolKey, gceAPIVersion) - diskTypeForMetric = metrics.GetDiskType(diskFromSourceVolume) if err != nil { if gce.IsGCEError(err, "notFound") { return nil, status.Errorf(codes.NotFound, "CreateVolume source volume %s does not exist", volumeContentSourceVolumeID) @@ -394,7 +394,6 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // Create the disk var disk *gce.CloudDisk - diskTypeForMetric = params.DiskType switch params.ReplicationType { case replicationTypeNone: if len(zones) != 1 { @@ -452,8 +451,6 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del } project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) - disk, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) - diskTypeForMetric = metrics.GetDiskType(disk) if err != nil { if gce.IsGCENotFoundError(err) { klog.Warningf("DeleteVolume treating volume as deleted because cannot find volume %v: %v", volumeID, err.Error()) @@ -466,7 +463,8 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID) } defer gceCS.volumeLocks.Release(volumeID) - + disk, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskTypeForMetric = metrics.GetDiskType(disk) err = gceCS.CloudProvider.DeleteDisk(ctx, project, volKey) if err != nil { return nil, common.LoggedError("Failed to delete disk: ", err) @@ -485,18 +483,17 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r } }() // Only valid requests will be accepted - project, volKey, err := gceCS.validateControllerPublishVolumeRequest(ctx, req) + _, _, err = gceCS.validateControllerPublishVolumeRequest(ctx, req) if err != nil { return nil, err } - diskToPublish, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) - diskTypeForMetric = metrics.GetDiskType(diskToPublish) + backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId) if gceCS.errorBackoff.blocking(backoffId) { return nil, status.Errorf(codes.Unavailable, "ControllerPublish not permitted on node %q due to backoff condition", req.NodeId) } - resp, err := gceCS.executeControllerPublishVolume(ctx, req) + resp, err, diskTypeForMetric := gceCS.executeControllerPublishVolume(ctx, req) if err != nil { klog.Infof("For node %s adding backoff due to error for volume %s: %v", req.NodeId, req.VolumeId, err.Error()) gceCS.errorBackoff.next(backoffId) @@ -544,10 +541,11 @@ func parseMachineType(machineTypeUrl string) string { return machineType } -func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { +func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error, string) { + diskToPublish := "" project, volKey, err := gceCS.validateControllerPublishVolumeRequest(ctx, req) if err != nil { - return nil, err + return nil, err, diskToPublish } volumeID := req.GetVolumeId() @@ -562,35 +560,36 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) if err != nil { if gce.IsGCENotFoundError(err) { - return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume could not find volume with ID %v: %v", volumeID, err.Error()) + return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume could not find volume with ID %v: %v", volumeID, err.Error()), diskToPublish } - return nil, common.LoggedError("ControllerPublishVolume error repairing underspecified volume key: ", err) + return nil, common.LoggedError("ControllerPublishVolume error repairing underspecified volume key: ", err), diskToPublish } // Acquires the lock for the volume on that node only, because we need to support the ability // to publish the same volume onto different nodes concurrently lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID) if acquired := gceCS.volumeLocks.TryAcquire(lockingVolumeID); !acquired { - return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID) + return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID), diskToPublish } defer gceCS.volumeLocks.Release(lockingVolumeID) - _, err = gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + disk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskToPublish = metrics.GetDiskType(disk) if err != nil { if gce.IsGCENotFoundError(err) { - return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.String(), err.Error()) + return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.String(), err.Error()), diskToPublish } - return nil, status.Errorf(codes.Internal, "Failed to getDisk: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "Failed to getDisk: %v", err.Error()), diskToPublish } instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID) if err != nil { - return nil, status.Errorf(codes.NotFound, "could not split nodeID: %v", err.Error()) + return nil, status.Errorf(codes.NotFound, "could not split nodeID: %v", err.Error()), diskToPublish } instance, err := gceCS.CloudProvider.GetInstanceOrError(ctx, instanceZone, instanceName) if err != nil { if gce.IsGCENotFoundError(err) { - return nil, status.Errorf(codes.NotFound, "Could not find instance %v: %v", nodeID, err.Error()) + return nil, status.Errorf(codes.NotFound, "Could not find instance %v: %v", nodeID, err.Error()), diskToPublish } - return nil, status.Errorf(codes.Internal, "Failed to get instance: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "Failed to get instance: %v", err.Error()), diskToPublish } readWrite := "READ_WRITE" @@ -600,21 +599,21 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con deviceName, err := common.GetDeviceName(volKey) if err != nil { - return nil, status.Errorf(codes.Internal, "error getting device name: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "error getting device name: %v", err.Error()), diskToPublish } attached, err := diskIsAttachedAndCompatible(deviceName, instance, volumeCapability, readWrite) if err != nil { - return nil, status.Errorf(codes.AlreadyExists, "Disk %v already published to node %v but incompatible: %v", volKey.Name, nodeID, err.Error()) + return nil, status.Errorf(codes.AlreadyExists, "Disk %v already published to node %v but incompatible: %v", volKey.Name, nodeID, err.Error()), diskToPublish } if attached { // Volume is attached to node. Success! klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v, already attached.", volKey, nodeID) - return pubVolResp, nil + return pubVolResp, nil, diskToPublish } instanceZone, instanceName, err = common.NodeIDToZoneAndName(nodeID) if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "could not split nodeID: %v", err.Error()) + return nil, status.Errorf(codes.InvalidArgument, "could not split nodeID: %v", err.Error()), diskToPublish } err = gceCS.CloudProvider.AttachDisk(ctx, project, volKey, readWrite, attachableDiskTypePersistent, instanceZone, instanceName) if err != nil { @@ -623,18 +622,18 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con // If we encountered an UnsupportedDiskError, rewrite the error message to be more user friendly. // The error message from GCE is phrased around disk create on VM creation, not runtime attach. machineType := parseMachineType(instance.MachineType) - return nil, status.Errorf(codes.InvalidArgument, "'%s' is not a compatible disk type with the machine type %s, please review the GCP online documentation for available persistent disk options", udErr.DiskType, machineType) + return nil, status.Errorf(codes.InvalidArgument, "'%s' is not a compatible disk type with the machine type %s, please review the GCP online documentation for available persistent disk options", udErr.DiskType, machineType), diskToPublish } - return nil, status.Errorf(codes.Internal, "Failed to Attach: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "Failed to Attach: %v", err.Error()), diskToPublish } err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName) if err != nil { - return nil, status.Errorf(codes.Internal, "Errored during WaitForAttach: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "Errored during WaitForAttach: %v", err.Error()), diskToPublish } klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID) - return pubVolResp, nil + return pubVolResp, nil, diskToPublish } func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { @@ -649,14 +648,13 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, if err != nil { return nil, err } - diskToUnpublish, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) - diskTypeForMetric = metrics.GetDiskType(diskToUnpublish) // Only valid requests will be queued backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId) if gceCS.errorBackoff.blocking(backoffId) { return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff condition", req.NodeId) } - + diskToUnpublish, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskTypeForMetric = metrics.GetDiskType(diskToUnpublish) resp, err := gceCS.executeControllerUnpublishVolume(ctx, req) if err != nil { klog.Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId) @@ -1257,8 +1255,6 @@ func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, re if err != nil { return nil, status.Errorf(codes.InvalidArgument, "ControllerExpandVolume Volume ID is invalid: %v", err.Error()) } - sourceDisk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) - diskTypeForMetric = metrics.GetDiskType(sourceDisk) project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) if err != nil { if gce.IsGCENotFoundError(err) { @@ -1266,6 +1262,8 @@ func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, re } return nil, common.LoggedError("ControllerExpandVolume error repairing underspecified volume key: ", err) } + sourceDisk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskTypeForMetric = metrics.GetDiskType(sourceDisk) resizedGb, err := gceCS.CloudProvider.ResizeDisk(ctx, project, volKey, reqBytes) if err != nil { return nil, common.LoggedError("ControllerExpandVolume failed to resize disk: ", err) From 97d5ecee0657736cf0b27bf5ba0a12b1a5262ef4 Mon Sep 17 00:00:00 2001 From: Sneha Aradhey Date: Thu, 25 May 2023 18:02:20 +0000 Subject: [PATCH 5/5] refactoring for GetDisk call --- pkg/gce-pd-csi-driver/controller.go | 73 +++++++++++++++-------------- pkg/metrics/metrics.go | 2 - 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index 1859e378b..b2157960f 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -542,10 +542,10 @@ func parseMachineType(machineTypeUrl string) string { } func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error, string) { - diskToPublish := "" + diskType := "" project, volKey, err := gceCS.validateControllerPublishVolumeRequest(ctx, req) if err != nil { - return nil, err, diskToPublish + return nil, err, diskType } volumeID := req.GetVolumeId() @@ -560,36 +560,36 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) if err != nil { if gce.IsGCENotFoundError(err) { - return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume could not find volume with ID %v: %v", volumeID, err.Error()), diskToPublish + return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume could not find volume with ID %v: %v", volumeID, err.Error()), diskType } - return nil, common.LoggedError("ControllerPublishVolume error repairing underspecified volume key: ", err), diskToPublish + return nil, common.LoggedError("ControllerPublishVolume error repairing underspecified volume key: ", err), diskType } // Acquires the lock for the volume on that node only, because we need to support the ability // to publish the same volume onto different nodes concurrently lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID) if acquired := gceCS.volumeLocks.TryAcquire(lockingVolumeID); !acquired { - return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID), diskToPublish + return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID), diskType } defer gceCS.volumeLocks.Release(lockingVolumeID) disk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) - diskToPublish = metrics.GetDiskType(disk) + diskType = metrics.GetDiskType(disk) if err != nil { if gce.IsGCENotFoundError(err) { - return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.String(), err.Error()), diskToPublish + return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.String(), err.Error()), diskType } - return nil, status.Errorf(codes.Internal, "Failed to getDisk: %v", err.Error()), diskToPublish + return nil, status.Errorf(codes.Internal, "Failed to getDisk: %v", err.Error()), diskType } instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID) if err != nil { - return nil, status.Errorf(codes.NotFound, "could not split nodeID: %v", err.Error()), diskToPublish + return nil, status.Errorf(codes.NotFound, "could not split nodeID: %v", err.Error()), diskType } instance, err := gceCS.CloudProvider.GetInstanceOrError(ctx, instanceZone, instanceName) if err != nil { if gce.IsGCENotFoundError(err) { - return nil, status.Errorf(codes.NotFound, "Could not find instance %v: %v", nodeID, err.Error()), diskToPublish + return nil, status.Errorf(codes.NotFound, "Could not find instance %v: %v", nodeID, err.Error()), diskType } - return nil, status.Errorf(codes.Internal, "Failed to get instance: %v", err.Error()), diskToPublish + return nil, status.Errorf(codes.Internal, "Failed to get instance: %v", err.Error()), diskType } readWrite := "READ_WRITE" @@ -599,21 +599,21 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con deviceName, err := common.GetDeviceName(volKey) if err != nil { - return nil, status.Errorf(codes.Internal, "error getting device name: %v", err.Error()), diskToPublish + return nil, status.Errorf(codes.Internal, "error getting device name: %v", err.Error()), diskType } attached, err := diskIsAttachedAndCompatible(deviceName, instance, volumeCapability, readWrite) if err != nil { - return nil, status.Errorf(codes.AlreadyExists, "Disk %v already published to node %v but incompatible: %v", volKey.Name, nodeID, err.Error()), diskToPublish + return nil, status.Errorf(codes.AlreadyExists, "Disk %v already published to node %v but incompatible: %v", volKey.Name, nodeID, err.Error()), diskType } if attached { // Volume is attached to node. Success! klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v, already attached.", volKey, nodeID) - return pubVolResp, nil, diskToPublish + return pubVolResp, nil, diskType } instanceZone, instanceName, err = common.NodeIDToZoneAndName(nodeID) if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "could not split nodeID: %v", err.Error()), diskToPublish + return nil, status.Errorf(codes.InvalidArgument, "could not split nodeID: %v", err.Error()), diskType } err = gceCS.CloudProvider.AttachDisk(ctx, project, volKey, readWrite, attachableDiskTypePersistent, instanceZone, instanceName) if err != nil { @@ -622,18 +622,18 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con // If we encountered an UnsupportedDiskError, rewrite the error message to be more user friendly. // The error message from GCE is phrased around disk create on VM creation, not runtime attach. machineType := parseMachineType(instance.MachineType) - return nil, status.Errorf(codes.InvalidArgument, "'%s' is not a compatible disk type with the machine type %s, please review the GCP online documentation for available persistent disk options", udErr.DiskType, machineType), diskToPublish + return nil, status.Errorf(codes.InvalidArgument, "'%s' is not a compatible disk type with the machine type %s, please review the GCP online documentation for available persistent disk options", udErr.DiskType, machineType), diskType } - return nil, status.Errorf(codes.Internal, "Failed to Attach: %v", err.Error()), diskToPublish + return nil, status.Errorf(codes.Internal, "Failed to Attach: %v", err.Error()), diskType } err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName) if err != nil { - return nil, status.Errorf(codes.Internal, "Errored during WaitForAttach: %v", err.Error()), diskToPublish + return nil, status.Errorf(codes.Internal, "Errored during WaitForAttach: %v", err.Error()), diskType } klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID) - return pubVolResp, nil, diskToPublish + return pubVolResp, nil, diskType } func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { @@ -644,18 +644,17 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, gceCS.Metrics.RecordOperationErrorMetrics("ControllerUnpublishVolume", err, diskTypeForMetric) } }() - project, volKey, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req) + _, _, err = gceCS.validateControllerUnpublishVolumeRequest(ctx, req) if err != nil { return nil, err } + err = status.Errorf(codes.InvalidArgument, "error message") // Only valid requests will be queued backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId) if gceCS.errorBackoff.blocking(backoffId) { return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff condition", req.NodeId) } - diskToUnpublish, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) - diskTypeForMetric = metrics.GetDiskType(diskToUnpublish) - resp, err := gceCS.executeControllerUnpublishVolume(ctx, req) + resp, err, diskTypeForMetric := gceCS.executeControllerUnpublishVolume(ctx, req) if err != nil { klog.Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId) gceCS.errorBackoff.next(backoffId) @@ -685,11 +684,12 @@ func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx c return project, volKey, nil } -func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { +func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error, string) { + var diskType string project, volKey, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req) if err != nil { - return nil, err + return nil, err, diskType } volumeID := req.GetVolumeId() @@ -698,36 +698,37 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C if err != nil { if gce.IsGCENotFoundError(err) { klog.Warningf("Treating volume %v as unpublished because it could not be found", volumeID) - return &csi.ControllerUnpublishVolumeResponse{}, nil + return &csi.ControllerUnpublishVolumeResponse{}, nil, diskType } - return nil, common.LoggedError("ControllerUnpublishVolume error repairing underspecified volume key: ", err) + return nil, common.LoggedError("ControllerUnpublishVolume error repairing underspecified volume key: ", err), diskType } // Acquires the lock for the volume on that node only, because we need to support the ability // to unpublish the same volume from different nodes concurrently lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID) if acquired := gceCS.volumeLocks.TryAcquire(lockingVolumeID); !acquired { - return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID) + return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID), diskType } defer gceCS.volumeLocks.Release(lockingVolumeID) - + diskToUnpublish, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskType = metrics.GetDiskType(diskToUnpublish) instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID) if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "could not split nodeID: %v", err.Error()) + return nil, status.Errorf(codes.InvalidArgument, "could not split nodeID: %v", err.Error()), diskType } instance, err := gceCS.CloudProvider.GetInstanceOrError(ctx, instanceZone, instanceName) if err != nil { if gce.IsGCENotFoundError(err) { // Node not existing on GCE means that disk has been detached klog.Warningf("Treating volume %v as unpublished because node %v could not be found", volKey.String(), instanceName) - return &csi.ControllerUnpublishVolumeResponse{}, nil + return &csi.ControllerUnpublishVolumeResponse{}, nil, diskType } - return nil, status.Errorf(codes.Internal, "error getting instance: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "error getting instance: %v", err.Error()), diskType } deviceName, err := common.GetDeviceName(volKey) if err != nil { - return nil, status.Errorf(codes.Internal, "error getting device name: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "error getting device name: %v", err.Error()), diskType } attached := diskIsAttached(deviceName, instance) @@ -735,15 +736,15 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C if !attached { // Volume is not attached to node. Success! klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v. Already not attached.", volKey, nodeID) - return &csi.ControllerUnpublishVolumeResponse{}, nil + return &csi.ControllerUnpublishVolumeResponse{}, nil, diskType } err = gceCS.CloudProvider.DetachDisk(ctx, project, deviceName, instanceZone, instanceName) if err != nil { - return nil, common.LoggedError("Failed to detach: ", err) + return nil, common.LoggedError("Failed to detach: ", err), diskType } klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v", volKey, nodeID) - return &csi.ControllerUnpublishVolumeResponse{}, nil + return &csi.ControllerUnpublishVolumeResponse{}, nil, diskType } func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 2016f8a8d..43282ee4a 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -151,8 +151,6 @@ func GetDiskType(disk *gce.CloudDisk) string { var diskType string if disk != nil { diskType = disk.GetPDType() - } else { - diskType = "" } return diskType }