Skip to content
This repository was archived by the owner on Dec 3, 2024. It is now read-only.

Added EventRecorder to the Controller #101

Merged
merged 17 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
k8s.io/apimachinery v0.24.2
k8s.io/client-go v0.24.2
k8s.io/klog/v2 v2.70.1
sigs.k8s.io/container-object-storage-interface-api v0.0.0-20220806044417-5d7517114883
sigs.k8s.io/container-object-storage-interface-api v0.1.1-0.20231116171305-97700454b010
sigs.k8s.io/controller-runtime v0.12.3
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,8 @@ k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/container-object-storage-interface-api v0.0.0-20220806044417-5d7517114883 h1:CtqK7l2m9Atw8L5daJdsXvVgxxvQkRBbJbUz7NiadD8=
sigs.k8s.io/container-object-storage-interface-api v0.0.0-20220806044417-5d7517114883/go.mod h1:YiB+i/UGkzqgODDhRG3u7jkbWkQcoUeLEJ7hwOT/2Qk=
sigs.k8s.io/container-object-storage-interface-api v0.1.1-0.20231116171305-97700454b010 h1:8Lw3AyLbbkRGlB9GRu9prtSPEp8DLlXjUzaXN6b9gxM=
sigs.k8s.io/container-object-storage-interface-api v0.1.1-0.20231116171305-97700454b010/go.mod h1:YiB+i/UGkzqgODDhRG3u7jkbWkQcoUeLEJ7hwOT/2Qk=
sigs.k8s.io/controller-runtime v0.12.3 h1:FCM8xeY/FI8hoAfh/V4XbbYMY20gElh9yh+A98usMio=
sigs.k8s.io/controller-runtime v0.12.3/go.mod h1:qKsk4WE6zW2Hfj0G4v10EnNB2jMG1C+NTb8h+DwCoU0=
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 h1:kDi4JBNAsJWfz1aEXhO8Jg87JJaPNLh5tIzYHgStQ9Y=
Expand Down
80 changes: 52 additions & 28 deletions pkg/bucketclaim/bucketclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,34 @@ import (
"context"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned"
objectstoragev1alpha1 "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/typed/objectstorage/v1alpha1"

"sigs.k8s.io/container-object-storage-interface-api/controller/events"
"sigs.k8s.io/container-object-storage-interface-controller/pkg/util"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

// bucketClaimListener is a resource handler for bucket requests objects
type bucketClaimListener struct {
// BucketClaimListener is a resource handler for bucket requests objects
type BucketClaimListener struct {
eventRecorder record.EventRecorder

kubeClient kubeclientset.Interface
bucketClient bucketclientset.Interface
}

func NewBucketClaimListener() *bucketClaimListener {
return &bucketClaimListener{}
func NewBucketClaimListener() *BucketClaimListener {
return &BucketClaimListener{}
}

// Add creates a bucket in response to a bucketClaim
func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
func (b *BucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
klog.V(3).InfoS("Add BucketClaim",
"name", bucketClaim.ObjectMeta.Name,
"ns", bucketClaim.ObjectMeta.Namespace,
Expand All @@ -39,7 +42,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc
if err != nil {
switch err {
case util.ErrInvalidBucketClass:
klog.V(3).ErrorS(util.ErrInvalidBucketClass,
klog.V(3).ErrorS(err,
"bucketClaim", bucketClaim.ObjectMeta.Name,
"ns", bucketClaim.ObjectMeta.Namespace,
"bucketClassName", bucketClaim.Spec.BucketClassName)
Expand All @@ -65,7 +68,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc
}

// update processes any updates made to the bucket request
func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.BucketClaim) error {
func (b *BucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.BucketClaim) error {
klog.V(3).InfoS("Update BucketClaim",
"name", old.Name,
"ns", old.Namespace)
Expand Down Expand Up @@ -94,22 +97,22 @@ func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.Buc
}

// Delete processes a bucket for which bucket request is deleted
func (b *bucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
klog.V(3).Info("Delete BucketClaim",
func (b *BucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
klog.V(3).InfoS("Delete BucketClaim",
"name", bucketClaim.ObjectMeta.Name,
"ns", bucketClaim.ObjectMeta.Namespace)

return nil
}

// provisionBucketClaimOperation attempts to provision a bucket for a given bucketClaim.
// Return values
//
// nil - BucketClaim successfully processed
// ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff]
// ErrBucketAlreadyExists - BucketClaim already processed
// non-nil err - Internal error [requeue'd with exponential backoff]
func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, inputBucketClaim *v1alpha1.BucketClaim) error {
// Return values
// - nil - BucketClaim successfully processed
// - ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff]
// - ErrBucketAlreadyExists - BucketClaim already processed
// - non-nil err - Internal error [requeue'd with exponential backoff]
func (b *BucketClaimListener) provisionBucketClaimOperation(ctx context.Context, inputBucketClaim *v1alpha1.BucketClaim) error {
bucketClaim := inputBucketClaim.DeepCopy()
if bucketClaim.Status.BucketReady {
return util.ErrBucketAlreadyExists
Expand All @@ -121,7 +124,10 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
if bucketClaim.Spec.ExistingBucketName != "" {
bucketName = bucketClaim.Spec.ExistingBucketName
bucket, err := b.buckets().Get(ctx, bucketName, metav1.GetOptions{})
if err != nil {
if kubeerrors.IsNotFound(err) {
b.recordEvent(inputBucketClaim, v1.EventTypeWarning, events.ProvisioningFailed, "Bucket provided in the BucketClaim does not exist")
return err
} else if err != nil {
klog.V(3).ErrorS(err, "Get Bucket with ExistingBucketName error", "name", bucketClaim.Spec.ExistingBucketName)
return err
}
Expand Down Expand Up @@ -153,9 +159,12 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
}

bucketClass, err := b.bucketClasses().Get(ctx, bucketClassName, metav1.GetOptions{})
if err != nil {
klog.V(3).ErrorS(err, "Get Bucketclass Error", "name", bucketClassName)
if kubeerrors.IsNotFound(err) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if IsNotFound() handles the nil case. Would it be better for this to be something like below, or am I missing something?

if err != nil {
    if kubeerrors.IsNotFound(err) {
        b.recordEvent( ... )  
    }
    klog.V(3)....
    return err
}

b.recordEvent(inputBucketClaim, v1.EventTypeWarning, events.ProvisioningFailed, "BucketClass provided in the BucketClaim does not exist")
return util.ErrInvalidBucketClass
} else if err != nil {
klog.V(3).ErrorS(err, "Get Bucketclass Error", "name", bucketClassName)
return err
}

bucketName = bucketClassName + string(bucketClaim.ObjectMeta.UID)
Expand All @@ -180,7 +189,7 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,

bucket.Spec.Protocols = protocolCopy
bucket, err = b.buckets().Create(ctx, bucket, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
if err != nil && !kubeerrors.IsAlreadyExists(err) {
klog.V(3).ErrorS(err, "Error creationg bucket",
"bucket", bucketName,
"bucketClaim", bucketClaim.ObjectMeta.Name)
Expand Down Expand Up @@ -212,31 +221,46 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
return nil
}

func (b *bucketClaimListener) InitializeKubeClient(k kubeclientset.Interface) {
// InitializeKubeClient initializes the kubernetes client
func (b *BucketClaimListener) InitializeKubeClient(k kubeclientset.Interface) {
b.kubeClient = k
}

func (b *bucketClaimListener) InitializeBucketClient(bc bucketclientset.Interface) {
// InitializeBucketClient initializes the object storage bucket client
func (b *BucketClaimListener) InitializeBucketClient(bc bucketclientset.Interface) {
b.bucketClient = bc
}

func (b *bucketClaimListener) buckets() objectstoragev1alpha1.BucketInterface {
// InitializeEventRecorder initializes the event recorder
func (b *BucketClaimListener) InitializeEventRecorder(er record.EventRecorder) {
b.eventRecorder = er
}

func (b *BucketClaimListener) buckets() objectstoragev1alpha1.BucketInterface {
if b.bucketClient != nil {
return b.bucketClient.ObjectstorageV1alpha1().Buckets()
}
panic("uninitialized listener")
}

func (b *bucketClaimListener) bucketClasses() objectstoragev1alpha1.BucketClassInterface {
func (b *BucketClaimListener) bucketClasses() objectstoragev1alpha1.BucketClassInterface {
if b.bucketClient != nil {
return b.bucketClient.ObjectstorageV1alpha1().BucketClasses()
}
panic("uninitialized listener")
}

func (b *bucketClaimListener) bucketClaims(namespace string) objectstoragev1alpha1.BucketClaimInterface {
func (b *BucketClaimListener) bucketClaims(namespace string) objectstoragev1alpha1.BucketClaimInterface {
if b.bucketClient != nil {
return b.bucketClient.ObjectstorageV1alpha1().BucketClaims(namespace)
}
panic("uninitialized listener")
}

// recordEvent during the processing of the objects
func (b *BucketClaimListener) recordEvent(subject runtime.Object, eventtype, reason, message string) {
if b.eventRecorder == nil {
return
}
b.eventRecorder.Event(subject, eventtype, reason, message)
}
7 changes: 7 additions & 0 deletions pkg/bucketclaim/bucketclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"

types "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/fake"
Expand Down Expand Up @@ -84,10 +85,12 @@ func runCreateBucket(t *testing.T, name string) {

client := bucketclientset.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
eventRecorder := record.NewFakeRecorder(3)

listener := NewBucketClaimListener()
listener.InitializeKubeClient(kubeClient)
listener.InitializeBucketClient(client)
listener.InitializeEventRecorder(eventRecorder)

bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass)
if err != nil {
Expand Down Expand Up @@ -127,10 +130,12 @@ func runCreateBucketWithMultipleBR(t *testing.T, name string) {

client := bucketclientset.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
eventRecorder := record.NewFakeRecorder(3)

listener := NewBucketClaimListener()
listener.InitializeKubeClient(kubeClient)
listener.InitializeBucketClient(client)
listener.InitializeEventRecorder(eventRecorder)

bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass)
if err != nil {
Expand Down Expand Up @@ -181,10 +186,12 @@ func runCreateBucketIdempotency(t *testing.T, name string) {

client := bucketclientset.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
eventRecorder := record.NewFakeRecorder(3)

listener := NewBucketClaimListener()
listener.InitializeKubeClient(kubeClient)
listener.InitializeBucketClient(client)
listener.InitializeEventRecorder(eventRecorder)

bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const (

var (
// Error codes that the central controller will return
ErrBucketAlreadyExists = errors.New("A bucket already existing that matches the bucket claim")
ErrInvalidBucketClass = errors.New("Cannot find bucket class with the name specified in the bucket claim")
ErrNotImplemented = errors.New("Operation Not Implemented")
ErrBucketAlreadyExists = errors.New("a bucket already existing that matches the bucket claim")
ErrInvalidBucketClass = errors.New("cannot find bucket class with the name specified in the bucket claim")
ErrNotImplemented = errors.New("operation not implemented")
)