Skip to content
This repository was archived by the owner on Dec 6, 2024. It is now read-only.

Commit adeb010

Browse files
committed
feat(bucket|bucketaccess): added EventRecorder
1 parent 8022a94 commit adeb010

File tree

4 files changed

+112
-35
lines changed

4 files changed

+112
-35
lines changed

Diff for: pkg/bucket/bucket_controller.go

+42-15
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@ import (
2020
"fmt"
2121
"strings"
2222

23+
v1 "k8s.io/api/core/v1"
24+
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
2325
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/runtime"
2427
utilversion "k8s.io/apimachinery/pkg/util/version"
2528
kube "k8s.io/client-go/kubernetes"
29+
"k8s.io/client-go/tools/record"
2630
"k8s.io/klog/v2"
2731

2832
"sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
@@ -42,6 +46,8 @@ type BucketListener struct {
4246
provisionerClient cosi.ProvisionerClient
4347
driverName string
4448

49+
eventRecorder record.EventRecorder
50+
4551
kubeClient kube.Interface
4652
bucketClient buckets.Interface
4753
kubeVersion *utilversion.Version
@@ -58,9 +64,16 @@ func NewBucketListener(driverName string, client cosi.ProvisionerClient) *Bucket
5864
}
5965

6066
// Add attempts to create a bucket for a given bucket. This function must be idempotent
67+
//
68+
// Recorded events
69+
//
70+
// MissingBucketClassName - BucketClassName was not defined for the inputBucket
71+
// InvalidBucketClass - BucketClass provided in the BucketClaim does not exist
72+
//
6173
// Return values
62-
// nil - Bucket successfully provisioned
63-
// non-nil err - Internal error [requeue'd with exponential backoff]
74+
//
75+
// nil - Bucket successfully provisioned
76+
// non-nil err - Internal error [requeue'd with exponential backoff]
6477
func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket) error {
6578
bucket := inputBucket.DeepCopy()
6679

@@ -71,7 +84,7 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
7184

7285
if bucket.Spec.BucketClassName == "" {
7386
err = errors.New(fmt.Sprintf("BucketClassName not defined for bucket %s", bucket.ObjectMeta.Name))
74-
klog.V(3).ErrorS(err, "BucketClassName not defined")
87+
b.recordEvent(inputBucket, v1.EventTypeWarning, "MissingBucketClassName", "BucketClassName was not defined in the Bucket")
7588
return err
7689
}
7790

@@ -100,15 +113,18 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
100113
bucketID = bucket.Spec.ExistingBucketID
101114
if bucket.Spec.Parameters == nil {
102115
bucketClass, err := b.bucketClasses().Get(ctx, bucket.Spec.BucketClassName, metav1.GetOptions{})
103-
if err != nil {
116+
if kubeerrors.IsNotFound(err) {
117+
b.recordEvent(inputBucket, v1.EventTypeWarning, "InvalidBucketClass", "BucketClass provided in the BucketClaim does not exist")
118+
return err
119+
} else if err != nil {
104120
klog.V(3).ErrorS(err, "Error fetching bucketClass",
105121
"bucketClass", bucket.Spec.BucketClassName,
106122
"bucket", bucket.ObjectMeta.Name)
107123
return err
108124
}
109125

110126
if bucketClass.Parameters != nil {
111-
var param map[string]string
127+
param := make(map[string]string)
112128
for k, v := range bucketClass.Parameters {
113129
param[k] = v
114130
}
@@ -125,11 +141,9 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
125141
rsp, err := b.provisionerClient.DriverCreateBucket(ctx, req)
126142
if err != nil {
127143
if status.Code(err) != codes.AlreadyExists {
128-
klog.V(3).ErrorS(err, "Driver failed to create bucket",
129-
"bucket", bucket.ObjectMeta.Name)
144+
b.recordEvent(inputBucket, v1.EventTypeWarning, status.Code(err).String(), "Failed to create bucket")
130145
return errors.Wrap(err, "Failed to create bucket")
131146
}
132-
133147
}
134148

135149
if rsp == nil {
@@ -201,8 +215,9 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
201215

202216
// Update attempts to reconcile changes to a given bucket. This function must be idempotent
203217
// Return values
204-
// nil - Bucket successfully reconciled
205-
// non-nil err - Internal error [requeue'd with exponential backoff]
218+
//
219+
// nil - Bucket successfully reconciled
220+
// non-nil err - Internal error [requeue'd with exponential backoff]
206221
func (b *BucketListener) Update(ctx context.Context, old, new *v1alpha1.Bucket) error {
207222
klog.V(3).InfoS("Update Bucket",
208223
"name", old.Name)
@@ -263,8 +278,9 @@ func (b *BucketListener) Update(ctx context.Context, old, new *v1alpha1.Bucket)
263278
// Delete function is called when the bucket was not able to add finalizers while creation.
264279
// Hence we will take care of removing the BucketClaim finalizer before deleting the Bucket object.
265280
// Return values
266-
// nil - Bucket successfully deleted
267-
// non-nil err - Internal error [requeue'd with exponential backoff]
281+
//
282+
// nil - Bucket successfully deleted
283+
// non-nil err - Internal error [requeue'd with exponential backoff]
268284
func (b *BucketListener) Delete(ctx context.Context, inputBucket *v1alpha1.Bucket) error {
269285
klog.V(3).InfoS("Delete Bucket",
270286
"name", inputBucket.ObjectMeta.Name,
@@ -314,6 +330,11 @@ func (b *BucketListener) InitializeBucketClient(bc buckets.Interface) {
314330
b.bucketClient = bc
315331
}
316332

333+
// InitializeEventRecorder initializes the event recorder
334+
func (b *BucketListener) InitializeEventRecorder(er record.EventRecorder) {
335+
b.eventRecorder = er
336+
}
337+
317338
func (b *BucketListener) deleteBucketOp(ctx context.Context, bucket *v1alpha1.Bucket) error {
318339
if !strings.EqualFold(bucket.Spec.DriverName, b.driverName) {
319340
klog.V(5).InfoS("Skipping bucket for provisioner",
@@ -333,9 +354,7 @@ func (b *BucketListener) deleteBucketOp(ctx context.Context, bucket *v1alpha1.Bu
333354

334355
if _, err := b.provisionerClient.DriverDeleteBucket(ctx, req); err != nil {
335356
if status.Code(err) != codes.NotFound {
336-
klog.V(3).ErrorS(err, "Failed to delete bucket",
337-
"bucket", bucket.ObjectMeta.Name,
338-
)
357+
b.recordEvent(bucket, v1.EventTypeWarning, status.Code(err).String(), "Failed to delete bucket")
339358
return err
340359
}
341360
}
@@ -396,3 +415,11 @@ func (b *BucketListener) bucketAccesses(namespace string) bucketapi.BucketAccess
396415
}
397416
panic("uninitialized listener")
398417
}
418+
419+
// recordEvent during the processing of the objects
420+
func (b *BucketListener) recordEvent(subject runtime.Object, eventtype, reason, message string) {
421+
if b.eventRecorder == nil {
422+
return
423+
}
424+
b.eventRecorder.Event(subject, eventtype, reason, message)
425+
}

Diff for: pkg/bucket/bucket_controller_test.go

+12
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"k8s.io/apimachinery/pkg/version"
3232
fakediscovery "k8s.io/client-go/discovery/fake"
3333
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
34+
"k8s.io/client-go/tools/record"
3435

3536
"google.golang.org/grpc"
3637
)
@@ -71,6 +72,17 @@ func TestInitializeBucketClient(t *testing.T) {
7172
}
7273
}
7374

75+
func TestInitializeEventRecorder(t *testing.T) {
76+
eventRecorder := record.NewFakeRecorder(1)
77+
78+
bl := BucketListener{}
79+
bl.InitializeEventRecorder(eventRecorder)
80+
81+
if bl.eventRecorder == nil {
82+
t.Errorf("BucketClient not initialized, expected not nil")
83+
}
84+
}
85+
7486
func TestAddWrongProvisioner(t *testing.T) {
7587
driver := "driver1"
7688
mpc := struct{ fakespec.FakeProvisionerClient }{}

Diff for: pkg/bucketaccess/bucketaccess_controller.go

+46-20
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ import (
2222
"strings"
2323
"time"
2424

25-
corev1 "k8s.io/api/core/v1"
25+
v1 "k8s.io/api/core/v1"
2626
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/runtime"
2829
utilversion "k8s.io/apimachinery/pkg/util/version"
2930
kube "k8s.io/client-go/kubernetes"
3031
kubecorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
32+
"k8s.io/client-go/tools/record"
3133
"k8s.io/klog/v2"
3234

3335
cosiapi "sigs.k8s.io/container-object-storage-interface-api/apis"
@@ -48,6 +50,8 @@ type BucketAccessListener struct {
4850
provisionerClient cosi.ProvisionerClient
4951
driverName string
5052

53+
eventRecorder record.EventRecorder
54+
5155
kubeClient kube.Interface
5256
bucketClient buckets.Interface
5357
kubeVersion *utilversion.Version
@@ -62,9 +66,17 @@ func NewBucketAccessListener(driverName string, client cosi.ProvisionerClient) (
6266
}
6367

6468
// Add attempts to provision credentials to access a given bucket. This function must be idempotent
69+
//
70+
// Recorded events
71+
//
72+
// BucketNotReady - BucketAccess can't be granted to bucket not in Ready state and without a bucketID
73+
// MissingServiceAccountName - Must define ServiceAccountName when AuthenticationType is IAM
74+
// InvalidBucketAccessClass - BucketAccessClass provided in the BucketAccess does not exist
75+
//
6576
// Return values
66-
// nil - BucketAccess successfully granted
67-
// non-nil err - Internal error [requeue'd with exponential backoff]
77+
//
78+
// nil - BucketAccess successfully granted
79+
// non-nil err - Internal error [requeue'd with exponential backoff]
6880
func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1alpha1.BucketAccess) error {
6981
bucketAccess := inputBucketAccess.DeepCopy()
7082

@@ -91,7 +103,10 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a
91103
}
92104

93105
bucketAccessClass, err := bal.bucketAccessClasses().Get(ctx, bucketAccessClassName, metav1.GetOptions{})
94-
if err != nil {
106+
if kubeerrors.IsNotFound(err) {
107+
bal.recordEvent(inputBucketAccess, v1.EventTypeWarning, "InvalidBucketAccessClass", "BucketAccessClass provided in the BucketAccess does not exist")
108+
return err
109+
} else if err != nil {
95110
klog.ErrorS(err, "Failed to fetch bucketAccessClass", "bucketAccessClass", bucketAccessClassName)
96111
return errors.Wrap(err, "Failed to fetch BucketAccessClass")
97112
}
@@ -129,6 +144,7 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a
129144
}
130145

131146
if authType == cosi.AuthenticationType_IAM && bucketAccess.Spec.ServiceAccountName == "" {
147+
bal.recordEvent(inputBucketAccess, v1.EventTypeWarning, "MissingServiceAccountName", "Must define ServiceAccountName when AuthenticationType is IAM")
132148
return errors.New("Must define ServiceAccountName when AuthenticationType is IAM")
133149
}
134150

@@ -146,6 +162,9 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a
146162
return errors.Wrap(err, "Failed to fetch bucket")
147163
}
148164

165+
if bucket.Status.BucketID == "" {
166+
bal.recordEvent(inputBucketAccess, v1.EventTypeWarning, "BucketNotReady", "BucketAccess can't be granted to bucket not in Ready state and without a bucketID")
167+
}
149168
if bucket.Status.BucketReady != true || bucket.Status.BucketID == "" {
150169
return errors.New("BucketAccess can't be granted to bucket not in Ready state and without a bucketID")
151170
}
@@ -163,11 +182,7 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a
163182
rsp, err := bal.provisionerClient.DriverGrantBucketAccess(ctx, req)
164183
if err != nil {
165184
if status.Code(err) != codes.AlreadyExists {
166-
klog.V(3).ErrorS(err,
167-
"Failed to grant access",
168-
"bucketAccess", bucketAccess.ObjectMeta.Name,
169-
"bucketClaim", bucketClaimName,
170-
)
185+
bal.recordEvent(inputBucketAccess, v1.EventTypeWarning, status.Code(err).String(), "Failed to grant access")
171186
return errors.Wrap(err, "failed to grant access")
172187
}
173188

@@ -237,7 +252,7 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a
237252
return errors.Wrap(err, "failed to fetch secrets")
238253
}
239254

240-
if _, err := bal.secrets(namespace).Create(ctx, &corev1.Secret{
255+
if _, err := bal.secrets(namespace).Create(ctx, &v1.Secret{
241256
ObjectMeta: metav1.ObjectMeta{
242257
Name: secretCredName,
243258
Namespace: namespace,
@@ -246,7 +261,7 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a
246261
StringData: map[string]string{
247262
"BucketInfo": string(stringData),
248263
},
249-
Type: corev1.SecretTypeOpaque,
264+
Type: v1.SecretTypeOpaque,
250265
}, metav1.CreateOptions{}); err != nil {
251266
if !kubeerrors.IsAlreadyExists(err) {
252267
klog.V(3).ErrorS(err,
@@ -291,8 +306,9 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a
291306

292307
// Update attempts to reconcile changes to a given bucketAccess. This function must be idempotent
293308
// Return values
294-
// nil - BucketAccess successfully reconciled
295-
// non-nil err - Internal error [requeue'd with exponential backoff]
309+
//
310+
// nil - BucketAccess successfully reconciled
311+
// non-nil err - Internal error [requeue'd with exponential backoff]
296312
func (bal *BucketAccessListener) Update(ctx context.Context, old, new *v1alpha1.BucketAccess) error {
297313
klog.V(3).InfoS("Update BucketAccess",
298314
"name", old.ObjectMeta.Name)
@@ -312,8 +328,9 @@ func (bal *BucketAccessListener) Update(ctx context.Context, old, new *v1alpha1.
312328

313329
// Delete attemps to delete a bucketAccess. This function must be idempotent
314330
// Return values
315-
// nil - BucketAccess successfully deleted
316-
// non-nil err - Internal error [requeue'd with exponential backoff]
331+
//
332+
// nil - BucketAccess successfully deleted
333+
// non-nil err - Internal error [requeue'd with exponential backoff]
317334
func (bal *BucketAccessListener) Delete(ctx context.Context, bucketAccess *v1alpha1.BucketAccess) error {
318335
klog.V(3).InfoS("Delete BucketAccess",
319336
"name", bucketAccess.ObjectMeta.Name,
@@ -346,11 +363,7 @@ func (bal *BucketAccessListener) deleteBucketAccessOp(ctx context.Context, bucke
346363

347364
// First we revoke the bucketAccess from the driver
348365
if _, err := bal.provisionerClient.DriverRevokeBucketAccess(ctx, req); err != nil {
349-
klog.V(3).ErrorS(err,
350-
"Failed to revoke bucket access",
351-
"bucketAccess", bucketAccess.ObjectMeta.Name,
352-
"bucketClaim", bucketClaimName,
353-
)
366+
bal.recordEvent(bucketAccess, v1.EventTypeWarning, status.Code(err).String(), "Failed to revoke bucket access")
354367
return errors.Wrap(err, "failed to revoke access")
355368
}
356369

@@ -446,3 +459,16 @@ func (bal *BucketAccessListener) InitializeKubeClient(k kube.Interface) {
446459
func (bal *BucketAccessListener) InitializeBucketClient(bc buckets.Interface) {
447460
bal.bucketClient = bc
448461
}
462+
463+
// InitializeEventRecorder initializes the event recorder
464+
func (bal *BucketAccessListener) InitializeEventRecorder(er record.EventRecorder) {
465+
bal.eventRecorder = er
466+
}
467+
468+
// recordEvent during the processing of the objects
469+
func (bal *BucketAccessListener) recordEvent(subject runtime.Object, eventtype, reason, message string) {
470+
if bal.eventRecorder == nil {
471+
return
472+
}
473+
bal.eventRecorder.Event(subject, eventtype, reason, message)
474+
}

Diff for: pkg/bucketaccess/bucketaccess_controller_test.go

+12
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/apimachinery/pkg/version"
2929
fakediscovery "k8s.io/client-go/discovery/fake"
3030
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
31+
"k8s.io/client-go/tools/record"
3132

3233
"sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
3334
fakebucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/fake"
@@ -71,6 +72,17 @@ func TestInitializeBucketClient(t *testing.T) {
7172
}
7273
}
7374

75+
func TestInitializeEventRecorder(t *testing.T) {
76+
eventRecorder := record.NewFakeRecorder(1)
77+
78+
bal := BucketAccessListener{}
79+
bal.InitializeEventRecorder(eventRecorder)
80+
81+
if bal.eventRecorder == nil {
82+
t.Errorf("BucketClient not initialized, expected not nil")
83+
}
84+
}
85+
7486
func TestAddWrongProvisioner(t *testing.T) {
7587
driver := "driver1"
7688
bucketAccessClassName := "bucketAccessClass1"

0 commit comments

Comments
 (0)