diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 4d45dbe68..0402ac646 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -98,10 +98,14 @@ func handle() { } klog.V(2).Infof("Driver vendor version %v", version) - if *runControllerService && *httpEndpoint != "" && metrics.IsGKEComponentVersionAvailable() { + if *runControllerService && *httpEndpoint != "" { mm := metrics.NewMetricsManager() mm.InitializeHttpHandler(*httpEndpoint, *metricsPath) - mm.EmitGKEComponentVersion() + mm.RegisterPDCSIMetric() + + if metrics.IsGKEComponentVersionAvailable() { + mm.EmitGKEComponentVersion() + } } if len(*extraVolumeLabelsStr) > 0 && !*runControllerService { diff --git a/pkg/common/utils.go b/pkg/common/utils.go index ae11afac0..af4e5d210 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -17,14 +17,21 @@ limitations under the License. package common import ( + "context" + "errors" "fmt" + "net/http" "regexp" "strings" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "google.golang.org/api/googleapi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" volumehelpers "k8s.io/cloud-provider/volume/helpers" + "k8s.io/klog/v2" ) const ( @@ -288,3 +295,83 @@ func ParseMachineType(machineTypeUrl string) (string, error) { } return machineType[1], nil } + +// CodeForError returns a pointer to the grpc error code that maps to the http +// error code for the passed in user googleapi error or context error. Returns +// codes.Internal if the given error is not a googleapi error caused by the user. +// The following http error codes are considered user errors: +// (1) http 400 Bad Request, returns grpc InvalidArgument, +// (2) http 403 Forbidden, returns grpc PermissionDenied, +// (3) http 404 Not Found, returns grpc NotFound +// (4) http 429 Too Many Requests, returns grpc ResourceExhausted +// The following errors are considered context errors: +// (1) "context deadline exceeded", returns grpc DeadlineExceeded, +// (2) "context canceled", returns grpc Canceled +func CodeForError(err error) *codes.Code { + if err == nil { + return nil + } + + if errCode := existingErrorCode(err); errCode != nil { + return errCode + } + if code := isContextError(err); code != nil { + return code + } + + internalErrorCode := codes.Internal + // Upwrap the error + var apiErr *googleapi.Error + if !errors.As(err, &apiErr) { + return &internalErrorCode + } + + userErrors := map[int]codes.Code{ + http.StatusForbidden: codes.PermissionDenied, + http.StatusBadRequest: codes.InvalidArgument, + http.StatusTooManyRequests: codes.ResourceExhausted, + http.StatusNotFound: codes.NotFound, + } + if code, ok := userErrors[apiErr.Code]; ok { + return &code + } + + return &internalErrorCode +} + +// isContextError returns a pointer to the grpc error code DeadlineExceeded +// if the passed in error contains the "context deadline exceeded" string and returns +// the grpc error code Canceled if the error contains the "context canceled" string. +func isContextError(err error) *codes.Code { + if err == nil { + return nil + } + + errStr := err.Error() + if strings.Contains(errStr, context.DeadlineExceeded.Error()) { + return errCodePtr(codes.DeadlineExceeded) + } + if strings.Contains(errStr, context.Canceled.Error()) { + return errCodePtr(codes.Canceled) + } + return nil +} + +func existingErrorCode(err error) *codes.Code { + if err == nil { + return nil + } + if status, ok := status.FromError(err); ok { + return errCodePtr(status.Code()) + } + return nil +} + +func errCodePtr(code codes.Code) *codes.Code { + return &code +} + +func LoggedError(msg string, err error) error { + klog.Errorf(msg+"%v", err.Error()) + return status.Errorf(*CodeForError(err), msg+"%v", err.Error()) +} diff --git a/pkg/common/utils_test.go b/pkg/common/utils_test.go index a13ea2de0..edf49e28c 100644 --- a/pkg/common/utils_test.go +++ b/pkg/common/utils_test.go @@ -17,11 +17,17 @@ limitations under the License. package common import ( + "context" + "errors" "fmt" + "net/http" "reflect" "testing" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "google.golang.org/api/googleapi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -853,3 +859,109 @@ func TestParseMachineType(t *testing.T) { }) } } + +func TestCodeForError(t *testing.T) { + internalErrorCode := codes.Internal + userErrorCode := codes.InvalidArgument + testCases := []struct { + name string + inputErr error + expCode *codes.Code + }{ + { + name: "Not googleapi.Error", + inputErr: errors.New("I am not a googleapi.Error"), + expCode: &internalErrorCode, + }, + { + name: "User error", + inputErr: &googleapi.Error{Code: http.StatusBadRequest, Message: "User error with bad request"}, + expCode: &userErrorCode, + }, + { + name: "googleapi.Error but not a user error", + inputErr: &googleapi.Error{Code: http.StatusInternalServerError, Message: "Internal error"}, + expCode: &internalErrorCode, + }, + { + name: "context canceled error", + inputErr: context.Canceled, + expCode: errCodePtr(codes.Canceled), + }, + { + name: "context deadline exceeded error", + inputErr: context.DeadlineExceeded, + expCode: errCodePtr(codes.DeadlineExceeded), + }, + { + name: "status error with Aborted error code", + inputErr: status.Error(codes.Aborted, "aborted error"), + expCode: errCodePtr(codes.Aborted), + }, + { + name: "nil error", + inputErr: nil, + expCode: nil, + }, + } + + for _, tc := range testCases { + t.Logf("Running test: %v", tc.name) + errCode := CodeForError(tc.inputErr) + if (tc.expCode == nil) != (errCode == nil) { + t.Errorf("test %v failed: got %v, expected %v", tc.name, errCode, tc.expCode) + } + if tc.expCode != nil && *errCode != *tc.expCode { + t.Errorf("test %v failed: got %v, expected %v", tc.name, errCode, tc.expCode) + } + } +} + +func TestIsContextError(t *testing.T) { + cases := []struct { + name string + err error + expectedErrCode *codes.Code + }{ + { + name: "deadline exceeded error", + err: context.DeadlineExceeded, + expectedErrCode: errCodePtr(codes.DeadlineExceeded), + }, + { + name: "contains 'context deadline exceeded'", + err: fmt.Errorf("got error: %w", context.DeadlineExceeded), + expectedErrCode: errCodePtr(codes.DeadlineExceeded), + }, + { + name: "context canceled error", + err: context.Canceled, + expectedErrCode: errCodePtr(codes.Canceled), + }, + { + name: "contains 'context canceled'", + err: fmt.Errorf("got error: %w", context.Canceled), + expectedErrCode: errCodePtr(codes.Canceled), + }, + { + name: "does not contain 'context canceled' or 'context deadline exceeded'", + err: fmt.Errorf("unknown error"), + expectedErrCode: nil, + }, + { + name: "nil error", + err: nil, + expectedErrCode: nil, + }, + } + + for _, test := range cases { + errCode := isContextError(test.err) + if (test.expectedErrCode == nil) != (errCode == nil) { + t.Errorf("test %v failed: got %v, expected %v", test.name, errCode, test.expectedErrCode) + } + if test.expectedErrCode != nil && *errCode != *test.expectedErrCode { + t.Errorf("test %v failed: got %v, expected %v", test.name, errCode, test.expectedErrCode) + } + } +} diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index 2d535c032..b2157960f 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -38,11 +38,13 @@ import ( "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics" ) type GCEControllerServer struct { Driver *GCEDriver CloudProvider gce.GCECompute + Metrics metrics.MetricsManager disks []*compute.Disk seen map[string]int @@ -121,7 +123,7 @@ const ( replicationTypeNone = "none" replicationTypeRegionalPD = "regional-pd" - + diskNotFound = "" // The maximum number of entries that we can include in the // ListVolumesResposne // In reality, the limit here is 4MB (based on gRPC client response limits), @@ -193,6 +195,12 @@ func useVolumeCloning(req *csi.CreateVolumeRequest) bool { func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, diskTypeForMetric) + } + }() // Validate arguments volumeCapabilities := req.GetVolumeCapabilities() name := req.GetName() @@ -217,6 +225,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // Apply Parameters (case-insensitive). We leave validation of // the values to the cloud provider. params, err := common.ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.name, gceCS.Driver.extraVolumeLabels) + diskTypeForMetric = params.DiskType if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to extract parameters: %v", err.Error()) } @@ -265,7 +274,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre volumeID, err := common.KeyToVolumeID(volKey, gceCS.CloudProvider.GetDefaultProject()) if err != nil { - return nil, LoggedError("Failed to convert volume key to volume ID: ", err) + return nil, common.LoggedError("Failed to convert volume key to volume ID: ", err) } if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired { return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID) @@ -274,9 +283,10 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // Validate if disk already exists existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, gceCS.CloudProvider.GetDefaultProject(), volKey, gceAPIVersion) + diskTypeForMetric = metrics.GetDiskType(existingDisk) if err != nil { if !gce.IsGCEError(err, "notFound") { - return nil, LoggedError("CreateVolume unknown get disk error when validating: ", err) + return nil, common.LoggedError("CreateVolume, failed to getDisk when validating: ", err) } } if err == nil { @@ -291,7 +301,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre ready, err := isDiskReady(existingDisk) if err != nil { - return nil, LoggedError("CreateVolume disk "+volKey.String()+" had error checking ready status: ", err) + return nil, common.LoggedError("CreateVolume disk "+volKey.String()+" had error checking ready status: ", err) } if !ready { return nil, status.Errorf(codes.Internal, "CreateVolume existing disk %v is not ready", volKey) @@ -312,7 +322,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // Verify that snapshot exists sl, err := gceCS.getSnapshotByID(ctx, snapshotID) if err != nil { - return nil, LoggedError("CreateVolume failed to get snapshot "+snapshotID+": ", err) + return nil, common.LoggedError("CreateVolume failed to get snapshot "+snapshotID+": ", err) } else if len(sl.Entries) == 0 { return nil, status.Errorf(codes.NotFound, "CreateVolume source snapshot %s does not exist", snapshotID) } @@ -332,7 +342,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre if gce.IsGCEError(err, "notFound") { return nil, status.Errorf(codes.NotFound, "CreateVolume source volume %s does not exist", volumeContentSourceVolumeID) } else { - return nil, LoggedError("CreateVolume unknown get disk error when validating: ", err) + return nil, common.LoggedError("CreateVolume, getDisk error when validating: ", err) } } @@ -370,7 +380,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // Verify the source disk is ready. ready, err := isDiskReady(diskFromSourceVolume) if err != nil { - return nil, LoggedError("CreateVolume disk from source volume "+sourceVolKey.String()+" had error checking ready status: ", err) + return nil, common.LoggedError("CreateVolume disk from source volume "+sourceVolKey.String()+" had error checking ready status: ", err) } if !ready { return nil, status.Errorf(codes.Internal, "CreateVolume disk from source volume %v is not ready", sourceVolKey) @@ -391,7 +401,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre } disk, err = createSingleZoneDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter) if err != nil { - return nil, LoggedError("CreateVolume failed to create single zonal disk "+name+": ", err) + return nil, common.LoggedError("CreateVolume failed to create single zonal disk "+name+": ", err) } case replicationTypeRegionalPD: if len(zones) != 2 { @@ -399,7 +409,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre } disk, err = createRegionalDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter) if err != nil { - return nil, LoggedError("CreateVolume failed to create regional disk "+name+": ", err) + return nil, common.LoggedError("CreateVolume failed to create regional disk "+name+": ", err) } default: return nil, status.Errorf(codes.InvalidArgument, "CreateVolume replication type '%s' is not supported", params.ReplicationType) @@ -419,6 +429,13 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre } func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("DeleteVolume", err, diskTypeForMetric) + } + }() // Validate arguments volumeID := req.GetVolumeId() if len(volumeID) == 0 { @@ -439,17 +456,18 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del klog.Warningf("DeleteVolume treating volume as deleted because cannot find volume %v: %v", volumeID, err.Error()) return &csi.DeleteVolumeResponse{}, nil } - return nil, LoggedError("DeleteVolume error repairing underspecified volume key: ", err) + return nil, common.LoggedError("DeleteVolume error repairing underspecified volume key: ", err) } if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired { return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID) } defer gceCS.volumeLocks.Release(volumeID) - + disk, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskTypeForMetric = metrics.GetDiskType(disk) err = gceCS.CloudProvider.DeleteDisk(ctx, project, volKey) if err != nil { - return nil, LoggedError("unknown Delete disk error: ", err) + return nil, common.LoggedError("Failed to delete disk: ", err) } klog.V(4).Infof("DeleteVolume succeeded for disk %v", volKey) @@ -457,8 +475,15 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del } func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, diskTypeForMetric) + } + }() // Only valid requests will be accepted - _, _, err := gceCS.validateControllerPublishVolumeRequest(ctx, req) + _, _, err = gceCS.validateControllerPublishVolumeRequest(ctx, req) if err != nil { return nil, err } @@ -468,7 +493,7 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r return nil, status.Errorf(codes.Unavailable, "ControllerPublish not permitted on node %q due to backoff condition", req.NodeId) } - resp, err := gceCS.executeControllerPublishVolume(ctx, req) + resp, err, diskTypeForMetric := gceCS.executeControllerPublishVolume(ctx, req) if err != nil { klog.Infof("For node %s adding backoff due to error for volume %s: %v", req.NodeId, req.VolumeId, err.Error()) gceCS.errorBackoff.next(backoffId) @@ -516,11 +541,11 @@ func parseMachineType(machineTypeUrl string) string { return machineType } -func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { +func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error, string) { + diskType := "" project, volKey, err := gceCS.validateControllerPublishVolumeRequest(ctx, req) - if err != nil { - return nil, err + return nil, err, diskType } volumeID := req.GetVolumeId() @@ -535,36 +560,36 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) if err != nil { if gce.IsGCENotFoundError(err) { - return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume could not find volume with ID %v: %v", volumeID, err.Error()) + return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume could not find volume with ID %v: %v", volumeID, err.Error()), diskType } - return nil, LoggedError("ControllerPublishVolume error repairing underspecified volume key: ", err) + return nil, common.LoggedError("ControllerPublishVolume error repairing underspecified volume key: ", err), diskType } // Acquires the lock for the volume on that node only, because we need to support the ability // to publish the same volume onto different nodes concurrently lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID) if acquired := gceCS.volumeLocks.TryAcquire(lockingVolumeID); !acquired { - return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID) + return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID), diskType } defer gceCS.volumeLocks.Release(lockingVolumeID) - - _, err = gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + disk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskType = metrics.GetDiskType(disk) if err != nil { if gce.IsGCENotFoundError(err) { - return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.String(), err.Error()) + return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.String(), err.Error()), diskType } - return nil, status.Errorf(codes.Internal, "Unknown get disk error: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "Failed to getDisk: %v", err.Error()), diskType } instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID) if err != nil { - return nil, status.Errorf(codes.NotFound, "could not split nodeID: %v", err.Error()) + return nil, status.Errorf(codes.NotFound, "could not split nodeID: %v", err.Error()), diskType } instance, err := gceCS.CloudProvider.GetInstanceOrError(ctx, instanceZone, instanceName) if err != nil { if gce.IsGCENotFoundError(err) { - return nil, status.Errorf(codes.NotFound, "Could not find instance %v: %v", nodeID, err.Error()) + return nil, status.Errorf(codes.NotFound, "Could not find instance %v: %v", nodeID, err.Error()), diskType } - return nil, status.Errorf(codes.Internal, "Unknown get instance error: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "Failed to get instance: %v", err.Error()), diskType } readWrite := "READ_WRITE" @@ -574,21 +599,21 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con deviceName, err := common.GetDeviceName(volKey) if err != nil { - return nil, status.Errorf(codes.Internal, "error getting device name: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "error getting device name: %v", err.Error()), diskType } attached, err := diskIsAttachedAndCompatible(deviceName, instance, volumeCapability, readWrite) if err != nil { - return nil, status.Errorf(codes.AlreadyExists, "Disk %v already published to node %v but incompatible: %v", volKey.Name, nodeID, err.Error()) + return nil, status.Errorf(codes.AlreadyExists, "Disk %v already published to node %v but incompatible: %v", volKey.Name, nodeID, err.Error()), diskType } if attached { // Volume is attached to node. Success! klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v, already attached.", volKey, nodeID) - return pubVolResp, nil + return pubVolResp, nil, diskType } instanceZone, instanceName, err = common.NodeIDToZoneAndName(nodeID) if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "could not split nodeID: %v", err.Error()) + return nil, status.Errorf(codes.InvalidArgument, "could not split nodeID: %v", err.Error()), diskType } err = gceCS.CloudProvider.AttachDisk(ctx, project, volKey, readWrite, attachableDiskTypePersistent, instanceZone, instanceName) if err != nil { @@ -597,32 +622,39 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con // If we encountered an UnsupportedDiskError, rewrite the error message to be more user friendly. // The error message from GCE is phrased around disk create on VM creation, not runtime attach. machineType := parseMachineType(instance.MachineType) - return nil, status.Errorf(codes.InvalidArgument, "'%s' is not a compatible disk type with the machine type %s, please review the GCP online documentation for available persistent disk options", udErr.DiskType, machineType) + return nil, status.Errorf(codes.InvalidArgument, "'%s' is not a compatible disk type with the machine type %s, please review the GCP online documentation for available persistent disk options", udErr.DiskType, machineType), diskType } - return nil, status.Errorf(codes.Internal, "unknown Attach error: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "Failed to Attach: %v", err.Error()), diskType } err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName) if err != nil { - return nil, status.Errorf(codes.Internal, "unknown WaitForAttach error: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "Errored during WaitForAttach: %v", err.Error()), diskType } + klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID) - return pubVolResp, nil + return pubVolResp, nil, diskType } func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { - // Only valid requests will be queued - _, _, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req) + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("ControllerUnpublishVolume", err, diskTypeForMetric) + } + }() + _, _, err = gceCS.validateControllerUnpublishVolumeRequest(ctx, req) if err != nil { return nil, err } - + err = status.Errorf(codes.InvalidArgument, "error message") + // Only valid requests will be queued backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId) if gceCS.errorBackoff.blocking(backoffId) { return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff condition", req.NodeId) } - - resp, err := gceCS.executeControllerUnpublishVolume(ctx, req) + resp, err, diskTypeForMetric := gceCS.executeControllerUnpublishVolume(ctx, req) if err != nil { klog.Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId) gceCS.errorBackoff.next(backoffId) @@ -652,11 +684,12 @@ func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx c return project, volKey, nil } -func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { +func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error, string) { + var diskType string project, volKey, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req) if err != nil { - return nil, err + return nil, err, diskType } volumeID := req.GetVolumeId() @@ -665,36 +698,37 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C if err != nil { if gce.IsGCENotFoundError(err) { klog.Warningf("Treating volume %v as unpublished because it could not be found", volumeID) - return &csi.ControllerUnpublishVolumeResponse{}, nil + return &csi.ControllerUnpublishVolumeResponse{}, nil, diskType } - return nil, LoggedError("ControllerUnpublishVolume error repairing underspecified volume key: ", err) + return nil, common.LoggedError("ControllerUnpublishVolume error repairing underspecified volume key: ", err), diskType } // Acquires the lock for the volume on that node only, because we need to support the ability // to unpublish the same volume from different nodes concurrently lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID) if acquired := gceCS.volumeLocks.TryAcquire(lockingVolumeID); !acquired { - return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID) + return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID), diskType } defer gceCS.volumeLocks.Release(lockingVolumeID) - + diskToUnpublish, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskType = metrics.GetDiskType(diskToUnpublish) instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID) if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "could not split nodeID: %v", err.Error()) + return nil, status.Errorf(codes.InvalidArgument, "could not split nodeID: %v", err.Error()), diskType } instance, err := gceCS.CloudProvider.GetInstanceOrError(ctx, instanceZone, instanceName) if err != nil { if gce.IsGCENotFoundError(err) { // Node not existing on GCE means that disk has been detached klog.Warningf("Treating volume %v as unpublished because node %v could not be found", volKey.String(), instanceName) - return &csi.ControllerUnpublishVolumeResponse{}, nil + return &csi.ControllerUnpublishVolumeResponse{}, nil, diskType } - return nil, status.Errorf(codes.Internal, "error getting instance: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "error getting instance: %v", err.Error()), diskType } deviceName, err := common.GetDeviceName(volKey) if err != nil { - return nil, status.Errorf(codes.Internal, "error getting device name: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "error getting device name: %v", err.Error()), diskType } attached := diskIsAttached(deviceName, instance) @@ -702,19 +736,25 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C if !attached { // Volume is not attached to node. Success! klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v. Already not attached.", volKey, nodeID) - return &csi.ControllerUnpublishVolumeResponse{}, nil + return &csi.ControllerUnpublishVolumeResponse{}, nil, diskType } - err = gceCS.CloudProvider.DetachDisk(ctx, project, deviceName, instanceZone, instanceName) if err != nil { - return nil, LoggedError("unknown detach error: ", err) + return nil, common.LoggedError("Failed to detach: ", err), diskType } klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v", volKey, nodeID) - return &csi.ControllerUnpublishVolumeResponse{}, nil + return &csi.ControllerUnpublishVolumeResponse{}, nil, diskType } func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("ValidateVolumeCapabilities", err, diskTypeForMetric) + } + }() if req.GetVolumeCapabilities() == nil || len(req.GetVolumeCapabilities()) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume Capabilities must be provided") } @@ -726,13 +766,12 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context if err != nil { return nil, status.Errorf(codes.InvalidArgument, "Volume ID is invalid: %v", err.Error()) } - project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) if err != nil { if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "ValidateVolumeCapabilities could not find volume with ID %v: %v", volumeID, err.Error()) } - return nil, LoggedError("ValidateVolumeCapabilities error repairing underspecified volume key: ", err) + return nil, common.LoggedError("ValidateVolumeCapabilities error repairing underspecified volume key: ", err) } if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired { @@ -741,11 +780,12 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context defer gceCS.volumeLocks.Release(volumeID) disk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskTypeForMetric = metrics.GetDiskType(disk) if err != nil { if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.Name, err.Error()) } - return nil, LoggedError("Unknown get disk error: ", err) + return nil, common.LoggedError("Failed to getDisk: ", err) } // Check Volume Context is Empty @@ -808,7 +848,7 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List if gce.IsGCEInvalidError(err) { return nil, status.Errorf(codes.Aborted, "ListVolumes error with invalid request: %v", err.Error()) } - return nil, LoggedError("Unknown list disk error: ", err) + return nil, common.LoggedError("Failed to list disk: ", err) } gceCS.disks = diskList gceCS.seen = map[string]int{} @@ -867,6 +907,13 @@ func (gceCS *GCEControllerServer) ControllerGetCapabilities(ctx context.Context, } func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, diskTypeForMetric) + } + }() // Validate arguments volumeID := req.GetSourceVolumeId() if len(req.Name) == 0 { @@ -886,12 +933,13 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C defer gceCS.volumeLocks.Release(volumeID) // Check if volume exists - _, err = gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + disk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskTypeForMetric = metrics.GetDiskType(disk) if err != nil { if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "CreateSnapshot could not find disk %v: %v", volKey.String(), err.Error()) } - return nil, LoggedError("CreateSnapshot unknown get disk error: ", err) + return nil, common.LoggedError("CreateSnapshot, failed to getDisk: ", err) } snapshotParams, err := common.ExtractAndDefaultSnapshotParameters(req.GetParameters(), gceCS.Driver.name) @@ -924,13 +972,12 @@ func (gceCS *GCEControllerServer) createPDSnapshot(ctx context.Context, project if err != nil { return nil, status.Errorf(codes.InvalidArgument, "Invalid volume key: %v", volKey) } - // Check if PD snapshot already exists var snapshot *compute.Snapshot snapshot, err = gceCS.CloudProvider.GetSnapshot(ctx, project, snapshotName) if err != nil { if !gce.IsGCEError(err, "notFound") { - return nil, status.Errorf(codes.Internal, "Unknown get snapshot error: %v", err.Error()) + return nil, status.Errorf(codes.Internal, "Failed to get snapshot: %v", err.Error()) } // If we could not find the snapshot, we create a new one snapshot, err = gceCS.CloudProvider.CreateSnapshot(ctx, project, volKey, snapshotName, snapshotParams) @@ -938,7 +985,7 @@ func (gceCS *GCEControllerServer) createPDSnapshot(ctx context.Context, project if gce.IsGCEError(err, "notFound") { return nil, status.Errorf(codes.NotFound, "Could not find volume with ID %v: %v", volKey.String(), err.Error()) } - return nil, LoggedError("Unknown create snapshot error: ", err) + return nil, common.LoggedError("Failed to create snapshot: ", err) } } @@ -977,7 +1024,7 @@ func (gceCS *GCEControllerServer) createImage(ctx context.Context, project strin image, err = gceCS.CloudProvider.GetImage(ctx, project, imageName) if err != nil { if !gce.IsGCEError(err, "notFound") { - return nil, LoggedError("Unknown get image error: ", err) + return nil, common.LoggedError("Failed to get image: ", err) } // create a new image image, err = gceCS.CloudProvider.CreateImage(ctx, project, volKey, imageName, snapshotParams) @@ -985,7 +1032,7 @@ func (gceCS *GCEControllerServer) createImage(ctx context.Context, project strin if gce.IsGCEError(err, "notFound") { return nil, status.Errorf(codes.NotFound, "Could not find volume with ID %v: %v", volKey.String(), err.Error()) } - return nil, LoggedError("Unknown create image error: ", err) + return nil, common.LoggedError("Failed to create image: ", err) } } @@ -1097,6 +1144,13 @@ func isCSISnapshotReady(status string) (bool, error) { } func (gceCS *GCEControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("DeleteSnapshot", err, diskTypeForMetric) + } + }() // Validate arguments snapshotID := req.GetSnapshotId() if len(snapshotID) == 0 { @@ -1115,12 +1169,12 @@ func (gceCS *GCEControllerServer) DeleteSnapshot(ctx context.Context, req *csi.D case common.DiskSnapshotType: err = gceCS.CloudProvider.DeleteSnapshot(ctx, project, key) if err != nil { - return nil, LoggedError("unknown Delete snapshot error: ", err) + return nil, common.LoggedError("Failed to DeleteSnapshot: ", err) } case common.DiskImageType: err = gceCS.CloudProvider.DeleteImage(ctx, project, key) if err != nil { - return nil, LoggedError("unknown Delete image error: ", err) + return nil, common.LoggedError("Failed to DeleteImage error: ", err) } default: return nil, status.Errorf(codes.InvalidArgument, "unknown snapshot type %s", snapshotType) @@ -1152,7 +1206,7 @@ func (gceCS *GCEControllerServer) ListSnapshots(ctx context.Context, req *csi.Li if gce.IsGCEInvalidError(err) { return nil, status.Errorf(codes.Aborted, "ListSnapshots error with invalid request: %v", err.Error()) } - return nil, LoggedError("Unknown list snapshots error: ", err) + return nil, common.LoggedError("Failed to list snapshots: ", err) } gceCS.snapshots = snapshotList gceCS.snapshotTokens = map[string]int{} @@ -1181,6 +1235,13 @@ func (gceCS *GCEControllerServer) ListSnapshots(ctx context.Context, req *csi.Li } func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { + var err error + diskTypeForMetric := "" + defer func() { + if err != nil { + gceCS.Metrics.RecordOperationErrorMetrics("ControllerExpandVolume", err, diskTypeForMetric) + } + }() volumeID := req.GetVolumeId() if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "ControllerExpandVolume volume ID must be provided") @@ -1195,18 +1256,18 @@ func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, re if err != nil { return nil, status.Errorf(codes.InvalidArgument, "ControllerExpandVolume Volume ID is invalid: %v", err.Error()) } - project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) if err != nil { if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "ControllerExpandVolume could not find volume with ID %v: %v", volumeID, err.Error()) } - return nil, LoggedError("ControllerExpandVolume error repairing underspecified volume key: ", err) + return nil, common.LoggedError("ControllerExpandVolume error repairing underspecified volume key: ", err) } - + sourceDisk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) + diskTypeForMetric = metrics.GetDiskType(sourceDisk) resizedGb, err := gceCS.CloudProvider.ResizeDisk(ctx, project, volKey, reqBytes) if err != nil { - return nil, LoggedError("ControllerExpandVolume failed to resize disk: ", err) + return nil, common.LoggedError("ControllerExpandVolume failed to resize disk: ", err) } klog.V(4).Infof("ControllerExpandVolume succeeded for disk %v to size %v", volKey, resizedGb) @@ -1229,7 +1290,7 @@ func (gceCS *GCEControllerServer) getSnapshots(ctx context.Context, req *csi.Lis if gce.IsGCEError(err, "invalid") { return nil, status.Errorf(codes.Aborted, "Invalid error: %v", err.Error()) } - return nil, LoggedError("Unknown list snapshot error: ", err) + return nil, common.LoggedError("Failed to list snapshot: ", err) } images, _, err = gceCS.CloudProvider.ListImages(ctx, filter) @@ -1237,7 +1298,7 @@ func (gceCS *GCEControllerServer) getSnapshots(ctx context.Context, req *csi.Lis if gce.IsGCEError(err, "invalid") { return nil, status.Errorf(codes.Aborted, "Invalid error: %v", err.Error()) } - return nil, LoggedError("Unknown list image error: ", err) + return nil, common.LoggedError("Failed to list image: ", err) } entries := []*csi.ListSnapshotsResponse_Entry{} @@ -1278,7 +1339,7 @@ func (gceCS *GCEControllerServer) getSnapshotByID(ctx context.Context, snapshotI // return empty list if no snapshot is found return &csi.ListSnapshotsResponse{}, nil } - return nil, LoggedError("Unknown list snapshot error: ", err) + return nil, common.LoggedError("Failed to list snapshot: ", err) } e, err := generateDiskSnapshotEntry(snapshot) if err != nil { diff --git a/pkg/gce-pd-csi-driver/utils.go b/pkg/gce-pd-csi-driver/utils.go index cfea8e0b4..d8ef8ce0b 100644 --- a/pkg/gce-pd-csi-driver/utils.go +++ b/pkg/gce-pd-csi-driver/utils.go @@ -17,18 +17,12 @@ limitations under the License. package gceGCEDriver import ( + "context" "errors" "fmt" - "net/http" - "strings" - - "context" csi "github.com/container-storage-interface/spec/lib/go/csi" - "google.golang.org/api/googleapi" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "k8s.io/klog/v2" ) @@ -218,83 +212,3 @@ func containsZone(zones []string, zone string) bool { return false } - -// CodeForError returns a pointer to the grpc error code that maps to the http -// error code for the passed in user googleapi error or context error. Returns -// codes.Internal if the given error is not a googleapi error caused by the user. -// The following http error codes are considered user errors: -// (1) http 400 Bad Request, returns grpc InvalidArgument, -// (2) http 403 Forbidden, returns grpc PermissionDenied, -// (3) http 404 Not Found, returns grpc NotFound -// (4) http 429 Too Many Requests, returns grpc ResourceExhausted -// The following errors are considered context errors: -// (1) "context deadline exceeded", returns grpc DeadlineExceeded, -// (2) "context canceled", returns grpc Canceled -func CodeForError(err error) *codes.Code { - if err == nil { - return nil - } - - if errCode := existingErrorCode(err); errCode != nil { - return errCode - } - if code := isContextError(err); code != nil { - return code - } - - internalErrorCode := codes.Internal - // Upwrap the error - var apiErr *googleapi.Error - if !errors.As(err, &apiErr) { - return &internalErrorCode - } - - userErrors := map[int]codes.Code{ - http.StatusForbidden: codes.PermissionDenied, - http.StatusBadRequest: codes.InvalidArgument, - http.StatusTooManyRequests: codes.ResourceExhausted, - http.StatusNotFound: codes.NotFound, - } - if code, ok := userErrors[apiErr.Code]; ok { - return &code - } - - return &internalErrorCode -} - -func existingErrorCode(err error) *codes.Code { - if err == nil { - return nil - } - if status, ok := status.FromError(err); ok { - return errCodePtr(status.Code()) - } - return nil -} - -// isContextError returns a pointer to the grpc error code DeadlineExceeded -// if the passed in error contains the "context deadline exceeded" string and returns -// the grpc error code Canceled if the error contains the "context canceled" string. -func isContextError(err error) *codes.Code { - if err == nil { - return nil - } - - errStr := err.Error() - if strings.Contains(errStr, context.DeadlineExceeded.Error()) { - return errCodePtr(codes.DeadlineExceeded) - } - if strings.Contains(errStr, context.Canceled.Error()) { - return errCodePtr(codes.Canceled) - } - return nil -} - -func errCodePtr(code codes.Code) *codes.Code { - return &code -} - -func LoggedError(msg string, err error) error { - klog.Errorf(msg+"%v", err.Error()) - return status.Errorf(*CodeForError(err), msg+"%v", err.Error()) -} diff --git a/pkg/gce-pd-csi-driver/utils_test.go b/pkg/gce-pd-csi-driver/utils_test.go index 048810f2f..2a0fd5f45 100644 --- a/pkg/gce-pd-csi-driver/utils_test.go +++ b/pkg/gce-pd-csi-driver/utils_test.go @@ -18,16 +18,9 @@ limitations under the License. package gceGCEDriver import ( - "context" - "errors" - "fmt" - "net/http" "testing" csi "github.com/container-storage-interface/spec/lib/go/csi" - "google.golang.org/api/googleapi" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) var ( @@ -298,109 +291,3 @@ func TestGetReadOnlyFromCapabilities(t *testing.T) { } } } - -func TestCodeForError(t *testing.T) { - internalErrorCode := codes.Internal - userErrorCode := codes.InvalidArgument - testCases := []struct { - name string - inputErr error - expCode *codes.Code - }{ - { - name: "Not googleapi.Error", - inputErr: errors.New("I am not a googleapi.Error"), - expCode: &internalErrorCode, - }, - { - name: "User error", - inputErr: &googleapi.Error{Code: http.StatusBadRequest, Message: "User error with bad request"}, - expCode: &userErrorCode, - }, - { - name: "googleapi.Error but not a user error", - inputErr: &googleapi.Error{Code: http.StatusInternalServerError, Message: "Internal error"}, - expCode: &internalErrorCode, - }, - { - name: "context canceled error", - inputErr: context.Canceled, - expCode: errCodePtr(codes.Canceled), - }, - { - name: "context deadline exceeded error", - inputErr: context.DeadlineExceeded, - expCode: errCodePtr(codes.DeadlineExceeded), - }, - { - name: "status error with Aborted error code", - inputErr: status.Error(codes.Aborted, "aborted error"), - expCode: errCodePtr(codes.Aborted), - }, - { - name: "nil error", - inputErr: nil, - expCode: nil, - }, - } - - for _, tc := range testCases { - t.Logf("Running test: %v", tc.name) - errCode := CodeForError(tc.inputErr) - if (tc.expCode == nil) != (errCode == nil) { - t.Errorf("test %v failed: got %v, expected %v", tc.name, errCode, tc.expCode) - } - if tc.expCode != nil && *errCode != *tc.expCode { - t.Errorf("test %v failed: got %v, expected %v", tc.name, errCode, tc.expCode) - } - } -} - -func TestIsContextError(t *testing.T) { - cases := []struct { - name string - err error - expectedErrCode *codes.Code - }{ - { - name: "deadline exceeded error", - err: context.DeadlineExceeded, - expectedErrCode: errCodePtr(codes.DeadlineExceeded), - }, - { - name: "contains 'context deadline exceeded'", - err: fmt.Errorf("got error: %w", context.DeadlineExceeded), - expectedErrCode: errCodePtr(codes.DeadlineExceeded), - }, - { - name: "context canceled error", - err: context.Canceled, - expectedErrCode: errCodePtr(codes.Canceled), - }, - { - name: "contains 'context canceled'", - err: fmt.Errorf("got error: %w", context.Canceled), - expectedErrCode: errCodePtr(codes.Canceled), - }, - { - name: "does not contain 'context canceled' or 'context deadline exceeded'", - err: fmt.Errorf("unknown error"), - expectedErrCode: nil, - }, - { - name: "nil error", - err: nil, - expectedErrCode: nil, - }, - } - - for _, test := range cases { - errCode := isContextError(test.err) - if (test.expectedErrCode == nil) != (errCode == nil) { - t.Errorf("test %v failed: got %v, expected %v", test.name, errCode, test.expectedErrCode) - } - if test.expectedErrCode != nil && *errCode != *test.expectedErrCode { - t.Errorf("test %v failed: got %v, expected %v", test.name, errCode, test.expectedErrCode) - } - } -} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index cae379231..43282ee4a 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -23,12 +23,15 @@ import ( "k8s.io/component-base/metrics" "k8s.io/klog/v2" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" + gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" ) const ( // envGKEPDCSIVersion is an environment variable set in the PDCSI controller manifest // with the current version of the GKE component. envGKEPDCSIVersion = "GKE_PDCSI_VERSION" + pdcsiDriverName = "pd.csi.storage.gke.io" ) var ( @@ -37,28 +40,41 @@ var ( Name: "component_version", Help: "Metric to expose the version of the PDCSI GKE component.", }, []string{"component_version"}) + + pdcsiOperationErrorsMetric = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: "csidriver", + Name: "operation_errors", + Help: "CSI server side error metrics", + StabilityLevel: metrics.ALPHA, + }, + []string{"driver_name", "method_name", "grpc_status_code", "disk_type"}) ) -type metricsManager struct { +type MetricsManager struct { registry metrics.KubeRegistry } -func NewMetricsManager() metricsManager { - mm := metricsManager{ +func NewMetricsManager() MetricsManager { + mm := MetricsManager{ registry: metrics.NewKubeRegistry(), } return mm } -func (mm *metricsManager) GetRegistry() metrics.KubeRegistry { +func (mm *MetricsManager) GetRegistry() metrics.KubeRegistry { return mm.registry } -func (mm *metricsManager) registerComponentVersionMetric() { +func (mm *MetricsManager) registerComponentVersionMetric() { mm.registry.MustRegister(gkeComponentVersion) } -func (mm *metricsManager) recordComponentVersionMetric() error { +func (mm *MetricsManager) RegisterPDCSIMetric() { + mm.registry.MustRegister(pdcsiOperationErrorsMetric) +} + +func (mm *MetricsManager) recordComponentVersionMetric() error { v := getEnvVar(envGKEPDCSIVersion) if v == "" { klog.V(2).Info("Skip emitting component version metric") @@ -70,7 +86,14 @@ func (mm *metricsManager) recordComponentVersionMetric() error { return nil } -func (mm *metricsManager) EmitGKEComponentVersion() error { +func (mm *MetricsManager) RecordOperationErrorMetrics( + operationName string, + operationErr error, + diskType string) { + pdcsiOperationErrorsMetric.WithLabelValues(pdcsiDriverName, "/csi.v1.Controller/"+operationName, common.CodeForError(operationErr).String(), diskType).Inc() +} + +func (mm *MetricsManager) EmitGKEComponentVersion() error { mm.registerComponentVersionMetric() if err := mm.recordComponentVersionMetric(); err != nil { return err @@ -87,7 +110,7 @@ type Server interface { // RegisterToServer registers an HTTP handler for this metrics manager to the // given server at the specified address/path. -func (mm *metricsManager) registerToServer(s Server, metricsPath string) { +func (mm *MetricsManager) registerToServer(s Server, metricsPath string) { s.Handle(metricsPath, metrics.HandlerFor( mm.GetRegistry(), metrics.HandlerOpts{ @@ -95,7 +118,7 @@ func (mm *metricsManager) registerToServer(s Server, metricsPath string) { } // InitializeHttpHandler sets up a server and creates a handler for metrics. -func (mm *metricsManager) InitializeHttpHandler(address, path string) { +func (mm *MetricsManager) InitializeHttpHandler(address, path string) { mux := http.NewServeMux() mm.registerToServer(mux, path) go func() { @@ -123,3 +146,11 @@ func IsGKEComponentVersionAvailable() bool { return true } + +func GetDiskType(disk *gce.CloudDisk) string { + var diskType string + if disk != nil { + diskType = disk.GetPDType() + } + return diskType +}