Skip to content
This repository was archived by the owner on Dec 3, 2024. It is now read-only.

Added EventRecorder to the Controller #101

Merged
merged 17 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
k8s.io/apimachinery v0.24.2
k8s.io/client-go v0.24.2
k8s.io/klog/v2 v2.70.1
sigs.k8s.io/container-object-storage-interface-api v0.0.0-20220806044417-5d7517114883
sigs.k8s.io/container-object-storage-interface-api v0.1.1-0.20240208184109-05444273ee49
sigs.k8s.io/controller-runtime v0.12.3
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,8 @@ k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/container-object-storage-interface-api v0.0.0-20220806044417-5d7517114883 h1:CtqK7l2m9Atw8L5daJdsXvVgxxvQkRBbJbUz7NiadD8=
sigs.k8s.io/container-object-storage-interface-api v0.0.0-20220806044417-5d7517114883/go.mod h1:YiB+i/UGkzqgODDhRG3u7jkbWkQcoUeLEJ7hwOT/2Qk=
sigs.k8s.io/container-object-storage-interface-api v0.1.1-0.20240208184109-05444273ee49 h1:Ax4j3ThWolmk6yH6jvL3Yf0Fzxe0ZfVuDlSLNILU3GA=
sigs.k8s.io/container-object-storage-interface-api v0.1.1-0.20240208184109-05444273ee49/go.mod h1:YiB+i/UGkzqgODDhRG3u7jkbWkQcoUeLEJ7hwOT/2Qk=
sigs.k8s.io/controller-runtime v0.12.3 h1:FCM8xeY/FI8hoAfh/V4XbbYMY20gElh9yh+A98usMio=
sigs.k8s.io/controller-runtime v0.12.3/go.mod h1:qKsk4WE6zW2Hfj0G4v10EnNB2jMG1C+NTb8h+DwCoU0=
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 h1:kDi4JBNAsJWfz1aEXhO8Jg87JJaPNLh5tIzYHgStQ9Y=
Expand Down
81 changes: 53 additions & 28 deletions pkg/bucketclaim/bucketclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,37 @@ package bucketclaim

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned"
objectstoragev1alpha1 "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/typed/objectstorage/v1alpha1"

"sigs.k8s.io/container-object-storage-interface-api/controller/events"
"sigs.k8s.io/container-object-storage-interface-controller/pkg/util"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

// bucketClaimListener is a resource handler for bucket requests objects
type bucketClaimListener struct {
// BucketClaimListener is a resource handler for bucket requests objects
type BucketClaimListener struct {
eventRecorder record.EventRecorder

kubeClient kubeclientset.Interface
bucketClient bucketclientset.Interface
}

func NewBucketClaimListener() *bucketClaimListener {
return &bucketClaimListener{}
func NewBucketClaimListener() *BucketClaimListener {
return &BucketClaimListener{}
}

// Add creates a bucket in response to a bucketClaim
func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
func (b *BucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
klog.V(3).InfoS("Add BucketClaim",
"name", bucketClaim.ObjectMeta.Name,
"ns", bucketClaim.ObjectMeta.Namespace,
Expand All @@ -39,7 +43,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc
if err != nil {
switch err {
case util.ErrInvalidBucketClass:
klog.V(3).ErrorS(util.ErrInvalidBucketClass,
klog.V(3).ErrorS(err,
"bucketClaim", bucketClaim.ObjectMeta.Name,
"ns", bucketClaim.ObjectMeta.Namespace,
"bucketClassName", bucketClaim.Spec.BucketClassName)
Expand All @@ -65,7 +69,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc
}

// update processes any updates made to the bucket request
func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.BucketClaim) error {
func (b *BucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.BucketClaim) error {
klog.V(3).InfoS("Update BucketClaim",
"name", old.Name,
"ns", old.Namespace)
Expand Down Expand Up @@ -94,22 +98,22 @@ func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.Buc
}

// Delete processes a bucket for which bucket request is deleted
func (b *bucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
klog.V(3).Info("Delete BucketClaim",
func (b *BucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
klog.V(3).InfoS("Delete BucketClaim",
"name", bucketClaim.ObjectMeta.Name,
"ns", bucketClaim.ObjectMeta.Namespace)

return nil
}

// provisionBucketClaimOperation attempts to provision a bucket for a given bucketClaim.
// Return values
//
// nil - BucketClaim successfully processed
// ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff]
// ErrBucketAlreadyExists - BucketClaim already processed
// non-nil err - Internal error [requeue'd with exponential backoff]
func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, inputBucketClaim *v1alpha1.BucketClaim) error {
// Return values
// - nil - BucketClaim successfully processed
// - ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff]
// - ErrBucketAlreadyExists - BucketClaim already processed
// - non-nil err - Internal error [requeue'd with exponential backoff]
func (b *BucketClaimListener) provisionBucketClaimOperation(ctx context.Context, inputBucketClaim *v1alpha1.BucketClaim) error {
bucketClaim := inputBucketClaim.DeepCopy()
if bucketClaim.Status.BucketReady {
return util.ErrBucketAlreadyExists
Expand All @@ -121,7 +125,10 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
if bucketClaim.Spec.ExistingBucketName != "" {
bucketName = bucketClaim.Spec.ExistingBucketName
bucket, err := b.buckets().Get(ctx, bucketName, metav1.GetOptions{})
if err != nil {
if kubeerrors.IsNotFound(err) {
b.recordEvent(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err.Error())
return err
} else if err != nil {
klog.V(3).ErrorS(err, "Get Bucket with ExistingBucketName error", "name", bucketClaim.Spec.ExistingBucketName)
return err
}
Expand Down Expand Up @@ -153,9 +160,12 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
}

bucketClass, err := b.bucketClasses().Get(ctx, bucketClassName, metav1.GetOptions{})
if err != nil {
if kubeerrors.IsNotFound(err) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if IsNotFound() handles the nil case. Would it be better for this to be something like below, or am I missing something?

if err != nil {
    if kubeerrors.IsNotFound(err) {
        b.recordEvent( ... )  
    }
    klog.V(3)....
    return err
}

b.recordEvent(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err.Error())
return err
} else if err != nil {
klog.V(3).ErrorS(err, "Get Bucketclass Error", "name", bucketClassName)
return util.ErrInvalidBucketClass
return err
}

bucketName = bucketClassName + string(bucketClaim.ObjectMeta.UID)
Expand All @@ -180,7 +190,7 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,

bucket.Spec.Protocols = protocolCopy
bucket, err = b.buckets().Create(ctx, bucket, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
if err != nil && !kubeerrors.IsAlreadyExists(err) {
klog.V(3).ErrorS(err, "Error creationg bucket",
"bucket", bucketName,
"bucketClaim", bucketClaim.ObjectMeta.Name)
Expand Down Expand Up @@ -212,31 +222,46 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
return nil
}

func (b *bucketClaimListener) InitializeKubeClient(k kubeclientset.Interface) {
// InitializeKubeClient initializes the kubernetes client
func (b *BucketClaimListener) InitializeKubeClient(k kubeclientset.Interface) {
b.kubeClient = k
}

func (b *bucketClaimListener) InitializeBucketClient(bc bucketclientset.Interface) {
// InitializeBucketClient initializes the object storage bucket client
func (b *BucketClaimListener) InitializeBucketClient(bc bucketclientset.Interface) {
b.bucketClient = bc
}

func (b *bucketClaimListener) buckets() objectstoragev1alpha1.BucketInterface {
// InitializeEventRecorder initializes the event recorder
func (b *BucketClaimListener) InitializeEventRecorder(er record.EventRecorder) {
b.eventRecorder = er
}

func (b *BucketClaimListener) buckets() objectstoragev1alpha1.BucketInterface {
if b.bucketClient != nil {
return b.bucketClient.ObjectstorageV1alpha1().Buckets()
}
panic("uninitialized listener")
}

func (b *bucketClaimListener) bucketClasses() objectstoragev1alpha1.BucketClassInterface {
func (b *BucketClaimListener) bucketClasses() objectstoragev1alpha1.BucketClassInterface {
if b.bucketClient != nil {
return b.bucketClient.ObjectstorageV1alpha1().BucketClasses()
}
panic("uninitialized listener")
}

func (b *bucketClaimListener) bucketClaims(namespace string) objectstoragev1alpha1.BucketClaimInterface {
func (b *BucketClaimListener) bucketClaims(namespace string) objectstoragev1alpha1.BucketClaimInterface {
if b.bucketClient != nil {
return b.bucketClient.ObjectstorageV1alpha1().BucketClaims(namespace)
}
panic("uninitialized listener")
}

// recordEvent during the processing of the objects
func (b *BucketClaimListener) recordEvent(subject runtime.Object, eventtype, reason, message string, args ...any) {
if b.eventRecorder == nil {
return
}
b.eventRecorder.Event(subject, eventtype, reason, fmt.Sprintf(message, args...))
}
120 changes: 110 additions & 10 deletions pkg/bucketclaim/bucketclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package bucketclaim

import (
"context"
"fmt"
"testing"

v1 "k8s.io/api/core/v1"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"

fakekubeclientset "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
types "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/fake"

fakebucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/fake"
"sigs.k8s.io/container-object-storage-interface-api/controller/events"
"sigs.k8s.io/container-object-storage-interface-controller/pkg/util"
)

Expand Down Expand Up @@ -82,12 +86,14 @@ func runCreateBucket(t *testing.T, name string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := bucketclientset.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
client := fakebucketclientset.NewSimpleClientset()
kubeClient := fakekubeclientset.NewSimpleClientset()
eventRecorder := record.NewFakeRecorder(3)

listener := NewBucketClaimListener()
listener.InitializeKubeClient(kubeClient)
listener.InitializeBucketClient(client)
listener.InitializeEventRecorder(eventRecorder)

bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass)
if err != nil {
Expand Down Expand Up @@ -125,12 +131,14 @@ func runCreateBucketWithMultipleBR(t *testing.T, name string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := bucketclientset.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
client := fakebucketclientset.NewSimpleClientset()
kubeClient := fakekubeclientset.NewSimpleClientset()
eventRecorder := record.NewFakeRecorder(3)

listener := NewBucketClaimListener()
listener.InitializeKubeClient(kubeClient)
listener.InitializeBucketClient(client)
listener.InitializeEventRecorder(eventRecorder)

bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass)
if err != nil {
Expand Down Expand Up @@ -179,12 +187,14 @@ func runCreateBucketIdempotency(t *testing.T, name string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := bucketclientset.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
client := fakebucketclientset.NewSimpleClientset()
kubeClient := fakekubeclientset.NewSimpleClientset()
eventRecorder := record.NewFakeRecorder(3)

listener := NewBucketClaimListener()
listener.InitializeKubeClient(kubeClient)
listener.InitializeBucketClient(client)
listener.InitializeEventRecorder(eventRecorder)

bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass)
if err != nil {
Expand Down Expand Up @@ -225,3 +235,93 @@ func runCreateBucketIdempotency(t *testing.T, name string) {
t.Fatalf("Expecting a single Bucket created but found %v", len(bucketList.Items))
}
}

// Test recording events
func TestRecordEvents(t *testing.T) {
t.Parallel()

defaultBucketClaim := &v1alpha1.BucketClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "test-bucketClaim",
Namespace: "test-ns",
},
Spec: v1alpha1.BucketClaimSpec{
BucketClassName: "test-bucketClass",
},
}

for _, tc := range []struct {
name string
expectedEvent string
eventTrigger func(*testing.T, *BucketClaimListener)
}{
{
name: "ExistingBucketNotFound",
expectedEvent: newEvent(
v1.EventTypeWarning,
events.FailedCreateBucket,
"buckets.objectstorage.k8s.io \"existing-bucket\" not found"),
eventTrigger: func(t *testing.T, bcl *BucketClaimListener) {
ctx := context.TODO()

bucketClaim := defaultBucketClaim.DeepCopy()
bucketClaim.Spec.ExistingBucketName = "existing-bucket"

err := bcl.Add(ctx, bucketClaim)
if !kubeerrors.IsNotFound(err) {
t.Errorf("expected Not Found error got %v", err)
}
},
},
{
name: "BucketClassNotFound",
expectedEvent: newEvent(
v1.EventTypeWarning,
events.FailedCreateBucket,
"bucketclasses.objectstorage.k8s.io \"test-bucketClass\" not found"),
eventTrigger: func(t *testing.T, listener *BucketClaimListener) {
ctx := context.TODO()
bucketClaim := defaultBucketClaim.DeepCopy()

err := listener.Add(ctx, bucketClaim)
if !kubeerrors.IsNotFound(err) {
t.Errorf("expected Not Found error got %v", err)
}
},
},
} {
tc := tc

t.Run(tc.name, func(t *testing.T) {
t.Parallel()

client := fakebucketclientset.NewSimpleClientset()
kubeClient := fakekubeclientset.NewSimpleClientset()
eventRecorder := record.NewFakeRecorder(1)

listener := NewBucketClaimListener()
listener.InitializeKubeClient(kubeClient)
listener.InitializeBucketClient(client)
listener.InitializeEventRecorder(eventRecorder)

tc.eventTrigger(t, listener)

select {
case event, ok := <-eventRecorder.Events:
if ok {
if event != tc.expectedEvent {
t.Errorf("expected %s got %s", tc.expectedEvent, event)
}
} else {
t.Error("channel closed, no event")
}
default:
t.Errorf("no event after trigger")
}
})
}
}

func newEvent(eventType, reason, message string) string {
return fmt.Sprintf("%s %s %s", eventType, reason, message)
}
6 changes: 3 additions & 3 deletions pkg/util/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const (

var (
// Error codes that the central controller will return
ErrBucketAlreadyExists = errors.New("A bucket already existing that matches the bucket claim")
ErrInvalidBucketClass = errors.New("Cannot find bucket class with the name specified in the bucket claim")
ErrNotImplemented = errors.New("Operation Not Implemented")
ErrBucketAlreadyExists = errors.New("a bucket already exists that matches the bucket claim")
ErrInvalidBucketClass = errors.New("cannot find bucket class with the name specified in the bucket claim")
ErrNotImplemented = errors.New("operation not implemented")
)