diff --git a/go.mod b/go.mod index fd0cdf1..176d1a7 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( k8s.io/apimachinery v0.24.2 k8s.io/client-go v0.24.2 k8s.io/klog/v2 v2.70.1 - sigs.k8s.io/container-object-storage-interface-api v0.0.0-20220806044417-5d7517114883 + sigs.k8s.io/container-object-storage-interface-api v0.1.1-0.20240208184109-05444273ee49 sigs.k8s.io/controller-runtime v0.12.3 ) diff --git a/go.sum b/go.sum index 65ae442..0dc3b45 100644 --- a/go.sum +++ b/go.sum @@ -706,8 +706,8 @@ k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= -sigs.k8s.io/container-object-storage-interface-api v0.0.0-20220806044417-5d7517114883 h1:CtqK7l2m9Atw8L5daJdsXvVgxxvQkRBbJbUz7NiadD8= -sigs.k8s.io/container-object-storage-interface-api v0.0.0-20220806044417-5d7517114883/go.mod h1:YiB+i/UGkzqgODDhRG3u7jkbWkQcoUeLEJ7hwOT/2Qk= +sigs.k8s.io/container-object-storage-interface-api v0.1.1-0.20240208184109-05444273ee49 h1:Ax4j3ThWolmk6yH6jvL3Yf0Fzxe0ZfVuDlSLNILU3GA= +sigs.k8s.io/container-object-storage-interface-api v0.1.1-0.20240208184109-05444273ee49/go.mod h1:YiB+i/UGkzqgODDhRG3u7jkbWkQcoUeLEJ7hwOT/2Qk= sigs.k8s.io/controller-runtime v0.12.3 h1:FCM8xeY/FI8hoAfh/V4XbbYMY20gElh9yh+A98usMio= sigs.k8s.io/controller-runtime v0.12.3/go.mod h1:qKsk4WE6zW2Hfj0G4v10EnNB2jMG1C+NTb8h+DwCoU0= sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 h1:kDi4JBNAsJWfz1aEXhO8Jg87JJaPNLh5tIzYHgStQ9Y= diff --git a/pkg/bucketclaim/bucketclaim.go b/pkg/bucketclaim/bucketclaim.go index 963e2cb..6ab9456 100644 --- a/pkg/bucketclaim/bucketclaim.go +++ b/pkg/bucketclaim/bucketclaim.go @@ -2,33 +2,37 @@ package bucketclaim import ( "context" + "fmt" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + kubeerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" kubeclientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1" bucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned" objectstoragev1alpha1 "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/typed/objectstorage/v1alpha1" - + "sigs.k8s.io/container-object-storage-interface-api/controller/events" "sigs.k8s.io/container-object-storage-interface-controller/pkg/util" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) -// bucketClaimListener is a resource handler for bucket requests objects -type bucketClaimListener struct { +// BucketClaimListener is a resource handler for bucket requests objects +type BucketClaimListener struct { + eventRecorder record.EventRecorder + kubeClient kubeclientset.Interface bucketClient bucketclientset.Interface } -func NewBucketClaimListener() *bucketClaimListener { - return &bucketClaimListener{} +func NewBucketClaimListener() *BucketClaimListener { + return &BucketClaimListener{} } // Add creates a bucket in response to a bucketClaim -func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error { +func (b *BucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error { klog.V(3).InfoS("Add BucketClaim", "name", bucketClaim.ObjectMeta.Name, "ns", bucketClaim.ObjectMeta.Namespace, @@ -39,7 +43,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc if err != nil { switch err { case util.ErrInvalidBucketClass: - klog.V(3).ErrorS(util.ErrInvalidBucketClass, + klog.V(3).ErrorS(err, "bucketClaim", bucketClaim.ObjectMeta.Name, "ns", bucketClaim.ObjectMeta.Namespace, "bucketClassName", bucketClaim.Spec.BucketClassName) @@ -65,7 +69,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc } // update processes any updates made to the bucket request -func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.BucketClaim) error { +func (b *BucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.BucketClaim) error { klog.V(3).InfoS("Update BucketClaim", "name", old.Name, "ns", old.Namespace) @@ -80,7 +84,7 @@ func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.Buc klog.V(3).ErrorS(err, "Error deleting bucket", "bucket", bucketName, "bucketClaim", bucketClaim.ObjectMeta.Name) - return err + return b.recordError(bucketClaim, v1.EventTypeWarning, events.FailedDeleteBucket, err) } klog.V(5).Infof("Successfully deleted bucket: %s from bucketClaim: %s", bucketName, bucketClaim.ObjectMeta.Name) @@ -94,8 +98,8 @@ func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.Buc } // Delete processes a bucket for which bucket request is deleted -func (b *bucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error { - klog.V(3).Info("Delete BucketClaim", +func (b *BucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error { + klog.V(3).InfoS("Delete BucketClaim", "name", bucketClaim.ObjectMeta.Name, "ns", bucketClaim.ObjectMeta.Namespace) @@ -103,13 +107,13 @@ func (b *bucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1. } // provisionBucketClaimOperation attempts to provision a bucket for a given bucketClaim. -// Return values // -// nil - BucketClaim successfully processed -// ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff] -// ErrBucketAlreadyExists - BucketClaim already processed -// non-nil err - Internal error [requeue'd with exponential backoff] -func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, inputBucketClaim *v1alpha1.BucketClaim) error { +// Return values +// - nil - BucketClaim successfully processed +// - ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff] +// - ErrBucketAlreadyExists - BucketClaim already processed +// - non-nil err - Internal error [requeue'd with exponential backoff] +func (b *BucketClaimListener) provisionBucketClaimOperation(ctx context.Context, inputBucketClaim *v1alpha1.BucketClaim) error { bucketClaim := inputBucketClaim.DeepCopy() if bucketClaim.Status.BucketReady { return util.ErrBucketAlreadyExists @@ -121,9 +125,11 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, if bucketClaim.Spec.ExistingBucketName != "" { bucketName = bucketClaim.Spec.ExistingBucketName bucket, err := b.buckets().Get(ctx, bucketName, metav1.GetOptions{}) - if err != nil { + if kubeerrors.IsNotFound(err) { + return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err) + } else if err != nil { klog.V(3).ErrorS(err, "Get Bucket with ExistingBucketName error", "name", bucketClaim.Spec.ExistingBucketName) - return err + return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err) } bucket.Spec.BucketClaim = &v1.ObjectReference{ @@ -141,7 +147,7 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, klog.V(3).ErrorS(err, "Error updating existing bucket", "bucket", bucket.ObjectMeta.Name, "bucketClaim", bucketClaim.ObjectMeta.Name) - return err + return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err) } bucketClaim.Status.BucketName = bucketName @@ -149,13 +155,15 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, } else { bucketClassName := bucketClaim.Spec.BucketClassName if bucketClassName == "" { - return util.ErrInvalidBucketClass + return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, util.ErrInvalidBucketClass) } bucketClass, err := b.bucketClasses().Get(ctx, bucketClassName, metav1.GetOptions{}) - if err != nil { + if kubeerrors.IsNotFound(err) { + return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err) + } else if err != nil { klog.V(3).ErrorS(err, "Get Bucketclass Error", "name", bucketClassName) - return util.ErrInvalidBucketClass + return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err) } bucketName = bucketClassName + string(bucketClaim.ObjectMeta.UID) @@ -180,11 +188,11 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, bucket.Spec.Protocols = protocolCopy bucket, err = b.buckets().Create(ctx, bucket, metav1.CreateOptions{}) - if err != nil && !errors.IsAlreadyExists(err) { + if err != nil && !kubeerrors.IsAlreadyExists(err) { klog.V(3).ErrorS(err, "Error creationg bucket", "bucket", bucketName, "bucketClaim", bucketClaim.ObjectMeta.Name) - return err + return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err) } bucketClaim.Status.BucketName = bucketName @@ -196,7 +204,7 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, bucketClaim, err = b.bucketClaims(bucketClaim.ObjectMeta.Namespace).UpdateStatus(ctx, bucketClaim, metav1.UpdateOptions{}) if err != nil { klog.V(3).ErrorS(err, "Failed to update status of BucketClaim", "name", bucketClaim.ObjectMeta.Name) - return err + return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err) } // Add the finalizers so that bucketClaim is deleted @@ -205,38 +213,63 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, _, err = b.bucketClaims(bucketClaim.ObjectMeta.Namespace).Update(ctx, bucketClaim, metav1.UpdateOptions{}) if err != nil { klog.V(3).ErrorS(err, "Failed to add finalizer BucketClaim", "name", bucketClaim.ObjectMeta.Name) - return err + return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err) } klog.V(3).Infof("Finished creating Bucket %v", bucketName) return nil } -func (b *bucketClaimListener) InitializeKubeClient(k kubeclientset.Interface) { +// InitializeKubeClient initializes the kubernetes client +func (b *BucketClaimListener) InitializeKubeClient(k kubeclientset.Interface) { b.kubeClient = k } -func (b *bucketClaimListener) InitializeBucketClient(bc bucketclientset.Interface) { +// InitializeBucketClient initializes the object storage bucket client +func (b *BucketClaimListener) InitializeBucketClient(bc bucketclientset.Interface) { b.bucketClient = bc } -func (b *bucketClaimListener) buckets() objectstoragev1alpha1.BucketInterface { +// InitializeEventRecorder initializes the event recorder +func (b *BucketClaimListener) InitializeEventRecorder(er record.EventRecorder) { + b.eventRecorder = er +} + +func (b *BucketClaimListener) buckets() objectstoragev1alpha1.BucketInterface { if b.bucketClient != nil { return b.bucketClient.ObjectstorageV1alpha1().Buckets() } panic("uninitialized listener") } -func (b *bucketClaimListener) bucketClasses() objectstoragev1alpha1.BucketClassInterface { +func (b *BucketClaimListener) bucketClasses() objectstoragev1alpha1.BucketClassInterface { if b.bucketClient != nil { return b.bucketClient.ObjectstorageV1alpha1().BucketClasses() } panic("uninitialized listener") } -func (b *bucketClaimListener) bucketClaims(namespace string) objectstoragev1alpha1.BucketClaimInterface { +func (b *BucketClaimListener) bucketClaims(namespace string) objectstoragev1alpha1.BucketClaimInterface { if b.bucketClient != nil { return b.bucketClient.ObjectstorageV1alpha1().BucketClaims(namespace) } panic("uninitialized listener") } + +// recordError during the processing of the objects +func (b *BucketClaimListener) recordError(subject runtime.Object, eventtype, reason string, err error) error { + if b.eventRecorder == nil { + return err + } + b.eventRecorder.Event(subject, eventtype, reason, err.Error()) + + return err +} + +// recordEvent during the processing of the objects +func (b *BucketClaimListener) recordEvent(subject runtime.Object, eventtype, reason, message string, args ...any) { + if b.eventRecorder == nil { + return + } + b.eventRecorder.Event(subject, eventtype, reason, fmt.Sprintf(message, args...)) +} diff --git a/pkg/bucketclaim/bucketclaim_test.go b/pkg/bucketclaim/bucketclaim_test.go index f377a78..22a173d 100644 --- a/pkg/bucketclaim/bucketclaim_test.go +++ b/pkg/bucketclaim/bucketclaim_test.go @@ -2,14 +2,18 @@ package bucketclaim import ( "context" + "fmt" "testing" + v1 "k8s.io/api/core/v1" + kubeerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" - + fakekubeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1" types "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1" - bucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/fake" - + fakebucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/fake" + "sigs.k8s.io/container-object-storage-interface-api/controller/events" "sigs.k8s.io/container-object-storage-interface-controller/pkg/util" ) @@ -65,29 +69,31 @@ var bucketClaim2 = types.BucketClaim{ // Test basic add functionality func TestAddBR(t *testing.T) { - runCreateBucket(t, "add") + runCreateBucket(t) } // Test add with multipleBRs func TestAddWithMultipleBR(t *testing.T) { - runCreateBucketWithMultipleBR(t, "addWithMultipleBR") + runCreateBucketWithMultipleBR(t) } // Test add idempotency func TestAddBRIdempotency(t *testing.T) { - runCreateBucketIdempotency(t, "addWithMultipleBR") + runCreateBucketIdempotency(t) } -func runCreateBucket(t *testing.T, name string) { +func runCreateBucket(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := bucketclientset.NewSimpleClientset() - kubeClient := fake.NewSimpleClientset() + client := fakebucketclientset.NewSimpleClientset() + kubeClient := fakekubeclientset.NewSimpleClientset() + eventRecorder := record.NewFakeRecorder(3) listener := NewBucketClaimListener() listener.InitializeKubeClient(kubeClient) listener.InitializeBucketClient(client) + listener.InitializeEventRecorder(eventRecorder) bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass) if err != nil { @@ -121,16 +127,18 @@ func runCreateBucket(t *testing.T, name string) { } } -func runCreateBucketWithMultipleBR(t *testing.T, name string) { +func runCreateBucketWithMultipleBR(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := bucketclientset.NewSimpleClientset() - kubeClient := fake.NewSimpleClientset() + client := fakebucketclientset.NewSimpleClientset() + kubeClient := fakekubeclientset.NewSimpleClientset() + eventRecorder := record.NewFakeRecorder(3) listener := NewBucketClaimListener() listener.InitializeKubeClient(kubeClient) listener.InitializeBucketClient(client) + listener.InitializeEventRecorder(eventRecorder) bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass) if err != nil { @@ -175,16 +183,18 @@ func runCreateBucketWithMultipleBR(t *testing.T, name string) { } } -func runCreateBucketIdempotency(t *testing.T, name string) { +func runCreateBucketIdempotency(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := bucketclientset.NewSimpleClientset() - kubeClient := fake.NewSimpleClientset() + client := fakebucketclientset.NewSimpleClientset() + kubeClient := fakekubeclientset.NewSimpleClientset() + eventRecorder := record.NewFakeRecorder(3) listener := NewBucketClaimListener() listener.InitializeKubeClient(kubeClient) listener.InitializeBucketClient(client) + listener.InitializeEventRecorder(eventRecorder) bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass) if err != nil { @@ -225,3 +235,93 @@ func runCreateBucketIdempotency(t *testing.T, name string) { t.Fatalf("Expecting a single Bucket created but found %v", len(bucketList.Items)) } } + +// Test recording events +func TestRecordEvents(t *testing.T) { + t.Parallel() + + defaultBucketClaim := &v1alpha1.BucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-bucketClaim", + Namespace: "test-ns", + }, + Spec: v1alpha1.BucketClaimSpec{ + BucketClassName: "test-bucketClass", + }, + } + + for _, tc := range []struct { + name string + expectedEvent string + eventTrigger func(*testing.T, *BucketClaimListener) + }{ + { + name: "ExistingBucketNotFound", + expectedEvent: newEvent( + v1.EventTypeWarning, + events.FailedCreateBucket, + "buckets.objectstorage.k8s.io \"existing-bucket\" not found"), + eventTrigger: func(t *testing.T, bcl *BucketClaimListener) { + ctx := context.TODO() + + bucketClaim := defaultBucketClaim.DeepCopy() + bucketClaim.Spec.ExistingBucketName = "existing-bucket" + + err := bcl.Add(ctx, bucketClaim) + if !kubeerrors.IsNotFound(err) { + t.Errorf("expected Not Found error got %v", err) + } + }, + }, + { + name: "BucketClassNotFound", + expectedEvent: newEvent( + v1.EventTypeWarning, + events.FailedCreateBucket, + "bucketclasses.objectstorage.k8s.io \"test-bucketClass\" not found"), + eventTrigger: func(t *testing.T, listener *BucketClaimListener) { + ctx := context.TODO() + bucketClaim := defaultBucketClaim.DeepCopy() + + err := listener.Add(ctx, bucketClaim) + if !kubeerrors.IsNotFound(err) { + t.Errorf("expected Not Found error got %v", err) + } + }, + }, + } { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + client := fakebucketclientset.NewSimpleClientset() + kubeClient := fakekubeclientset.NewSimpleClientset() + eventRecorder := record.NewFakeRecorder(1) + + listener := NewBucketClaimListener() + listener.InitializeKubeClient(kubeClient) + listener.InitializeBucketClient(client) + listener.InitializeEventRecorder(eventRecorder) + + tc.eventTrigger(t, listener) + + select { + case event, ok := <-eventRecorder.Events: + if ok { + if event != tc.expectedEvent { + t.Errorf("expected %s got %s", tc.expectedEvent, event) + } + } else { + t.Error("channel closed, no event") + } + default: + t.Errorf("no event after trigger") + } + }) + } +} + +func newEvent(eventType, reason, message string) string { + return fmt.Sprintf("%s %s %s", eventType, reason, message) +} diff --git a/pkg/util/const.go b/pkg/util/const.go index 0aa9bf7..082d779 100644 --- a/pkg/util/const.go +++ b/pkg/util/const.go @@ -10,7 +10,7 @@ const ( var ( // Error codes that the central controller will return - ErrBucketAlreadyExists = errors.New("A bucket already existing that matches the bucket claim") - ErrInvalidBucketClass = errors.New("Cannot find bucket class with the name specified in the bucket claim") - ErrNotImplemented = errors.New("Operation Not Implemented") + ErrBucketAlreadyExists = errors.New("a bucket already exists that matches the bucket claim") + ErrInvalidBucketClass = errors.New("cannot find bucket class with the name specified in the bucket claim") + ErrNotImplemented = errors.New("operation not implemented") )