Skip to content
This repository was archived by the owner on Dec 3, 2024. It is now read-only.

Added EventRecorder to the Controller #101

Merged
merged 17 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
k8s.io/apimachinery v0.24.2
k8s.io/client-go v0.24.2
k8s.io/klog/v2 v2.70.1
sigs.k8s.io/container-object-storage-interface-api v0.0.0-20220806044417-5d7517114883
sigs.k8s.io/container-object-storage-interface-api v0.1.1-0.20240208184109-05444273ee49
sigs.k8s.io/controller-runtime v0.12.3
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,8 @@ k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/container-object-storage-interface-api v0.0.0-20220806044417-5d7517114883 h1:CtqK7l2m9Atw8L5daJdsXvVgxxvQkRBbJbUz7NiadD8=
sigs.k8s.io/container-object-storage-interface-api v0.0.0-20220806044417-5d7517114883/go.mod h1:YiB+i/UGkzqgODDhRG3u7jkbWkQcoUeLEJ7hwOT/2Qk=
sigs.k8s.io/container-object-storage-interface-api v0.1.1-0.20240208184109-05444273ee49 h1:Ax4j3ThWolmk6yH6jvL3Yf0Fzxe0ZfVuDlSLNILU3GA=
sigs.k8s.io/container-object-storage-interface-api v0.1.1-0.20240208184109-05444273ee49/go.mod h1:YiB+i/UGkzqgODDhRG3u7jkbWkQcoUeLEJ7hwOT/2Qk=
sigs.k8s.io/controller-runtime v0.12.3 h1:FCM8xeY/FI8hoAfh/V4XbbYMY20gElh9yh+A98usMio=
sigs.k8s.io/controller-runtime v0.12.3/go.mod h1:qKsk4WE6zW2Hfj0G4v10EnNB2jMG1C+NTb8h+DwCoU0=
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 h1:kDi4JBNAsJWfz1aEXhO8Jg87JJaPNLh5tIzYHgStQ9Y=
Expand Down
103 changes: 68 additions & 35 deletions pkg/bucketclaim/bucketclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,37 @@ package bucketclaim

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned"
objectstoragev1alpha1 "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/typed/objectstorage/v1alpha1"

"sigs.k8s.io/container-object-storage-interface-api/controller/events"
"sigs.k8s.io/container-object-storage-interface-controller/pkg/util"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

// bucketClaimListener is a resource handler for bucket requests objects
type bucketClaimListener struct {
// BucketClaimListener is a resource handler for bucket requests objects
type BucketClaimListener struct {
eventRecorder record.EventRecorder

kubeClient kubeclientset.Interface
bucketClient bucketclientset.Interface
}

func NewBucketClaimListener() *bucketClaimListener {
return &bucketClaimListener{}
func NewBucketClaimListener() *BucketClaimListener {
return &BucketClaimListener{}
}

// Add creates a bucket in response to a bucketClaim
func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
func (b *BucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
klog.V(3).InfoS("Add BucketClaim",
"name", bucketClaim.ObjectMeta.Name,
"ns", bucketClaim.ObjectMeta.Namespace,
Expand All @@ -39,7 +43,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc
if err != nil {
switch err {
case util.ErrInvalidBucketClass:
klog.V(3).ErrorS(util.ErrInvalidBucketClass,
klog.V(3).ErrorS(err,
"bucketClaim", bucketClaim.ObjectMeta.Name,
"ns", bucketClaim.ObjectMeta.Namespace,
"bucketClassName", bucketClaim.Spec.BucketClassName)
Expand All @@ -65,7 +69,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc
}

// update processes any updates made to the bucket request
func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.BucketClaim) error {
func (b *BucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.BucketClaim) error {
klog.V(3).InfoS("Update BucketClaim",
"name", old.Name,
"ns", old.Namespace)
Expand All @@ -80,7 +84,7 @@ func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.Buc
klog.V(3).ErrorS(err, "Error deleting bucket",
"bucket", bucketName,
"bucketClaim", bucketClaim.ObjectMeta.Name)
return err
return b.recordError(bucketClaim, v1.EventTypeWarning, events.FailedDeleteBucket, err)
}

klog.V(5).Infof("Successfully deleted bucket: %s from bucketClaim: %s", bucketName, bucketClaim.ObjectMeta.Name)
Expand All @@ -94,22 +98,22 @@ func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.Buc
}

// Delete processes a bucket for which bucket request is deleted
func (b *bucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
klog.V(3).Info("Delete BucketClaim",
func (b *BucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
klog.V(3).InfoS("Delete BucketClaim",
"name", bucketClaim.ObjectMeta.Name,
"ns", bucketClaim.ObjectMeta.Namespace)

return nil
}

// provisionBucketClaimOperation attempts to provision a bucket for a given bucketClaim.
// Return values
//
// nil - BucketClaim successfully processed
// ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff]
// ErrBucketAlreadyExists - BucketClaim already processed
// non-nil err - Internal error [requeue'd with exponential backoff]
func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, inputBucketClaim *v1alpha1.BucketClaim) error {
// Return values
// - nil - BucketClaim successfully processed
// - ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff]
// - ErrBucketAlreadyExists - BucketClaim already processed
// - non-nil err - Internal error [requeue'd with exponential backoff]
func (b *BucketClaimListener) provisionBucketClaimOperation(ctx context.Context, inputBucketClaim *v1alpha1.BucketClaim) error {
bucketClaim := inputBucketClaim.DeepCopy()
if bucketClaim.Status.BucketReady {
return util.ErrBucketAlreadyExists
Expand All @@ -121,9 +125,11 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
if bucketClaim.Spec.ExistingBucketName != "" {
bucketName = bucketClaim.Spec.ExistingBucketName
bucket, err := b.buckets().Get(ctx, bucketName, metav1.GetOptions{})
if err != nil {
if kubeerrors.IsNotFound(err) {
return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err)
} else if err != nil {
klog.V(3).ErrorS(err, "Get Bucket with ExistingBucketName error", "name", bucketClaim.Spec.ExistingBucketName)
return err
return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err)
}

bucket.Spec.BucketClaim = &v1.ObjectReference{
Expand All @@ -141,21 +147,23 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
klog.V(3).ErrorS(err, "Error updating existing bucket",
"bucket", bucket.ObjectMeta.Name,
"bucketClaim", bucketClaim.ObjectMeta.Name)
return err
return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err)
}

bucketClaim.Status.BucketName = bucketName
bucketClaim.Status.BucketReady = true
} else {
bucketClassName := bucketClaim.Spec.BucketClassName
if bucketClassName == "" {
return util.ErrInvalidBucketClass
return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, util.ErrInvalidBucketClass)
}

bucketClass, err := b.bucketClasses().Get(ctx, bucketClassName, metav1.GetOptions{})
if err != nil {
if kubeerrors.IsNotFound(err) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if IsNotFound() handles the nil case. Would it be better for this to be something like below, or am I missing something?

if err != nil {
    if kubeerrors.IsNotFound(err) {
        b.recordEvent( ... )  
    }
    klog.V(3)....
    return err
}

return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err)
} else if err != nil {
klog.V(3).ErrorS(err, "Get Bucketclass Error", "name", bucketClassName)
return util.ErrInvalidBucketClass
return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err)
}

bucketName = bucketClassName + string(bucketClaim.ObjectMeta.UID)
Expand All @@ -180,11 +188,11 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,

bucket.Spec.Protocols = protocolCopy
bucket, err = b.buckets().Create(ctx, bucket, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
if err != nil && !kubeerrors.IsAlreadyExists(err) {
klog.V(3).ErrorS(err, "Error creationg bucket",
"bucket", bucketName,
"bucketClaim", bucketClaim.ObjectMeta.Name)
return err
return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err)
}

bucketClaim.Status.BucketName = bucketName
Expand All @@ -196,7 +204,7 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
bucketClaim, err = b.bucketClaims(bucketClaim.ObjectMeta.Namespace).UpdateStatus(ctx, bucketClaim, metav1.UpdateOptions{})
if err != nil {
klog.V(3).ErrorS(err, "Failed to update status of BucketClaim", "name", bucketClaim.ObjectMeta.Name)
return err
return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err)
}

// Add the finalizers so that bucketClaim is deleted
Expand All @@ -205,38 +213,63 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
_, err = b.bucketClaims(bucketClaim.ObjectMeta.Namespace).Update(ctx, bucketClaim, metav1.UpdateOptions{})
if err != nil {
klog.V(3).ErrorS(err, "Failed to add finalizer BucketClaim", "name", bucketClaim.ObjectMeta.Name)
return err
return b.recordError(inputBucketClaim, v1.EventTypeWarning, events.FailedCreateBucket, err)
}

klog.V(3).Infof("Finished creating Bucket %v", bucketName)
return nil
}

func (b *bucketClaimListener) InitializeKubeClient(k kubeclientset.Interface) {
// InitializeKubeClient initializes the kubernetes client
func (b *BucketClaimListener) InitializeKubeClient(k kubeclientset.Interface) {
b.kubeClient = k
}

func (b *bucketClaimListener) InitializeBucketClient(bc bucketclientset.Interface) {
// InitializeBucketClient initializes the object storage bucket client
func (b *BucketClaimListener) InitializeBucketClient(bc bucketclientset.Interface) {
b.bucketClient = bc
}

func (b *bucketClaimListener) buckets() objectstoragev1alpha1.BucketInterface {
// InitializeEventRecorder initializes the event recorder
func (b *BucketClaimListener) InitializeEventRecorder(er record.EventRecorder) {
b.eventRecorder = er
}

func (b *BucketClaimListener) buckets() objectstoragev1alpha1.BucketInterface {
if b.bucketClient != nil {
return b.bucketClient.ObjectstorageV1alpha1().Buckets()
}
panic("uninitialized listener")
}

func (b *bucketClaimListener) bucketClasses() objectstoragev1alpha1.BucketClassInterface {
func (b *BucketClaimListener) bucketClasses() objectstoragev1alpha1.BucketClassInterface {
if b.bucketClient != nil {
return b.bucketClient.ObjectstorageV1alpha1().BucketClasses()
}
panic("uninitialized listener")
}

func (b *bucketClaimListener) bucketClaims(namespace string) objectstoragev1alpha1.BucketClaimInterface {
func (b *BucketClaimListener) bucketClaims(namespace string) objectstoragev1alpha1.BucketClaimInterface {
if b.bucketClient != nil {
return b.bucketClient.ObjectstorageV1alpha1().BucketClaims(namespace)
}
panic("uninitialized listener")
}

// recordError during the processing of the objects
func (b *BucketClaimListener) recordError(subject runtime.Object, eventtype, reason string, err error) error {
if b.eventRecorder == nil {
return err
}
b.eventRecorder.Event(subject, eventtype, reason, err.Error())

return err
}

// recordEvent during the processing of the objects
func (b *BucketClaimListener) recordEvent(subject runtime.Object, eventtype, reason, message string, args ...any) {
if b.eventRecorder == nil {
return
}
b.eventRecorder.Event(subject, eventtype, reason, fmt.Sprintf(message, args...))
}
Loading